Skip to main content

pixelflow_test_support/
scheduler_trace.rs

1//! Deterministic scheduler trace helpers.
2
3use std::sync::{Arc, Condvar, Mutex};
4
5use pixelflow_core::{
6    ErrorCategory, ErrorCode, Frame, FrameExecutor, FrameRequest, Metadata, MetadataSchema,
7    MetadataValue, NodeId, PixelFlowError, Result,
8};
9
10use crate::synthetic_u8_frame;
11
12/// Kind of scheduler event recorded by trace executors.
13#[derive(Clone, Debug, Eq, PartialEq)]
14pub enum TraceEventKind {
15    /// Executor `prepare` started.
16    PrepareStart,
17    /// Executor `prepare` completed.
18    PrepareEnd,
19    /// Executor requested an input dependency.
20    DependencyRequest {
21        /// Input slot requested.
22        input_index: usize,
23        /// Upstream frame requested.
24        requested_frame: usize,
25    },
26    /// Executor `commit` completed.
27    Commit,
28    /// Test observed ordered output frame.
29    Output,
30}
31
32/// One scheduler trace event.
33#[derive(Clone, Debug, Eq, PartialEq)]
34pub struct TraceEvent {
35    /// Node that emitted event.
36    pub node_id: Option<NodeId>,
37    /// Output frame associated with event.
38    pub frame_number: usize,
39    /// Event kind.
40    pub kind: TraceEventKind,
41}
42
43/// Thread-safe recorder for scheduler trace events.
44#[derive(Clone, Default)]
45pub struct TraceRecorder {
46    events: Arc<Mutex<Vec<TraceEvent>>>,
47}
48
49impl TraceRecorder {
50    /// Records one event.
51    pub fn record(&self, event: TraceEvent) {
52        self.events.lock().expect("trace lock poisoned").push(event);
53    }
54
55    /// Returns recorded events in insertion order.
56    #[must_use]
57    pub fn events(&self) -> Vec<TraceEvent> {
58        self.events.lock().expect("trace lock poisoned").clone()
59    }
60}
61
62#[derive(Default)]
63struct GateState {
64    started: Vec<usize>,
65    finished: Vec<usize>,
66}
67
68/// Deterministic gate used to force one frame to finish after another starts.
69#[derive(Clone)]
70pub struct TraceGate {
71    blocked_frame: usize,
72    wait_for_frame: usize,
73    state: Arc<(Mutex<GateState>, Condvar)>,
74}
75
76impl TraceGate {
77    /// Creates gate where `blocked_frame` waits until `wait_for_frame` enters.
78    #[must_use]
79    pub fn new(blocked_frame: usize, wait_for_frame: usize) -> Self {
80        Self {
81            blocked_frame,
82            wait_for_frame,
83            state: Arc::new((Mutex::new(GateState::default()), Condvar::new())),
84        }
85    }
86
87    fn started(&self, frame_number: usize) {
88        let (lock, condvar) = &*self.state;
89        let mut state = lock.lock().expect("trace gate lock poisoned");
90        state.started.push(frame_number);
91        condvar.notify_all();
92    }
93
94    fn wait_for_other_frame_to_finish(&self, frame_number: usize) {
95        if frame_number != self.blocked_frame {
96            return;
97        }
98
99        let (lock, condvar) = &*self.state;
100        let mut state = lock.lock().expect("trace gate lock poisoned");
101
102        while !state.finished.contains(&self.wait_for_frame) {
103            state = condvar.wait(state).expect("trace gate wait poisoned");
104        }
105    }
106
107    fn finished(&self, frame_number: usize) {
108        let (lock, condvar) = &*self.state;
109        let mut state = lock.lock().expect("trace gate lock poisoned");
110
111        state.finished.push(frame_number);
112        condvar.notify_all();
113
114        if frame_number == self.blocked_frame {
115            while !state.started.contains(&self.wait_for_frame) {
116                state = condvar.wait(state).expect("trace gate wait poisoned");
117            }
118        }
119    }
120}
121
122/// Source executor that records prepare ordering and tags frame metadata.
123pub struct TraceSource {
124    recorder: TraceRecorder,
125    gate: Option<TraceGate>,
126}
127
128impl TraceSource {
129    /// Creates trace source without gating.
130    #[must_use]
131    pub const fn new(recorder: TraceRecorder) -> Self {
132        Self {
133            recorder,
134            gate: None,
135        }
136    }
137
138    /// Adds deterministic gate to source.
139    #[must_use]
140    pub fn with_gate(mut self, gate: TraceGate) -> Self {
141        self.gate = Some(gate);
142        self
143    }
144}
145
146impl FrameExecutor for TraceSource {
147    fn prepare(&self, request: FrameRequest<'_>) -> Result<Frame> {
148        if let Some(gate) = &self.gate {
149            gate.started(request.frame_number());
150        }
151
152        self.recorder.record(TraceEvent {
153            node_id: Some(request.node_id()),
154            frame_number: request.frame_number(),
155            kind: TraceEventKind::PrepareStart,
156        });
157
158        let frame_number = request.frame_number();
159        let mut frame = synthetic_u8_frame("gray8", 1, 1, |_plane, _x, _y| {
160            u8::try_from(frame_number).expect("trace frame number fits u8")
161        })?;
162        let schema = MetadataSchema::core();
163        let mut metadata = Metadata::new(&schema);
164        metadata.set(
165            &schema,
166            "core:frame_number",
167            MetadataValue::Int(i64::try_from(frame_number).expect("trace frame number fits i64")),
168        )?;
169        frame = frame.with_metadata(metadata);
170
171        if let Some(gate) = &self.gate {
172            gate.wait_for_other_frame_to_finish(request.frame_number());
173        }
174
175        self.recorder.record(TraceEvent {
176            node_id: Some(request.node_id()),
177            frame_number: request.frame_number(),
178            kind: TraceEventKind::PrepareEnd,
179        });
180
181        if let Some(gate) = &self.gate {
182            gate.finished(request.frame_number());
183        }
184
185        Ok(frame)
186    }
187}
188
189/// Passthrough filter that records dependency requests and commit ordering.
190pub struct TracePassthroughFilter {
191    recorder: TraceRecorder,
192    request_offset: isize,
193}
194
195impl TracePassthroughFilter {
196    /// Creates same-frame passthrough filter.
197    #[must_use]
198    pub const fn same_frame(recorder: TraceRecorder) -> Self {
199        Self {
200            recorder,
201            request_offset: 0,
202        }
203    }
204
205    /// Creates passthrough filter that requests `output + offset`.
206    #[must_use]
207    pub const fn with_request_offset(recorder: TraceRecorder, request_offset: isize) -> Self {
208        Self {
209            recorder,
210            request_offset,
211        }
212    }
213}
214
215impl FrameExecutor for TracePassthroughFilter {
216    fn prepare(&self, request: FrameRequest<'_>) -> Result<Frame> {
217        let requested_frame = request
218            .frame_number()
219            .checked_add_signed(self.request_offset)
220            .ok_or_else(|| {
221                PixelFlowError::new(
222                    ErrorCategory::Core,
223                    ErrorCode::new("test.invalid_trace_request"),
224                    "trace dependency request underflowed",
225                )
226            })?;
227
228        self.recorder.record(TraceEvent {
229            node_id: Some(request.node_id()),
230            frame_number: request.frame_number(),
231            kind: TraceEventKind::DependencyRequest {
232                input_index: 0,
233                requested_frame,
234            },
235        });
236
237        request.input_frame(0, requested_frame)
238    }
239
240    fn commit(&self, frame_number: usize, frame: Frame) -> Result<Frame> {
241        self.recorder.record(TraceEvent {
242            node_id: None,
243            frame_number,
244            kind: TraceEventKind::Commit,
245        });
246        Ok(frame)
247    }
248}
249
250#[cfg(test)]
251mod tests {
252    use std::sync::Arc;
253
254    use pixelflow_core::{
255        ClipMedia, ConcurrencyClass, DependencyPattern, ErrorCode, FilterCompatibility,
256        GraphBuilder, MetadataValue, Rational, RenderEngine, RenderExecutorMap, RenderOptions,
257        SourceCapabilities, WorkerPoolConfig, resolve_format_alias,
258    };
259
260    use super::{
261        TraceEvent, TraceEventKind, TraceGate, TracePassthroughFilter, TraceRecorder, TraceSource,
262    };
263
264    fn media(frames: usize) -> ClipMedia {
265        ClipMedia::fixed(
266            resolve_format_alias("gray8").expect("gray8 exists"),
267            1,
268            1,
269            frames,
270            Rational {
271                numerator: 24,
272                denominator: 1,
273            },
274        )
275    }
276
277    #[test]
278    fn trace_verifies_out_of_order_completion_and_ordered_output() {
279        let recorder = TraceRecorder::default();
280        let gate = TraceGate::new(0, 1);
281        let mut builder = GraphBuilder::new();
282        let source = builder.source_with_capabilities(
283            "trace",
284            media(2),
285            SourceCapabilities::random_access().with_concurrency_limit(2),
286        );
287        builder.set_output(source);
288        let graph = builder.build();
289        let mut executors = RenderExecutorMap::new();
290        executors.insert(
291            source.node_id(),
292            Arc::new(TraceSource::new(recorder.clone()).with_gate(gate)),
293        );
294
295        let mut render = RenderEngine::new(WorkerPoolConfig::new(2))
296            .render_ordered(graph, executors, RenderOptions::new(0, Some(2)))
297            .expect("render should start");
298        let first = render
299            .next()
300            .expect("first output exists")
301            .expect("frame ok");
302        recorder.record(TraceEvent {
303            node_id: None,
304            frame_number: 0,
305            kind: TraceEventKind::Output,
306        });
307        let second = render
308            .next()
309            .expect("second output exists")
310            .expect("frame ok");
311        recorder.record(TraceEvent {
312            node_id: None,
313            frame_number: 1,
314            kind: TraceEventKind::Output,
315        });
316
317        assert_eq!(
318            first.metadata().get("core:frame_number"),
319            Some(&MetadataValue::Int(0))
320        );
321        assert_eq!(
322            second.metadata().get("core:frame_number"),
323            Some(&MetadataValue::Int(1))
324        );
325
326        let output_frames = recorder
327            .events()
328            .into_iter()
329            .filter(|event| event.kind == TraceEventKind::Output)
330            .map(|event| event.frame_number)
331            .collect::<Vec<_>>();
332        assert_eq!(output_frames, vec![0, 1]);
333
334        let prepare_end_frames = recorder
335            .events()
336            .into_iter()
337            .filter(|event| event.kind == TraceEventKind::PrepareEnd)
338            .map(|event| event.frame_number)
339            .collect::<Vec<_>>();
340        assert_eq!(prepare_end_frames, vec![1, 0]);
341    }
342
343    #[test]
344    fn trace_verifies_dependency_declarations() {
345        let recorder = TraceRecorder::default();
346        let mut builder = GraphBuilder::new();
347        let source = builder.source("trace", media(3));
348        let filtered = builder
349            .filter_with_schedule(
350                "future",
351                &[source],
352                media(3),
353                FilterCompatibility::Preserve,
354                DependencyPattern::window(0, 1),
355                ConcurrencyClass::Stateless,
356            )
357            .expect("filter should build");
358        builder.set_output(filtered);
359        let graph = builder.build();
360        let mut executors = RenderExecutorMap::new();
361        executors.insert(
362            source.node_id(),
363            Arc::new(TraceSource::new(recorder.clone())),
364        );
365        executors.insert(
366            filtered.node_id(),
367            Arc::new(TracePassthroughFilter::with_request_offset(
368                recorder.clone(),
369                1,
370            )),
371        );
372
373        RenderEngine::new(WorkerPoolConfig::new(1))
374            .render_ordered(graph, executors, RenderOptions::new(0, Some(1)))
375            .expect("render should start")
376            .collect::<pixelflow_core::Result<Vec<_>>>()
377            .expect("declared future dependency should succeed");
378
379        assert!(recorder.events().iter().any(|event| {
380            matches!(
381                &event.kind,
382                TraceEventKind::DependencyRequest {
383                    input_index: 0,
384                    requested_frame: 1,
385                }
386            )
387        }));
388    }
389
390    #[test]
391    fn trace_reports_dependency_contract_violation() {
392        let recorder = TraceRecorder::default();
393        let mut builder = GraphBuilder::new();
394        let source = builder.source("trace", media(3));
395        let filtered = builder
396            .filter_with_schedule(
397                "bad",
398                &[source],
399                media(3),
400                FilterCompatibility::Preserve,
401                DependencyPattern::same_frame(),
402                ConcurrencyClass::Stateless,
403            )
404            .expect("filter should build");
405        builder.set_output(filtered);
406        let graph = builder.build();
407        let mut executors = RenderExecutorMap::new();
408        executors.insert(
409            source.node_id(),
410            Arc::new(TraceSource::new(recorder.clone())),
411        );
412        executors.insert(
413            filtered.node_id(),
414            Arc::new(TracePassthroughFilter::with_request_offset(recorder, 1)),
415        );
416
417        let mut render = RenderEngine::new(WorkerPoolConfig::new(1))
418            .render_ordered(graph, executors, RenderOptions::new(0, Some(1)))
419            .expect("render should start");
420        let Err(error) = render.next().expect("one output exists") else {
421            panic!("future request should violate same-frame contract");
422        };
423
424        assert_eq!(error.code(), ErrorCode::new("render.dependency_contract"));
425    }
426
427    #[test]
428    fn same_frame_passthrough_records_commit() {
429        let recorder = TraceRecorder::default();
430        let mut builder = GraphBuilder::new();
431        let source = builder.source("trace", media(1));
432        let filtered = builder
433            .filter_with_schedule(
434                "same",
435                &[source],
436                media(1),
437                FilterCompatibility::Preserve,
438                DependencyPattern::same_frame(),
439                ConcurrencyClass::Stateless,
440            )
441            .expect("filter should build");
442        builder.set_output(filtered);
443        let graph = builder.build();
444        let mut executors = RenderExecutorMap::new();
445        executors.insert(
446            source.node_id(),
447            Arc::new(TraceSource::new(recorder.clone())),
448        );
449        executors.insert(
450            filtered.node_id(),
451            Arc::new(TracePassthroughFilter::same_frame(recorder.clone())),
452        );
453
454        RenderEngine::new(WorkerPoolConfig::new(1))
455            .render_ordered(graph, executors, RenderOptions::new(0, Some(1)))
456            .expect("render should start")
457            .collect::<pixelflow_core::Result<Vec<_>>>()
458            .expect("same-frame trace should render");
459
460        assert!(
461            recorder
462                .events()
463                .iter()
464                .any(|event| event.kind == TraceEventKind::Commit)
465        );
466    }
467}