use std::sync::{Arc, Condvar, Mutex};
use pixelflow_core::{
ErrorCategory, ErrorCode, Frame, FrameExecutor, FrameRequest, Metadata, MetadataSchema,
MetadataValue, NodeId, PixelFlowError, Result,
};
use crate::synthetic_u8_frame;
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum TraceEventKind {
PrepareStart,
PrepareEnd,
DependencyRequest {
input_index: usize,
requested_frame: usize,
},
Commit,
Output,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct TraceEvent {
pub node_id: Option<NodeId>,
pub frame_number: usize,
pub kind: TraceEventKind,
}
#[derive(Clone, Default)]
pub struct TraceRecorder {
events: Arc<Mutex<Vec<TraceEvent>>>,
}
impl TraceRecorder {
pub fn record(&self, event: TraceEvent) {
self.events.lock().expect("trace lock poisoned").push(event);
}
#[must_use]
pub fn events(&self) -> Vec<TraceEvent> {
self.events.lock().expect("trace lock poisoned").clone()
}
}
#[derive(Default)]
struct GateState {
started: Vec<usize>,
finished: Vec<usize>,
}
#[derive(Clone)]
pub struct TraceGate {
blocked_frame: usize,
wait_for_frame: usize,
state: Arc<(Mutex<GateState>, Condvar)>,
}
impl TraceGate {
#[must_use]
pub fn new(blocked_frame: usize, wait_for_frame: usize) -> Self {
Self {
blocked_frame,
wait_for_frame,
state: Arc::new((Mutex::new(GateState::default()), Condvar::new())),
}
}
fn started(&self, frame_number: usize) {
let (lock, condvar) = &*self.state;
let mut state = lock.lock().expect("trace gate lock poisoned");
state.started.push(frame_number);
condvar.notify_all();
}
fn wait_for_other_frame_to_finish(&self, frame_number: usize) {
if frame_number != self.blocked_frame {
return;
}
let (lock, condvar) = &*self.state;
let mut state = lock.lock().expect("trace gate lock poisoned");
while !state.finished.contains(&self.wait_for_frame) {
state = condvar.wait(state).expect("trace gate wait poisoned");
}
}
fn finished(&self, frame_number: usize) {
let (lock, condvar) = &*self.state;
let mut state = lock.lock().expect("trace gate lock poisoned");
state.finished.push(frame_number);
condvar.notify_all();
if frame_number == self.blocked_frame {
while !state.started.contains(&self.wait_for_frame) {
state = condvar.wait(state).expect("trace gate wait poisoned");
}
}
}
}
pub struct TraceSource {
recorder: TraceRecorder,
gate: Option<TraceGate>,
}
impl TraceSource {
#[must_use]
pub const fn new(recorder: TraceRecorder) -> Self {
Self {
recorder,
gate: None,
}
}
#[must_use]
pub fn with_gate(mut self, gate: TraceGate) -> Self {
self.gate = Some(gate);
self
}
}
impl FrameExecutor for TraceSource {
fn prepare(&self, request: FrameRequest<'_>) -> Result<Frame> {
if let Some(gate) = &self.gate {
gate.started(request.frame_number());
}
self.recorder.record(TraceEvent {
node_id: Some(request.node_id()),
frame_number: request.frame_number(),
kind: TraceEventKind::PrepareStart,
});
let frame_number = request.frame_number();
let mut frame = synthetic_u8_frame("gray8", 1, 1, |_plane, _x, _y| {
u8::try_from(frame_number).expect("trace frame number fits u8")
})?;
let schema = MetadataSchema::core();
let mut metadata = Metadata::new(&schema);
metadata.set(
&schema,
"core:frame_number",
MetadataValue::Int(i64::try_from(frame_number).expect("trace frame number fits i64")),
)?;
frame = frame.with_metadata(metadata);
if let Some(gate) = &self.gate {
gate.wait_for_other_frame_to_finish(request.frame_number());
}
self.recorder.record(TraceEvent {
node_id: Some(request.node_id()),
frame_number: request.frame_number(),
kind: TraceEventKind::PrepareEnd,
});
if let Some(gate) = &self.gate {
gate.finished(request.frame_number());
}
Ok(frame)
}
}
pub struct TracePassthroughFilter {
recorder: TraceRecorder,
request_offset: isize,
}
impl TracePassthroughFilter {
#[must_use]
pub const fn same_frame(recorder: TraceRecorder) -> Self {
Self {
recorder,
request_offset: 0,
}
}
#[must_use]
pub const fn with_request_offset(recorder: TraceRecorder, request_offset: isize) -> Self {
Self {
recorder,
request_offset,
}
}
}
impl FrameExecutor for TracePassthroughFilter {
fn prepare(&self, request: FrameRequest<'_>) -> Result<Frame> {
let requested_frame = request
.frame_number()
.checked_add_signed(self.request_offset)
.ok_or_else(|| {
PixelFlowError::new(
ErrorCategory::Core,
ErrorCode::new("test.invalid_trace_request"),
"trace dependency request underflowed",
)
})?;
self.recorder.record(TraceEvent {
node_id: Some(request.node_id()),
frame_number: request.frame_number(),
kind: TraceEventKind::DependencyRequest {
input_index: 0,
requested_frame,
},
});
request.input_frame(0, requested_frame)
}
fn commit(&self, frame_number: usize, frame: Frame) -> Result<Frame> {
self.recorder.record(TraceEvent {
node_id: None,
frame_number,
kind: TraceEventKind::Commit,
});
Ok(frame)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use pixelflow_core::{
ClipMedia, ConcurrencyClass, DependencyPattern, ErrorCode, FilterCompatibility,
GraphBuilder, MetadataValue, Rational, RenderEngine, RenderExecutorMap, RenderOptions,
SourceCapabilities, WorkerPoolConfig, resolve_format_alias,
};
use super::{
TraceEvent, TraceEventKind, TraceGate, TracePassthroughFilter, TraceRecorder, TraceSource,
};
fn media(frames: usize) -> ClipMedia {
ClipMedia::fixed(
resolve_format_alias("gray8").expect("gray8 exists"),
1,
1,
frames,
Rational {
numerator: 24,
denominator: 1,
},
)
}
#[test]
fn trace_verifies_out_of_order_completion_and_ordered_output() {
let recorder = TraceRecorder::default();
let gate = TraceGate::new(0, 1);
let mut builder = GraphBuilder::new();
let source = builder.source_with_capabilities(
"trace",
media(2),
SourceCapabilities::random_access().with_concurrency_limit(2),
);
builder.set_output(source);
let graph = builder.build();
let mut executors = RenderExecutorMap::new();
executors.insert(
source.node_id(),
Arc::new(TraceSource::new(recorder.clone()).with_gate(gate)),
);
let mut render = RenderEngine::new(WorkerPoolConfig::new(2))
.render_ordered(graph, executors, RenderOptions::new(0, Some(2)))
.expect("render should start");
let first = render
.next()
.expect("first output exists")
.expect("frame ok");
recorder.record(TraceEvent {
node_id: None,
frame_number: 0,
kind: TraceEventKind::Output,
});
let second = render
.next()
.expect("second output exists")
.expect("frame ok");
recorder.record(TraceEvent {
node_id: None,
frame_number: 1,
kind: TraceEventKind::Output,
});
assert_eq!(
first.metadata().get("core:frame_number"),
Some(&MetadataValue::Int(0))
);
assert_eq!(
second.metadata().get("core:frame_number"),
Some(&MetadataValue::Int(1))
);
let output_frames = recorder
.events()
.into_iter()
.filter(|event| event.kind == TraceEventKind::Output)
.map(|event| event.frame_number)
.collect::<Vec<_>>();
assert_eq!(output_frames, vec![0, 1]);
let prepare_end_frames = recorder
.events()
.into_iter()
.filter(|event| event.kind == TraceEventKind::PrepareEnd)
.map(|event| event.frame_number)
.collect::<Vec<_>>();
assert_eq!(prepare_end_frames, vec![1, 0]);
}
#[test]
fn trace_verifies_dependency_declarations() {
let recorder = TraceRecorder::default();
let mut builder = GraphBuilder::new();
let source = builder.source("trace", media(3));
let filtered = builder
.filter_with_schedule(
"future",
&[source],
media(3),
FilterCompatibility::Preserve,
DependencyPattern::window(0, 1),
ConcurrencyClass::Stateless,
)
.expect("filter should build");
builder.set_output(filtered);
let graph = builder.build();
let mut executors = RenderExecutorMap::new();
executors.insert(
source.node_id(),
Arc::new(TraceSource::new(recorder.clone())),
);
executors.insert(
filtered.node_id(),
Arc::new(TracePassthroughFilter::with_request_offset(
recorder.clone(),
1,
)),
);
RenderEngine::new(WorkerPoolConfig::new(1))
.render_ordered(graph, executors, RenderOptions::new(0, Some(1)))
.expect("render should start")
.collect::<pixelflow_core::Result<Vec<_>>>()
.expect("declared future dependency should succeed");
assert!(recorder.events().iter().any(|event| {
matches!(
&event.kind,
TraceEventKind::DependencyRequest {
input_index: 0,
requested_frame: 1,
}
)
}));
}
#[test]
fn trace_reports_dependency_contract_violation() {
let recorder = TraceRecorder::default();
let mut builder = GraphBuilder::new();
let source = builder.source("trace", media(3));
let filtered = builder
.filter_with_schedule(
"bad",
&[source],
media(3),
FilterCompatibility::Preserve,
DependencyPattern::same_frame(),
ConcurrencyClass::Stateless,
)
.expect("filter should build");
builder.set_output(filtered);
let graph = builder.build();
let mut executors = RenderExecutorMap::new();
executors.insert(
source.node_id(),
Arc::new(TraceSource::new(recorder.clone())),
);
executors.insert(
filtered.node_id(),
Arc::new(TracePassthroughFilter::with_request_offset(recorder, 1)),
);
let mut render = RenderEngine::new(WorkerPoolConfig::new(1))
.render_ordered(graph, executors, RenderOptions::new(0, Some(1)))
.expect("render should start");
let Err(error) = render.next().expect("one output exists") else {
panic!("future request should violate same-frame contract");
};
assert_eq!(error.code(), ErrorCode::new("render.dependency_contract"));
}
#[test]
fn same_frame_passthrough_records_commit() {
let recorder = TraceRecorder::default();
let mut builder = GraphBuilder::new();
let source = builder.source("trace", media(1));
let filtered = builder
.filter_with_schedule(
"same",
&[source],
media(1),
FilterCompatibility::Preserve,
DependencyPattern::same_frame(),
ConcurrencyClass::Stateless,
)
.expect("filter should build");
builder.set_output(filtered);
let graph = builder.build();
let mut executors = RenderExecutorMap::new();
executors.insert(
source.node_id(),
Arc::new(TraceSource::new(recorder.clone())),
);
executors.insert(
filtered.node_id(),
Arc::new(TracePassthroughFilter::same_frame(recorder.clone())),
);
RenderEngine::new(WorkerPoolConfig::new(1))
.render_ordered(graph, executors, RenderOptions::new(0, Some(1)))
.expect("render should start")
.collect::<pixelflow_core::Result<Vec<_>>>()
.expect("same-frame trace should render");
assert!(
recorder
.events()
.iter()
.any(|event| event.kind == TraceEventKind::Commit)
);
}
}