Skip to main content

djvu_rs/
djvu_async.rs

1//! Async render surface for [`DjVuPage`] — phase 5 extension.
2//!
3//! Feature-gated: `--features async` (adds `tokio` as a dependency).
4//!
5//! All rendering is delegated to [`tokio::task::spawn_blocking`]: the CPU-bound
6//! IW44/JB2 decode work runs on the blocking thread pool and never blocks the
7//! async runtime thread.
8//!
9//! [`DjVuPage`] implements [`Clone`], so the page is cloned into the blocking
10//! closure with no unsafe code and no thread management by the caller.
11//!
12//! ## Key public functions
13//!
14//! - [`render_pixmap_async`] — async wrapper around [`djvu_render::render_pixmap`]
15//! - [`render_gray8_async`] — async wrapper around [`djvu_render::render_gray8`]
16//! - [`render_progressive_stream`] — streaming progressive render yielding one frame per BG44 chunk
17//!
18//! ## Example: concurrent multi-page rendering
19//!
20//! ```no_run
21//! use djvu_rs::djvu_document::DjVuDocument;
22//! use djvu_rs::djvu_render::RenderOptions;
23//! use djvu_rs::djvu_async::render_pixmap_async;
24//!
25//! #[tokio::main]
26//! async fn main() {
27//!     let data = std::fs::read("document.djvu").unwrap();
28//!     let doc = std::sync::Arc::new(DjVuDocument::parse(&data).unwrap());
29//!
30//!     let futures: Vec<_> = (0..doc.page_count())
31//!         .filter_map(|i| doc.page(i).ok())
32//!         .map(|page| {
33//!             let page = page.clone();
34//!             let opts = RenderOptions { width: 800, height: 600, ..Default::default() };
35//!             tokio::spawn(async move { render_pixmap_async(&page, opts).await })
36//!         })
37//!         .collect();
38//!
39//!     for handle in futures {
40//!         let pixmap = handle.await.unwrap().unwrap();
41//!         println!("{}×{}", pixmap.width, pixmap.height);
42//!     }
43//! }
44//! ```
45
46use std::{collections::BTreeMap, ops::Range, sync::Arc};
47
48use tokio::{
49    io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt},
50    sync::{Mutex, OnceCell},
51};
52
53use crate::{
54    djvu_document::{DjVuDocument, DjVuPage, DocError},
55    djvu_render::{self, RenderError, RenderOptions},
56    error::IffError,
57    iff::parse_form,
58    pixmap::{GrayPixmap, Pixmap},
59};
60
61// ── Error types ───────────────────────────────────────────────────────────────
62
63/// Errors from async rendering.
64#[derive(Debug, thiserror::Error)]
65pub enum AsyncRenderError {
66    /// The underlying render failed.
67    #[error("render error: {0}")]
68    Render(#[from] RenderError),
69
70    /// The blocking task was cancelled or panicked.
71    #[error("spawn_blocking join error: {0}")]
72    Join(String),
73}
74
75/// Errors from async document loading.
76#[derive(Debug, thiserror::Error)]
77pub enum AsyncLoadError {
78    /// I/O error from the underlying async reader.
79    #[error("I/O error: {0}")]
80    Io(#[from] std::io::Error),
81
82    /// The buffered bytes failed to parse as a DjVu document.
83    #[error("parse error: {0}")]
84    Parse(#[from] DocError),
85}
86
87/// Errors from true lazy async document loading (#233 Phase 3 PR1).
88#[derive(Debug, thiserror::Error)]
89pub enum AsyncLazyError {
90    /// I/O error from the underlying async reader.
91    #[error("I/O error: {0}")]
92    Io(#[from] std::io::Error),
93
94    /// The fetched bytes failed to parse as a DjVu document.
95    #[error("parse error: {0}")]
96    Parse(#[from] DocError),
97
98    /// IFF container parse error while inspecting lazy page bytes.
99    #[error("IFF error: {0}")]
100    Iff(#[from] IffError),
101
102    /// Page index is out of range.
103    #[error("page index {index} is out of range (document has {count} pages)")]
104    PageOutOfRange { index: usize, count: usize },
105
106    /// This lazy-loading slice intentionally rejects a document shape.
107    #[error("unsupported lazy document shape: {0}")]
108    Unsupported(&'static str),
109}
110
111// ── True lazy async document loader (#233 Phase 3 PR1) ───────────────────────
112
113#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114enum LazyComponentType {
115    Shared,
116    Page,
117    Thumbnail,
118}
119
120#[derive(Debug, Clone)]
121struct LazyDirmEntry {
122    comp_type: LazyComponentType,
123    id: String,
124    offset: u32,
125}
126
127/// Native async lazy DjVu document.
128///
129/// This is the first Phase 3 slice for #233: it indexes a seekable async
130/// reader up front, then fetches and parses each page only when
131/// [`LazyDocument::page_async`] is called. Parsed pages are cached as
132/// `Arc<DjVuPage>` so callers can render them concurrently without borrowing
133/// the document across awaits.
134///
135/// Current scope:
136/// - single-page `FORM:DJVU`
137/// - bundled `FORM:DJVM` pages, including shared `DJVI` dictionaries referenced
138///   via `INCL`
139///
140/// WASM `!Send` readers are intentionally left to the next issue slice.
141pub struct LazyDocument<R> {
142    reader: Arc<Mutex<R>>,
143    pages: Vec<LazyPageIndex>,
144    shared: BTreeMap<String, LazyComponentIndex>,
145    cache: Vec<OnceCell<Arc<DjVuPage>>>,
146    shared_cache: BTreeMap<String, OnceCell<Arc<Vec<u8>>>>,
147}
148
149#[derive(Debug, Clone)]
150struct LazyPageIndex {
151    range: Range<u64>,
152}
153
154#[derive(Debug, Clone)]
155struct LazyComponentIndex {
156    range: Range<u64>,
157}
158
159impl<R> LazyDocument<R>
160where
161    R: AsyncRead + AsyncSeek + Unpin + 'static,
162{
163    /// Build a native lazy document index from an async seekable reader.
164    pub async fn from_async_reader_lazy(mut reader: R) -> Result<Self, AsyncLazyError> {
165        let file_len = reader.seek(std::io::SeekFrom::End(0)).await?;
166        reader.seek(std::io::SeekFrom::Start(0)).await?;
167
168        let mut head = [0u8; 16];
169        reader.read_exact(&mut head).await?;
170        if &head[..4] != b"AT&T" || &head[4..8] != b"FORM" {
171            return Err(AsyncLazyError::Unsupported("not an AT&T FORM document"));
172        }
173
174        let form_type = &head[12..16];
175        let (pages, shared) = if form_type == b"DJVU" {
176            (vec![LazyPageIndex { range: 0..file_len }], BTreeMap::new())
177        } else if form_type == b"DJVM" {
178            index_bundled_djvm(&mut reader).await?
179        } else {
180            return Err(AsyncLazyError::Unsupported(
181                "lazy loader supports only FORM:DJVU and bundled FORM:DJVM",
182            ));
183        };
184
185        if pages.is_empty() {
186            return Err(AsyncLazyError::Unsupported(
187                "document has no lazy-loadable pages",
188            ));
189        }
190
191        let cache = (0..pages.len()).map(|_| OnceCell::new()).collect();
192        let shared_cache = shared
193            .keys()
194            .map(|id| (id.clone(), OnceCell::new()))
195            .collect();
196        Ok(Self {
197            reader: Arc::new(Mutex::new(reader)),
198            pages,
199            shared,
200            cache,
201            shared_cache,
202        })
203    }
204
205    /// Number of lazy-loadable pages.
206    pub fn page_count(&self) -> usize {
207        self.pages.len()
208    }
209
210    /// Fetch, parse, cache, and return page `index`.
211    pub async fn page_async(&self, index: usize) -> Result<Arc<DjVuPage>, AsyncLazyError> {
212        let page = self
213            .pages
214            .get(index)
215            .ok_or(AsyncLazyError::PageOutOfRange {
216                index,
217                count: self.pages.len(),
218            })?
219            .clone();
220
221        self.cache[index]
222            .get_or_try_init(|| async move {
223                let bytes = self.read_page_bytes(page.range).await?;
224                let form = parse_form(&bytes)?;
225                let shared_djbz = if let Some(incl) = form.chunks.iter().find(|c| &c.id == b"INCL")
226                {
227                    Some(self.shared_djbz(incl.data).await?)
228                } else {
229                    None
230                };
231                let page = DjVuDocument::parse_single_page_with_shared(&bytes, index, shared_djbz)?;
232                Ok(Arc::new(page))
233            })
234            .await
235            .cloned()
236    }
237
238    async fn shared_djbz(&self, incl: &[u8]) -> Result<Arc<Vec<u8>>, AsyncLazyError> {
239        let name = core::str::from_utf8(incl.trim_ascii_end())
240            .map_err(|_| AsyncLazyError::Unsupported("INCL name is not valid UTF-8"))?;
241        let cell = self
242            .shared_cache
243            .get(name)
244            .ok_or(AsyncLazyError::Unsupported("INCL target is not in DIRM"))?;
245        cell.get_or_try_init(|| async move {
246            let component = self
247                .shared
248                .get(name)
249                .ok_or(AsyncLazyError::Unsupported("INCL target is not in DIRM"))?
250                .clone();
251            let bytes = self.read_page_bytes(component.range).await?;
252            let form = parse_form(&bytes)?;
253            if form.form_type != *b"DJVI" {
254                return Err(AsyncLazyError::Unsupported("INCL target is not FORM:DJVI"));
255            }
256            let djbz = form.chunks.iter().find(|c| &c.id == b"Djbz").ok_or(
257                AsyncLazyError::Unsupported("DJVI component is missing Djbz"),
258            )?;
259            Ok(Arc::new(djbz.data.to_vec()))
260        })
261        .await
262        .cloned()
263    }
264
265    async fn read_page_bytes(&self, range: Range<u64>) -> Result<Vec<u8>, AsyncLazyError> {
266        let len = usize::try_from(range.end.saturating_sub(range.start))
267            .map_err(|_| AsyncLazyError::Unsupported("page range exceeds addressable memory"))?;
268        let mut bytes = Vec::with_capacity(len.saturating_add(4));
269        if range.start != 0 {
270            bytes.extend_from_slice(b"AT&T");
271        }
272        let mut reader = self.reader.lock().await;
273        reader.seek(std::io::SeekFrom::Start(range.start)).await?;
274        let mut chunk = vec![0u8; len];
275        reader.read_exact(&mut chunk).await?;
276        bytes.extend_from_slice(&chunk);
277        Ok(bytes)
278    }
279}
280
281/// Build a native lazy document index from an async seekable reader.
282///
283/// Convenience wrapper around [`LazyDocument::from_async_reader_lazy`].
284pub async fn from_async_reader_lazy<R>(reader: R) -> Result<LazyDocument<R>, AsyncLazyError>
285where
286    R: AsyncRead + AsyncSeek + Unpin + Send + 'static,
287{
288    LazyDocument::from_async_reader_lazy(reader).await
289}
290
291/// Build a lazy document from a single-threaded WASM-local async reader.
292///
293/// This constructor intentionally drops the native `Send` bound for browser
294/// readers such as `wasm-bindgen-futures`/`gloo` streams.
295#[cfg(target_arch = "wasm32")]
296pub async fn from_async_reader_lazy_local<R>(reader: R) -> Result<LazyDocument<R>, AsyncLazyError>
297where
298    R: AsyncRead + AsyncSeek + Unpin + 'static,
299{
300    LazyDocument::from_async_reader_lazy(reader).await
301}
302
303async fn index_bundled_djvm<R>(
304    reader: &mut R,
305) -> Result<(Vec<LazyPageIndex>, BTreeMap<String, LazyComponentIndex>), AsyncLazyError>
306where
307    R: AsyncRead + AsyncSeek + Unpin + 'static,
308{
309    let mut chunk_hdr = [0u8; 8];
310    reader.read_exact(&mut chunk_hdr).await?;
311    if &chunk_hdr[..4] != b"DIRM" {
312        return Err(AsyncLazyError::Unsupported(
313            "lazy DJVM loader requires DIRM as the first inner chunk",
314        ));
315    }
316    let dirm_len =
317        u32::from_be_bytes([chunk_hdr[4], chunk_hdr[5], chunk_hdr[6], chunk_hdr[7]]) as usize;
318    let padded = dirm_len + (dirm_len & 1);
319    let mut dirm = vec![0u8; padded];
320    reader.read_exact(&mut dirm).await?;
321
322    let entries = parse_lazy_dirm(&dirm[..dirm_len])?;
323    let mut pages = Vec::new();
324    let mut shared = BTreeMap::new();
325    for entry in entries {
326        let start = entry.offset as u64;
327        reader.seek(std::io::SeekFrom::Start(start + 4)).await?;
328        let mut size_bytes = [0u8; 4];
329        reader.read_exact(&mut size_bytes).await?;
330        let size = u32::from_be_bytes(size_bytes) as u64;
331        let range = start..start.saturating_add(8).saturating_add(size);
332        match entry.comp_type {
333            LazyComponentType::Page => pages.push(LazyPageIndex { range }),
334            LazyComponentType::Shared => {
335                shared.insert(entry.id, LazyComponentIndex { range });
336            }
337            LazyComponentType::Thumbnail => {}
338        }
339    }
340    Ok((pages, shared))
341}
342
343fn parse_lazy_dirm(data: &[u8]) -> Result<Vec<LazyDirmEntry>, AsyncLazyError> {
344    if data.len() < 3 {
345        return Err(AsyncLazyError::Unsupported("DIRM chunk too short"));
346    }
347    let dflags = data[0];
348    if (dflags >> 7) == 0 {
349        return Err(AsyncLazyError::Unsupported(
350            "indirect DJVM lazy loading is not implemented yet",
351        ));
352    }
353    let nfiles = u16::from_be_bytes([data[1], data[2]]) as usize;
354    let offsets_start = 3usize;
355    let offsets_end = offsets_start
356        .checked_add(nfiles.saturating_mul(4))
357        .ok_or(AsyncLazyError::Unsupported("DIRM offset table overflow"))?;
358    if offsets_end > data.len() {
359        return Err(AsyncLazyError::Unsupported("DIRM offset table truncated"));
360    }
361
362    let mut offsets = Vec::with_capacity(nfiles);
363    for i in 0..nfiles {
364        let base = offsets_start + i * 4;
365        offsets.push(u32::from_be_bytes([
366            data[base],
367            data[base + 1],
368            data[base + 2],
369            data[base + 3],
370        ]));
371    }
372
373    let meta = djvu_bzz::bzz_decode(&data[offsets_end..]).unwrap_or_default();
374    let mut comp_types = Vec::with_capacity(nfiles);
375    let mut ids = Vec::with_capacity(nfiles);
376    let flags_start = nfiles * 3;
377    if flags_start + nfiles <= meta.len() {
378        let mut pos = flags_start + nfiles;
379        for flag in &meta[flags_start..flags_start + nfiles] {
380            let comp_type = match flag & 0x3f {
381                1 => LazyComponentType::Page,
382                2 => LazyComponentType::Thumbnail,
383                _ => LazyComponentType::Shared,
384            };
385            comp_types.push(comp_type);
386            ids.push(read_lazy_dirm_string(&meta, &mut pos).unwrap_or_default());
387
388            if (flag & 0x80) != 0 {
389                let _ = read_lazy_dirm_string(&meta, &mut pos);
390            }
391            if (flag & 0x40) != 0 {
392                let _ = read_lazy_dirm_string(&meta, &mut pos);
393            }
394        }
395    } else {
396        comp_types.resize(nfiles, LazyComponentType::Page);
397        ids.extend((0..nfiles).map(|i| format!("p{i:04}")));
398    }
399
400    Ok(offsets
401        .into_iter()
402        .zip(comp_types)
403        .zip(ids)
404        .map(|((offset, comp_type), id)| LazyDirmEntry {
405            comp_type,
406            id,
407            offset,
408        })
409        .collect())
410}
411
412fn read_lazy_dirm_string(data: &[u8], pos: &mut usize) -> Option<String> {
413    let start = *pos;
414    let rest = data.get(start..)?;
415    let nul = rest.iter().position(|&b| b == 0)?;
416    *pos = start + nul + 1;
417    core::str::from_utf8(&rest[..nul])
418        .ok()
419        .map(ToOwned::to_owned)
420}
421
422// ── Async document loader ─────────────────────────────────────────────────────
423
424/// Asynchronously load and parse a DjVu document from any [`AsyncRead`].
425///
426/// **Phase 1 of #196.** Convenience constructor that buffers the full reader
427/// into memory before handing the bytes to [`DjVuDocument::parse`]. Memory
428/// still peaks at full file size, but removes the synchronous `std::fs::read`
429/// boundary at the call site — works directly with async file readers, HTTP
430/// body streams, S3 GetObject, etc.
431///
432/// Phases 2/3 will add genuine streaming: Phase 2 reads only the IFF
433/// header and DIRM up front and exposes per-page byte offsets; Phase 3
434/// makes [`DjVuDocument::page`] async and fetches each page's bytes on
435/// demand (HTTP Range requests, etc.).
436///
437/// # Example
438///
439/// ```no_run
440/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
441/// use djvu_rs::djvu_async::load_document_async;
442///
443/// let data = std::fs::read("document.djvu")?;
444/// let doc = load_document_async(std::io::Cursor::new(data)).await?;
445/// println!("loaded {} pages", doc.page_count());
446/// # Ok(()) }
447/// ```
448pub async fn load_document_async<R>(mut reader: R) -> Result<DjVuDocument, AsyncLoadError>
449where
450    R: AsyncRead + Unpin + Send,
451{
452    let mut buf = Vec::new();
453    reader.read_to_end(&mut buf).await?;
454    Ok(DjVuDocument::parse(&buf)?)
455}
456
457/// Async loader that reads the IFF + FORM + DIRM head separately from the
458/// page bodies (#196 Phase 2).
459///
460/// **Phase 2 of #196.** Issues two `read_exact` calls for the document head
461/// (IFF magic + FORM length + form_type, then the DIRM chunk header + payload),
462/// then a single `read_to_end` for the remainder. The total bytes received
463/// match Phase 1 — this constructor still returns an in-memory
464/// [`DjVuDocument`] — but a bandwidth-instrumented `AsyncRead` implementation
465/// can observe the head-first read pattern, and the resulting document
466/// exposes [`DjVuDocument::page_byte_range`] for any caller that wants to
467/// fan out per-page byte fetches via HTTP `Range` requests on a separate
468/// connection.
469///
470/// For documents that aren't bundled DJVM (single-page DJVU, indirect DJVM,
471/// or anything without a DIRM in the first chunk), this falls back to the
472/// Phase 1 buffered-read behavior — there's nothing useful to stream.
473///
474/// # Errors
475///
476/// - `AsyncLoadError::Io` — any underlying read fails
477/// - `AsyncLoadError::Parse` — the assembled buffer fails [`DjVuDocument::parse`]
478pub async fn load_document_async_streaming<R>(mut reader: R) -> Result<DjVuDocument, AsyncLoadError>
479where
480    R: AsyncRead + Unpin + Send,
481{
482    // 1) IFF outer header: 4-byte magic "AT&T" + "FORM" + 4-byte length + 4-byte form_type = 16 bytes.
483    let mut head = [0u8; 16];
484    reader.read_exact(&mut head).await?;
485
486    // If it isn't a DJVM bundle, the rest of the file is just page payload —
487    // no per-chunk streaming benefit, so fall back to bulk read.
488    let is_djvm = &head[..4] == b"AT&T" && &head[4..8] == b"FORM" && &head[12..16] == b"DJVM";
489
490    let mut buf = Vec::with_capacity(if is_djvm {
491        // Pre-size: 1 MB head guess; Vec grows as needed.
492        1 << 20
493    } else {
494        16 * 1024
495    });
496    buf.extend_from_slice(&head);
497
498    if is_djvm {
499        // 2) Next chunk header: 4-byte id + 4-byte BE length.
500        let mut chunk_hdr = [0u8; 8];
501        reader.read_exact(&mut chunk_hdr).await?;
502        buf.extend_from_slice(&chunk_hdr);
503
504        // If the first inner chunk is DIRM, read its payload separately so
505        // a recording reader sees the head-first pattern. Otherwise just
506        // continue with read_to_end — the document layout is non-canonical
507        // and Phase 2's offset map wouldn't apply anyway.
508        if &chunk_hdr[..4] == b"DIRM" {
509            let dirm_len =
510                u32::from_be_bytes([chunk_hdr[4], chunk_hdr[5], chunk_hdr[6], chunk_hdr[7]])
511                    as usize;
512            // IFF chunks pad to 2-byte boundary; the parser handles this, but
513            // we must read those padding bytes too to keep alignment.
514            let padded = dirm_len + (dirm_len & 1);
515            let mut dirm_buf = vec![0u8; padded];
516            reader.read_exact(&mut dirm_buf).await?;
517            buf.extend_from_slice(&dirm_buf);
518        }
519    }
520
521    // 3) Bulk-read the remainder.
522    reader.read_to_end(&mut buf).await?;
523
524    Ok(DjVuDocument::parse(&buf)?)
525}
526
527// ── Async render functions ────────────────────────────────────────────────────
528
529/// Render `page` to an RGBA [`Pixmap`] asynchronously.
530///
531/// Clones the page and delegates to [`djvu_render::render_pixmap`] via
532/// [`tokio::task::spawn_blocking`]. The render runs on the blocking thread
533/// pool and does not block the async runtime.
534///
535/// # Example
536///
537/// ```no_run
538/// # async fn example() {
539/// use djvu_rs::djvu_document::DjVuDocument;
540/// use djvu_rs::djvu_render::RenderOptions;
541/// use djvu_rs::djvu_async::render_pixmap_async;
542///
543/// let data = std::fs::read("file.djvu").unwrap();
544/// let doc = DjVuDocument::parse(&data).unwrap();
545/// let page = doc.page(0).unwrap();
546/// let opts = RenderOptions { width: 400, height: 300, ..Default::default() };
547/// let pixmap = render_pixmap_async(page, opts).await.unwrap();
548/// println!("{}×{}", pixmap.width, pixmap.height);
549/// # }
550/// ```
551pub async fn render_pixmap_async(
552    page: &DjVuPage,
553    opts: RenderOptions,
554) -> Result<Pixmap, AsyncRenderError> {
555    let page = Arc::new(page.clone());
556    tokio::task::spawn_blocking(move || {
557        djvu_render::render_pixmap(&page, &opts).map_err(AsyncRenderError::Render)
558    })
559    .await
560    .map_err(|e| AsyncRenderError::Join(e.to_string()))?
561}
562
563/// Render `page` to an 8-bit grayscale [`GrayPixmap`] asynchronously.
564///
565/// Clones the page and delegates to [`djvu_render::render_gray8`] via
566/// [`tokio::task::spawn_blocking`].
567pub async fn render_gray8_async(
568    page: &DjVuPage,
569    opts: RenderOptions,
570) -> Result<GrayPixmap, AsyncRenderError> {
571    let page = Arc::new(page.clone());
572    tokio::task::spawn_blocking(move || {
573        djvu_render::render_gray8(&page, &opts).map_err(AsyncRenderError::Render)
574    })
575    .await
576    .map_err(|e| AsyncRenderError::Join(e.to_string()))?
577}
578
579/// Render a `DjVuPage` as a lazy progressive stream of [`Pixmap`] frames.
580///
581/// Yields one frame per BG44 wavelet refinement chunk: the first frame is the
582/// coarsest (fastest to produce), and each subsequent frame adds detail. The
583/// final frame is equivalent to [`render_pixmap`][djvu_render::render_pixmap].
584///
585/// If the page has no BG44 chunks (bilevel JB2-only pages), exactly one frame
586/// is yielded via [`render_pixmap`][djvu_render::render_pixmap].
587///
588/// Each frame is produced via [`tokio::task::spawn_blocking`] just before it is
589/// yielded, so the stream never blocks the async runtime thread.
590///
591/// # Example
592///
593/// ```no_run
594/// # async fn example() {
595/// use djvu_rs::djvu_document::DjVuDocument;
596/// use djvu_rs::djvu_render::RenderOptions;
597/// use djvu_rs::djvu_async::render_progressive_stream;
598/// use futures::StreamExt;
599///
600/// let data = std::fs::read("file.djvu").unwrap();
601/// let doc = DjVuDocument::parse(&data).unwrap();
602/// let page = doc.page(0).unwrap();
603/// let opts = RenderOptions { width: 800, height: 600, ..Default::default() };
604///
605/// let stream = render_progressive_stream(page, opts);
606/// futures::pin_mut!(stream);
607/// while let Some(pixmap) = stream.next().await {
608///     let pixmap = pixmap.unwrap();
609///     println!("{}×{}", pixmap.width, pixmap.height);
610/// }
611/// # }
612/// ```
613pub fn render_progressive_stream(
614    page: &DjVuPage,
615    opts: RenderOptions,
616) -> impl futures_core::Stream<Item = Result<Pixmap, AsyncRenderError>> {
617    // Single clone wrapped in Arc — all spawn_blocking closures share
618    // this one allocation instead of cloning the full page each time.
619    let page = Arc::new(page.clone());
620    let n_chunks = page.bg44_chunks().len();
621
622    async_stream::stream! {
623        if n_chunks == 0 {
624            let page = Arc::clone(&page);
625            let opts = opts.clone();
626            let result = tokio::task::spawn_blocking(move || {
627                djvu_render::render_pixmap(&page, &opts).map_err(AsyncRenderError::Render)
628            })
629            .await
630            .map_err(|e| AsyncRenderError::Join(e.to_string()));
631            yield result.and_then(|r| r);
632        } else {
633            for chunk_n in 0..n_chunks {
634                let page = Arc::clone(&page);
635                let opts = opts.clone();
636                let result = tokio::task::spawn_blocking(move || {
637                    djvu_render::render_progressive(&page, &opts, chunk_n)
638                        .map_err(AsyncRenderError::Render)
639                })
640                .await
641                .map_err(|e| AsyncRenderError::Join(e.to_string()));
642                yield result.and_then(|r| r);
643            }
644        }
645    }
646}
647
648// ── Tests ─────────────────────────────────────────────────────────────────────
649
650#[cfg(test)]
651mod tests {
652    use super::*;
653    use crate::djvu_document::DjVuDocument;
654
655    fn assets_path() -> std::path::PathBuf {
656        std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
657            .join("references/djvujs/library/assets")
658    }
659
660    fn load_doc(name: &str) -> DjVuDocument {
661        let data =
662            std::fs::read(assets_path().join(name)).unwrap_or_else(|_| panic!("{name} must exist"));
663        DjVuDocument::parse(&data).unwrap_or_else(|e| panic!("{e}"))
664    }
665
666    /// `render_pixmap_async` returns a pixmap with correct dimensions.
667    #[tokio::test]
668    async fn render_pixmap_async_correct_dims() {
669        let doc = load_doc("chicken.djvu");
670        let page = doc.page(0).unwrap();
671        let pw = page.width() as u32;
672        let ph = page.height() as u32;
673
674        let opts = RenderOptions {
675            width: pw,
676            height: ph,
677            ..Default::default()
678        };
679        let pm = render_pixmap_async(page, opts)
680            .await
681            .expect("async render must succeed");
682        assert_eq!(pm.width, pw);
683        assert_eq!(pm.height, ph);
684    }
685
686    /// `render_gray8_async` returns a grayscale pixmap with the right size.
687    #[tokio::test]
688    async fn render_gray8_async_correct_dims() {
689        let doc = load_doc("chicken.djvu");
690        let page = doc.page(0).unwrap();
691        let pw = page.width() as u32;
692        let ph = page.height() as u32;
693
694        let opts = RenderOptions {
695            width: pw,
696            height: ph,
697            ..Default::default()
698        };
699        let gm = render_gray8_async(page, opts)
700            .await
701            .expect("async gray render must succeed");
702        assert_eq!(gm.width, pw);
703        assert_eq!(gm.height, ph);
704        assert_eq!(gm.data.len(), (pw * ph) as usize);
705    }
706
707    /// Async and sync renders produce identical results.
708    #[tokio::test]
709    async fn async_matches_sync() {
710        let doc = load_doc("chicken.djvu");
711        let page = doc.page(0).unwrap();
712        let pw = page.width() as u32;
713        let ph = page.height() as u32;
714
715        let opts = RenderOptions {
716            width: pw,
717            height: ph,
718            ..Default::default()
719        };
720        let sync_pm = djvu_render::render_pixmap(page, &opts).expect("sync render must succeed");
721        let async_pm = render_pixmap_async(page, opts.clone())
722            .await
723            .expect("async render must succeed");
724
725        assert_eq!(
726            sync_pm.data, async_pm.data,
727            "async and sync renders must match"
728        );
729    }
730
731    /// Concurrent rendering of multiple instances of the same page succeeds.
732    #[tokio::test]
733    async fn concurrent_render_multiple_tasks() {
734        let doc = load_doc("chicken.djvu");
735        let page = doc.page(0).unwrap();
736        let pw = page.width() as u32;
737        let ph = page.height() as u32;
738        let opts = RenderOptions {
739            width: pw / 2,
740            height: ph / 2,
741            scale: 0.5,
742            ..Default::default()
743        };
744
745        // Spawn 4 concurrent renders of the same page.
746        let handles: Vec<_> = (0..4)
747            .map(|_| {
748                let page_clone = page.clone();
749                let opts_clone = opts.clone();
750                tokio::spawn(async move { render_pixmap_async(&page_clone, opts_clone).await })
751            })
752            .collect();
753
754        for handle in handles {
755            let pm = handle
756                .await
757                .expect("task must not panic")
758                .expect("render must succeed");
759            assert_eq!(pm.width, pw / 2);
760            assert_eq!(pm.height, ph / 2);
761        }
762    }
763
764    /// `AsyncRenderError::Render` wraps `RenderError`.
765    #[test]
766    fn async_render_error_display() {
767        let err = AsyncRenderError::Render(crate::djvu_render::RenderError::InvalidDimensions {
768            width: 0,
769            height: 0,
770        });
771        let s = err.to_string();
772        assert!(
773            s.contains("render error"),
774            "error must mention 'render error'"
775        );
776    }
777
778    // ── render_progressive_stream tests ──────────────────────────────────────
779
780    /// Last frame from the progressive stream matches `render_pixmap`.
781    #[tokio::test]
782    async fn progressive_stream_last_frame_matches_pixmap() {
783        use futures::StreamExt;
784        let doc = load_doc("chicken.djvu");
785        let page = doc.page(0).unwrap();
786        let opts = RenderOptions {
787            width: 100,
788            height: 80,
789            ..Default::default()
790        };
791
792        let stream = render_progressive_stream(page, opts.clone());
793        futures::pin_mut!(stream);
794
795        let mut frames: Vec<Pixmap> = Vec::new();
796        while let Some(result) = stream.next().await {
797            frames.push(result.expect("frame should succeed"));
798        }
799
800        assert!(!frames.is_empty(), "stream must yield at least one frame");
801
802        let expected = djvu_render::render_pixmap(page, &opts).expect("render_pixmap must succeed");
803        assert_eq!(
804            frames.last().unwrap().data,
805            expected.data,
806            "last frame must match render_pixmap"
807        );
808    }
809
810    /// Each successive frame has the same dimensions.
811    #[tokio::test]
812    async fn progressive_stream_consistent_dimensions() {
813        use futures::StreamExt;
814        let doc = load_doc("chicken.djvu");
815        let page = doc.page(0).unwrap();
816        let n_chunks = page.bg44_chunks().len();
817        let opts = RenderOptions {
818            width: 100,
819            height: 80,
820            ..Default::default()
821        };
822
823        let stream = render_progressive_stream(page, opts);
824        futures::pin_mut!(stream);
825
826        let mut count = 0usize;
827        while let Some(result) = stream.next().await {
828            let frame = result.expect("frame should succeed");
829            assert_eq!(frame.width, 100);
830            assert_eq!(frame.height, 80);
831            count += 1;
832        }
833
834        let expected_count = if n_chunks == 0 { 1 } else { n_chunks };
835        assert_eq!(
836            count, expected_count,
837            "frame count must equal BG44 chunk count"
838        );
839    }
840
841    // ── load_document_async tests ────────────────────────────────────────────
842
843    /// `load_document_async` over an async reader matches `DjVuDocument::parse`.
844    #[tokio::test]
845    async fn load_document_async_matches_sync_parse() {
846        let path = assets_path().join("chicken.djvu");
847        let sync_data = std::fs::read(&path).expect("sync read must succeed");
848        let async_doc = load_document_async(std::io::Cursor::new(sync_data.clone()))
849            .await
850            .expect("async load must succeed");
851        let sync_doc = DjVuDocument::parse(&sync_data).expect("sync parse must succeed");
852
853        assert_eq!(async_doc.page_count(), sync_doc.page_count());
854        for i in 0..sync_doc.page_count() {
855            let a = async_doc.page(i).expect("async page");
856            let s = sync_doc.page(i).expect("sync page");
857            assert_eq!(a.width(), s.width());
858            assert_eq!(a.height(), s.height());
859        }
860    }
861
862    /// `load_document_async` works with an in-memory `&[u8]` reader (e.g. HTTP body).
863    #[tokio::test]
864    async fn load_document_async_from_in_memory_reader() {
865        let path = assets_path().join("chicken.djvu");
866        let bytes = std::fs::read(&path).expect("read");
867
868        // `&[u8]` implements AsyncRead via tokio's blanket impl on slices.
869        let reader = std::io::Cursor::new(bytes.clone());
870        let doc = load_document_async(reader)
871            .await
872            .expect("async load from cursor must succeed");
873        assert!(doc.page_count() > 0);
874    }
875
876    /// Truncated / non-DjVu bytes surface as `AsyncLoadError::Parse`, not panic.
877    #[tokio::test]
878    async fn load_document_async_propagates_parse_error() {
879        let bogus = b"not a djvu file at all".to_vec();
880        let reader = std::io::Cursor::new(bogus);
881        let err = load_document_async(reader)
882            .await
883            .expect_err("must fail to parse garbage");
884        assert!(
885            matches!(err, AsyncLoadError::Parse(_)),
886            "expected Parse error, got {err:?}"
887        );
888    }
889
890    /// `LazyDocument` fetches and parses a single-page document only when
891    /// `page_async` is called, then returns the cached `Arc` on repeat access.
892    #[tokio::test]
893    async fn lazy_document_single_page_caches_arc_page() {
894        let path = assets_path().join("chicken.djvu");
895        let bytes = std::fs::read(&path).expect("read");
896        let sync_doc = DjVuDocument::parse(&bytes).expect("sync parse");
897
898        let lazy = from_async_reader_lazy(std::io::Cursor::new(bytes))
899            .await
900            .expect("lazy index");
901        assert_eq!(lazy.page_count(), 1);
902
903        let page_a = lazy.page_async(0).await.expect("lazy page");
904        let page_b = lazy.page_async(0).await.expect("lazy cached page");
905        assert!(
906            Arc::ptr_eq(&page_a, &page_b),
907            "repeat access must reuse cache"
908        );
909
910        let sync_page = sync_doc.page(0).expect("sync page");
911        assert_eq!(page_a.width(), sync_page.width());
912        assert_eq!(page_a.height(), sync_page.height());
913    }
914
915    /// `LazyDocument` indexes bundled DJVM ranges up front and can fetch a
916    /// no-INCL page without reading/parsing the full document body.
917    #[tokio::test]
918    async fn lazy_document_bundled_page_without_incl_matches_sync() {
919        let path = assets_path().join("colorbook.djvu");
920        let Ok(bytes) = std::fs::read(&path) else {
921            eprintln!("skip: {} missing", path.display());
922            return;
923        };
924        let sync_doc = DjVuDocument::parse(&bytes).expect("sync parse");
925        let lazy = from_async_reader_lazy(std::io::Cursor::new(bytes))
926            .await
927            .expect("lazy index");
928
929        assert_eq!(lazy.page_count(), sync_doc.page_count());
930        let page_index = (0..sync_doc.page_count())
931            .find(|&i| {
932                sync_doc
933                    .page(i)
934                    .expect("sync page")
935                    .chunk_ids()
936                    .iter()
937                    .all(|id| id != b"INCL")
938            })
939            .expect("fixture must contain at least one page without INCL");
940
941        let lazy_page = lazy.page_async(page_index).await.expect("lazy page");
942        let sync_page = sync_doc.page(page_index).expect("sync page");
943        assert_eq!(lazy_page.width(), sync_page.width());
944        assert_eq!(lazy_page.height(), sync_page.height());
945    }
946
947    #[tokio::test]
948    async fn lazy_document_bundled_page_with_incl_uses_shared_dict() {
949        let mut p1 = crate::bitmap::Bitmap::new(32, 12);
950        let mut p2 = crate::bitmap::Bitmap::new(32, 12);
951        for y in 2..10 {
952            for x in 3..9 {
953                p1.set(x, y, true);
954                p2.set(x, y, true);
955            }
956        }
957        for y in 3..9 {
958            for x in 16..22 {
959                p1.set(x, y, true);
960                p2.set(x, y, true);
961            }
962        }
963
964        let bytes = crate::jb2_encode::encode_djvm_bundle_jb2(&[p1.clone(), p2.clone()], 2);
965        let sync_doc = DjVuDocument::parse(&bytes).expect("sync parse");
966        let lazy = from_async_reader_lazy(std::io::Cursor::new(bytes))
967            .await
968            .expect("lazy index");
969
970        assert_eq!(lazy.page_count(), 2);
971        let lazy_page = lazy.page_async(0).await.expect("lazy page");
972        assert!(lazy_page.raw_chunk(b"INCL").is_some());
973        let lazy_mask = lazy_page
974            .extract_mask()
975            .expect("lazy mask")
976            .expect("lazy mask present");
977        let sync_mask = sync_doc
978            .page(0)
979            .expect("sync page")
980            .extract_mask()
981            .expect("sync mask")
982            .expect("sync mask present");
983        assert_eq!(lazy_mask, sync_mask);
984        assert_eq!(lazy_mask, p1);
985    }
986
987    #[tokio::test]
988    async fn lazy_document_page_out_of_range() {
989        let path = assets_path().join("chicken.djvu");
990        let bytes = std::fs::read(&path).expect("read");
991        let lazy = from_async_reader_lazy(std::io::Cursor::new(bytes))
992            .await
993            .expect("lazy index");
994
995        let err = lazy
996            .page_async(1)
997            .await
998            .expect_err("page 1 is out of range");
999        assert!(
1000            matches!(err, AsyncLazyError::PageOutOfRange { index: 1, count: 1 }),
1001            "unexpected error: {err:?}"
1002        );
1003    }
1004
1005    /// `load_document_async_streaming` produces the same document as
1006    /// the buffered Phase 1 loader on a bundled DJVM.
1007    #[tokio::test]
1008    async fn streaming_loader_matches_buffered() {
1009        let path = assets_path().join("DjVu3Spec_bundled.djvu");
1010        let Ok(bytes) = std::fs::read(&path) else {
1011            eprintln!("skip: {} missing", path.display());
1012            return;
1013        };
1014        let streamed = load_document_async_streaming(std::io::Cursor::new(bytes.clone()))
1015            .await
1016            .expect("streaming load must succeed");
1017        let buffered = DjVuDocument::parse(&bytes).expect("buffered parse");
1018
1019        assert_eq!(streamed.page_count(), buffered.page_count());
1020        for i in 0..buffered.page_count() {
1021            assert_eq!(streamed.page_byte_range(i), buffered.page_byte_range(i));
1022        }
1023    }
1024
1025    /// `load_document_async_streaming` reads the head before the body
1026    /// (#196 Phase 2 DoD).
1027    ///
1028    /// A custom `AsyncRead` records every requested read size. The first
1029    /// three calls must be small and bounded (IFF head 16 B, chunk header
1030    /// 8 B, DIRM payload — typically a few KB on a real document).
1031    #[tokio::test]
1032    async fn streaming_loader_reads_head_before_body() {
1033        use std::sync::{Arc, Mutex};
1034
1035        let path = assets_path().join("DjVu3Spec_bundled.djvu");
1036        let Ok(bytes) = std::fs::read(&path) else {
1037            eprintln!("skip: {} missing", path.display());
1038            return;
1039        };
1040
1041        struct RecordingReader {
1042            inner: std::io::Cursor<Vec<u8>>,
1043            sizes: Arc<Mutex<Vec<usize>>>,
1044        }
1045        impl tokio::io::AsyncRead for RecordingReader {
1046            fn poll_read(
1047                mut self: std::pin::Pin<&mut Self>,
1048                _cx: &mut std::task::Context<'_>,
1049                buf: &mut tokio::io::ReadBuf<'_>,
1050            ) -> std::task::Poll<std::io::Result<()>> {
1051                let want = buf.remaining();
1052                let pos = self.inner.position() as usize;
1053                let src = self.inner.get_ref();
1054                let n = want.min(src.len().saturating_sub(pos));
1055                if n > 0 {
1056                    buf.put_slice(&src[pos..pos + n]);
1057                    self.inner.set_position((pos + n) as u64);
1058                }
1059                self.sizes.lock().unwrap().push(n);
1060                std::task::Poll::Ready(Ok(()))
1061            }
1062        }
1063
1064        let sizes = Arc::new(Mutex::new(Vec::new()));
1065        let reader = RecordingReader {
1066            inner: std::io::Cursor::new(bytes.clone()),
1067            sizes: Arc::clone(&sizes),
1068        };
1069        let _ = load_document_async_streaming(reader)
1070            .await
1071            .expect("streaming load must succeed");
1072
1073        let sizes = sizes.lock().unwrap().clone();
1074        // Strip 0-byte tail reads (EOF signals from read_to_end).
1075        let nonzero: Vec<usize> = sizes.into_iter().filter(|&n| n > 0).collect();
1076
1077        // First read: the 16-byte IFF + FORM + form_type head.
1078        assert_eq!(nonzero[0], 16, "first read must be 16-byte IFF head");
1079        // Second read: the 8-byte DIRM chunk header.
1080        assert_eq!(nonzero[1], 8, "second read must be 8-byte chunk header");
1081        // Third read: the DIRM payload — must be smaller than the full body.
1082        assert!(
1083            nonzero[2] < bytes.len() / 4,
1084            "third read should be the DIRM payload, well under the full body \
1085             (got {} bytes for a {} byte file)",
1086            nonzero[2],
1087            bytes.len()
1088        );
1089    }
1090
1091    /// I/O failure surfaces as `AsyncLoadError::Io`, not panic.
1092    #[tokio::test]
1093    async fn load_document_async_propagates_io_error() {
1094        struct FailingReader;
1095        impl tokio::io::AsyncRead for FailingReader {
1096            fn poll_read(
1097                self: std::pin::Pin<&mut Self>,
1098                _cx: &mut std::task::Context<'_>,
1099                _buf: &mut tokio::io::ReadBuf<'_>,
1100            ) -> std::task::Poll<std::io::Result<()>> {
1101                std::task::Poll::Ready(Err(std::io::Error::other("simulated I/O failure")))
1102            }
1103        }
1104        let err = load_document_async(FailingReader)
1105            .await
1106            .expect_err("must fail on I/O error");
1107        assert!(
1108            matches!(err, AsyncLoadError::Io(_)),
1109            "expected Io error, got {err:?}"
1110        );
1111    }
1112
1113    /// A JB2-only page (no BG44 chunks) yields exactly one frame.
1114    #[tokio::test]
1115    async fn progressive_stream_jb2_only_yields_one_frame() {
1116        use futures::StreamExt;
1117        let doc = load_doc("boy_jb2.djvu");
1118        let page = doc.page(0).unwrap();
1119        if !page.bg44_chunks().is_empty() {
1120            // Page is not JB2-only; skip
1121            return;
1122        }
1123        let opts = RenderOptions {
1124            width: 80,
1125            height: 60,
1126            ..Default::default()
1127        };
1128
1129        let stream = render_progressive_stream(page, opts);
1130        futures::pin_mut!(stream);
1131
1132        let mut count = 0;
1133        while let Some(result) = stream.next().await {
1134            result.expect("frame should succeed");
1135            count += 1;
1136        }
1137        assert_eq!(count, 1, "JB2-only page must yield exactly one frame");
1138    }
1139}