Skip to main content

pixelflow_core/
scheduler.rs

1//! Scheduling contracts and per-core worker configuration.
2
3use std::collections::{BTreeMap, BTreeSet, VecDeque};
4use std::sync::{
5    Arc, Condvar, Mutex, MutexGuard,
6    atomic::{AtomicUsize, Ordering},
7};
8use std::thread::JoinHandle;
9use std::time::Duration;
10
11use crate::{ErrorCategory, ErrorCode, NodeId, PixelFlowError, Result};
12
13/// Declares which upstream frames a node may request while producing one output frame.
14#[derive(Clone, Debug, Eq, PartialEq)]
15pub enum DependencyPattern {
16    /// Input frame must equal output frame.
17    SameFrame,
18    /// Input frame may be in `[output - before, output + after]`.
19    Window {
20        /// Maximum number of frames allowed before output frame.
21        before: usize,
22        /// Maximum number of frames allowed after output frame.
23        after: usize,
24    },
25    /// Input frame is mapped by filter logic but must stay within declared bounds.
26    FrameMap(DynamicDependencyBounds),
27    /// Input frame is chosen at runtime but must stay within declared bounds.
28    Dynamic(DynamicDependencyBounds),
29}
30
31impl DependencyPattern {
32    /// Creates same-frame dependency contract.
33    #[must_use]
34    pub const fn same_frame() -> Self {
35        Self::SameFrame
36    }
37
38    /// Creates bounded window dependency contract.
39    #[must_use]
40    pub const fn window(before: usize, after: usize) -> Self {
41        Self::Window { before, after }
42    }
43
44    /// Creates frame-map dependency contract.
45    #[must_use]
46    pub const fn frame_map(bounds: DynamicDependencyBounds) -> Self {
47        Self::FrameMap(bounds)
48    }
49
50    /// Creates dynamic dependency contract.
51    #[must_use]
52    pub const fn dynamic(bounds: DynamicDependencyBounds) -> Self {
53        Self::Dynamic(bounds)
54    }
55
56    /// Returns true when requested frame is allowed for one output frame.
57    #[must_use]
58    pub const fn allows(&self, output: usize, requested: usize) -> bool {
59        match self {
60            Self::SameFrame => requested == output,
61            Self::Window { before, after } => in_window(output, requested, *before, *after),
62            Self::FrameMap(bounds) | Self::Dynamic(bounds) => bounds.allows(output, requested),
63        }
64    }
65}
66
67/// Runtime bounds for frame-map and dynamic dependencies.
68#[derive(Clone, Copy, Debug, Eq, PartialEq)]
69pub enum DynamicDependencyBounds {
70    /// Any frame in finite clip range may be requested.
71    Any,
72    /// Requested frame must be less than or equal to output frame.
73    PastOnly,
74    /// Requested frame must be in `[output, output + after]`.
75    FutureWindow {
76        /// Maximum number of frames allowed after output frame.
77        after: usize,
78    },
79    /// Requested frame must be in `[output - before, output + after]`.
80    Bounded {
81        /// Maximum number of frames allowed before output frame.
82        before: usize,
83        /// Maximum number of frames allowed after output frame.
84        after: usize,
85    },
86}
87
88impl DynamicDependencyBounds {
89    /// Creates unbounded dynamic access.
90    #[must_use]
91    pub const fn any() -> Self {
92        Self::Any
93    }
94
95    /// Creates past-only dynamic access.
96    #[must_use]
97    pub const fn past_only() -> Self {
98        Self::PastOnly
99    }
100
101    /// Creates future-window dynamic access.
102    #[must_use]
103    pub const fn future_window(after: usize) -> Self {
104        Self::FutureWindow { after }
105    }
106
107    /// Creates bounded dynamic access.
108    #[must_use]
109    pub const fn bounded(before: usize, after: usize) -> Self {
110        Self::Bounded { before, after }
111    }
112
113    /// Returns true when requested frame is allowed for one output frame.
114    #[must_use]
115    pub const fn allows(self, output: usize, requested: usize) -> bool {
116        match self {
117            Self::Any => true,
118            Self::PastOnly => requested <= output,
119            Self::FutureWindow { after } => {
120                requested >= output && requested <= output.saturating_add(after)
121            }
122            Self::Bounded { before, after } => in_window(output, requested, before, after),
123        }
124    }
125}
126
127/// Declares how scheduler may run node callbacks across frames.
128#[derive(Clone, Copy, Debug, Eq, PartialEq)]
129pub enum ConcurrencyClass {
130    /// Callback has no cross-frame mutable state.
131    Stateless,
132    /// Callback may prepare out of order but must commit in increasing frame order.
133    OrderedStateful,
134    /// Callback is a source and may declare source-specific capability limits.
135    Source,
136}
137
138impl ConcurrencyClass {
139    /// Returns stable diagnostic name.
140    #[must_use]
141    pub const fn as_str(self) -> &'static str {
142        match self {
143            Self::Stateless => "stateless",
144            Self::OrderedStateful => "ordered_stateful",
145            Self::Source => "source",
146        }
147    }
148}
149
150/// Source node scheduling capabilities.
151#[derive(Clone, Copy, Debug, Eq, PartialEq)]
152pub struct SourceCapabilities {
153    random_access: bool,
154    indexing_required: bool,
155    known_frame_count: bool,
156    concurrency_limit: Option<usize>,
157}
158
159impl SourceCapabilities {
160    /// Creates Phase 1 default source capabilities.
161    #[must_use]
162    pub const fn random_access() -> Self {
163        Self {
164            random_access: true,
165            indexing_required: true,
166            known_frame_count: true,
167            concurrency_limit: Some(1),
168        }
169    }
170
171    /// Returns source with explicit concurrency limit. Zero becomes one.
172    #[must_use]
173    pub const fn with_concurrency_limit(mut self, limit: usize) -> Self {
174        self.concurrency_limit = Some(if limit == 0 { 1 } else { limit });
175        self
176    }
177
178    /// Returns true when random frame access is supported.
179    #[must_use]
180    pub const fn supports_random_access(self) -> bool {
181        self.random_access
182    }
183
184    /// Returns true when source requires indexing before render.
185    #[must_use]
186    pub const fn indexing_required(self) -> bool {
187        self.indexing_required
188    }
189
190    /// Returns true when source has known finite frame count.
191    #[must_use]
192    pub const fn known_frame_count(self) -> bool {
193        self.known_frame_count
194    }
195
196    /// Returns source concurrency limit when declared.
197    #[must_use]
198    pub const fn concurrency_limit(self) -> Option<usize> {
199        self.concurrency_limit
200    }
201}
202
203/// Per-core worker pool configuration.
204#[derive(Clone, Copy, Debug, Eq, PartialEq)]
205pub struct WorkerPoolConfig {
206    worker_threads: usize,
207}
208
209impl WorkerPoolConfig {
210    /// Creates worker pool config. Zero becomes one worker.
211    #[must_use]
212    pub const fn new(worker_threads: usize) -> Self {
213        Self {
214            worker_threads: if worker_threads == 0 {
215                1
216            } else {
217                worker_threads
218            },
219        }
220    }
221
222    /// Returns configured worker count.
223    #[must_use]
224    pub const fn worker_threads(self) -> usize {
225        self.worker_threads
226    }
227}
228
229/// Aggregate timing for one graph node.
230#[derive(Clone, Debug, Default, Eq, PartialEq)]
231pub struct FilterTiming {
232    frames: usize,
233    total: Duration,
234}
235
236impl FilterTiming {
237    /// Records one frame duration.
238    pub fn record(&mut self, duration: Duration) {
239        self.frames += 1;
240        self.total += duration;
241    }
242
243    /// Returns number of recorded frames.
244    #[must_use]
245    pub const fn frames(&self) -> usize {
246        self.frames
247    }
248
249    /// Returns total recorded duration.
250    #[must_use]
251    pub const fn total(&self) -> Duration {
252        self.total
253    }
254}
255
256/// Snapshot of aggregate node timings.
257#[derive(Clone, Debug, Default, Eq, PartialEq)]
258pub struct TimingReport {
259    timings: BTreeMap<NodeId, FilterTiming>,
260}
261
262impl TimingReport {
263    /// Returns timing for one node.
264    #[must_use]
265    pub fn get(&self, node_id: NodeId) -> Option<&FilterTiming> {
266        self.timings.get(&node_id)
267    }
268
269    /// Returns all timings in node order.
270    pub fn iter(&self) -> impl Iterator<Item = (NodeId, &FilterTiming)> {
271        self.timings
272            .iter()
273            .map(|(node_id, timing)| (*node_id, timing))
274    }
275
276    pub(crate) const fn from_timings(timings: BTreeMap<NodeId, FilterTiming>) -> Self {
277        Self { timings }
278    }
279}
280
281type Job = Box<dyn FnOnce() + Send + 'static>;
282
283struct PoolState {
284    queue: VecDeque<Job>,
285    closed: bool,
286}
287
288struct PoolShared {
289    state: Mutex<PoolState>,
290    wake: Condvar,
291}
292
293/// Blocking worker pool owned by one core render.
294pub(crate) struct WorkerPool {
295    shared: Arc<PoolShared>,
296    workers: Vec<JoinHandle<()>>,
297}
298
299impl WorkerPool {
300    pub(crate) fn new(config: WorkerPoolConfig) -> Self {
301        let shared = Arc::new(PoolShared {
302            state: Mutex::new(PoolState {
303                queue: VecDeque::new(),
304                closed: false,
305            }),
306            wake: Condvar::new(),
307        });
308
309        let mut workers = Vec::with_capacity(config.worker_threads());
310        for _ in 0..config.worker_threads() {
311            let shared = Arc::clone(&shared);
312            workers.push(std::thread::spawn(move || worker_loop(&shared)));
313        }
314
315        Self { shared, workers }
316    }
317
318    pub(crate) fn execute<F>(&self, job: F) -> Result<()>
319    where
320        F: FnOnce() + Send + 'static,
321    {
322        let mut state = lock(&self.shared.state);
323        if state.closed {
324            return Err(PixelFlowError::new(
325                ErrorCategory::Core,
326                ErrorCode::new("render.worker_pool_closed"),
327                "render worker pool is already closed",
328            ));
329        }
330        state.queue.push_back(Box::new(job));
331        self.shared.wake.notify_one();
332        Ok(())
333    }
334}
335
336impl Drop for WorkerPool {
337    fn drop(&mut self) {
338        {
339            let mut state = lock(&self.shared.state);
340            state.closed = true;
341        }
342        self.shared.wake.notify_all();
343        for worker in self.workers.drain(..) {
344            #[expect(
345                clippy::let_underscore_must_use,
346                reason = "cannot propagate result during Drop, better to ignore than panic"
347            )]
348            let _ = worker.join();
349        }
350    }
351}
352
353/// Serializes ordered-stateful commits in increasing frame order among pending frames.
354pub(crate) struct OrderedCommitGate {
355    pending: Mutex<BTreeSet<usize>>,
356    wake: Condvar,
357}
358
359impl OrderedCommitGate {
360    pub(crate) const fn new() -> Self {
361        Self {
362            pending: Mutex::new(BTreeSet::new()),
363            wake: Condvar::new(),
364        }
365    }
366
367    pub(crate) fn register(&self, frame_number: usize) -> OrderedCommitTicket<'_> {
368        let mut pending = lock(&self.pending);
369        pending.insert(frame_number);
370        drop(pending);
371        OrderedCommitTicket {
372            gate: self,
373            frame_number,
374            finished: false,
375        }
376    }
377}
378
379pub(crate) struct OrderedCommitTicket<'a> {
380    gate: &'a OrderedCommitGate,
381    frame_number: usize,
382    finished: bool,
383}
384
385impl OrderedCommitTicket<'_> {
386    pub(crate) fn wait_turn(&self) {
387        let mut pending = lock(&self.gate.pending);
388        while pending.first().copied() != Some(self.frame_number) {
389            pending = wait(&self.gate.wake, pending);
390        }
391    }
392
393    pub(crate) fn finish(mut self) {
394        self.finish_inner();
395        self.finished = true;
396    }
397
398    fn finish_inner(&self) {
399        let mut pending = lock(&self.gate.pending);
400        pending.remove(&self.frame_number);
401        drop(pending);
402        self.gate.wake.notify_all();
403    }
404}
405
406impl Drop for OrderedCommitTicket<'_> {
407    fn drop(&mut self) {
408        if !self.finished {
409            self.finish_inner();
410        }
411    }
412}
413
414/// Limits concurrent work for a source node.
415pub(crate) struct ConcurrencyGate {
416    limit: usize,
417    active: AtomicUsize,
418    // RATIONALE: `Condvar` still needs paired mutex for wait/wake coordination.
419    state: Mutex<()>,
420    wake: Condvar,
421}
422
423impl ConcurrencyGate {
424    pub(crate) fn new(limit: usize) -> Self {
425        Self {
426            limit: limit.max(1),
427            active: AtomicUsize::new(0),
428            state: Mutex::new(()),
429            wake: Condvar::new(),
430        }
431    }
432
433    pub(crate) fn acquire(&self) -> ConcurrencyGuard<'_> {
434        let mut state = lock(&self.state);
435        while self.active.load(Ordering::Relaxed) >= self.limit {
436            state = wait(&self.wake, state);
437        }
438        self.active.fetch_add(1, Ordering::Relaxed);
439        drop(state);
440        ConcurrencyGuard {
441            gate: self,
442            released: false,
443        }
444    }
445}
446
447pub(crate) struct ConcurrencyGuard<'a> {
448    gate: &'a ConcurrencyGate,
449    released: bool,
450}
451
452impl ConcurrencyGuard<'_> {
453    fn release_inner(&self) {
454        let state = lock(&self.gate.state);
455        let active = self.gate.active.load(Ordering::Relaxed);
456        self.gate
457            .active
458            .store(active.saturating_sub(1), Ordering::Relaxed);
459        drop(state);
460        self.gate.wake.notify_one();
461    }
462}
463
464impl Drop for ConcurrencyGuard<'_> {
465    fn drop(&mut self) {
466        if !self.released {
467            self.release_inner();
468            self.released = true;
469        }
470    }
471}
472
473const fn in_window(output: usize, requested: usize, before: usize, after: usize) -> bool {
474    requested >= output.saturating_sub(before) && requested <= output.saturating_add(after)
475}
476
477fn worker_loop(shared: &Arc<PoolShared>) {
478    loop {
479        let job = {
480            let mut state = lock(&shared.state);
481            loop {
482                if let Some(job) = state.queue.pop_front() {
483                    break job;
484                }
485                if state.closed {
486                    return;
487                }
488                state = wait(&shared.wake, state);
489            }
490        };
491        job();
492    }
493}
494
495fn lock<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
496    mutex
497        .lock()
498        .unwrap_or_else(|poisoned| poisoned.into_inner())
499}
500
501fn wait<'a, T>(condvar: &Condvar, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> {
502    condvar
503        .wait(guard)
504        .unwrap_or_else(|poisoned| poisoned.into_inner())
505}
506
507#[cfg(test)]
508mod tests {
509    use std::sync::Arc;
510    use std::sync::mpsc;
511    use std::time::Duration;
512
513    use super::{
514        ConcurrencyClass, ConcurrencyGate, DependencyPattern, DynamicDependencyBounds,
515        SourceCapabilities, WorkerPoolConfig,
516    };
517
518    #[test]
519    fn same_frame_contract_accepts_only_matching_frame() {
520        let contract = DependencyPattern::same_frame();
521
522        assert!(contract.allows(10, 10));
523        assert!(!contract.allows(10, 9));
524        assert!(!contract.allows(10, 11));
525    }
526
527    #[test]
528    fn window_contract_accepts_declared_relative_bounds() {
529        let contract = DependencyPattern::window(2, 1);
530
531        assert!(contract.allows(10, 8));
532        assert!(contract.allows(10, 10));
533        assert!(contract.allows(10, 11));
534        assert!(!contract.allows(10, 7));
535        assert!(!contract.allows(10, 12));
536    }
537
538    #[test]
539    fn dynamic_future_window_rejects_past_and_far_future() {
540        let contract = DependencyPattern::dynamic(DynamicDependencyBounds::future_window(3));
541
542        assert!(contract.allows(10, 10));
543        assert!(contract.allows(10, 13));
544        assert!(!contract.allows(10, 9));
545        assert!(!contract.allows(10, 14));
546    }
547
548    #[test]
549    fn worker_pool_config_clamps_zero_to_one() {
550        assert_eq!(WorkerPoolConfig::new(0).worker_threads(), 1);
551        assert_eq!(WorkerPoolConfig::new(4).worker_threads(), 4);
552    }
553
554    #[test]
555    fn source_capabilities_record_concurrency_limit() {
556        let caps = SourceCapabilities::random_access().with_concurrency_limit(2);
557
558        assert!(caps.supports_random_access());
559        assert_eq!(caps.concurrency_limit(), Some(2));
560    }
561
562    #[test]
563    fn concurrency_gate_blocks_until_active_work_releases() {
564        let gate = Arc::new(ConcurrencyGate::new(1));
565        let first = gate.acquire();
566        let gate_for_thread = Arc::clone(&gate);
567        let (started_tx, started_rx) = mpsc::channel();
568        let (acquired_tx, acquired_rx) = mpsc::channel();
569
570        let worker = std::thread::spawn(move || {
571            started_tx
572                .send(())
573                .expect("worker should signal before waiting");
574            let _second = gate_for_thread.acquire();
575            acquired_tx
576                .send(())
577                .expect("worker should signal after acquiring gate");
578        });
579
580        started_rx.recv().expect("worker should reach acquire call");
581        assert!(
582            matches!(
583                acquired_rx.recv_timeout(Duration::from_millis(100)),
584                Err(mpsc::RecvTimeoutError::Timeout)
585            ),
586            "second acquisition should stay blocked while first guard is held"
587        );
588
589        drop(first);
590
591        acquired_rx
592            .recv_timeout(Duration::from_millis(200))
593            .expect("worker should acquire after first guard drops");
594        worker.join().expect("worker should join cleanly");
595    }
596
597    #[test]
598    fn concurrency_classes_are_named_for_diagnostics() {
599        assert_eq!(ConcurrencyClass::Stateless.as_str(), "stateless");
600        assert_eq!(
601            ConcurrencyClass::OrderedStateful.as_str(),
602            "ordered_stateful"
603        );
604        assert_eq!(ConcurrencyClass::Source.as_str(), "source");
605    }
606}