1use std::cell::RefCell;
4use std::collections::BTreeMap;
5use std::sync::{Arc, Condvar, Mutex, MutexGuard, mpsc};
6use std::time::Instant;
7
8use semisafe::slice::get as semisafe_get;
9
10use crate::scheduler::{
11 ConcurrencyGate, FilterTiming, OrderedCommitGate, TimingReport, WorkerPool,
12};
13use crate::{
14 ErrorCategory, ErrorCode, Frame, FrameCount, Graph, NodeId, NodeKind, PixelFlowError, Result,
15 WorkerPoolConfig,
16};
17
18thread_local! {
19 static ACTIVE_KEYS: RefCell<Vec<FrameKey>> = const { RefCell::new(Vec::new()) };
20}
21
22pub trait FrameExecutor: Send + Sync {
24 fn prepare(&self, request: FrameRequest<'_>) -> Result<Frame>;
26
27 fn commit(&self, _frame_number: usize, frame: Frame) -> Result<Frame> {
29 Ok(frame)
30 }
31}
32
33pub struct FrameRequest<'a> {
35 node_id: NodeId,
36 frame_number: usize,
37 scheduler: &'a dyn DependencyRequester,
38}
39
40impl<'a> FrameRequest<'a> {
41 const fn new(
42 node_id: NodeId,
43 frame_number: usize,
44 scheduler: &'a dyn DependencyRequester,
45 ) -> Self {
46 Self {
47 node_id,
48 frame_number,
49 scheduler,
50 }
51 }
52
53 #[must_use]
55 pub const fn node_id(&self) -> NodeId {
56 self.node_id
57 }
58
59 #[must_use]
61 pub const fn frame_number(&self) -> usize {
62 self.frame_number
63 }
64
65 pub fn input_frame(&self, input_index: usize, frame_number: usize) -> Result<Frame> {
67 self.scheduler
68 .request_input(self.node_id, self.frame_number, input_index, frame_number)
69 }
70}
71
72#[cfg(test)]
73struct NoopDependencyRequester;
74
75#[cfg(test)]
76impl DependencyRequester for NoopDependencyRequester {
77 fn request_input(
78 &self,
79 _node_id: NodeId,
80 _output_frame: usize,
81 _input_index: usize,
82 _requested_frame: usize,
83 ) -> Result<Frame> {
84 Err(render_error(
85 "render.invalid_input",
86 "test frame requests do not support dependencies",
87 ))
88 }
89}
90
91#[cfg(test)]
92impl<'a> FrameRequest<'a> {
93 #[must_use]
95 pub fn for_tests(node_id: NodeId, frame_number: usize) -> Self {
96 static REQUESTER: NoopDependencyRequester = NoopDependencyRequester;
97 Self::new(node_id, frame_number, &REQUESTER)
98 }
99}
100
101trait DependencyRequester: Send + Sync {
102 fn request_input(
103 &self,
104 node_id: NodeId,
105 output_frame: usize,
106 input_index: usize,
107 requested_frame: usize,
108 ) -> Result<Frame>;
109}
110
111#[derive(Clone, Default)]
113pub struct RenderExecutorMap {
114 executors: BTreeMap<NodeId, Arc<dyn FrameExecutor>>,
115}
116
117impl RenderExecutorMap {
118 #[must_use]
120 pub fn new() -> Self {
121 Self {
122 executors: BTreeMap::new(),
123 }
124 }
125
126 pub fn insert(&mut self, node_id: NodeId, executor: Arc<dyn FrameExecutor>) {
128 self.executors.insert(node_id, executor);
129 }
130
131 #[must_use]
133 pub fn contains(&self, node_id: NodeId) -> bool {
134 self.executors.contains_key(&node_id)
135 }
136
137 fn get(&self, node_id: NodeId) -> Option<Arc<dyn FrameExecutor>> {
138 self.executors.get(&node_id).cloned()
139 }
140}
141
142#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
144pub struct RenderOptions {
145 start: usize,
146 end: Option<usize>,
147}
148
149impl RenderOptions {
150 #[must_use]
152 pub const fn new(start: usize, end: Option<usize>) -> Self {
153 Self { start, end }
154 }
155
156 pub fn validate(self, frame_count: usize) -> Result<RenderRange> {
158 let end = self.end.unwrap_or(frame_count);
159 if self.start > end {
160 return Err(render_error(
161 "render.invalid_range",
162 "render start must be less than or equal to end",
163 ));
164 }
165 if end > frame_count {
166 return Err(render_error(
167 "render.frame_out_of_range",
168 "render range exceeds clip frame count",
169 ));
170 }
171
172 Ok(RenderRange {
173 start: self.start,
174 end,
175 })
176 }
177}
178
179#[derive(Clone, Copy, Debug, Eq, PartialEq)]
181pub struct RenderRange {
182 start: usize,
183 end: usize,
184}
185
186impl RenderRange {
187 #[must_use]
189 pub const fn start(self) -> usize {
190 self.start
191 }
192
193 #[must_use]
195 pub const fn end(self) -> usize {
196 self.end
197 }
198
199 #[must_use]
201 pub const fn len(self) -> usize {
202 self.end - self.start
203 }
204
205 #[must_use]
207 pub const fn is_empty(self) -> bool {
208 self.start == self.end
209 }
210}
211
212pub struct RenderEngine {
214 config: WorkerPoolConfig,
215}
216
217impl RenderEngine {
218 #[must_use]
220 pub const fn new(config: WorkerPoolConfig) -> Self {
221 Self { config }
222 }
223
224 #[must_use]
226 pub const fn worker_threads(&self) -> usize {
227 self.config.worker_threads()
228 }
229
230 pub fn render_ordered(
232 &self,
233 graph: Graph,
234 executors: RenderExecutorMap,
235 options: RenderOptions,
236 ) -> Result<OrderedRender> {
237 let _ = graph.validate()?;
238 let output_node_id = semisafe_get(graph.outputs(), 0).node_id();
239 let frame_count = output_frame_count(&graph, output_node_id)?;
240 let range = options.validate(frame_count)?;
241 let scheduler = Arc::new(SharedScheduler::new(graph, executors));
242 OrderedRender::start(self.config, scheduler, output_node_id, range)
243 }
244}
245
246pub struct OrderedRender {
251 scheduler: Arc<SharedScheduler>,
252 _worker_pool: WorkerPool,
253 receiver: mpsc::Receiver<(usize, Result<Frame>)>,
254 reorder_buffer: BTreeMap<usize, Result<Frame>>,
255 next_frame: usize,
256 remaining: usize,
257}
258
259impl OrderedRender {
260 fn start(
261 config: WorkerPoolConfig,
262 scheduler: Arc<SharedScheduler>,
263 output_node_id: NodeId,
264 range: RenderRange,
265 ) -> Result<Self> {
266 let (tx, receiver) = mpsc::channel();
267 let worker_pool = WorkerPool::new(config);
268
269 for frame_number in range.start..range.end {
270 let scheduler = Arc::clone(&scheduler);
271 let tx = tx.clone();
272 worker_pool.execute(move || {
273 let result = scheduler.compute_frame(FrameKey {
274 node_id: output_node_id,
275 frame_number,
276 });
277 #[expect(
278 clippy::let_underscore_must_use,
279 reason = "cannot propagate result from threaded context, better to ignore than panic"
280 )]
281 let _ = tx.send((frame_number, result));
282 })?;
283 }
284 drop(tx);
285
286 Ok(Self {
287 scheduler,
288 _worker_pool: worker_pool,
289 receiver,
290 reorder_buffer: BTreeMap::new(),
291 next_frame: range.start,
292 remaining: range.len(),
293 })
294 }
295
296 #[must_use]
298 pub fn timing_report(&self) -> TimingReport {
299 self.scheduler.timing_report()
300 }
301}
302
303impl Iterator for OrderedRender {
304 type Item = Result<Frame>;
305
306 fn next(&mut self) -> Option<Self::Item> {
307 if self.remaining == 0 {
308 return None;
309 }
310
311 loop {
312 if let Some(result) = self.reorder_buffer.remove(&self.next_frame) {
313 self.next_frame += 1;
314 self.remaining -= 1;
315 return Some(result);
316 }
317
318 match self.receiver.recv() {
319 Ok((frame_number, result)) => {
320 self.reorder_buffer.insert(frame_number, result);
321 }
322 Err(_) => {
323 self.remaining = 0;
324 return Some(Err(render_error(
325 "render.worker_pool_closed",
326 "render worker pool closed before next frame completed",
327 )));
328 }
329 }
330 }
331 }
332}
333
334#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
335struct FrameKey {
336 node_id: NodeId,
337 frame_number: usize,
338}
339
340struct SharedScheduler {
341 graph: Graph,
342 executors: RenderExecutorMap,
343 slots: Mutex<BTreeMap<FrameKey, Arc<FrameCell>>>,
344 timings: Mutex<BTreeMap<NodeId, FilterTiming>>,
345 ordered_commit_gates: Mutex<BTreeMap<NodeId, Arc<OrderedCommitGate>>>,
346 source_gates: Mutex<BTreeMap<NodeId, Arc<ConcurrencyGate>>>,
347}
348
349impl SharedScheduler {
350 const fn new(graph: Graph, executors: RenderExecutorMap) -> Self {
351 Self {
352 graph,
353 executors,
354 slots: Mutex::new(BTreeMap::new()),
355 timings: Mutex::new(BTreeMap::new()),
356 ordered_commit_gates: Mutex::new(BTreeMap::new()),
357 source_gates: Mutex::new(BTreeMap::new()),
358 }
359 }
360
361 fn timing_report(&self) -> TimingReport {
362 let timings = lock(&self.timings).clone();
363 TimingReport::from_timings(timings)
364 }
365
366 fn compute_frame(&self, key: FrameKey) -> Result<Frame> {
367 let _active = ActiveKeyGuard::enter(key)?;
368 let (cell, should_compute) = {
369 let mut slots = lock(&self.slots);
370 match slots.entry(key) {
371 std::collections::btree_map::Entry::Vacant(entry) => {
372 let cell = Arc::new(FrameCell::new());
373 entry.insert(Arc::clone(&cell));
374 (cell, true)
375 }
376 std::collections::btree_map::Entry::Occupied(entry) => {
377 (Arc::clone(entry.get()), false)
378 }
379 }
380 };
381
382 if !should_compute {
383 return cell.wait();
384 }
385
386 let result = self.compute_uncached(key);
387 cell.store(&result);
388 result
389 }
390
391 fn compute_uncached(&self, key: FrameKey) -> Result<Frame> {
392 let node = self.graph.node(key.node_id).ok_or_else(|| {
393 render_error(
394 "render.missing_executor",
395 format!("render node {} is missing", key.node_id.index()),
396 )
397 })?;
398 validate_frame_number(node, key.frame_number)?;
399 let executor = self.executors.get(node.id()).ok_or_else(|| {
400 render_error(
401 "render.missing_executor",
402 format!("render executor for node {} is missing", node.id().index()),
403 )
404 })?;
405
406 match node.kind() {
407 NodeKind::Source { capabilities, .. } => {
408 let source_gate = capabilities
409 .concurrency_limit()
410 .map(|limit| self.source_gate(node.id(), limit));
411 let source_guard = source_gate.as_ref().map(|gate| gate.acquire());
412 let started = Instant::now();
413 let request = FrameRequest::new(node.id(), key.frame_number, self);
414 let prepared = executor.prepare(request)?;
415 let committed = executor.commit(key.frame_number, prepared)?;
416 drop(source_guard);
417 self.record_timing(node.id(), started.elapsed());
418 Ok(committed)
419 }
420 NodeKind::Filter { concurrency, .. } => match concurrency {
421 crate::ConcurrencyClass::OrderedStateful => {
422 let gate = self.commit_gate(node.id());
423 let ticket = gate.register(key.frame_number);
424 let started = Instant::now();
425 let request = FrameRequest::new(node.id(), key.frame_number, self);
426 let prepared = executor.prepare(request)?;
427 ticket.wait_turn();
428 let committed = executor.commit(key.frame_number, prepared)?;
429 ticket.finish();
430 self.record_timing(node.id(), started.elapsed());
431 Ok(committed)
432 }
433 crate::ConcurrencyClass::Stateless | crate::ConcurrencyClass::Source => {
434 let started = Instant::now();
435 let request = FrameRequest::new(node.id(), key.frame_number, self);
436 let prepared = executor.prepare(request)?;
437 let committed = executor.commit(key.frame_number, prepared)?;
438 self.record_timing(node.id(), started.elapsed());
439 Ok(committed)
440 }
441 },
442 }
443 }
444
445 fn commit_gate(&self, node_id: NodeId) -> Arc<OrderedCommitGate> {
446 let mut gates = lock(&self.ordered_commit_gates);
447 Arc::clone(
448 gates
449 .entry(node_id)
450 .or_insert_with(|| Arc::new(OrderedCommitGate::new())),
451 )
452 }
453
454 fn source_gate(&self, node_id: NodeId, limit: usize) -> Arc<ConcurrencyGate> {
455 let mut gates = lock(&self.source_gates);
456 Arc::clone(
457 gates
458 .entry(node_id)
459 .or_insert_with(|| Arc::new(ConcurrencyGate::new(limit))),
460 )
461 }
462
463 fn record_timing(&self, node_id: NodeId, duration: std::time::Duration) {
464 let duration = if duration.is_zero() {
465 std::time::Duration::from_nanos(1)
466 } else {
467 duration
468 };
469 let mut timings = lock(&self.timings);
470 timings.entry(node_id).or_default().record(duration);
471 }
472}
473
474impl DependencyRequester for SharedScheduler {
475 fn request_input(
476 &self,
477 node_id: NodeId,
478 output_frame: usize,
479 input_index: usize,
480 requested_frame: usize,
481 ) -> Result<Frame> {
482 let node = self.graph.node(node_id).ok_or_else(|| {
483 render_error(
484 "render.missing_executor",
485 format!("render node {} is missing", node_id.index()),
486 )
487 })?;
488 let NodeKind::Filter {
489 inputs,
490 dependencies,
491 ..
492 } = node.kind()
493 else {
494 return Err(render_error(
495 "render.invalid_input",
496 "source nodes do not have input frames",
497 ));
498 };
499
500 if !dependencies.allows(output_frame, requested_frame) {
501 return Err(render_error(
502 "render.dependency_contract",
503 format!(
504 "node {} requested frame {} while producing frame {} outside declared contract",
505 node_id.index(),
506 requested_frame,
507 output_frame
508 ),
509 ));
510 }
511
512 let input = inputs.get(input_index).ok_or_else(|| {
513 render_error(
514 "render.invalid_input",
515 format!(
516 "node {} input index {} is out of range",
517 node_id.index(),
518 input_index
519 ),
520 )
521 })?;
522
523 self.compute_frame(FrameKey {
524 node_id: input.node_id(),
525 frame_number: requested_frame,
526 })
527 }
528}
529
530struct FrameCell {
531 state: Mutex<FrameState>,
532 wake: Condvar,
533}
534
535impl FrameCell {
536 const fn new() -> Self {
537 Self {
538 state: Mutex::new(FrameState::Computing),
539 wake: Condvar::new(),
540 }
541 }
542
543 fn wait(&self) -> Result<Frame> {
544 let mut state = lock(&self.state);
545 loop {
546 match &*state {
547 FrameState::Computing => {
548 state = wait(&self.wake, state);
549 }
550 FrameState::Ready(frame) => return Ok(frame.clone()),
551 FrameState::Failed(error) => return Err(error.to_error()),
552 }
553 }
554 }
555
556 fn store(&self, result: &Result<Frame>) {
557 let mut state = lock(&self.state);
558 *state = match result {
559 Ok(frame) => FrameState::Ready(frame.clone()),
560 Err(error) => FrameState::Failed(StoredError::from_error(error)),
561 };
562 drop(state);
563 self.wake.notify_all();
564 }
565}
566
567enum FrameState {
568 Computing,
569 Ready(Frame),
570 Failed(StoredError),
571}
572
573#[derive(Clone)]
574struct StoredError {
575 category: ErrorCategory,
576 code: ErrorCode,
577 message: String,
578}
579
580impl StoredError {
581 fn from_error(error: &PixelFlowError) -> Self {
582 Self {
583 category: error.category(),
584 code: error.code(),
585 message: error.message().to_owned(),
586 }
587 }
588
589 fn to_error(&self) -> PixelFlowError {
590 PixelFlowError::new(self.category, self.code, self.message.clone())
591 }
592}
593
594struct ActiveKeyGuard {
595 key: FrameKey,
596}
597
598impl ActiveKeyGuard {
599 fn enter(key: FrameKey) -> Result<Self> {
600 ACTIVE_KEYS.with(|active| {
601 let mut active = active.borrow_mut();
602 if active.contains(&key) {
603 return Err(render_error(
604 "render.cycle",
605 format!(
606 "runtime dependency cycle reached node {} frame {}",
607 key.node_id.index(),
608 key.frame_number
609 ),
610 ));
611 }
612 active.push(key);
613 Ok(Self { key })
614 })
615 }
616}
617
618impl Drop for ActiveKeyGuard {
619 fn drop(&mut self) {
620 ACTIVE_KEYS.with(|active| {
621 let mut active = active.borrow_mut();
622 let popped = active.pop();
623 debug_assert_eq!(popped, Some(self.key));
624 });
625 }
626}
627
628fn output_frame_count(graph: &Graph, output_node_id: NodeId) -> Result<usize> {
629 let node = graph.node(output_node_id).ok_or_else(|| {
630 render_error(
631 "render.missing_executor",
632 format!("output node {} is missing", output_node_id.index()),
633 )
634 })?;
635
636 match node.media().frame_count() {
637 FrameCount::Finite(frame_count) => Ok(frame_count),
638 FrameCount::Unknown => Err(render_error(
639 "render.frame_out_of_range",
640 "render requires finite frame count",
641 )),
642 }
643}
644
645fn validate_frame_number(node: &crate::GraphNode, frame_number: usize) -> Result<()> {
646 match node.media().frame_count() {
647 FrameCount::Finite(frame_count) if frame_number < frame_count => Ok(()),
648 FrameCount::Finite(frame_count) => Err(render_error(
649 "render.frame_out_of_range",
650 format!(
651 "node {} frame {} is outside 0..{}",
652 node.id().index(),
653 frame_number,
654 frame_count
655 ),
656 )),
657 FrameCount::Unknown => Err(render_error(
658 "render.frame_out_of_range",
659 format!("node {} has unknown frame count", node.id().index()),
660 )),
661 }
662}
663
664fn render_error(code: &'static str, message: impl Into<String>) -> PixelFlowError {
665 PixelFlowError::new(ErrorCategory::Core, ErrorCode::new(code), message)
666}
667
668fn lock<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
669 mutex
670 .lock()
671 .unwrap_or_else(|poisoned| poisoned.into_inner())
672}
673
674fn wait<'a, T>(condvar: &Condvar, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> {
675 condvar
676 .wait(guard)
677 .unwrap_or_else(|poisoned| poisoned.into_inner())
678}
679
680#[cfg(test)]
681mod tests {
682 #![expect(clippy::panic, reason = "allow in tests")]
683 #![expect(clippy::panic_in_result_fn, reason = "allow in tests")]
684 #![expect(clippy::unwrap_in_result, reason = "allow in tests")]
685
686 use std::sync::Arc;
687
688 use crate::{
689 AllocatorConfig, ClipMedia, ErrorCategory, ErrorCode, Frame, FrameBuilder, GraphBuilder,
690 Metadata, MetadataSchema, MetadataValue, Rational, RenderExecutorMap, RenderOptions,
691 resolve_format_alias,
692 };
693
694 use super::{FrameExecutor, FrameRequest};
695
696 fn media(frame_count: usize) -> ClipMedia {
697 ClipMedia::fixed(
698 resolve_format_alias("gray8").expect("format should resolve"),
699 2,
700 2,
701 frame_count,
702 Rational {
703 numerator: 24,
704 denominator: 1,
705 },
706 )
707 }
708
709 fn gray_frame() -> Frame {
710 FrameBuilder::new(
711 resolve_format_alias("gray8").expect("format should resolve"),
712 2,
713 2,
714 &MetadataSchema::core(),
715 AllocatorConfig::default(),
716 )
717 .expect("frame builder should allocate")
718 .finish()
719 }
720
721 fn gray_frame_with_number(number: u8) -> Frame {
722 let schema = MetadataSchema::core();
723 let mut metadata = Metadata::new(&schema);
724 metadata
725 .set(
726 &schema,
727 "core:frame_number",
728 MetadataValue::Int(i64::from(number)),
729 )
730 .expect("core frame number should set");
731 gray_frame().with_metadata(metadata)
732 }
733
734 struct ConstantExecutor;
735
736 impl FrameExecutor for ConstantExecutor {
737 fn prepare(&self, _request: FrameRequest<'_>) -> crate::Result<Frame> {
738 Ok(gray_frame())
739 }
740 }
741
742 #[test]
743 fn render_options_validate_non_empty_range() {
744 let options = RenderOptions::new(1, Some(3));
745 let range = options.validate(5).expect("range should be valid");
746
747 assert_eq!(range.start(), 1);
748 assert_eq!(range.end(), 3);
749 assert_eq!(range.len(), 2);
750 }
751
752 #[test]
753 fn render_options_reject_start_after_end() {
754 let error = RenderOptions::new(3, Some(1))
755 .validate(5)
756 .expect_err("range should fail");
757
758 assert_eq!(error.category(), ErrorCategory::Core);
759 assert_eq!(error.code(), ErrorCode::new("render.invalid_range"));
760 }
761
762 #[test]
763 fn render_options_reject_end_past_frame_count() {
764 let error = RenderOptions::new(0, Some(6))
765 .validate(5)
766 .expect_err("range should fail");
767
768 assert_eq!(error.category(), ErrorCategory::Core);
769 assert_eq!(error.code(), ErrorCode::new("render.frame_out_of_range"));
770 }
771
772 #[test]
773 fn executor_map_records_node_executor() {
774 let mut builder = GraphBuilder::new();
775 let source = builder.source("source", media(2));
776 let graph = builder.build();
777 let mut executors = RenderExecutorMap::new();
778
779 executors.insert(source.node_id(), Arc::new(ConstantExecutor));
780
781 assert!(graph.node(source.node_id()).is_some());
782 assert!(executors.contains(source.node_id()));
783 }
784
785 #[test]
786 fn duplicate_requests_for_same_node_frame_are_coalesced() {
787 use std::sync::atomic::{AtomicUsize, Ordering};
788
789 struct CountingSource {
790 calls: AtomicUsize,
791 }
792
793 impl FrameExecutor for CountingSource {
794 fn prepare(&self, _request: FrameRequest<'_>) -> crate::Result<Frame> {
795 self.calls.fetch_add(1, Ordering::SeqCst);
796 Ok(gray_frame())
797 }
798 }
799
800 struct DoubleRequestFilter;
801
802 impl FrameExecutor for DoubleRequestFilter {
803 fn prepare(&self, request: FrameRequest<'_>) -> crate::Result<Frame> {
804 let first = request.input_frame(0, request.frame_number())?;
805 let second = request.input_frame(0, request.frame_number())?;
806 assert!(first.shares_plane_storage(&second, 0));
807 Ok(first)
808 }
809 }
810
811 let mut builder = GraphBuilder::new();
812 let source = builder.source("source", media(3));
813 let filtered = builder
814 .filter(
815 "double",
816 &[source],
817 media(3),
818 crate::FilterCompatibility::Preserve,
819 )
820 .expect("filter should be added");
821 builder.set_output(filtered);
822 let graph = builder.build();
823
824 let source_exec = Arc::new(CountingSource {
825 calls: AtomicUsize::new(0),
826 });
827 let mut executors = RenderExecutorMap::new();
828 executors.insert(source.node_id(), source_exec.clone());
829 executors.insert(filtered.node_id(), Arc::new(DoubleRequestFilter));
830
831 let mut render = crate::RenderEngine::new(crate::WorkerPoolConfig::new(2))
832 .render_ordered(graph, executors, RenderOptions::new(0, Some(1)))
833 .expect("render should start");
834
835 assert_eq!(
836 render.next().expect("one item").expect("frame ok").width(),
837 2
838 );
839 assert_eq!(source_exec.calls.load(Ordering::SeqCst), 1);
840 }
841
842 #[test]
843 fn dependency_contract_violation_returns_structured_error() {
844 struct FutureRequestFilter;
845
846 impl FrameExecutor for FutureRequestFilter {
847 fn prepare(&self, request: FrameRequest<'_>) -> crate::Result<Frame> {
848 request.input_frame(0, request.frame_number() + 1)
849 }
850 }
851
852 let mut builder = GraphBuilder::new();
853 let source = builder.source("source", media(3));
854 let filtered = builder
855 .filter(
856 "bad",
857 &[source],
858 media(3),
859 crate::FilterCompatibility::Preserve,
860 )
861 .expect("filter should be added");
862 builder.set_output(filtered);
863 let graph = builder.build();
864
865 let mut executors = RenderExecutorMap::new();
866 executors.insert(source.node_id(), Arc::new(ConstantExecutor));
867 executors.insert(filtered.node_id(), Arc::new(FutureRequestFilter));
868
869 let mut render = crate::RenderEngine::new(crate::WorkerPoolConfig::new(1))
870 .render_ordered(graph, executors, RenderOptions::new(0, Some(1)))
871 .expect("render should start");
872 let Err(error) = render.next().expect("one item") else {
873 panic!("contract should fail")
874 };
875
876 assert_eq!(error.category(), ErrorCategory::Core);
877 assert_eq!(error.code(), ErrorCode::new("render.dependency_contract"));
878 }
879
880 #[test]
881 fn render_api_yields_frames_in_order_when_tasks_finish_out_of_order() {
882 use std::thread;
883 use std::time::Duration;
884
885 struct SlowEvenSource;
886
887 impl FrameExecutor for SlowEvenSource {
888 fn prepare(&self, request: FrameRequest<'_>) -> crate::Result<Frame> {
889 if request.frame_number() == 0 {
890 thread::sleep(Duration::from_millis(30));
891 }
892 Ok(gray_frame_with_number(request.frame_number() as u8))
893 }
894 }
895
896 let mut builder = GraphBuilder::new();
897 let source = builder.source("source", media(2));
898 builder.set_output(source);
899 let graph = builder.build();
900
901 let mut executors = RenderExecutorMap::new();
902 executors.insert(source.node_id(), Arc::new(SlowEvenSource));
903
904 let mut render = crate::RenderEngine::new(crate::WorkerPoolConfig::new(2))
905 .render_ordered(graph, executors, RenderOptions::new(0, Some(2)))
906 .expect("render should start");
907
908 let first = render.next().expect("first item").expect("first frame");
909 let second = render.next().expect("second item").expect("second frame");
910
911 assert_eq!(
912 first.metadata().get("core:frame_number"),
913 Some(&crate::MetadataValue::Int(0))
914 );
915 assert_eq!(
916 second.metadata().get("core:frame_number"),
917 Some(&crate::MetadataValue::Int(1))
918 );
919 assert!(render.next().is_none());
920 }
921
922 #[test]
923 fn render_engine_respects_configured_worker_count() {
924 let engine = crate::RenderEngine::new(crate::WorkerPoolConfig::new(3));
925
926 assert_eq!(engine.worker_threads(), 3);
927 }
928
929 #[test]
930 fn ordered_stateful_filter_commits_in_increasing_frame_order() {
931 use std::sync::{Arc, Mutex};
932 use std::thread;
933 use std::time::Duration;
934
935 struct OrderedRecorder {
936 commits: Arc<Mutex<Vec<usize>>>,
937 }
938
939 impl FrameExecutor for OrderedRecorder {
940 fn prepare(&self, request: FrameRequest<'_>) -> crate::Result<Frame> {
941 if request.frame_number() == 0 {
942 thread::sleep(Duration::from_millis(30));
943 }
944 request.input_frame(0, request.frame_number())
945 }
946
947 fn commit(&self, frame_number: usize, frame: Frame) -> crate::Result<Frame> {
948 self.commits
949 .lock()
950 .expect("commit log lock")
951 .push(frame_number);
952 Ok(frame)
953 }
954 }
955
956 let commits = Arc::new(Mutex::new(Vec::new()));
957 let mut builder = GraphBuilder::new();
958 let source = builder.source("source", media(2));
959 let filtered = builder
960 .filter_with_schedule(
961 "ordered",
962 &[source],
963 media(2),
964 crate::FilterCompatibility::Preserve,
965 crate::DependencyPattern::same_frame(),
966 crate::ConcurrencyClass::OrderedStateful,
967 )
968 .expect("filter should be added");
969 builder.set_output(filtered);
970 let graph = builder.build();
971
972 let mut executors = RenderExecutorMap::new();
973 executors.insert(source.node_id(), Arc::new(ConstantExecutor));
974 executors.insert(
975 filtered.node_id(),
976 Arc::new(OrderedRecorder {
977 commits: commits.clone(),
978 }),
979 );
980
981 let frames: Vec<_> = crate::RenderEngine::new(crate::WorkerPoolConfig::new(2))
982 .render_ordered(graph, executors, RenderOptions::new(0, Some(2)))
983 .expect("render should start")
984 .collect::<crate::Result<Vec<_>>>()
985 .expect("render should succeed");
986
987 assert_eq!(frames.len(), 2);
988 assert_eq!(*commits.lock().expect("commit log lock"), vec![0, 1]);
989 }
990
991 #[test]
992 fn source_concurrency_limit_is_enforced() {
993 use std::sync::atomic::{AtomicUsize, Ordering};
994 use std::thread;
995 use std::time::Duration;
996
997 struct ConcurrentSource {
998 active: AtomicUsize,
999 max_seen: AtomicUsize,
1000 }
1001
1002 impl FrameExecutor for ConcurrentSource {
1003 fn prepare(&self, _request: FrameRequest<'_>) -> crate::Result<Frame> {
1004 let active = self.active.fetch_add(1, Ordering::SeqCst) + 1;
1005 self.max_seen.fetch_max(active, Ordering::SeqCst);
1006 thread::sleep(Duration::from_millis(10));
1007 self.active.fetch_sub(1, Ordering::SeqCst);
1008 Ok(gray_frame())
1009 }
1010 }
1011
1012 let source_exec = Arc::new(ConcurrentSource {
1013 active: AtomicUsize::new(0),
1014 max_seen: AtomicUsize::new(0),
1015 });
1016 let mut builder = GraphBuilder::new();
1017 let source = builder.source_with_capabilities(
1018 "source",
1019 media(4),
1020 crate::SourceCapabilities::random_access().with_concurrency_limit(1),
1021 );
1022 builder.set_output(source);
1023 let graph = builder.build();
1024
1025 let mut executors = RenderExecutorMap::new();
1026 executors.insert(source.node_id(), source_exec.clone());
1027
1028 crate::RenderEngine::new(crate::WorkerPoolConfig::new(4))
1029 .render_ordered(graph, executors, RenderOptions::new(0, Some(4)))
1030 .expect("render should start")
1031 .collect::<crate::Result<Vec<_>>>()
1032 .expect("render should succeed");
1033
1034 assert_eq!(source_exec.max_seen.load(Ordering::SeqCst), 1);
1035 }
1036
1037 #[test]
1038 fn render_exposes_aggregate_timing_by_node() {
1039 let mut builder = GraphBuilder::new();
1040 let source = builder.source("source", media(2));
1041 builder.set_output(source);
1042 let graph = builder.build();
1043
1044 let mut executors = RenderExecutorMap::new();
1045 executors.insert(source.node_id(), Arc::new(ConstantExecutor));
1046
1047 let mut render = crate::RenderEngine::new(crate::WorkerPoolConfig::new(2))
1048 .render_ordered(graph, executors, RenderOptions::new(0, Some(2)))
1049 .expect("render should start");
1050 render
1051 .by_ref()
1052 .collect::<crate::Result<Vec<_>>>()
1053 .expect("render should succeed");
1054 let report = render.timing_report();
1055 let timing = report.get(source.node_id()).expect("source timing exists");
1056
1057 assert_eq!(timing.frames(), 2);
1058 assert!(timing.total() > std::time::Duration::ZERO);
1059 }
1060}