use std::{collections::BTreeMap, ops::Range, sync::Arc};
use tokio::{
io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt},
sync::{Mutex, OnceCell},
};
use crate::{
djvu_document::{DjVuDocument, DjVuPage, DocError},
djvu_render::{self, RenderError, RenderOptions},
error::IffError,
iff::parse_form,
pixmap::{GrayPixmap, Pixmap},
};
#[derive(Debug, thiserror::Error)]
pub enum AsyncRenderError {
#[error("render error: {0}")]
Render(#[from] RenderError),
#[error("spawn_blocking join error: {0}")]
Join(String),
}
#[derive(Debug, thiserror::Error)]
pub enum AsyncLoadError {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("parse error: {0}")]
Parse(#[from] DocError),
}
#[derive(Debug, thiserror::Error)]
pub enum AsyncLazyError {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("parse error: {0}")]
Parse(#[from] DocError),
#[error("IFF error: {0}")]
Iff(#[from] IffError),
#[error("page index {index} is out of range (document has {count} pages)")]
PageOutOfRange { index: usize, count: usize },
#[error("unsupported lazy document shape: {0}")]
Unsupported(&'static str),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum LazyComponentType {
Shared,
Page,
Thumbnail,
}
#[derive(Debug, Clone)]
struct LazyDirmEntry {
comp_type: LazyComponentType,
id: String,
offset: u32,
}
pub struct LazyDocument<R> {
reader: Arc<Mutex<R>>,
pages: Vec<LazyPageIndex>,
shared: BTreeMap<String, LazyComponentIndex>,
cache: Vec<OnceCell<Arc<DjVuPage>>>,
shared_cache: BTreeMap<String, OnceCell<Arc<Vec<u8>>>>,
}
#[derive(Debug, Clone)]
struct LazyPageIndex {
range: Range<u64>,
}
#[derive(Debug, Clone)]
struct LazyComponentIndex {
range: Range<u64>,
}
impl<R> LazyDocument<R>
where
R: AsyncRead + AsyncSeek + Unpin + 'static,
{
pub async fn from_async_reader_lazy(mut reader: R) -> Result<Self, AsyncLazyError> {
let file_len = reader.seek(std::io::SeekFrom::End(0)).await?;
reader.seek(std::io::SeekFrom::Start(0)).await?;
let mut head = [0u8; 16];
reader.read_exact(&mut head).await?;
if &head[..4] != b"AT&T" || &head[4..8] != b"FORM" {
return Err(AsyncLazyError::Unsupported("not an AT&T FORM document"));
}
let form_type = &head[12..16];
let (pages, shared) = if form_type == b"DJVU" {
(vec![LazyPageIndex { range: 0..file_len }], BTreeMap::new())
} else if form_type == b"DJVM" {
index_bundled_djvm(&mut reader).await?
} else {
return Err(AsyncLazyError::Unsupported(
"lazy loader supports only FORM:DJVU and bundled FORM:DJVM",
));
};
if pages.is_empty() {
return Err(AsyncLazyError::Unsupported(
"document has no lazy-loadable pages",
));
}
let cache = (0..pages.len()).map(|_| OnceCell::new()).collect();
let shared_cache = shared
.keys()
.map(|id| (id.clone(), OnceCell::new()))
.collect();
Ok(Self {
reader: Arc::new(Mutex::new(reader)),
pages,
shared,
cache,
shared_cache,
})
}
pub fn page_count(&self) -> usize {
self.pages.len()
}
pub async fn page_async(&self, index: usize) -> Result<Arc<DjVuPage>, AsyncLazyError> {
let page = self
.pages
.get(index)
.ok_or(AsyncLazyError::PageOutOfRange {
index,
count: self.pages.len(),
})?
.clone();
self.cache[index]
.get_or_try_init(|| async move {
let bytes = self.read_page_bytes(page.range).await?;
let form = parse_form(&bytes)?;
let shared_djbz = if let Some(incl) = form.chunks.iter().find(|c| &c.id == b"INCL")
{
Some(self.shared_djbz(incl.data).await?)
} else {
None
};
let page = DjVuDocument::parse_single_page_with_shared(&bytes, index, shared_djbz)?;
Ok(Arc::new(page))
})
.await
.cloned()
}
async fn shared_djbz(&self, incl: &[u8]) -> Result<Arc<Vec<u8>>, AsyncLazyError> {
let name = core::str::from_utf8(incl.trim_ascii_end())
.map_err(|_| AsyncLazyError::Unsupported("INCL name is not valid UTF-8"))?;
let cell = self
.shared_cache
.get(name)
.ok_or(AsyncLazyError::Unsupported("INCL target is not in DIRM"))?;
cell.get_or_try_init(|| async move {
let component = self
.shared
.get(name)
.ok_or(AsyncLazyError::Unsupported("INCL target is not in DIRM"))?
.clone();
let bytes = self.read_page_bytes(component.range).await?;
let form = parse_form(&bytes)?;
if form.form_type != *b"DJVI" {
return Err(AsyncLazyError::Unsupported("INCL target is not FORM:DJVI"));
}
let djbz = form.chunks.iter().find(|c| &c.id == b"Djbz").ok_or(
AsyncLazyError::Unsupported("DJVI component is missing Djbz"),
)?;
Ok(Arc::new(djbz.data.to_vec()))
})
.await
.cloned()
}
async fn read_page_bytes(&self, range: Range<u64>) -> Result<Vec<u8>, AsyncLazyError> {
let len = usize::try_from(range.end.saturating_sub(range.start))
.map_err(|_| AsyncLazyError::Unsupported("page range exceeds addressable memory"))?;
let mut bytes = Vec::with_capacity(len.saturating_add(4));
if range.start != 0 {
bytes.extend_from_slice(b"AT&T");
}
let mut reader = self.reader.lock().await;
reader.seek(std::io::SeekFrom::Start(range.start)).await?;
let mut chunk = vec![0u8; len];
reader.read_exact(&mut chunk).await?;
bytes.extend_from_slice(&chunk);
Ok(bytes)
}
}
pub async fn from_async_reader_lazy<R>(reader: R) -> Result<LazyDocument<R>, AsyncLazyError>
where
R: AsyncRead + AsyncSeek + Unpin + Send + 'static,
{
LazyDocument::from_async_reader_lazy(reader).await
}
#[cfg(target_arch = "wasm32")]
pub async fn from_async_reader_lazy_local<R>(reader: R) -> Result<LazyDocument<R>, AsyncLazyError>
where
R: AsyncRead + AsyncSeek + Unpin + 'static,
{
LazyDocument::from_async_reader_lazy(reader).await
}
async fn index_bundled_djvm<R>(
reader: &mut R,
) -> Result<(Vec<LazyPageIndex>, BTreeMap<String, LazyComponentIndex>), AsyncLazyError>
where
R: AsyncRead + AsyncSeek + Unpin + 'static,
{
let mut chunk_hdr = [0u8; 8];
reader.read_exact(&mut chunk_hdr).await?;
if &chunk_hdr[..4] != b"DIRM" {
return Err(AsyncLazyError::Unsupported(
"lazy DJVM loader requires DIRM as the first inner chunk",
));
}
let dirm_len =
u32::from_be_bytes([chunk_hdr[4], chunk_hdr[5], chunk_hdr[6], chunk_hdr[7]]) as usize;
let padded = dirm_len + (dirm_len & 1);
let mut dirm = vec![0u8; padded];
reader.read_exact(&mut dirm).await?;
let entries = parse_lazy_dirm(&dirm[..dirm_len])?;
let mut pages = Vec::new();
let mut shared = BTreeMap::new();
for entry in entries {
let start = entry.offset as u64;
reader.seek(std::io::SeekFrom::Start(start + 4)).await?;
let mut size_bytes = [0u8; 4];
reader.read_exact(&mut size_bytes).await?;
let size = u32::from_be_bytes(size_bytes) as u64;
let range = start..start.saturating_add(8).saturating_add(size);
match entry.comp_type {
LazyComponentType::Page => pages.push(LazyPageIndex { range }),
LazyComponentType::Shared => {
shared.insert(entry.id, LazyComponentIndex { range });
}
LazyComponentType::Thumbnail => {}
}
}
Ok((pages, shared))
}
fn parse_lazy_dirm(data: &[u8]) -> Result<Vec<LazyDirmEntry>, AsyncLazyError> {
if data.len() < 3 {
return Err(AsyncLazyError::Unsupported("DIRM chunk too short"));
}
let dflags = data[0];
if (dflags >> 7) == 0 {
return Err(AsyncLazyError::Unsupported(
"indirect DJVM lazy loading is not implemented yet",
));
}
let nfiles = u16::from_be_bytes([data[1], data[2]]) as usize;
let offsets_start = 3usize;
let offsets_end = offsets_start
.checked_add(nfiles.saturating_mul(4))
.ok_or(AsyncLazyError::Unsupported("DIRM offset table overflow"))?;
if offsets_end > data.len() {
return Err(AsyncLazyError::Unsupported("DIRM offset table truncated"));
}
let mut offsets = Vec::with_capacity(nfiles);
for i in 0..nfiles {
let base = offsets_start + i * 4;
offsets.push(u32::from_be_bytes([
data[base],
data[base + 1],
data[base + 2],
data[base + 3],
]));
}
let meta = djvu_bzz::bzz_decode(&data[offsets_end..]).unwrap_or_default();
let mut comp_types = Vec::with_capacity(nfiles);
let mut ids = Vec::with_capacity(nfiles);
let flags_start = nfiles * 3;
if flags_start + nfiles <= meta.len() {
let mut pos = flags_start + nfiles;
for flag in &meta[flags_start..flags_start + nfiles] {
let comp_type = match flag & 0x3f {
1 => LazyComponentType::Page,
2 => LazyComponentType::Thumbnail,
_ => LazyComponentType::Shared,
};
comp_types.push(comp_type);
ids.push(read_lazy_dirm_string(&meta, &mut pos).unwrap_or_default());
if (flag & 0x80) != 0 {
let _ = read_lazy_dirm_string(&meta, &mut pos);
}
if (flag & 0x40) != 0 {
let _ = read_lazy_dirm_string(&meta, &mut pos);
}
}
} else {
comp_types.resize(nfiles, LazyComponentType::Page);
ids.extend((0..nfiles).map(|i| format!("p{i:04}")));
}
Ok(offsets
.into_iter()
.zip(comp_types)
.zip(ids)
.map(|((offset, comp_type), id)| LazyDirmEntry {
comp_type,
id,
offset,
})
.collect())
}
fn read_lazy_dirm_string(data: &[u8], pos: &mut usize) -> Option<String> {
let start = *pos;
let rest = data.get(start..)?;
let nul = rest.iter().position(|&b| b == 0)?;
*pos = start + nul + 1;
core::str::from_utf8(&rest[..nul])
.ok()
.map(ToOwned::to_owned)
}
pub async fn load_document_async<R>(mut reader: R) -> Result<DjVuDocument, AsyncLoadError>
where
R: AsyncRead + Unpin + Send,
{
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await?;
Ok(DjVuDocument::parse(&buf)?)
}
pub async fn load_document_async_streaming<R>(mut reader: R) -> Result<DjVuDocument, AsyncLoadError>
where
R: AsyncRead + Unpin + Send,
{
let mut head = [0u8; 16];
reader.read_exact(&mut head).await?;
let is_djvm = &head[..4] == b"AT&T" && &head[4..8] == b"FORM" && &head[12..16] == b"DJVM";
let mut buf = Vec::with_capacity(if is_djvm {
1 << 20
} else {
16 * 1024
});
buf.extend_from_slice(&head);
if is_djvm {
let mut chunk_hdr = [0u8; 8];
reader.read_exact(&mut chunk_hdr).await?;
buf.extend_from_slice(&chunk_hdr);
if &chunk_hdr[..4] == b"DIRM" {
let dirm_len =
u32::from_be_bytes([chunk_hdr[4], chunk_hdr[5], chunk_hdr[6], chunk_hdr[7]])
as usize;
let padded = dirm_len + (dirm_len & 1);
let mut dirm_buf = vec![0u8; padded];
reader.read_exact(&mut dirm_buf).await?;
buf.extend_from_slice(&dirm_buf);
}
}
reader.read_to_end(&mut buf).await?;
Ok(DjVuDocument::parse(&buf)?)
}
pub async fn render_pixmap_async(
page: &DjVuPage,
opts: RenderOptions,
) -> Result<Pixmap, AsyncRenderError> {
let page = Arc::new(page.clone());
tokio::task::spawn_blocking(move || {
djvu_render::render_pixmap(&page, &opts).map_err(AsyncRenderError::Render)
})
.await
.map_err(|e| AsyncRenderError::Join(e.to_string()))?
}
pub async fn render_gray8_async(
page: &DjVuPage,
opts: RenderOptions,
) -> Result<GrayPixmap, AsyncRenderError> {
let page = Arc::new(page.clone());
tokio::task::spawn_blocking(move || {
djvu_render::render_gray8(&page, &opts).map_err(AsyncRenderError::Render)
})
.await
.map_err(|e| AsyncRenderError::Join(e.to_string()))?
}
pub fn render_progressive_stream(
page: &DjVuPage,
opts: RenderOptions,
) -> impl futures_core::Stream<Item = Result<Pixmap, AsyncRenderError>> {
let page = Arc::new(page.clone());
let n_chunks = page.bg44_chunks().len();
async_stream::stream! {
if n_chunks == 0 {
let page = Arc::clone(&page);
let opts = opts.clone();
let result = tokio::task::spawn_blocking(move || {
djvu_render::render_pixmap(&page, &opts).map_err(AsyncRenderError::Render)
})
.await
.map_err(|e| AsyncRenderError::Join(e.to_string()));
yield result.and_then(|r| r);
} else {
for chunk_n in 0..n_chunks {
let page = Arc::clone(&page);
let opts = opts.clone();
let result = tokio::task::spawn_blocking(move || {
djvu_render::render_progressive(&page, &opts, chunk_n)
.map_err(AsyncRenderError::Render)
})
.await
.map_err(|e| AsyncRenderError::Join(e.to_string()));
yield result.and_then(|r| r);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::djvu_document::DjVuDocument;
fn assets_path() -> std::path::PathBuf {
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("references/djvujs/library/assets")
}
fn load_doc(name: &str) -> DjVuDocument {
let data =
std::fs::read(assets_path().join(name)).unwrap_or_else(|_| panic!("{name} must exist"));
DjVuDocument::parse(&data).unwrap_or_else(|e| panic!("{e}"))
}
#[tokio::test]
async fn render_pixmap_async_correct_dims() {
let doc = load_doc("chicken.djvu");
let page = doc.page(0).unwrap();
let pw = page.width() as u32;
let ph = page.height() as u32;
let opts = RenderOptions {
width: pw,
height: ph,
..Default::default()
};
let pm = render_pixmap_async(page, opts)
.await
.expect("async render must succeed");
assert_eq!(pm.width, pw);
assert_eq!(pm.height, ph);
}
#[tokio::test]
async fn render_gray8_async_correct_dims() {
let doc = load_doc("chicken.djvu");
let page = doc.page(0).unwrap();
let pw = page.width() as u32;
let ph = page.height() as u32;
let opts = RenderOptions {
width: pw,
height: ph,
..Default::default()
};
let gm = render_gray8_async(page, opts)
.await
.expect("async gray render must succeed");
assert_eq!(gm.width, pw);
assert_eq!(gm.height, ph);
assert_eq!(gm.data.len(), (pw * ph) as usize);
}
#[tokio::test]
async fn async_matches_sync() {
let doc = load_doc("chicken.djvu");
let page = doc.page(0).unwrap();
let pw = page.width() as u32;
let ph = page.height() as u32;
let opts = RenderOptions {
width: pw,
height: ph,
..Default::default()
};
let sync_pm = djvu_render::render_pixmap(page, &opts).expect("sync render must succeed");
let async_pm = render_pixmap_async(page, opts.clone())
.await
.expect("async render must succeed");
assert_eq!(
sync_pm.data, async_pm.data,
"async and sync renders must match"
);
}
#[tokio::test]
async fn concurrent_render_multiple_tasks() {
let doc = load_doc("chicken.djvu");
let page = doc.page(0).unwrap();
let pw = page.width() as u32;
let ph = page.height() as u32;
let opts = RenderOptions {
width: pw / 2,
height: ph / 2,
scale: 0.5,
..Default::default()
};
let handles: Vec<_> = (0..4)
.map(|_| {
let page_clone = page.clone();
let opts_clone = opts.clone();
tokio::spawn(async move { render_pixmap_async(&page_clone, opts_clone).await })
})
.collect();
for handle in handles {
let pm = handle
.await
.expect("task must not panic")
.expect("render must succeed");
assert_eq!(pm.width, pw / 2);
assert_eq!(pm.height, ph / 2);
}
}
#[test]
fn async_render_error_display() {
let err = AsyncRenderError::Render(crate::djvu_render::RenderError::InvalidDimensions {
width: 0,
height: 0,
});
let s = err.to_string();
assert!(
s.contains("render error"),
"error must mention 'render error'"
);
}
#[tokio::test]
async fn progressive_stream_last_frame_matches_pixmap() {
use futures::StreamExt;
let doc = load_doc("chicken.djvu");
let page = doc.page(0).unwrap();
let opts = RenderOptions {
width: 100,
height: 80,
..Default::default()
};
let stream = render_progressive_stream(page, opts.clone());
futures::pin_mut!(stream);
let mut frames: Vec<Pixmap> = Vec::new();
while let Some(result) = stream.next().await {
frames.push(result.expect("frame should succeed"));
}
assert!(!frames.is_empty(), "stream must yield at least one frame");
let expected = djvu_render::render_pixmap(page, &opts).expect("render_pixmap must succeed");
assert_eq!(
frames.last().unwrap().data,
expected.data,
"last frame must match render_pixmap"
);
}
#[tokio::test]
async fn progressive_stream_consistent_dimensions() {
use futures::StreamExt;
let doc = load_doc("chicken.djvu");
let page = doc.page(0).unwrap();
let n_chunks = page.bg44_chunks().len();
let opts = RenderOptions {
width: 100,
height: 80,
..Default::default()
};
let stream = render_progressive_stream(page, opts);
futures::pin_mut!(stream);
let mut count = 0usize;
while let Some(result) = stream.next().await {
let frame = result.expect("frame should succeed");
assert_eq!(frame.width, 100);
assert_eq!(frame.height, 80);
count += 1;
}
let expected_count = if n_chunks == 0 { 1 } else { n_chunks };
assert_eq!(
count, expected_count,
"frame count must equal BG44 chunk count"
);
}
#[tokio::test]
async fn load_document_async_matches_sync_parse() {
let path = assets_path().join("chicken.djvu");
let sync_data = std::fs::read(&path).expect("sync read must succeed");
let async_doc = load_document_async(std::io::Cursor::new(sync_data.clone()))
.await
.expect("async load must succeed");
let sync_doc = DjVuDocument::parse(&sync_data).expect("sync parse must succeed");
assert_eq!(async_doc.page_count(), sync_doc.page_count());
for i in 0..sync_doc.page_count() {
let a = async_doc.page(i).expect("async page");
let s = sync_doc.page(i).expect("sync page");
assert_eq!(a.width(), s.width());
assert_eq!(a.height(), s.height());
}
}
#[tokio::test]
async fn load_document_async_from_in_memory_reader() {
let path = assets_path().join("chicken.djvu");
let bytes = std::fs::read(&path).expect("read");
let reader = std::io::Cursor::new(bytes.clone());
let doc = load_document_async(reader)
.await
.expect("async load from cursor must succeed");
assert!(doc.page_count() > 0);
}
#[tokio::test]
async fn load_document_async_propagates_parse_error() {
let bogus = b"not a djvu file at all".to_vec();
let reader = std::io::Cursor::new(bogus);
let err = load_document_async(reader)
.await
.expect_err("must fail to parse garbage");
assert!(
matches!(err, AsyncLoadError::Parse(_)),
"expected Parse error, got {err:?}"
);
}
#[tokio::test]
async fn lazy_document_single_page_caches_arc_page() {
let path = assets_path().join("chicken.djvu");
let bytes = std::fs::read(&path).expect("read");
let sync_doc = DjVuDocument::parse(&bytes).expect("sync parse");
let lazy = from_async_reader_lazy(std::io::Cursor::new(bytes))
.await
.expect("lazy index");
assert_eq!(lazy.page_count(), 1);
let page_a = lazy.page_async(0).await.expect("lazy page");
let page_b = lazy.page_async(0).await.expect("lazy cached page");
assert!(
Arc::ptr_eq(&page_a, &page_b),
"repeat access must reuse cache"
);
let sync_page = sync_doc.page(0).expect("sync page");
assert_eq!(page_a.width(), sync_page.width());
assert_eq!(page_a.height(), sync_page.height());
}
#[tokio::test]
async fn lazy_document_bundled_page_without_incl_matches_sync() {
let path = assets_path().join("colorbook.djvu");
let Ok(bytes) = std::fs::read(&path) else {
eprintln!("skip: {} missing", path.display());
return;
};
let sync_doc = DjVuDocument::parse(&bytes).expect("sync parse");
let lazy = from_async_reader_lazy(std::io::Cursor::new(bytes))
.await
.expect("lazy index");
assert_eq!(lazy.page_count(), sync_doc.page_count());
let page_index = (0..sync_doc.page_count())
.find(|&i| {
sync_doc
.page(i)
.expect("sync page")
.chunk_ids()
.iter()
.all(|id| id != b"INCL")
})
.expect("fixture must contain at least one page without INCL");
let lazy_page = lazy.page_async(page_index).await.expect("lazy page");
let sync_page = sync_doc.page(page_index).expect("sync page");
assert_eq!(lazy_page.width(), sync_page.width());
assert_eq!(lazy_page.height(), sync_page.height());
}
#[tokio::test]
async fn lazy_document_bundled_page_with_incl_uses_shared_dict() {
let mut p1 = crate::bitmap::Bitmap::new(32, 12);
let mut p2 = crate::bitmap::Bitmap::new(32, 12);
for y in 2..10 {
for x in 3..9 {
p1.set(x, y, true);
p2.set(x, y, true);
}
}
for y in 3..9 {
for x in 16..22 {
p1.set(x, y, true);
p2.set(x, y, true);
}
}
let bytes = crate::jb2_encode::encode_djvm_bundle_jb2(&[p1.clone(), p2.clone()], 2);
let sync_doc = DjVuDocument::parse(&bytes).expect("sync parse");
let lazy = from_async_reader_lazy(std::io::Cursor::new(bytes))
.await
.expect("lazy index");
assert_eq!(lazy.page_count(), 2);
let lazy_page = lazy.page_async(0).await.expect("lazy page");
assert!(lazy_page.raw_chunk(b"INCL").is_some());
let lazy_mask = lazy_page
.extract_mask()
.expect("lazy mask")
.expect("lazy mask present");
let sync_mask = sync_doc
.page(0)
.expect("sync page")
.extract_mask()
.expect("sync mask")
.expect("sync mask present");
assert_eq!(lazy_mask, sync_mask);
assert_eq!(lazy_mask, p1);
}
#[tokio::test]
async fn lazy_document_page_out_of_range() {
let path = assets_path().join("chicken.djvu");
let bytes = std::fs::read(&path).expect("read");
let lazy = from_async_reader_lazy(std::io::Cursor::new(bytes))
.await
.expect("lazy index");
let err = lazy
.page_async(1)
.await
.expect_err("page 1 is out of range");
assert!(
matches!(err, AsyncLazyError::PageOutOfRange { index: 1, count: 1 }),
"unexpected error: {err:?}"
);
}
#[tokio::test]
async fn streaming_loader_matches_buffered() {
let path = assets_path().join("DjVu3Spec_bundled.djvu");
let Ok(bytes) = std::fs::read(&path) else {
eprintln!("skip: {} missing", path.display());
return;
};
let streamed = load_document_async_streaming(std::io::Cursor::new(bytes.clone()))
.await
.expect("streaming load must succeed");
let buffered = DjVuDocument::parse(&bytes).expect("buffered parse");
assert_eq!(streamed.page_count(), buffered.page_count());
for i in 0..buffered.page_count() {
assert_eq!(streamed.page_byte_range(i), buffered.page_byte_range(i));
}
}
#[tokio::test]
async fn streaming_loader_reads_head_before_body() {
use std::sync::{Arc, Mutex};
let path = assets_path().join("DjVu3Spec_bundled.djvu");
let Ok(bytes) = std::fs::read(&path) else {
eprintln!("skip: {} missing", path.display());
return;
};
struct RecordingReader {
inner: std::io::Cursor<Vec<u8>>,
sizes: Arc<Mutex<Vec<usize>>>,
}
impl tokio::io::AsyncRead for RecordingReader {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let want = buf.remaining();
let pos = self.inner.position() as usize;
let src = self.inner.get_ref();
let n = want.min(src.len().saturating_sub(pos));
if n > 0 {
buf.put_slice(&src[pos..pos + n]);
self.inner.set_position((pos + n) as u64);
}
self.sizes.lock().unwrap().push(n);
std::task::Poll::Ready(Ok(()))
}
}
let sizes = Arc::new(Mutex::new(Vec::new()));
let reader = RecordingReader {
inner: std::io::Cursor::new(bytes.clone()),
sizes: Arc::clone(&sizes),
};
let _ = load_document_async_streaming(reader)
.await
.expect("streaming load must succeed");
let sizes = sizes.lock().unwrap().clone();
let nonzero: Vec<usize> = sizes.into_iter().filter(|&n| n > 0).collect();
assert_eq!(nonzero[0], 16, "first read must be 16-byte IFF head");
assert_eq!(nonzero[1], 8, "second read must be 8-byte chunk header");
assert!(
nonzero[2] < bytes.len() / 4,
"third read should be the DIRM payload, well under the full body \
(got {} bytes for a {} byte file)",
nonzero[2],
bytes.len()
);
}
#[tokio::test]
async fn load_document_async_propagates_io_error() {
struct FailingReader;
impl tokio::io::AsyncRead for FailingReader {
fn poll_read(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
_buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
std::task::Poll::Ready(Err(std::io::Error::other("simulated I/O failure")))
}
}
let err = load_document_async(FailingReader)
.await
.expect_err("must fail on I/O error");
assert!(
matches!(err, AsyncLoadError::Io(_)),
"expected Io error, got {err:?}"
);
}
#[tokio::test]
async fn progressive_stream_jb2_only_yields_one_frame() {
use futures::StreamExt;
let doc = load_doc("boy_jb2.djvu");
let page = doc.page(0).unwrap();
if !page.bg44_chunks().is_empty() {
return;
}
let opts = RenderOptions {
width: 80,
height: 60,
..Default::default()
};
let stream = render_progressive_stream(page, opts);
futures::pin_mut!(stream);
let mut count = 0;
while let Some(result) = stream.next().await {
result.expect("frame should succeed");
count += 1;
}
assert_eq!(count, 1, "JB2-only page must yield exactly one frame");
}
}