use std::collections::HashSet;
use std::sync::Arc;
use crossbeam_channel::{Receiver, Sender, TrySendError};
use crate::cache::PageCache;
use crate::page::{RenderSize, RenderedPage};
use crate::source::DocumentSource;
struct RenderRequest {
page_index: usize,
size: RenderSize,
}
struct RenderResult {
page_index: usize,
size: RenderSize,
page: RenderedPage,
}
pub struct RenderPipeline {
request_tx: Sender<RenderRequest>,
result_rx: Receiver<RenderResult>,
pending: HashSet<(usize, RenderSize)>,
}
pub const FALLBACK_RENDER_SIZE: RenderSize = RenderSize { width: 1920, height: 1080 };
impl RenderPipeline {
#[allow(clippy::needless_pass_by_value)]
pub fn new(doc: Arc<dyn DocumentSource>, num_workers: usize) -> Self {
let (request_tx, request_rx) = crossbeam_channel::bounded::<RenderRequest>(64);
let (result_tx, result_rx) = crossbeam_channel::unbounded::<RenderResult>();
for _ in 0..num_workers {
let rx = request_rx.clone();
let tx = result_tx.clone();
let doc = Arc::clone(&doc);
std::thread::Builder::new()
.name("dais-render".into())
.spawn(move || render_worker(doc, rx, tx))
.expect("failed to spawn render thread");
}
Self { request_tx, result_rx, pending: HashSet::new() }
}
pub fn poll_results(&mut self, cache: &mut PageCache) {
while let Ok(result) = self.result_rx.try_recv() {
let key = (result.page_index, result.size);
self.pending.remove(&key);
cache.insert(result.page_index, result.size, result.page);
}
}
pub fn ensure_rendered(&mut self, page_index: usize, size: RenderSize, cache: &mut PageCache) {
if cache.get(page_index, size).is_some() {
return;
}
let key = (page_index, size);
if self.pending.contains(&key) {
return;
}
match self.request_tx.try_send(RenderRequest { page_index, size }) {
Ok(()) => {
self.pending.insert(key);
}
Err(TrySendError::Full(_)) => {
}
Err(TrySendError::Disconnected(_)) => {
tracing::error!("Render pipeline disconnected");
}
}
}
pub fn prefetch_neighborhood(
&mut self,
current_page: usize,
total_pages: usize,
size: RenderSize,
cache: &mut PageCache,
) {
self.ensure_rendered(current_page, size, cache);
if current_page + 1 < total_pages {
self.ensure_rendered(current_page + 1, size, cache);
}
if current_page > 0 {
self.ensure_rendered(current_page - 1, size, cache);
}
if current_page + 2 < total_pages {
self.ensure_rendered(current_page + 2, size, cache);
}
}
}
#[allow(clippy::needless_pass_by_value)]
fn render_worker(
doc: Arc<dyn DocumentSource>,
rx: Receiver<RenderRequest>,
tx: Sender<RenderResult>,
) {
while let Ok(req) = rx.recv() {
match doc.render_page(req.page_index, req.size) {
Ok(page) => {
let _ = tx.send(RenderResult { page_index: req.page_index, size: req.size, page });
}
Err(e) => {
tracing::warn!("Background render failed for page {}: {e}", req.page_index);
}
}
}
}