use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncReadExt};
use crate::{
djvu_document::{DjVuDocument, DjVuPage, DocError},
djvu_render::{self, RenderError, RenderOptions},
pixmap::{GrayPixmap, Pixmap},
};
#[derive(Debug, thiserror::Error)]
pub enum AsyncRenderError {
#[error("render error: {0}")]
Render(#[from] RenderError),
#[error("spawn_blocking join error: {0}")]
Join(String),
}
#[derive(Debug, thiserror::Error)]
pub enum AsyncLoadError {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("parse error: {0}")]
Parse(#[from] DocError),
}
pub async fn load_document_async<R>(mut reader: R) -> Result<DjVuDocument, AsyncLoadError>
where
R: AsyncRead + Unpin + Send,
{
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await?;
Ok(DjVuDocument::parse(&buf)?)
}
pub async fn load_document_async_streaming<R>(mut reader: R) -> Result<DjVuDocument, AsyncLoadError>
where
R: AsyncRead + Unpin + Send,
{
let mut head = [0u8; 16];
reader.read_exact(&mut head).await?;
let is_djvm = &head[..4] == b"AT&T" && &head[4..8] == b"FORM" && &head[12..16] == b"DJVM";
let mut buf = Vec::with_capacity(if is_djvm {
1 << 20
} else {
16 * 1024
});
buf.extend_from_slice(&head);
if is_djvm {
let mut chunk_hdr = [0u8; 8];
reader.read_exact(&mut chunk_hdr).await?;
buf.extend_from_slice(&chunk_hdr);
if &chunk_hdr[..4] == b"DIRM" {
let dirm_len =
u32::from_be_bytes([chunk_hdr[4], chunk_hdr[5], chunk_hdr[6], chunk_hdr[7]])
as usize;
let padded = dirm_len + (dirm_len & 1);
let mut dirm_buf = vec![0u8; padded];
reader.read_exact(&mut dirm_buf).await?;
buf.extend_from_slice(&dirm_buf);
}
}
reader.read_to_end(&mut buf).await?;
Ok(DjVuDocument::parse(&buf)?)
}
pub async fn render_pixmap_async(
page: &DjVuPage,
opts: RenderOptions,
) -> Result<Pixmap, AsyncRenderError> {
let page = Arc::new(page.clone());
tokio::task::spawn_blocking(move || {
djvu_render::render_pixmap(&page, &opts).map_err(AsyncRenderError::Render)
})
.await
.map_err(|e| AsyncRenderError::Join(e.to_string()))?
}
pub async fn render_gray8_async(
page: &DjVuPage,
opts: RenderOptions,
) -> Result<GrayPixmap, AsyncRenderError> {
let page = Arc::new(page.clone());
tokio::task::spawn_blocking(move || {
djvu_render::render_gray8(&page, &opts).map_err(AsyncRenderError::Render)
})
.await
.map_err(|e| AsyncRenderError::Join(e.to_string()))?
}
pub fn render_progressive_stream(
page: &DjVuPage,
opts: RenderOptions,
) -> impl futures_core::Stream<Item = Result<Pixmap, AsyncRenderError>> {
let page = Arc::new(page.clone());
let n_chunks = page.bg44_chunks().len();
async_stream::stream! {
if n_chunks == 0 {
let page = Arc::clone(&page);
let opts = opts.clone();
let result = tokio::task::spawn_blocking(move || {
djvu_render::render_pixmap(&page, &opts).map_err(AsyncRenderError::Render)
})
.await
.map_err(|e| AsyncRenderError::Join(e.to_string()));
yield result.and_then(|r| r);
} else {
for chunk_n in 0..n_chunks {
let page = Arc::clone(&page);
let opts = opts.clone();
let result = tokio::task::spawn_blocking(move || {
djvu_render::render_progressive(&page, &opts, chunk_n)
.map_err(AsyncRenderError::Render)
})
.await
.map_err(|e| AsyncRenderError::Join(e.to_string()));
yield result.and_then(|r| r);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::djvu_document::DjVuDocument;
fn assets_path() -> std::path::PathBuf {
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("references/djvujs/library/assets")
}
fn load_doc(name: &str) -> DjVuDocument {
let data =
std::fs::read(assets_path().join(name)).unwrap_or_else(|_| panic!("{name} must exist"));
DjVuDocument::parse(&data).unwrap_or_else(|e| panic!("{e}"))
}
#[tokio::test]
async fn render_pixmap_async_correct_dims() {
let doc = load_doc("chicken.djvu");
let page = doc.page(0).unwrap();
let pw = page.width() as u32;
let ph = page.height() as u32;
let opts = RenderOptions {
width: pw,
height: ph,
..Default::default()
};
let pm = render_pixmap_async(page, opts)
.await
.expect("async render must succeed");
assert_eq!(pm.width, pw);
assert_eq!(pm.height, ph);
}
#[tokio::test]
async fn render_gray8_async_correct_dims() {
let doc = load_doc("chicken.djvu");
let page = doc.page(0).unwrap();
let pw = page.width() as u32;
let ph = page.height() as u32;
let opts = RenderOptions {
width: pw,
height: ph,
..Default::default()
};
let gm = render_gray8_async(page, opts)
.await
.expect("async gray render must succeed");
assert_eq!(gm.width, pw);
assert_eq!(gm.height, ph);
assert_eq!(gm.data.len(), (pw * ph) as usize);
}
#[tokio::test]
async fn async_matches_sync() {
let doc = load_doc("chicken.djvu");
let page = doc.page(0).unwrap();
let pw = page.width() as u32;
let ph = page.height() as u32;
let opts = RenderOptions {
width: pw,
height: ph,
..Default::default()
};
let sync_pm = djvu_render::render_pixmap(page, &opts).expect("sync render must succeed");
let async_pm = render_pixmap_async(page, opts.clone())
.await
.expect("async render must succeed");
assert_eq!(
sync_pm.data, async_pm.data,
"async and sync renders must match"
);
}
#[tokio::test]
async fn concurrent_render_multiple_tasks() {
let doc = load_doc("chicken.djvu");
let page = doc.page(0).unwrap();
let pw = page.width() as u32;
let ph = page.height() as u32;
let opts = RenderOptions {
width: pw / 2,
height: ph / 2,
scale: 0.5,
..Default::default()
};
let handles: Vec<_> = (0..4)
.map(|_| {
let page_clone = page.clone();
let opts_clone = opts.clone();
tokio::spawn(async move { render_pixmap_async(&page_clone, opts_clone).await })
})
.collect();
for handle in handles {
let pm = handle
.await
.expect("task must not panic")
.expect("render must succeed");
assert_eq!(pm.width, pw / 2);
assert_eq!(pm.height, ph / 2);
}
}
#[test]
fn async_render_error_display() {
let err = AsyncRenderError::Render(crate::djvu_render::RenderError::InvalidDimensions {
width: 0,
height: 0,
});
let s = err.to_string();
assert!(
s.contains("render error"),
"error must mention 'render error'"
);
}
#[tokio::test]
async fn progressive_stream_last_frame_matches_pixmap() {
use futures::StreamExt;
let doc = load_doc("chicken.djvu");
let page = doc.page(0).unwrap();
let opts = RenderOptions {
width: 100,
height: 80,
..Default::default()
};
let stream = render_progressive_stream(page, opts.clone());
futures::pin_mut!(stream);
let mut frames: Vec<Pixmap> = Vec::new();
while let Some(result) = stream.next().await {
frames.push(result.expect("frame should succeed"));
}
assert!(!frames.is_empty(), "stream must yield at least one frame");
let expected = djvu_render::render_pixmap(page, &opts).expect("render_pixmap must succeed");
assert_eq!(
frames.last().unwrap().data,
expected.data,
"last frame must match render_pixmap"
);
}
#[tokio::test]
async fn progressive_stream_consistent_dimensions() {
use futures::StreamExt;
let doc = load_doc("chicken.djvu");
let page = doc.page(0).unwrap();
let n_chunks = page.bg44_chunks().len();
let opts = RenderOptions {
width: 100,
height: 80,
..Default::default()
};
let stream = render_progressive_stream(page, opts);
futures::pin_mut!(stream);
let mut count = 0usize;
while let Some(result) = stream.next().await {
let frame = result.expect("frame should succeed");
assert_eq!(frame.width, 100);
assert_eq!(frame.height, 80);
count += 1;
}
let expected_count = if n_chunks == 0 { 1 } else { n_chunks };
assert_eq!(
count, expected_count,
"frame count must equal BG44 chunk count"
);
}
#[tokio::test]
async fn load_document_async_matches_sync_parse() {
let path = assets_path().join("chicken.djvu");
let file = tokio::fs::File::open(&path)
.await
.expect("open must succeed");
let async_doc = load_document_async(file)
.await
.expect("async load must succeed");
let sync_data = std::fs::read(&path).expect("sync read must succeed");
let sync_doc = DjVuDocument::parse(&sync_data).expect("sync parse must succeed");
assert_eq!(async_doc.page_count(), sync_doc.page_count());
for i in 0..sync_doc.page_count() {
let a = async_doc.page(i).expect("async page");
let s = sync_doc.page(i).expect("sync page");
assert_eq!(a.width(), s.width());
assert_eq!(a.height(), s.height());
}
}
#[tokio::test]
async fn load_document_async_from_in_memory_reader() {
let path = assets_path().join("chicken.djvu");
let bytes = std::fs::read(&path).expect("read");
let reader = std::io::Cursor::new(bytes.clone());
let doc = load_document_async(reader)
.await
.expect("async load from cursor must succeed");
assert!(doc.page_count() > 0);
}
#[tokio::test]
async fn load_document_async_propagates_parse_error() {
let bogus = b"not a djvu file at all".to_vec();
let reader = std::io::Cursor::new(bogus);
let err = load_document_async(reader)
.await
.expect_err("must fail to parse garbage");
assert!(
matches!(err, AsyncLoadError::Parse(_)),
"expected Parse error, got {err:?}"
);
}
#[tokio::test]
async fn streaming_loader_matches_buffered() {
let path = assets_path().join("DjVu3Spec_bundled.djvu");
let Ok(bytes) = std::fs::read(&path) else {
eprintln!("skip: {} missing", path.display());
return;
};
let streamed = load_document_async_streaming(std::io::Cursor::new(bytes.clone()))
.await
.expect("streaming load must succeed");
let buffered = DjVuDocument::parse(&bytes).expect("buffered parse");
assert_eq!(streamed.page_count(), buffered.page_count());
for i in 0..buffered.page_count() {
assert_eq!(streamed.page_byte_range(i), buffered.page_byte_range(i));
}
}
#[tokio::test]
async fn streaming_loader_reads_head_before_body() {
use std::sync::{Arc, Mutex};
let path = assets_path().join("DjVu3Spec_bundled.djvu");
let Ok(bytes) = std::fs::read(&path) else {
eprintln!("skip: {} missing", path.display());
return;
};
struct RecordingReader {
inner: std::io::Cursor<Vec<u8>>,
sizes: Arc<Mutex<Vec<usize>>>,
}
impl tokio::io::AsyncRead for RecordingReader {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let want = buf.remaining();
let pos = self.inner.position() as usize;
let src = self.inner.get_ref();
let n = want.min(src.len().saturating_sub(pos));
if n > 0 {
buf.put_slice(&src[pos..pos + n]);
self.inner.set_position((pos + n) as u64);
}
self.sizes.lock().unwrap().push(n);
std::task::Poll::Ready(Ok(()))
}
}
let sizes = Arc::new(Mutex::new(Vec::new()));
let reader = RecordingReader {
inner: std::io::Cursor::new(bytes.clone()),
sizes: Arc::clone(&sizes),
};
let _ = load_document_async_streaming(reader)
.await
.expect("streaming load must succeed");
let sizes = sizes.lock().unwrap().clone();
let nonzero: Vec<usize> = sizes.into_iter().filter(|&n| n > 0).collect();
assert_eq!(nonzero[0], 16, "first read must be 16-byte IFF head");
assert_eq!(nonzero[1], 8, "second read must be 8-byte chunk header");
assert!(
nonzero[2] < bytes.len() / 4,
"third read should be the DIRM payload, well under the full body \
(got {} bytes for a {} byte file)",
nonzero[2],
bytes.len()
);
}
#[tokio::test]
async fn load_document_async_propagates_io_error() {
struct FailingReader;
impl tokio::io::AsyncRead for FailingReader {
fn poll_read(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
_buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
std::task::Poll::Ready(Err(std::io::Error::other("simulated I/O failure")))
}
}
let err = load_document_async(FailingReader)
.await
.expect_err("must fail on I/O error");
assert!(
matches!(err, AsyncLoadError::Io(_)),
"expected Io error, got {err:?}"
);
}
#[tokio::test]
async fn progressive_stream_jb2_only_yields_one_frame() {
use futures::StreamExt;
let doc = load_doc("boy_jb2.djvu");
let page = doc.page(0).unwrap();
if !page.bg44_chunks().is_empty() {
return;
}
let opts = RenderOptions {
width: 80,
height: 60,
..Default::default()
};
let stream = render_progressive_stream(page, opts);
futures::pin_mut!(stream);
let mut count = 0;
while let Some(result) = stream.next().await {
result.expect("frame should succeed");
count += 1;
}
assert_eq!(count, 1, "JB2-only page must yield exactly one frame");
}
}