1use std::sync::{Arc, Condvar, Mutex};
4
5use pixelflow_core::{
6 ErrorCategory, ErrorCode, Frame, FrameExecutor, FrameRequest, Metadata, MetadataSchema,
7 MetadataValue, NodeId, PixelFlowError, Result,
8};
9
10use crate::synthetic_u8_frame;
11
12#[derive(Clone, Debug, Eq, PartialEq)]
14pub enum TraceEventKind {
15 PrepareStart,
17 PrepareEnd,
19 DependencyRequest {
21 input_index: usize,
23 requested_frame: usize,
25 },
26 Commit,
28 Output,
30}
31
32#[derive(Clone, Debug, Eq, PartialEq)]
34pub struct TraceEvent {
35 pub node_id: Option<NodeId>,
37 pub frame_number: usize,
39 pub kind: TraceEventKind,
41}
42
43#[derive(Clone, Default)]
45pub struct TraceRecorder {
46 events: Arc<Mutex<Vec<TraceEvent>>>,
47}
48
49impl TraceRecorder {
50 pub fn record(&self, event: TraceEvent) {
52 self.events.lock().expect("trace lock poisoned").push(event);
53 }
54
55 #[must_use]
57 pub fn events(&self) -> Vec<TraceEvent> {
58 self.events.lock().expect("trace lock poisoned").clone()
59 }
60}
61
62#[derive(Default)]
63struct GateState {
64 started: Vec<usize>,
65 finished: Vec<usize>,
66}
67
68#[derive(Clone)]
70pub struct TraceGate {
71 blocked_frame: usize,
72 wait_for_frame: usize,
73 state: Arc<(Mutex<GateState>, Condvar)>,
74}
75
76impl TraceGate {
77 #[must_use]
79 pub fn new(blocked_frame: usize, wait_for_frame: usize) -> Self {
80 Self {
81 blocked_frame,
82 wait_for_frame,
83 state: Arc::new((Mutex::new(GateState::default()), Condvar::new())),
84 }
85 }
86
87 fn started(&self, frame_number: usize) {
88 let (lock, condvar) = &*self.state;
89 let mut state = lock.lock().expect("trace gate lock poisoned");
90 state.started.push(frame_number);
91 condvar.notify_all();
92 }
93
94 fn wait_for_other_frame_to_finish(&self, frame_number: usize) {
95 if frame_number != self.blocked_frame {
96 return;
97 }
98
99 let (lock, condvar) = &*self.state;
100 let mut state = lock.lock().expect("trace gate lock poisoned");
101
102 while !state.finished.contains(&self.wait_for_frame) {
103 state = condvar.wait(state).expect("trace gate wait poisoned");
104 }
105 }
106
107 fn finished(&self, frame_number: usize) {
108 let (lock, condvar) = &*self.state;
109 let mut state = lock.lock().expect("trace gate lock poisoned");
110
111 state.finished.push(frame_number);
112 condvar.notify_all();
113
114 if frame_number == self.blocked_frame {
115 while !state.started.contains(&self.wait_for_frame) {
116 state = condvar.wait(state).expect("trace gate wait poisoned");
117 }
118 }
119 }
120}
121
122pub struct TraceSource {
124 recorder: TraceRecorder,
125 gate: Option<TraceGate>,
126}
127
128impl TraceSource {
129 #[must_use]
131 pub const fn new(recorder: TraceRecorder) -> Self {
132 Self {
133 recorder,
134 gate: None,
135 }
136 }
137
138 #[must_use]
140 pub fn with_gate(mut self, gate: TraceGate) -> Self {
141 self.gate = Some(gate);
142 self
143 }
144}
145
146impl FrameExecutor for TraceSource {
147 fn prepare(&self, request: FrameRequest<'_>) -> Result<Frame> {
148 if let Some(gate) = &self.gate {
149 gate.started(request.frame_number());
150 }
151
152 self.recorder.record(TraceEvent {
153 node_id: Some(request.node_id()),
154 frame_number: request.frame_number(),
155 kind: TraceEventKind::PrepareStart,
156 });
157
158 let frame_number = request.frame_number();
159 let mut frame = synthetic_u8_frame("gray8", 1, 1, |_plane, _x, _y| {
160 u8::try_from(frame_number).expect("trace frame number fits u8")
161 })?;
162 let schema = MetadataSchema::core();
163 let mut metadata = Metadata::new(&schema);
164 metadata.set(
165 &schema,
166 "core:frame_number",
167 MetadataValue::Int(i64::try_from(frame_number).expect("trace frame number fits i64")),
168 )?;
169 frame = frame.with_metadata(metadata);
170
171 if let Some(gate) = &self.gate {
172 gate.wait_for_other_frame_to_finish(request.frame_number());
173 }
174
175 self.recorder.record(TraceEvent {
176 node_id: Some(request.node_id()),
177 frame_number: request.frame_number(),
178 kind: TraceEventKind::PrepareEnd,
179 });
180
181 if let Some(gate) = &self.gate {
182 gate.finished(request.frame_number());
183 }
184
185 Ok(frame)
186 }
187}
188
189pub struct TracePassthroughFilter {
191 recorder: TraceRecorder,
192 request_offset: isize,
193}
194
195impl TracePassthroughFilter {
196 #[must_use]
198 pub const fn same_frame(recorder: TraceRecorder) -> Self {
199 Self {
200 recorder,
201 request_offset: 0,
202 }
203 }
204
205 #[must_use]
207 pub const fn with_request_offset(recorder: TraceRecorder, request_offset: isize) -> Self {
208 Self {
209 recorder,
210 request_offset,
211 }
212 }
213}
214
215impl FrameExecutor for TracePassthroughFilter {
216 fn prepare(&self, request: FrameRequest<'_>) -> Result<Frame> {
217 let requested_frame = request
218 .frame_number()
219 .checked_add_signed(self.request_offset)
220 .ok_or_else(|| {
221 PixelFlowError::new(
222 ErrorCategory::Core,
223 ErrorCode::new("test.invalid_trace_request"),
224 "trace dependency request underflowed",
225 )
226 })?;
227
228 self.recorder.record(TraceEvent {
229 node_id: Some(request.node_id()),
230 frame_number: request.frame_number(),
231 kind: TraceEventKind::DependencyRequest {
232 input_index: 0,
233 requested_frame,
234 },
235 });
236
237 request.input_frame(0, requested_frame)
238 }
239
240 fn commit(&self, frame_number: usize, frame: Frame) -> Result<Frame> {
241 self.recorder.record(TraceEvent {
242 node_id: None,
243 frame_number,
244 kind: TraceEventKind::Commit,
245 });
246 Ok(frame)
247 }
248}
249
250#[cfg(test)]
251mod tests {
252 use std::sync::Arc;
253
254 use pixelflow_core::{
255 ClipMedia, ConcurrencyClass, DependencyPattern, ErrorCode, FilterCompatibility,
256 GraphBuilder, MetadataValue, Rational, RenderEngine, RenderExecutorMap, RenderOptions,
257 SourceCapabilities, WorkerPoolConfig, resolve_format_alias,
258 };
259
260 use super::{
261 TraceEvent, TraceEventKind, TraceGate, TracePassthroughFilter, TraceRecorder, TraceSource,
262 };
263
264 fn media(frames: usize) -> ClipMedia {
265 ClipMedia::fixed(
266 resolve_format_alias("gray8").expect("gray8 exists"),
267 1,
268 1,
269 frames,
270 Rational {
271 numerator: 24,
272 denominator: 1,
273 },
274 )
275 }
276
277 #[test]
278 fn trace_verifies_out_of_order_completion_and_ordered_output() {
279 let recorder = TraceRecorder::default();
280 let gate = TraceGate::new(0, 1);
281 let mut builder = GraphBuilder::new();
282 let source = builder.source_with_capabilities(
283 "trace",
284 media(2),
285 SourceCapabilities::random_access().with_concurrency_limit(2),
286 );
287 builder.set_output(source);
288 let graph = builder.build();
289 let mut executors = RenderExecutorMap::new();
290 executors.insert(
291 source.node_id(),
292 Arc::new(TraceSource::new(recorder.clone()).with_gate(gate)),
293 );
294
295 let mut render = RenderEngine::new(WorkerPoolConfig::new(2))
296 .render_ordered(graph, executors, RenderOptions::new(0, Some(2)))
297 .expect("render should start");
298 let first = render
299 .next()
300 .expect("first output exists")
301 .expect("frame ok");
302 recorder.record(TraceEvent {
303 node_id: None,
304 frame_number: 0,
305 kind: TraceEventKind::Output,
306 });
307 let second = render
308 .next()
309 .expect("second output exists")
310 .expect("frame ok");
311 recorder.record(TraceEvent {
312 node_id: None,
313 frame_number: 1,
314 kind: TraceEventKind::Output,
315 });
316
317 assert_eq!(
318 first.metadata().get("core:frame_number"),
319 Some(&MetadataValue::Int(0))
320 );
321 assert_eq!(
322 second.metadata().get("core:frame_number"),
323 Some(&MetadataValue::Int(1))
324 );
325
326 let output_frames = recorder
327 .events()
328 .into_iter()
329 .filter(|event| event.kind == TraceEventKind::Output)
330 .map(|event| event.frame_number)
331 .collect::<Vec<_>>();
332 assert_eq!(output_frames, vec![0, 1]);
333
334 let prepare_end_frames = recorder
335 .events()
336 .into_iter()
337 .filter(|event| event.kind == TraceEventKind::PrepareEnd)
338 .map(|event| event.frame_number)
339 .collect::<Vec<_>>();
340 assert_eq!(prepare_end_frames, vec![1, 0]);
341 }
342
343 #[test]
344 fn trace_verifies_dependency_declarations() {
345 let recorder = TraceRecorder::default();
346 let mut builder = GraphBuilder::new();
347 let source = builder.source("trace", media(3));
348 let filtered = builder
349 .filter_with_schedule(
350 "future",
351 &[source],
352 media(3),
353 FilterCompatibility::Preserve,
354 DependencyPattern::window(0, 1),
355 ConcurrencyClass::Stateless,
356 )
357 .expect("filter should build");
358 builder.set_output(filtered);
359 let graph = builder.build();
360 let mut executors = RenderExecutorMap::new();
361 executors.insert(
362 source.node_id(),
363 Arc::new(TraceSource::new(recorder.clone())),
364 );
365 executors.insert(
366 filtered.node_id(),
367 Arc::new(TracePassthroughFilter::with_request_offset(
368 recorder.clone(),
369 1,
370 )),
371 );
372
373 RenderEngine::new(WorkerPoolConfig::new(1))
374 .render_ordered(graph, executors, RenderOptions::new(0, Some(1)))
375 .expect("render should start")
376 .collect::<pixelflow_core::Result<Vec<_>>>()
377 .expect("declared future dependency should succeed");
378
379 assert!(recorder.events().iter().any(|event| {
380 matches!(
381 &event.kind,
382 TraceEventKind::DependencyRequest {
383 input_index: 0,
384 requested_frame: 1,
385 }
386 )
387 }));
388 }
389
390 #[test]
391 fn trace_reports_dependency_contract_violation() {
392 let recorder = TraceRecorder::default();
393 let mut builder = GraphBuilder::new();
394 let source = builder.source("trace", media(3));
395 let filtered = builder
396 .filter_with_schedule(
397 "bad",
398 &[source],
399 media(3),
400 FilterCompatibility::Preserve,
401 DependencyPattern::same_frame(),
402 ConcurrencyClass::Stateless,
403 )
404 .expect("filter should build");
405 builder.set_output(filtered);
406 let graph = builder.build();
407 let mut executors = RenderExecutorMap::new();
408 executors.insert(
409 source.node_id(),
410 Arc::new(TraceSource::new(recorder.clone())),
411 );
412 executors.insert(
413 filtered.node_id(),
414 Arc::new(TracePassthroughFilter::with_request_offset(recorder, 1)),
415 );
416
417 let mut render = RenderEngine::new(WorkerPoolConfig::new(1))
418 .render_ordered(graph, executors, RenderOptions::new(0, Some(1)))
419 .expect("render should start");
420 let Err(error) = render.next().expect("one output exists") else {
421 panic!("future request should violate same-frame contract");
422 };
423
424 assert_eq!(error.code(), ErrorCode::new("render.dependency_contract"));
425 }
426
427 #[test]
428 fn same_frame_passthrough_records_commit() {
429 let recorder = TraceRecorder::default();
430 let mut builder = GraphBuilder::new();
431 let source = builder.source("trace", media(1));
432 let filtered = builder
433 .filter_with_schedule(
434 "same",
435 &[source],
436 media(1),
437 FilterCompatibility::Preserve,
438 DependencyPattern::same_frame(),
439 ConcurrencyClass::Stateless,
440 )
441 .expect("filter should build");
442 builder.set_output(filtered);
443 let graph = builder.build();
444 let mut executors = RenderExecutorMap::new();
445 executors.insert(
446 source.node_id(),
447 Arc::new(TraceSource::new(recorder.clone())),
448 );
449 executors.insert(
450 filtered.node_id(),
451 Arc::new(TracePassthroughFilter::same_frame(recorder.clone())),
452 );
453
454 RenderEngine::new(WorkerPoolConfig::new(1))
455 .render_ordered(graph, executors, RenderOptions::new(0, Some(1)))
456 .expect("render should start")
457 .collect::<pixelflow_core::Result<Vec<_>>>()
458 .expect("same-frame trace should render");
459
460 assert!(
461 recorder
462 .events()
463 .iter()
464 .any(|event| event.kind == TraceEventKind::Commit)
465 );
466 }
467}