Skip to main content

pixelflow_core/
render.rs

1//! Demand-driven blocking ordered render API.
2
3use std::cell::RefCell;
4use std::collections::BTreeMap;
5use std::sync::{Arc, Condvar, Mutex, MutexGuard, mpsc};
6use std::time::Instant;
7
8use semisafe::slice::get as semisafe_get;
9
10use crate::scheduler::{
11    ConcurrencyGate, FilterTiming, OrderedCommitGate, TimingReport, WorkerPool,
12};
13use crate::{
14    ErrorCategory, ErrorCode, Frame, FrameCount, Graph, NodeId, NodeKind, PixelFlowError, Result,
15    WorkerPoolConfig,
16};
17
18thread_local! {
19    static ACTIVE_KEYS: RefCell<Vec<FrameKey>> = const { RefCell::new(Vec::new()) };
20}
21
22/// Executor that produces frames for one graph node.
23pub trait FrameExecutor: Send + Sync {
24    /// Prepares one frame and may request dependencies through the provided request object.
25    fn prepare(&self, request: FrameRequest<'_>) -> Result<Frame>;
26
27    /// Commits prepared frame state before result becomes visible downstream.
28    fn commit(&self, _frame_number: usize, frame: Frame) -> Result<Frame> {
29        Ok(frame)
30    }
31}
32
33/// Request object passed to a frame executor.
34pub struct FrameRequest<'a> {
35    node_id: NodeId,
36    frame_number: usize,
37    scheduler: &'a dyn DependencyRequester,
38}
39
40impl<'a> FrameRequest<'a> {
41    const fn new(
42        node_id: NodeId,
43        frame_number: usize,
44        scheduler: &'a dyn DependencyRequester,
45    ) -> Self {
46        Self {
47            node_id,
48            frame_number,
49            scheduler,
50        }
51    }
52
53    /// Returns node ID currently being produced.
54    #[must_use]
55    pub const fn node_id(&self) -> NodeId {
56        self.node_id
57    }
58
59    /// Returns output frame number currently being produced.
60    #[must_use]
61    pub const fn frame_number(&self) -> usize {
62        self.frame_number
63    }
64
65    /// Requests one upstream input frame by input slot index and frame number.
66    pub fn input_frame(&self, input_index: usize, frame_number: usize) -> Result<Frame> {
67        self.scheduler
68            .request_input(self.node_id, self.frame_number, input_index, frame_number)
69    }
70}
71
72#[cfg(test)]
73struct NoopDependencyRequester;
74
75#[cfg(test)]
76impl DependencyRequester for NoopDependencyRequester {
77    fn request_input(
78        &self,
79        _node_id: NodeId,
80        _output_frame: usize,
81        _input_index: usize,
82        _requested_frame: usize,
83    ) -> Result<Frame> {
84        Err(render_error(
85            "render.invalid_input",
86            "test frame requests do not support dependencies",
87        ))
88    }
89}
90
91#[cfg(test)]
92impl<'a> FrameRequest<'a> {
93    /// Creates frame request with no dependency support for unit tests.
94    #[must_use]
95    pub fn for_tests(node_id: NodeId, frame_number: usize) -> Self {
96        static REQUESTER: NoopDependencyRequester = NoopDependencyRequester;
97        Self::new(node_id, frame_number, &REQUESTER)
98    }
99}
100
101trait DependencyRequester: Send + Sync {
102    fn request_input(
103        &self,
104        node_id: NodeId,
105        output_frame: usize,
106        input_index: usize,
107        requested_frame: usize,
108    ) -> Result<Frame>;
109}
110
111/// Public map from graph node IDs to concrete render executors.
112#[derive(Clone, Default)]
113pub struct RenderExecutorMap {
114    executors: BTreeMap<NodeId, Arc<dyn FrameExecutor>>,
115}
116
117impl RenderExecutorMap {
118    /// Creates empty executor map.
119    #[must_use]
120    pub fn new() -> Self {
121        Self {
122            executors: BTreeMap::new(),
123        }
124    }
125
126    /// Inserts executor for one node ID.
127    pub fn insert(&mut self, node_id: NodeId, executor: Arc<dyn FrameExecutor>) {
128        self.executors.insert(node_id, executor);
129    }
130
131    /// Returns true when executor exists for node ID.
132    #[must_use]
133    pub fn contains(&self, node_id: NodeId) -> bool {
134        self.executors.contains_key(&node_id)
135    }
136
137    fn get(&self, node_id: NodeId) -> Option<Arc<dyn FrameExecutor>> {
138        self.executors.get(&node_id).cloned()
139    }
140}
141
142/// User-selected render frame range.
143#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
144pub struct RenderOptions {
145    start: usize,
146    end: Option<usize>,
147}
148
149impl RenderOptions {
150    /// Creates render options. End is exclusive; `None` means clip frame count.
151    #[must_use]
152    pub const fn new(start: usize, end: Option<usize>) -> Self {
153        Self { start, end }
154    }
155
156    /// Validates selected range against finite clip frame count.
157    pub fn validate(self, frame_count: usize) -> Result<RenderRange> {
158        let end = self.end.unwrap_or(frame_count);
159        if self.start > end {
160            return Err(render_error(
161                "render.invalid_range",
162                "render start must be less than or equal to end",
163            ));
164        }
165        if end > frame_count {
166            return Err(render_error(
167                "render.frame_out_of_range",
168                "render range exceeds clip frame count",
169            ));
170        }
171
172        Ok(RenderRange {
173            start: self.start,
174            end,
175        })
176    }
177}
178
179/// Validated render frame range.
180#[derive(Clone, Copy, Debug, Eq, PartialEq)]
181pub struct RenderRange {
182    start: usize,
183    end: usize,
184}
185
186impl RenderRange {
187    /// Returns inclusive start frame.
188    #[must_use]
189    pub const fn start(self) -> usize {
190        self.start
191    }
192
193    /// Returns exclusive end frame.
194    #[must_use]
195    pub const fn end(self) -> usize {
196        self.end
197    }
198
199    /// Returns frame count inside range.
200    #[must_use]
201    pub const fn len(self) -> usize {
202        self.end - self.start
203    }
204
205    /// Returns true when range is empty.
206    #[must_use]
207    pub const fn is_empty(self) -> bool {
208        self.start == self.end
209    }
210}
211
212/// Render engine configured with one per-core worker-pool setting.
213pub struct RenderEngine {
214    config: WorkerPoolConfig,
215}
216
217impl RenderEngine {
218    /// Creates render engine using provided worker configuration.
219    #[must_use]
220    pub const fn new(config: WorkerPoolConfig) -> Self {
221        Self { config }
222    }
223
224    /// Returns configured worker thread count.
225    #[must_use]
226    pub const fn worker_threads(&self) -> usize {
227        self.config.worker_threads()
228    }
229
230    /// Starts blocking ordered render iterator.
231    pub fn render_ordered(
232        &self,
233        graph: Graph,
234        executors: RenderExecutorMap,
235        options: RenderOptions,
236    ) -> Result<OrderedRender> {
237        let _ = graph.validate()?;
238        let output_node_id = semisafe_get(graph.outputs(), 0).node_id();
239        let frame_count = output_frame_count(&graph, output_node_id)?;
240        let range = options.validate(frame_count)?;
241        let scheduler = Arc::new(SharedScheduler::new(graph, executors));
242        OrderedRender::start(self.config, scheduler, output_node_id, range)
243    }
244}
245
246/// Blocking ordered render iterator.
247///
248/// Phase 1 keeps computed `(node, frame)` results alive for the lifetime of this iterator so
249/// duplicate requests can be coalesced and timing data can be reported after iteration completes.
250pub struct OrderedRender {
251    scheduler: Arc<SharedScheduler>,
252    _worker_pool: WorkerPool,
253    receiver: mpsc::Receiver<(usize, Result<Frame>)>,
254    reorder_buffer: BTreeMap<usize, Result<Frame>>,
255    next_frame: usize,
256    remaining: usize,
257}
258
259impl OrderedRender {
260    fn start(
261        config: WorkerPoolConfig,
262        scheduler: Arc<SharedScheduler>,
263        output_node_id: NodeId,
264        range: RenderRange,
265    ) -> Result<Self> {
266        let (tx, receiver) = mpsc::channel();
267        let worker_pool = WorkerPool::new(config);
268
269        for frame_number in range.start..range.end {
270            let scheduler = Arc::clone(&scheduler);
271            let tx = tx.clone();
272            worker_pool.execute(move || {
273                let result = scheduler.compute_frame(FrameKey {
274                    node_id: output_node_id,
275                    frame_number,
276                });
277                #[expect(
278                    clippy::let_underscore_must_use,
279                    reason = "cannot propagate result from threaded context, better to ignore than panic"
280                )]
281                let _ = tx.send((frame_number, result));
282            })?;
283        }
284        drop(tx);
285
286        Ok(Self {
287            scheduler,
288            _worker_pool: worker_pool,
289            receiver,
290            reorder_buffer: BTreeMap::new(),
291            next_frame: range.start,
292            remaining: range.len(),
293        })
294    }
295
296    /// Returns aggregate per-node timing snapshot collected so far.
297    #[must_use]
298    pub fn timing_report(&self) -> TimingReport {
299        self.scheduler.timing_report()
300    }
301}
302
303impl Iterator for OrderedRender {
304    type Item = Result<Frame>;
305
306    fn next(&mut self) -> Option<Self::Item> {
307        if self.remaining == 0 {
308            return None;
309        }
310
311        loop {
312            if let Some(result) = self.reorder_buffer.remove(&self.next_frame) {
313                self.next_frame += 1;
314                self.remaining -= 1;
315                return Some(result);
316            }
317
318            match self.receiver.recv() {
319                Ok((frame_number, result)) => {
320                    self.reorder_buffer.insert(frame_number, result);
321                }
322                Err(_) => {
323                    self.remaining = 0;
324                    return Some(Err(render_error(
325                        "render.worker_pool_closed",
326                        "render worker pool closed before next frame completed",
327                    )));
328                }
329            }
330        }
331    }
332}
333
334#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
335struct FrameKey {
336    node_id: NodeId,
337    frame_number: usize,
338}
339
340struct SharedScheduler {
341    graph: Graph,
342    executors: RenderExecutorMap,
343    slots: Mutex<BTreeMap<FrameKey, Arc<FrameCell>>>,
344    timings: Mutex<BTreeMap<NodeId, FilterTiming>>,
345    ordered_commit_gates: Mutex<BTreeMap<NodeId, Arc<OrderedCommitGate>>>,
346    source_gates: Mutex<BTreeMap<NodeId, Arc<ConcurrencyGate>>>,
347}
348
349impl SharedScheduler {
350    const fn new(graph: Graph, executors: RenderExecutorMap) -> Self {
351        Self {
352            graph,
353            executors,
354            slots: Mutex::new(BTreeMap::new()),
355            timings: Mutex::new(BTreeMap::new()),
356            ordered_commit_gates: Mutex::new(BTreeMap::new()),
357            source_gates: Mutex::new(BTreeMap::new()),
358        }
359    }
360
361    fn timing_report(&self) -> TimingReport {
362        let timings = lock(&self.timings).clone();
363        TimingReport::from_timings(timings)
364    }
365
366    fn compute_frame(&self, key: FrameKey) -> Result<Frame> {
367        let _active = ActiveKeyGuard::enter(key)?;
368        let (cell, should_compute) = {
369            let mut slots = lock(&self.slots);
370            match slots.entry(key) {
371                std::collections::btree_map::Entry::Vacant(entry) => {
372                    let cell = Arc::new(FrameCell::new());
373                    entry.insert(Arc::clone(&cell));
374                    (cell, true)
375                }
376                std::collections::btree_map::Entry::Occupied(entry) => {
377                    (Arc::clone(entry.get()), false)
378                }
379            }
380        };
381
382        if !should_compute {
383            return cell.wait();
384        }
385
386        let result = self.compute_uncached(key);
387        cell.store(&result);
388        result
389    }
390
391    fn compute_uncached(&self, key: FrameKey) -> Result<Frame> {
392        let node = self.graph.node(key.node_id).ok_or_else(|| {
393            render_error(
394                "render.missing_executor",
395                format!("render node {} is missing", key.node_id.index()),
396            )
397        })?;
398        validate_frame_number(node, key.frame_number)?;
399        let executor = self.executors.get(node.id()).ok_or_else(|| {
400            render_error(
401                "render.missing_executor",
402                format!("render executor for node {} is missing", node.id().index()),
403            )
404        })?;
405
406        match node.kind() {
407            NodeKind::Source { capabilities, .. } => {
408                let source_gate = capabilities
409                    .concurrency_limit()
410                    .map(|limit| self.source_gate(node.id(), limit));
411                let source_guard = source_gate.as_ref().map(|gate| gate.acquire());
412                let started = Instant::now();
413                let request = FrameRequest::new(node.id(), key.frame_number, self);
414                let prepared = executor.prepare(request)?;
415                let committed = executor.commit(key.frame_number, prepared)?;
416                drop(source_guard);
417                self.record_timing(node.id(), started.elapsed());
418                Ok(committed)
419            }
420            NodeKind::Filter { concurrency, .. } => match concurrency {
421                crate::ConcurrencyClass::OrderedStateful => {
422                    let gate = self.commit_gate(node.id());
423                    let ticket = gate.register(key.frame_number);
424                    let started = Instant::now();
425                    let request = FrameRequest::new(node.id(), key.frame_number, self);
426                    let prepared = executor.prepare(request)?;
427                    ticket.wait_turn();
428                    let committed = executor.commit(key.frame_number, prepared)?;
429                    ticket.finish();
430                    self.record_timing(node.id(), started.elapsed());
431                    Ok(committed)
432                }
433                crate::ConcurrencyClass::Stateless | crate::ConcurrencyClass::Source => {
434                    let started = Instant::now();
435                    let request = FrameRequest::new(node.id(), key.frame_number, self);
436                    let prepared = executor.prepare(request)?;
437                    let committed = executor.commit(key.frame_number, prepared)?;
438                    self.record_timing(node.id(), started.elapsed());
439                    Ok(committed)
440                }
441            },
442        }
443    }
444
445    fn commit_gate(&self, node_id: NodeId) -> Arc<OrderedCommitGate> {
446        let mut gates = lock(&self.ordered_commit_gates);
447        Arc::clone(
448            gates
449                .entry(node_id)
450                .or_insert_with(|| Arc::new(OrderedCommitGate::new())),
451        )
452    }
453
454    fn source_gate(&self, node_id: NodeId, limit: usize) -> Arc<ConcurrencyGate> {
455        let mut gates = lock(&self.source_gates);
456        Arc::clone(
457            gates
458                .entry(node_id)
459                .or_insert_with(|| Arc::new(ConcurrencyGate::new(limit))),
460        )
461    }
462
463    fn record_timing(&self, node_id: NodeId, duration: std::time::Duration) {
464        let duration = if duration.is_zero() {
465            std::time::Duration::from_nanos(1)
466        } else {
467            duration
468        };
469        let mut timings = lock(&self.timings);
470        timings.entry(node_id).or_default().record(duration);
471    }
472}
473
474impl DependencyRequester for SharedScheduler {
475    fn request_input(
476        &self,
477        node_id: NodeId,
478        output_frame: usize,
479        input_index: usize,
480        requested_frame: usize,
481    ) -> Result<Frame> {
482        let node = self.graph.node(node_id).ok_or_else(|| {
483            render_error(
484                "render.missing_executor",
485                format!("render node {} is missing", node_id.index()),
486            )
487        })?;
488        let NodeKind::Filter {
489            inputs,
490            dependencies,
491            ..
492        } = node.kind()
493        else {
494            return Err(render_error(
495                "render.invalid_input",
496                "source nodes do not have input frames",
497            ));
498        };
499
500        if !dependencies.allows(output_frame, requested_frame) {
501            return Err(render_error(
502                "render.dependency_contract",
503                format!(
504                    "node {} requested frame {} while producing frame {} outside declared contract",
505                    node_id.index(),
506                    requested_frame,
507                    output_frame
508                ),
509            ));
510        }
511
512        let input = inputs.get(input_index).ok_or_else(|| {
513            render_error(
514                "render.invalid_input",
515                format!(
516                    "node {} input index {} is out of range",
517                    node_id.index(),
518                    input_index
519                ),
520            )
521        })?;
522
523        self.compute_frame(FrameKey {
524            node_id: input.node_id(),
525            frame_number: requested_frame,
526        })
527    }
528}
529
530struct FrameCell {
531    state: Mutex<FrameState>,
532    wake: Condvar,
533}
534
535impl FrameCell {
536    const fn new() -> Self {
537        Self {
538            state: Mutex::new(FrameState::Computing),
539            wake: Condvar::new(),
540        }
541    }
542
543    fn wait(&self) -> Result<Frame> {
544        let mut state = lock(&self.state);
545        loop {
546            match &*state {
547                FrameState::Computing => {
548                    state = wait(&self.wake, state);
549                }
550                FrameState::Ready(frame) => return Ok(frame.clone()),
551                FrameState::Failed(error) => return Err(error.to_error()),
552            }
553        }
554    }
555
556    fn store(&self, result: &Result<Frame>) {
557        let mut state = lock(&self.state);
558        *state = match result {
559            Ok(frame) => FrameState::Ready(frame.clone()),
560            Err(error) => FrameState::Failed(StoredError::from_error(error)),
561        };
562        drop(state);
563        self.wake.notify_all();
564    }
565}
566
567enum FrameState {
568    Computing,
569    Ready(Frame),
570    Failed(StoredError),
571}
572
573#[derive(Clone)]
574struct StoredError {
575    category: ErrorCategory,
576    code: ErrorCode,
577    message: String,
578}
579
580impl StoredError {
581    fn from_error(error: &PixelFlowError) -> Self {
582        Self {
583            category: error.category(),
584            code: error.code(),
585            message: error.message().to_owned(),
586        }
587    }
588
589    fn to_error(&self) -> PixelFlowError {
590        PixelFlowError::new(self.category, self.code, self.message.clone())
591    }
592}
593
594struct ActiveKeyGuard {
595    key: FrameKey,
596}
597
598impl ActiveKeyGuard {
599    fn enter(key: FrameKey) -> Result<Self> {
600        ACTIVE_KEYS.with(|active| {
601            let mut active = active.borrow_mut();
602            if active.contains(&key) {
603                return Err(render_error(
604                    "render.cycle",
605                    format!(
606                        "runtime dependency cycle reached node {} frame {}",
607                        key.node_id.index(),
608                        key.frame_number
609                    ),
610                ));
611            }
612            active.push(key);
613            Ok(Self { key })
614        })
615    }
616}
617
618impl Drop for ActiveKeyGuard {
619    fn drop(&mut self) {
620        ACTIVE_KEYS.with(|active| {
621            let mut active = active.borrow_mut();
622            let popped = active.pop();
623            debug_assert_eq!(popped, Some(self.key));
624        });
625    }
626}
627
628fn output_frame_count(graph: &Graph, output_node_id: NodeId) -> Result<usize> {
629    let node = graph.node(output_node_id).ok_or_else(|| {
630        render_error(
631            "render.missing_executor",
632            format!("output node {} is missing", output_node_id.index()),
633        )
634    })?;
635
636    match node.media().frame_count() {
637        FrameCount::Finite(frame_count) => Ok(frame_count),
638        FrameCount::Unknown => Err(render_error(
639            "render.frame_out_of_range",
640            "render requires finite frame count",
641        )),
642    }
643}
644
645fn validate_frame_number(node: &crate::GraphNode, frame_number: usize) -> Result<()> {
646    match node.media().frame_count() {
647        FrameCount::Finite(frame_count) if frame_number < frame_count => Ok(()),
648        FrameCount::Finite(frame_count) => Err(render_error(
649            "render.frame_out_of_range",
650            format!(
651                "node {} frame {} is outside 0..{}",
652                node.id().index(),
653                frame_number,
654                frame_count
655            ),
656        )),
657        FrameCount::Unknown => Err(render_error(
658            "render.frame_out_of_range",
659            format!("node {} has unknown frame count", node.id().index()),
660        )),
661    }
662}
663
664fn render_error(code: &'static str, message: impl Into<String>) -> PixelFlowError {
665    PixelFlowError::new(ErrorCategory::Core, ErrorCode::new(code), message)
666}
667
668fn lock<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
669    mutex
670        .lock()
671        .unwrap_or_else(|poisoned| poisoned.into_inner())
672}
673
674fn wait<'a, T>(condvar: &Condvar, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> {
675    condvar
676        .wait(guard)
677        .unwrap_or_else(|poisoned| poisoned.into_inner())
678}
679
680#[cfg(test)]
681mod tests {
682    #![expect(clippy::panic, reason = "allow in tests")]
683    #![expect(clippy::panic_in_result_fn, reason = "allow in tests")]
684    #![expect(clippy::unwrap_in_result, reason = "allow in tests")]
685
686    use std::sync::Arc;
687
688    use crate::{
689        AllocatorConfig, ClipMedia, ErrorCategory, ErrorCode, Frame, FrameBuilder, GraphBuilder,
690        Metadata, MetadataSchema, MetadataValue, Rational, RenderExecutorMap, RenderOptions,
691        resolve_format_alias,
692    };
693
694    use super::{FrameExecutor, FrameRequest};
695
696    fn media(frame_count: usize) -> ClipMedia {
697        ClipMedia::fixed(
698            resolve_format_alias("gray8").expect("format should resolve"),
699            2,
700            2,
701            frame_count,
702            Rational {
703                numerator: 24,
704                denominator: 1,
705            },
706        )
707    }
708
709    fn gray_frame() -> Frame {
710        FrameBuilder::new(
711            resolve_format_alias("gray8").expect("format should resolve"),
712            2,
713            2,
714            &MetadataSchema::core(),
715            AllocatorConfig::default(),
716        )
717        .expect("frame builder should allocate")
718        .finish()
719    }
720
721    fn gray_frame_with_number(number: u8) -> Frame {
722        let schema = MetadataSchema::core();
723        let mut metadata = Metadata::new(&schema);
724        metadata
725            .set(
726                &schema,
727                "core:frame_number",
728                MetadataValue::Int(i64::from(number)),
729            )
730            .expect("core frame number should set");
731        gray_frame().with_metadata(metadata)
732    }
733
734    struct ConstantExecutor;
735
736    impl FrameExecutor for ConstantExecutor {
737        fn prepare(&self, _request: FrameRequest<'_>) -> crate::Result<Frame> {
738            Ok(gray_frame())
739        }
740    }
741
742    #[test]
743    fn render_options_validate_non_empty_range() {
744        let options = RenderOptions::new(1, Some(3));
745        let range = options.validate(5).expect("range should be valid");
746
747        assert_eq!(range.start(), 1);
748        assert_eq!(range.end(), 3);
749        assert_eq!(range.len(), 2);
750    }
751
752    #[test]
753    fn render_options_reject_start_after_end() {
754        let error = RenderOptions::new(3, Some(1))
755            .validate(5)
756            .expect_err("range should fail");
757
758        assert_eq!(error.category(), ErrorCategory::Core);
759        assert_eq!(error.code(), ErrorCode::new("render.invalid_range"));
760    }
761
762    #[test]
763    fn render_options_reject_end_past_frame_count() {
764        let error = RenderOptions::new(0, Some(6))
765            .validate(5)
766            .expect_err("range should fail");
767
768        assert_eq!(error.category(), ErrorCategory::Core);
769        assert_eq!(error.code(), ErrorCode::new("render.frame_out_of_range"));
770    }
771
772    #[test]
773    fn executor_map_records_node_executor() {
774        let mut builder = GraphBuilder::new();
775        let source = builder.source("source", media(2));
776        let graph = builder.build();
777        let mut executors = RenderExecutorMap::new();
778
779        executors.insert(source.node_id(), Arc::new(ConstantExecutor));
780
781        assert!(graph.node(source.node_id()).is_some());
782        assert!(executors.contains(source.node_id()));
783    }
784
785    #[test]
786    fn duplicate_requests_for_same_node_frame_are_coalesced() {
787        use std::sync::atomic::{AtomicUsize, Ordering};
788
789        struct CountingSource {
790            calls: AtomicUsize,
791        }
792
793        impl FrameExecutor for CountingSource {
794            fn prepare(&self, _request: FrameRequest<'_>) -> crate::Result<Frame> {
795                self.calls.fetch_add(1, Ordering::SeqCst);
796                Ok(gray_frame())
797            }
798        }
799
800        struct DoubleRequestFilter;
801
802        impl FrameExecutor for DoubleRequestFilter {
803            fn prepare(&self, request: FrameRequest<'_>) -> crate::Result<Frame> {
804                let first = request.input_frame(0, request.frame_number())?;
805                let second = request.input_frame(0, request.frame_number())?;
806                assert!(first.shares_plane_storage(&second, 0));
807                Ok(first)
808            }
809        }
810
811        let mut builder = GraphBuilder::new();
812        let source = builder.source("source", media(3));
813        let filtered = builder
814            .filter(
815                "double",
816                &[source],
817                media(3),
818                crate::FilterCompatibility::Preserve,
819            )
820            .expect("filter should be added");
821        builder.set_output(filtered);
822        let graph = builder.build();
823
824        let source_exec = Arc::new(CountingSource {
825            calls: AtomicUsize::new(0),
826        });
827        let mut executors = RenderExecutorMap::new();
828        executors.insert(source.node_id(), source_exec.clone());
829        executors.insert(filtered.node_id(), Arc::new(DoubleRequestFilter));
830
831        let mut render = crate::RenderEngine::new(crate::WorkerPoolConfig::new(2))
832            .render_ordered(graph, executors, RenderOptions::new(0, Some(1)))
833            .expect("render should start");
834
835        assert_eq!(
836            render.next().expect("one item").expect("frame ok").width(),
837            2
838        );
839        assert_eq!(source_exec.calls.load(Ordering::SeqCst), 1);
840    }
841
842    #[test]
843    fn dependency_contract_violation_returns_structured_error() {
844        struct FutureRequestFilter;
845
846        impl FrameExecutor for FutureRequestFilter {
847            fn prepare(&self, request: FrameRequest<'_>) -> crate::Result<Frame> {
848                request.input_frame(0, request.frame_number() + 1)
849            }
850        }
851
852        let mut builder = GraphBuilder::new();
853        let source = builder.source("source", media(3));
854        let filtered = builder
855            .filter(
856                "bad",
857                &[source],
858                media(3),
859                crate::FilterCompatibility::Preserve,
860            )
861            .expect("filter should be added");
862        builder.set_output(filtered);
863        let graph = builder.build();
864
865        let mut executors = RenderExecutorMap::new();
866        executors.insert(source.node_id(), Arc::new(ConstantExecutor));
867        executors.insert(filtered.node_id(), Arc::new(FutureRequestFilter));
868
869        let mut render = crate::RenderEngine::new(crate::WorkerPoolConfig::new(1))
870            .render_ordered(graph, executors, RenderOptions::new(0, Some(1)))
871            .expect("render should start");
872        let Err(error) = render.next().expect("one item") else {
873            panic!("contract should fail")
874        };
875
876        assert_eq!(error.category(), ErrorCategory::Core);
877        assert_eq!(error.code(), ErrorCode::new("render.dependency_contract"));
878    }
879
880    #[test]
881    fn render_api_yields_frames_in_order_when_tasks_finish_out_of_order() {
882        use std::thread;
883        use std::time::Duration;
884
885        struct SlowEvenSource;
886
887        impl FrameExecutor for SlowEvenSource {
888            fn prepare(&self, request: FrameRequest<'_>) -> crate::Result<Frame> {
889                if request.frame_number() == 0 {
890                    thread::sleep(Duration::from_millis(30));
891                }
892                Ok(gray_frame_with_number(request.frame_number() as u8))
893            }
894        }
895
896        let mut builder = GraphBuilder::new();
897        let source = builder.source("source", media(2));
898        builder.set_output(source);
899        let graph = builder.build();
900
901        let mut executors = RenderExecutorMap::new();
902        executors.insert(source.node_id(), Arc::new(SlowEvenSource));
903
904        let mut render = crate::RenderEngine::new(crate::WorkerPoolConfig::new(2))
905            .render_ordered(graph, executors, RenderOptions::new(0, Some(2)))
906            .expect("render should start");
907
908        let first = render.next().expect("first item").expect("first frame");
909        let second = render.next().expect("second item").expect("second frame");
910
911        assert_eq!(
912            first.metadata().get("core:frame_number"),
913            Some(&crate::MetadataValue::Int(0))
914        );
915        assert_eq!(
916            second.metadata().get("core:frame_number"),
917            Some(&crate::MetadataValue::Int(1))
918        );
919        assert!(render.next().is_none());
920    }
921
922    #[test]
923    fn render_engine_respects_configured_worker_count() {
924        let engine = crate::RenderEngine::new(crate::WorkerPoolConfig::new(3));
925
926        assert_eq!(engine.worker_threads(), 3);
927    }
928
929    #[test]
930    fn ordered_stateful_filter_commits_in_increasing_frame_order() {
931        use std::sync::{Arc, Mutex};
932        use std::thread;
933        use std::time::Duration;
934
935        struct OrderedRecorder {
936            commits: Arc<Mutex<Vec<usize>>>,
937        }
938
939        impl FrameExecutor for OrderedRecorder {
940            fn prepare(&self, request: FrameRequest<'_>) -> crate::Result<Frame> {
941                if request.frame_number() == 0 {
942                    thread::sleep(Duration::from_millis(30));
943                }
944                request.input_frame(0, request.frame_number())
945            }
946
947            fn commit(&self, frame_number: usize, frame: Frame) -> crate::Result<Frame> {
948                self.commits
949                    .lock()
950                    .expect("commit log lock")
951                    .push(frame_number);
952                Ok(frame)
953            }
954        }
955
956        let commits = Arc::new(Mutex::new(Vec::new()));
957        let mut builder = GraphBuilder::new();
958        let source = builder.source("source", media(2));
959        let filtered = builder
960            .filter_with_schedule(
961                "ordered",
962                &[source],
963                media(2),
964                crate::FilterCompatibility::Preserve,
965                crate::DependencyPattern::same_frame(),
966                crate::ConcurrencyClass::OrderedStateful,
967            )
968            .expect("filter should be added");
969        builder.set_output(filtered);
970        let graph = builder.build();
971
972        let mut executors = RenderExecutorMap::new();
973        executors.insert(source.node_id(), Arc::new(ConstantExecutor));
974        executors.insert(
975            filtered.node_id(),
976            Arc::new(OrderedRecorder {
977                commits: commits.clone(),
978            }),
979        );
980
981        let frames: Vec<_> = crate::RenderEngine::new(crate::WorkerPoolConfig::new(2))
982            .render_ordered(graph, executors, RenderOptions::new(0, Some(2)))
983            .expect("render should start")
984            .collect::<crate::Result<Vec<_>>>()
985            .expect("render should succeed");
986
987        assert_eq!(frames.len(), 2);
988        assert_eq!(*commits.lock().expect("commit log lock"), vec![0, 1]);
989    }
990
991    #[test]
992    fn source_concurrency_limit_is_enforced() {
993        use std::sync::atomic::{AtomicUsize, Ordering};
994        use std::thread;
995        use std::time::Duration;
996
997        struct ConcurrentSource {
998            active: AtomicUsize,
999            max_seen: AtomicUsize,
1000        }
1001
1002        impl FrameExecutor for ConcurrentSource {
1003            fn prepare(&self, _request: FrameRequest<'_>) -> crate::Result<Frame> {
1004                let active = self.active.fetch_add(1, Ordering::SeqCst) + 1;
1005                self.max_seen.fetch_max(active, Ordering::SeqCst);
1006                thread::sleep(Duration::from_millis(10));
1007                self.active.fetch_sub(1, Ordering::SeqCst);
1008                Ok(gray_frame())
1009            }
1010        }
1011
1012        let source_exec = Arc::new(ConcurrentSource {
1013            active: AtomicUsize::new(0),
1014            max_seen: AtomicUsize::new(0),
1015        });
1016        let mut builder = GraphBuilder::new();
1017        let source = builder.source_with_capabilities(
1018            "source",
1019            media(4),
1020            crate::SourceCapabilities::random_access().with_concurrency_limit(1),
1021        );
1022        builder.set_output(source);
1023        let graph = builder.build();
1024
1025        let mut executors = RenderExecutorMap::new();
1026        executors.insert(source.node_id(), source_exec.clone());
1027
1028        crate::RenderEngine::new(crate::WorkerPoolConfig::new(4))
1029            .render_ordered(graph, executors, RenderOptions::new(0, Some(4)))
1030            .expect("render should start")
1031            .collect::<crate::Result<Vec<_>>>()
1032            .expect("render should succeed");
1033
1034        assert_eq!(source_exec.max_seen.load(Ordering::SeqCst), 1);
1035    }
1036
1037    #[test]
1038    fn render_exposes_aggregate_timing_by_node() {
1039        let mut builder = GraphBuilder::new();
1040        let source = builder.source("source", media(2));
1041        builder.set_output(source);
1042        let graph = builder.build();
1043
1044        let mut executors = RenderExecutorMap::new();
1045        executors.insert(source.node_id(), Arc::new(ConstantExecutor));
1046
1047        let mut render = crate::RenderEngine::new(crate::WorkerPoolConfig::new(2))
1048            .render_ordered(graph, executors, RenderOptions::new(0, Some(2)))
1049            .expect("render should start");
1050        render
1051            .by_ref()
1052            .collect::<crate::Result<Vec<_>>>()
1053            .expect("render should succeed");
1054        let report = render.timing_report();
1055        let timing = report.get(source.node_id()).expect("source timing exists");
1056
1057        assert_eq!(timing.frames(), 2);
1058        assert!(timing.total() > std::time::Duration::ZERO);
1059    }
1060}