use std::cell::RefCell;
use std::collections::BTreeMap;
use std::sync::{Arc, Condvar, Mutex, MutexGuard, mpsc};
use std::time::Instant;
use semisafe::slice::get as semisafe_get;
use crate::scheduler::{
ConcurrencyGate, FilterTiming, OrderedCommitGate, TimingReport, WorkerPool,
};
use crate::{
ErrorCategory, ErrorCode, Frame, FrameCount, Graph, NodeId, NodeKind, PixelFlowError, Result,
WorkerPoolConfig,
};
thread_local! {
static ACTIVE_KEYS: RefCell<Vec<FrameKey>> = const { RefCell::new(Vec::new()) };
}
pub trait FrameExecutor: Send + Sync {
fn prepare(&self, request: FrameRequest<'_>) -> Result<Frame>;
fn commit(&self, _frame_number: usize, frame: Frame) -> Result<Frame> {
Ok(frame)
}
}
pub struct FrameRequest<'a> {
node_id: NodeId,
frame_number: usize,
scheduler: &'a dyn DependencyRequester,
}
impl<'a> FrameRequest<'a> {
const fn new(
node_id: NodeId,
frame_number: usize,
scheduler: &'a dyn DependencyRequester,
) -> Self {
Self {
node_id,
frame_number,
scheduler,
}
}
#[must_use]
pub const fn node_id(&self) -> NodeId {
self.node_id
}
#[must_use]
pub const fn frame_number(&self) -> usize {
self.frame_number
}
pub fn input_frame(&self, input_index: usize, frame_number: usize) -> Result<Frame> {
self.scheduler
.request_input(self.node_id, self.frame_number, input_index, frame_number)
}
}
#[cfg(test)]
struct NoopDependencyRequester;
#[cfg(test)]
impl DependencyRequester for NoopDependencyRequester {
fn request_input(
&self,
_node_id: NodeId,
_output_frame: usize,
_input_index: usize,
_requested_frame: usize,
) -> Result<Frame> {
Err(render_error(
"render.invalid_input",
"test frame requests do not support dependencies",
))
}
}
#[cfg(test)]
impl<'a> FrameRequest<'a> {
#[must_use]
pub fn for_tests(node_id: NodeId, frame_number: usize) -> Self {
static REQUESTER: NoopDependencyRequester = NoopDependencyRequester;
Self::new(node_id, frame_number, &REQUESTER)
}
}
trait DependencyRequester: Send + Sync {
fn request_input(
&self,
node_id: NodeId,
output_frame: usize,
input_index: usize,
requested_frame: usize,
) -> Result<Frame>;
}
#[derive(Clone, Default)]
pub struct RenderExecutorMap {
executors: BTreeMap<NodeId, Arc<dyn FrameExecutor>>,
}
impl RenderExecutorMap {
#[must_use]
pub fn new() -> Self {
Self {
executors: BTreeMap::new(),
}
}
pub fn insert(&mut self, node_id: NodeId, executor: Arc<dyn FrameExecutor>) {
self.executors.insert(node_id, executor);
}
#[must_use]
pub fn contains(&self, node_id: NodeId) -> bool {
self.executors.contains_key(&node_id)
}
fn get(&self, node_id: NodeId) -> Option<Arc<dyn FrameExecutor>> {
self.executors.get(&node_id).cloned()
}
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct RenderOptions {
start: usize,
end: Option<usize>,
}
impl RenderOptions {
#[must_use]
pub const fn new(start: usize, end: Option<usize>) -> Self {
Self { start, end }
}
pub fn validate(self, frame_count: usize) -> Result<RenderRange> {
let end = self.end.unwrap_or(frame_count);
if self.start > end {
return Err(render_error(
"render.invalid_range",
"render start must be less than or equal to end",
));
}
if end > frame_count {
return Err(render_error(
"render.frame_out_of_range",
"render range exceeds clip frame count",
));
}
Ok(RenderRange {
start: self.start,
end,
})
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct RenderRange {
start: usize,
end: usize,
}
impl RenderRange {
#[must_use]
pub const fn start(self) -> usize {
self.start
}
#[must_use]
pub const fn end(self) -> usize {
self.end
}
#[must_use]
pub const fn len(self) -> usize {
self.end - self.start
}
#[must_use]
pub const fn is_empty(self) -> bool {
self.start == self.end
}
}
pub struct RenderEngine {
config: WorkerPoolConfig,
}
impl RenderEngine {
#[must_use]
pub const fn new(config: WorkerPoolConfig) -> Self {
Self { config }
}
#[must_use]
pub const fn worker_threads(&self) -> usize {
self.config.worker_threads()
}
pub fn render_ordered(
&self,
graph: Graph,
executors: RenderExecutorMap,
options: RenderOptions,
) -> Result<OrderedRender> {
let _ = graph.validate()?;
let output_node_id = semisafe_get(graph.outputs(), 0).node_id();
let frame_count = output_frame_count(&graph, output_node_id)?;
let range = options.validate(frame_count)?;
let scheduler = Arc::new(SharedScheduler::new(graph, executors));
OrderedRender::start(self.config, scheduler, output_node_id, range)
}
}
pub struct OrderedRender {
scheduler: Arc<SharedScheduler>,
_worker_pool: WorkerPool,
receiver: mpsc::Receiver<(usize, Result<Frame>)>,
reorder_buffer: BTreeMap<usize, Result<Frame>>,
next_frame: usize,
remaining: usize,
}
impl OrderedRender {
fn start(
config: WorkerPoolConfig,
scheduler: Arc<SharedScheduler>,
output_node_id: NodeId,
range: RenderRange,
) -> Result<Self> {
let (tx, receiver) = mpsc::channel();
let worker_pool = WorkerPool::new(config);
for frame_number in range.start..range.end {
let scheduler = Arc::clone(&scheduler);
let tx = tx.clone();
worker_pool.execute(move || {
let result = scheduler.compute_frame(FrameKey {
node_id: output_node_id,
frame_number,
});
#[expect(
clippy::let_underscore_must_use,
reason = "cannot propagate result from threaded context, better to ignore than panic"
)]
let _ = tx.send((frame_number, result));
})?;
}
drop(tx);
Ok(Self {
scheduler,
_worker_pool: worker_pool,
receiver,
reorder_buffer: BTreeMap::new(),
next_frame: range.start,
remaining: range.len(),
})
}
#[must_use]
pub fn timing_report(&self) -> TimingReport {
self.scheduler.timing_report()
}
}
impl Iterator for OrderedRender {
type Item = Result<Frame>;
fn next(&mut self) -> Option<Self::Item> {
if self.remaining == 0 {
return None;
}
loop {
if let Some(result) = self.reorder_buffer.remove(&self.next_frame) {
self.next_frame += 1;
self.remaining -= 1;
return Some(result);
}
match self.receiver.recv() {
Ok((frame_number, result)) => {
self.reorder_buffer.insert(frame_number, result);
}
Err(_) => {
self.remaining = 0;
return Some(Err(render_error(
"render.worker_pool_closed",
"render worker pool closed before next frame completed",
)));
}
}
}
}
}
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
struct FrameKey {
node_id: NodeId,
frame_number: usize,
}
struct SharedScheduler {
graph: Graph,
executors: RenderExecutorMap,
slots: Mutex<BTreeMap<FrameKey, Arc<FrameCell>>>,
timings: Mutex<BTreeMap<NodeId, FilterTiming>>,
ordered_commit_gates: Mutex<BTreeMap<NodeId, Arc<OrderedCommitGate>>>,
source_gates: Mutex<BTreeMap<NodeId, Arc<ConcurrencyGate>>>,
}
impl SharedScheduler {
const fn new(graph: Graph, executors: RenderExecutorMap) -> Self {
Self {
graph,
executors,
slots: Mutex::new(BTreeMap::new()),
timings: Mutex::new(BTreeMap::new()),
ordered_commit_gates: Mutex::new(BTreeMap::new()),
source_gates: Mutex::new(BTreeMap::new()),
}
}
fn timing_report(&self) -> TimingReport {
let timings = lock(&self.timings).clone();
TimingReport::from_timings(timings)
}
fn compute_frame(&self, key: FrameKey) -> Result<Frame> {
let _active = ActiveKeyGuard::enter(key)?;
let (cell, should_compute) = {
let mut slots = lock(&self.slots);
match slots.entry(key) {
std::collections::btree_map::Entry::Vacant(entry) => {
let cell = Arc::new(FrameCell::new());
entry.insert(Arc::clone(&cell));
(cell, true)
}
std::collections::btree_map::Entry::Occupied(entry) => {
(Arc::clone(entry.get()), false)
}
}
};
if !should_compute {
return cell.wait();
}
let result = self.compute_uncached(key);
cell.store(&result);
result
}
fn compute_uncached(&self, key: FrameKey) -> Result<Frame> {
let node = self.graph.node(key.node_id).ok_or_else(|| {
render_error(
"render.missing_executor",
format!("render node {} is missing", key.node_id.index()),
)
})?;
validate_frame_number(node, key.frame_number)?;
let executor = self.executors.get(node.id()).ok_or_else(|| {
render_error(
"render.missing_executor",
format!("render executor for node {} is missing", node.id().index()),
)
})?;
match node.kind() {
NodeKind::Source { capabilities, .. } => {
let source_gate = capabilities
.concurrency_limit()
.map(|limit| self.source_gate(node.id(), limit));
let source_guard = source_gate.as_ref().map(|gate| gate.acquire());
let started = Instant::now();
let request = FrameRequest::new(node.id(), key.frame_number, self);
let prepared = executor.prepare(request)?;
let committed = executor.commit(key.frame_number, prepared)?;
drop(source_guard);
self.record_timing(node.id(), started.elapsed());
Ok(committed)
}
NodeKind::Filter { concurrency, .. } => match concurrency {
crate::ConcurrencyClass::OrderedStateful => {
let gate = self.commit_gate(node.id());
let ticket = gate.register(key.frame_number);
let started = Instant::now();
let request = FrameRequest::new(node.id(), key.frame_number, self);
let prepared = executor.prepare(request)?;
ticket.wait_turn();
let committed = executor.commit(key.frame_number, prepared)?;
ticket.finish();
self.record_timing(node.id(), started.elapsed());
Ok(committed)
}
crate::ConcurrencyClass::Stateless | crate::ConcurrencyClass::Source => {
let started = Instant::now();
let request = FrameRequest::new(node.id(), key.frame_number, self);
let prepared = executor.prepare(request)?;
let committed = executor.commit(key.frame_number, prepared)?;
self.record_timing(node.id(), started.elapsed());
Ok(committed)
}
},
}
}
fn commit_gate(&self, node_id: NodeId) -> Arc<OrderedCommitGate> {
let mut gates = lock(&self.ordered_commit_gates);
Arc::clone(
gates
.entry(node_id)
.or_insert_with(|| Arc::new(OrderedCommitGate::new())),
)
}
fn source_gate(&self, node_id: NodeId, limit: usize) -> Arc<ConcurrencyGate> {
let mut gates = lock(&self.source_gates);
Arc::clone(
gates
.entry(node_id)
.or_insert_with(|| Arc::new(ConcurrencyGate::new(limit))),
)
}
fn record_timing(&self, node_id: NodeId, duration: std::time::Duration) {
let duration = if duration.is_zero() {
std::time::Duration::from_nanos(1)
} else {
duration
};
let mut timings = lock(&self.timings);
timings.entry(node_id).or_default().record(duration);
}
}
impl DependencyRequester for SharedScheduler {
fn request_input(
&self,
node_id: NodeId,
output_frame: usize,
input_index: usize,
requested_frame: usize,
) -> Result<Frame> {
let node = self.graph.node(node_id).ok_or_else(|| {
render_error(
"render.missing_executor",
format!("render node {} is missing", node_id.index()),
)
})?;
let NodeKind::Filter {
inputs,
dependencies,
..
} = node.kind()
else {
return Err(render_error(
"render.invalid_input",
"source nodes do not have input frames",
));
};
if !dependencies.allows(output_frame, requested_frame) {
return Err(render_error(
"render.dependency_contract",
format!(
"node {} requested frame {} while producing frame {} outside declared contract",
node_id.index(),
requested_frame,
output_frame
),
));
}
let input = inputs.get(input_index).ok_or_else(|| {
render_error(
"render.invalid_input",
format!(
"node {} input index {} is out of range",
node_id.index(),
input_index
),
)
})?;
self.compute_frame(FrameKey {
node_id: input.node_id(),
frame_number: requested_frame,
})
}
}
struct FrameCell {
state: Mutex<FrameState>,
wake: Condvar,
}
impl FrameCell {
const fn new() -> Self {
Self {
state: Mutex::new(FrameState::Computing),
wake: Condvar::new(),
}
}
fn wait(&self) -> Result<Frame> {
let mut state = lock(&self.state);
loop {
match &*state {
FrameState::Computing => {
state = wait(&self.wake, state);
}
FrameState::Ready(frame) => return Ok(frame.clone()),
FrameState::Failed(error) => return Err(error.to_error()),
}
}
}
fn store(&self, result: &Result<Frame>) {
let mut state = lock(&self.state);
*state = match result {
Ok(frame) => FrameState::Ready(frame.clone()),
Err(error) => FrameState::Failed(StoredError::from_error(error)),
};
drop(state);
self.wake.notify_all();
}
}
enum FrameState {
Computing,
Ready(Frame),
Failed(StoredError),
}
#[derive(Clone)]
struct StoredError {
category: ErrorCategory,
code: ErrorCode,
message: String,
}
impl StoredError {
fn from_error(error: &PixelFlowError) -> Self {
Self {
category: error.category(),
code: error.code(),
message: error.message().to_owned(),
}
}
fn to_error(&self) -> PixelFlowError {
PixelFlowError::new(self.category, self.code, self.message.clone())
}
}
struct ActiveKeyGuard {
key: FrameKey,
}
impl ActiveKeyGuard {
fn enter(key: FrameKey) -> Result<Self> {
ACTIVE_KEYS.with(|active| {
let mut active = active.borrow_mut();
if active.contains(&key) {
return Err(render_error(
"render.cycle",
format!(
"runtime dependency cycle reached node {} frame {}",
key.node_id.index(),
key.frame_number
),
));
}
active.push(key);
Ok(Self { key })
})
}
}
impl Drop for ActiveKeyGuard {
fn drop(&mut self) {
ACTIVE_KEYS.with(|active| {
let mut active = active.borrow_mut();
let popped = active.pop();
debug_assert_eq!(popped, Some(self.key));
});
}
}
fn output_frame_count(graph: &Graph, output_node_id: NodeId) -> Result<usize> {
let node = graph.node(output_node_id).ok_or_else(|| {
render_error(
"render.missing_executor",
format!("output node {} is missing", output_node_id.index()),
)
})?;
match node.media().frame_count() {
FrameCount::Finite(frame_count) => Ok(frame_count),
FrameCount::Unknown => Err(render_error(
"render.frame_out_of_range",
"render requires finite frame count",
)),
}
}
fn validate_frame_number(node: &crate::GraphNode, frame_number: usize) -> Result<()> {
match node.media().frame_count() {
FrameCount::Finite(frame_count) if frame_number < frame_count => Ok(()),
FrameCount::Finite(frame_count) => Err(render_error(
"render.frame_out_of_range",
format!(
"node {} frame {} is outside 0..{}",
node.id().index(),
frame_number,
frame_count
),
)),
FrameCount::Unknown => Err(render_error(
"render.frame_out_of_range",
format!("node {} has unknown frame count", node.id().index()),
)),
}
}
fn render_error(code: &'static str, message: impl Into<String>) -> PixelFlowError {
PixelFlowError::new(ErrorCategory::Core, ErrorCode::new(code), message)
}
fn lock<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
mutex
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
}
fn wait<'a, T>(condvar: &Condvar, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> {
condvar
.wait(guard)
.unwrap_or_else(|poisoned| poisoned.into_inner())
}
#[cfg(test)]
mod tests {
#![expect(clippy::panic, reason = "allow in tests")]
#![expect(clippy::panic_in_result_fn, reason = "allow in tests")]
#![expect(clippy::unwrap_in_result, reason = "allow in tests")]
use std::sync::Arc;
use crate::{
AllocatorConfig, ClipMedia, ErrorCategory, ErrorCode, Frame, FrameBuilder, GraphBuilder,
Metadata, MetadataSchema, MetadataValue, Rational, RenderExecutorMap, RenderOptions,
resolve_format_alias,
};
use super::{FrameExecutor, FrameRequest};
fn media(frame_count: usize) -> ClipMedia {
ClipMedia::fixed(
resolve_format_alias("gray8").expect("format should resolve"),
2,
2,
frame_count,
Rational {
numerator: 24,
denominator: 1,
},
)
}
fn gray_frame() -> Frame {
FrameBuilder::new(
resolve_format_alias("gray8").expect("format should resolve"),
2,
2,
&MetadataSchema::core(),
AllocatorConfig::default(),
)
.expect("frame builder should allocate")
.finish()
}
fn gray_frame_with_number(number: u8) -> Frame {
let schema = MetadataSchema::core();
let mut metadata = Metadata::new(&schema);
metadata
.set(
&schema,
"core:frame_number",
MetadataValue::Int(i64::from(number)),
)
.expect("core frame number should set");
gray_frame().with_metadata(metadata)
}
struct ConstantExecutor;
impl FrameExecutor for ConstantExecutor {
fn prepare(&self, _request: FrameRequest<'_>) -> crate::Result<Frame> {
Ok(gray_frame())
}
}
#[test]
fn render_options_validate_non_empty_range() {
let options = RenderOptions::new(1, Some(3));
let range = options.validate(5).expect("range should be valid");
assert_eq!(range.start(), 1);
assert_eq!(range.end(), 3);
assert_eq!(range.len(), 2);
}
#[test]
fn render_options_reject_start_after_end() {
let error = RenderOptions::new(3, Some(1))
.validate(5)
.expect_err("range should fail");
assert_eq!(error.category(), ErrorCategory::Core);
assert_eq!(error.code(), ErrorCode::new("render.invalid_range"));
}
#[test]
fn render_options_reject_end_past_frame_count() {
let error = RenderOptions::new(0, Some(6))
.validate(5)
.expect_err("range should fail");
assert_eq!(error.category(), ErrorCategory::Core);
assert_eq!(error.code(), ErrorCode::new("render.frame_out_of_range"));
}
#[test]
fn executor_map_records_node_executor() {
let mut builder = GraphBuilder::new();
let source = builder.source("source", media(2));
let graph = builder.build();
let mut executors = RenderExecutorMap::new();
executors.insert(source.node_id(), Arc::new(ConstantExecutor));
assert!(graph.node(source.node_id()).is_some());
assert!(executors.contains(source.node_id()));
}
#[test]
fn duplicate_requests_for_same_node_frame_are_coalesced() {
use std::sync::atomic::{AtomicUsize, Ordering};
struct CountingSource {
calls: AtomicUsize,
}
impl FrameExecutor for CountingSource {
fn prepare(&self, _request: FrameRequest<'_>) -> crate::Result<Frame> {
self.calls.fetch_add(1, Ordering::SeqCst);
Ok(gray_frame())
}
}
struct DoubleRequestFilter;
impl FrameExecutor for DoubleRequestFilter {
fn prepare(&self, request: FrameRequest<'_>) -> crate::Result<Frame> {
let first = request.input_frame(0, request.frame_number())?;
let second = request.input_frame(0, request.frame_number())?;
assert!(first.shares_plane_storage(&second, 0));
Ok(first)
}
}
let mut builder = GraphBuilder::new();
let source = builder.source("source", media(3));
let filtered = builder
.filter(
"double",
&[source],
media(3),
crate::FilterCompatibility::Preserve,
)
.expect("filter should be added");
builder.set_output(filtered);
let graph = builder.build();
let source_exec = Arc::new(CountingSource {
calls: AtomicUsize::new(0),
});
let mut executors = RenderExecutorMap::new();
executors.insert(source.node_id(), source_exec.clone());
executors.insert(filtered.node_id(), Arc::new(DoubleRequestFilter));
let mut render = crate::RenderEngine::new(crate::WorkerPoolConfig::new(2))
.render_ordered(graph, executors, RenderOptions::new(0, Some(1)))
.expect("render should start");
assert_eq!(
render.next().expect("one item").expect("frame ok").width(),
2
);
assert_eq!(source_exec.calls.load(Ordering::SeqCst), 1);
}
#[test]
fn dependency_contract_violation_returns_structured_error() {
struct FutureRequestFilter;
impl FrameExecutor for FutureRequestFilter {
fn prepare(&self, request: FrameRequest<'_>) -> crate::Result<Frame> {
request.input_frame(0, request.frame_number() + 1)
}
}
let mut builder = GraphBuilder::new();
let source = builder.source("source", media(3));
let filtered = builder
.filter(
"bad",
&[source],
media(3),
crate::FilterCompatibility::Preserve,
)
.expect("filter should be added");
builder.set_output(filtered);
let graph = builder.build();
let mut executors = RenderExecutorMap::new();
executors.insert(source.node_id(), Arc::new(ConstantExecutor));
executors.insert(filtered.node_id(), Arc::new(FutureRequestFilter));
let mut render = crate::RenderEngine::new(crate::WorkerPoolConfig::new(1))
.render_ordered(graph, executors, RenderOptions::new(0, Some(1)))
.expect("render should start");
let Err(error) = render.next().expect("one item") else {
panic!("contract should fail")
};
assert_eq!(error.category(), ErrorCategory::Core);
assert_eq!(error.code(), ErrorCode::new("render.dependency_contract"));
}
#[test]
fn render_api_yields_frames_in_order_when_tasks_finish_out_of_order() {
use std::thread;
use std::time::Duration;
struct SlowEvenSource;
impl FrameExecutor for SlowEvenSource {
fn prepare(&self, request: FrameRequest<'_>) -> crate::Result<Frame> {
if request.frame_number() == 0 {
thread::sleep(Duration::from_millis(30));
}
Ok(gray_frame_with_number(request.frame_number() as u8))
}
}
let mut builder = GraphBuilder::new();
let source = builder.source("source", media(2));
builder.set_output(source);
let graph = builder.build();
let mut executors = RenderExecutorMap::new();
executors.insert(source.node_id(), Arc::new(SlowEvenSource));
let mut render = crate::RenderEngine::new(crate::WorkerPoolConfig::new(2))
.render_ordered(graph, executors, RenderOptions::new(0, Some(2)))
.expect("render should start");
let first = render.next().expect("first item").expect("first frame");
let second = render.next().expect("second item").expect("second frame");
assert_eq!(
first.metadata().get("core:frame_number"),
Some(&crate::MetadataValue::Int(0))
);
assert_eq!(
second.metadata().get("core:frame_number"),
Some(&crate::MetadataValue::Int(1))
);
assert!(render.next().is_none());
}
#[test]
fn render_engine_respects_configured_worker_count() {
let engine = crate::RenderEngine::new(crate::WorkerPoolConfig::new(3));
assert_eq!(engine.worker_threads(), 3);
}
#[test]
fn ordered_stateful_filter_commits_in_increasing_frame_order() {
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
struct OrderedRecorder {
commits: Arc<Mutex<Vec<usize>>>,
}
impl FrameExecutor for OrderedRecorder {
fn prepare(&self, request: FrameRequest<'_>) -> crate::Result<Frame> {
if request.frame_number() == 0 {
thread::sleep(Duration::from_millis(30));
}
request.input_frame(0, request.frame_number())
}
fn commit(&self, frame_number: usize, frame: Frame) -> crate::Result<Frame> {
self.commits
.lock()
.expect("commit log lock")
.push(frame_number);
Ok(frame)
}
}
let commits = Arc::new(Mutex::new(Vec::new()));
let mut builder = GraphBuilder::new();
let source = builder.source("source", media(2));
let filtered = builder
.filter_with_schedule(
"ordered",
&[source],
media(2),
crate::FilterCompatibility::Preserve,
crate::DependencyPattern::same_frame(),
crate::ConcurrencyClass::OrderedStateful,
)
.expect("filter should be added");
builder.set_output(filtered);
let graph = builder.build();
let mut executors = RenderExecutorMap::new();
executors.insert(source.node_id(), Arc::new(ConstantExecutor));
executors.insert(
filtered.node_id(),
Arc::new(OrderedRecorder {
commits: commits.clone(),
}),
);
let frames: Vec<_> = crate::RenderEngine::new(crate::WorkerPoolConfig::new(2))
.render_ordered(graph, executors, RenderOptions::new(0, Some(2)))
.expect("render should start")
.collect::<crate::Result<Vec<_>>>()
.expect("render should succeed");
assert_eq!(frames.len(), 2);
assert_eq!(*commits.lock().expect("commit log lock"), vec![0, 1]);
}
#[test]
fn source_concurrency_limit_is_enforced() {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;
struct ConcurrentSource {
active: AtomicUsize,
max_seen: AtomicUsize,
}
impl FrameExecutor for ConcurrentSource {
fn prepare(&self, _request: FrameRequest<'_>) -> crate::Result<Frame> {
let active = self.active.fetch_add(1, Ordering::SeqCst) + 1;
self.max_seen.fetch_max(active, Ordering::SeqCst);
thread::sleep(Duration::from_millis(10));
self.active.fetch_sub(1, Ordering::SeqCst);
Ok(gray_frame())
}
}
let source_exec = Arc::new(ConcurrentSource {
active: AtomicUsize::new(0),
max_seen: AtomicUsize::new(0),
});
let mut builder = GraphBuilder::new();
let source = builder.source_with_capabilities(
"source",
media(4),
crate::SourceCapabilities::random_access().with_concurrency_limit(1),
);
builder.set_output(source);
let graph = builder.build();
let mut executors = RenderExecutorMap::new();
executors.insert(source.node_id(), source_exec.clone());
crate::RenderEngine::new(crate::WorkerPoolConfig::new(4))
.render_ordered(graph, executors, RenderOptions::new(0, Some(4)))
.expect("render should start")
.collect::<crate::Result<Vec<_>>>()
.expect("render should succeed");
assert_eq!(source_exec.max_seen.load(Ordering::SeqCst), 1);
}
#[test]
fn render_exposes_aggregate_timing_by_node() {
let mut builder = GraphBuilder::new();
let source = builder.source("source", media(2));
builder.set_output(source);
let graph = builder.build();
let mut executors = RenderExecutorMap::new();
executors.insert(source.node_id(), Arc::new(ConstantExecutor));
let mut render = crate::RenderEngine::new(crate::WorkerPoolConfig::new(2))
.render_ordered(graph, executors, RenderOptions::new(0, Some(2)))
.expect("render should start");
render
.by_ref()
.collect::<crate::Result<Vec<_>>>()
.expect("render should succeed");
let report = render.timing_report();
let timing = report.get(source.node_id()).expect("source timing exists");
assert_eq!(timing.frames(), 2);
assert!(timing.total() > std::time::Duration::ZERO);
}
}