1use std::collections::{BTreeMap, BTreeSet, VecDeque};
4use std::sync::{
5 Arc, Condvar, Mutex, MutexGuard,
6 atomic::{AtomicUsize, Ordering},
7};
8use std::thread::JoinHandle;
9use std::time::Duration;
10
11use crate::{ErrorCategory, ErrorCode, NodeId, PixelFlowError, Result};
12
13#[derive(Clone, Debug, Eq, PartialEq)]
15pub enum DependencyPattern {
16 SameFrame,
18 Window {
20 before: usize,
22 after: usize,
24 },
25 FrameMap(DynamicDependencyBounds),
27 Dynamic(DynamicDependencyBounds),
29}
30
31impl DependencyPattern {
32 #[must_use]
34 pub const fn same_frame() -> Self {
35 Self::SameFrame
36 }
37
38 #[must_use]
40 pub const fn window(before: usize, after: usize) -> Self {
41 Self::Window { before, after }
42 }
43
44 #[must_use]
46 pub const fn frame_map(bounds: DynamicDependencyBounds) -> Self {
47 Self::FrameMap(bounds)
48 }
49
50 #[must_use]
52 pub const fn dynamic(bounds: DynamicDependencyBounds) -> Self {
53 Self::Dynamic(bounds)
54 }
55
56 #[must_use]
58 pub const fn allows(&self, output: usize, requested: usize) -> bool {
59 match self {
60 Self::SameFrame => requested == output,
61 Self::Window { before, after } => in_window(output, requested, *before, *after),
62 Self::FrameMap(bounds) | Self::Dynamic(bounds) => bounds.allows(output, requested),
63 }
64 }
65}
66
67#[derive(Clone, Copy, Debug, Eq, PartialEq)]
69pub enum DynamicDependencyBounds {
70 Any,
72 PastOnly,
74 FutureWindow {
76 after: usize,
78 },
79 Bounded {
81 before: usize,
83 after: usize,
85 },
86}
87
88impl DynamicDependencyBounds {
89 #[must_use]
91 pub const fn any() -> Self {
92 Self::Any
93 }
94
95 #[must_use]
97 pub const fn past_only() -> Self {
98 Self::PastOnly
99 }
100
101 #[must_use]
103 pub const fn future_window(after: usize) -> Self {
104 Self::FutureWindow { after }
105 }
106
107 #[must_use]
109 pub const fn bounded(before: usize, after: usize) -> Self {
110 Self::Bounded { before, after }
111 }
112
113 #[must_use]
115 pub const fn allows(self, output: usize, requested: usize) -> bool {
116 match self {
117 Self::Any => true,
118 Self::PastOnly => requested <= output,
119 Self::FutureWindow { after } => {
120 requested >= output && requested <= output.saturating_add(after)
121 }
122 Self::Bounded { before, after } => in_window(output, requested, before, after),
123 }
124 }
125}
126
127#[derive(Clone, Copy, Debug, Eq, PartialEq)]
129pub enum ConcurrencyClass {
130 Stateless,
132 OrderedStateful,
134 Source,
136}
137
138impl ConcurrencyClass {
139 #[must_use]
141 pub const fn as_str(self) -> &'static str {
142 match self {
143 Self::Stateless => "stateless",
144 Self::OrderedStateful => "ordered_stateful",
145 Self::Source => "source",
146 }
147 }
148}
149
150#[derive(Clone, Copy, Debug, Eq, PartialEq)]
152pub struct SourceCapabilities {
153 random_access: bool,
154 indexing_required: bool,
155 known_frame_count: bool,
156 concurrency_limit: Option<usize>,
157}
158
159impl SourceCapabilities {
160 #[must_use]
162 pub const fn random_access() -> Self {
163 Self {
164 random_access: true,
165 indexing_required: true,
166 known_frame_count: true,
167 concurrency_limit: Some(1),
168 }
169 }
170
171 #[must_use]
173 pub const fn with_concurrency_limit(mut self, limit: usize) -> Self {
174 self.concurrency_limit = Some(if limit == 0 { 1 } else { limit });
175 self
176 }
177
178 #[must_use]
180 pub const fn supports_random_access(self) -> bool {
181 self.random_access
182 }
183
184 #[must_use]
186 pub const fn indexing_required(self) -> bool {
187 self.indexing_required
188 }
189
190 #[must_use]
192 pub const fn known_frame_count(self) -> bool {
193 self.known_frame_count
194 }
195
196 #[must_use]
198 pub const fn concurrency_limit(self) -> Option<usize> {
199 self.concurrency_limit
200 }
201}
202
203#[derive(Clone, Copy, Debug, Eq, PartialEq)]
205pub struct WorkerPoolConfig {
206 worker_threads: usize,
207}
208
209impl WorkerPoolConfig {
210 #[must_use]
212 pub const fn new(worker_threads: usize) -> Self {
213 Self {
214 worker_threads: if worker_threads == 0 {
215 1
216 } else {
217 worker_threads
218 },
219 }
220 }
221
222 #[must_use]
224 pub const fn worker_threads(self) -> usize {
225 self.worker_threads
226 }
227}
228
229#[derive(Clone, Debug, Default, Eq, PartialEq)]
231pub struct FilterTiming {
232 frames: usize,
233 total: Duration,
234}
235
236impl FilterTiming {
237 pub fn record(&mut self, duration: Duration) {
239 self.frames += 1;
240 self.total += duration;
241 }
242
243 #[must_use]
245 pub const fn frames(&self) -> usize {
246 self.frames
247 }
248
249 #[must_use]
251 pub const fn total(&self) -> Duration {
252 self.total
253 }
254}
255
256#[derive(Clone, Debug, Default, Eq, PartialEq)]
258pub struct TimingReport {
259 timings: BTreeMap<NodeId, FilterTiming>,
260}
261
262impl TimingReport {
263 #[must_use]
265 pub fn get(&self, node_id: NodeId) -> Option<&FilterTiming> {
266 self.timings.get(&node_id)
267 }
268
269 pub fn iter(&self) -> impl Iterator<Item = (NodeId, &FilterTiming)> {
271 self.timings
272 .iter()
273 .map(|(node_id, timing)| (*node_id, timing))
274 }
275
276 pub(crate) const fn from_timings(timings: BTreeMap<NodeId, FilterTiming>) -> Self {
277 Self { timings }
278 }
279}
280
281type Job = Box<dyn FnOnce() + Send + 'static>;
282
283struct PoolState {
284 queue: VecDeque<Job>,
285 closed: bool,
286}
287
288struct PoolShared {
289 state: Mutex<PoolState>,
290 wake: Condvar,
291}
292
293pub(crate) struct WorkerPool {
295 shared: Arc<PoolShared>,
296 workers: Vec<JoinHandle<()>>,
297}
298
299impl WorkerPool {
300 pub(crate) fn new(config: WorkerPoolConfig) -> Self {
301 let shared = Arc::new(PoolShared {
302 state: Mutex::new(PoolState {
303 queue: VecDeque::new(),
304 closed: false,
305 }),
306 wake: Condvar::new(),
307 });
308
309 let mut workers = Vec::with_capacity(config.worker_threads());
310 for _ in 0..config.worker_threads() {
311 let shared = Arc::clone(&shared);
312 workers.push(std::thread::spawn(move || worker_loop(&shared)));
313 }
314
315 Self { shared, workers }
316 }
317
318 pub(crate) fn execute<F>(&self, job: F) -> Result<()>
319 where
320 F: FnOnce() + Send + 'static,
321 {
322 let mut state = lock(&self.shared.state);
323 if state.closed {
324 return Err(PixelFlowError::new(
325 ErrorCategory::Core,
326 ErrorCode::new("render.worker_pool_closed"),
327 "render worker pool is already closed",
328 ));
329 }
330 state.queue.push_back(Box::new(job));
331 self.shared.wake.notify_one();
332 Ok(())
333 }
334}
335
336impl Drop for WorkerPool {
337 fn drop(&mut self) {
338 {
339 let mut state = lock(&self.shared.state);
340 state.closed = true;
341 }
342 self.shared.wake.notify_all();
343 for worker in self.workers.drain(..) {
344 #[expect(
345 clippy::let_underscore_must_use,
346 reason = "cannot propagate result during Drop, better to ignore than panic"
347 )]
348 let _ = worker.join();
349 }
350 }
351}
352
353pub(crate) struct OrderedCommitGate {
355 pending: Mutex<BTreeSet<usize>>,
356 wake: Condvar,
357}
358
359impl OrderedCommitGate {
360 pub(crate) const fn new() -> Self {
361 Self {
362 pending: Mutex::new(BTreeSet::new()),
363 wake: Condvar::new(),
364 }
365 }
366
367 pub(crate) fn register(&self, frame_number: usize) -> OrderedCommitTicket<'_> {
368 let mut pending = lock(&self.pending);
369 pending.insert(frame_number);
370 drop(pending);
371 OrderedCommitTicket {
372 gate: self,
373 frame_number,
374 finished: false,
375 }
376 }
377}
378
379pub(crate) struct OrderedCommitTicket<'a> {
380 gate: &'a OrderedCommitGate,
381 frame_number: usize,
382 finished: bool,
383}
384
385impl OrderedCommitTicket<'_> {
386 pub(crate) fn wait_turn(&self) {
387 let mut pending = lock(&self.gate.pending);
388 while pending.first().copied() != Some(self.frame_number) {
389 pending = wait(&self.gate.wake, pending);
390 }
391 }
392
393 pub(crate) fn finish(mut self) {
394 self.finish_inner();
395 self.finished = true;
396 }
397
398 fn finish_inner(&self) {
399 let mut pending = lock(&self.gate.pending);
400 pending.remove(&self.frame_number);
401 drop(pending);
402 self.gate.wake.notify_all();
403 }
404}
405
406impl Drop for OrderedCommitTicket<'_> {
407 fn drop(&mut self) {
408 if !self.finished {
409 self.finish_inner();
410 }
411 }
412}
413
414pub(crate) struct ConcurrencyGate {
416 limit: usize,
417 active: AtomicUsize,
418 state: Mutex<()>,
420 wake: Condvar,
421}
422
423impl ConcurrencyGate {
424 pub(crate) fn new(limit: usize) -> Self {
425 Self {
426 limit: limit.max(1),
427 active: AtomicUsize::new(0),
428 state: Mutex::new(()),
429 wake: Condvar::new(),
430 }
431 }
432
433 pub(crate) fn acquire(&self) -> ConcurrencyGuard<'_> {
434 let mut state = lock(&self.state);
435 while self.active.load(Ordering::Relaxed) >= self.limit {
436 state = wait(&self.wake, state);
437 }
438 self.active.fetch_add(1, Ordering::Relaxed);
439 drop(state);
440 ConcurrencyGuard {
441 gate: self,
442 released: false,
443 }
444 }
445}
446
447pub(crate) struct ConcurrencyGuard<'a> {
448 gate: &'a ConcurrencyGate,
449 released: bool,
450}
451
452impl ConcurrencyGuard<'_> {
453 fn release_inner(&self) {
454 let state = lock(&self.gate.state);
455 let active = self.gate.active.load(Ordering::Relaxed);
456 self.gate
457 .active
458 .store(active.saturating_sub(1), Ordering::Relaxed);
459 drop(state);
460 self.gate.wake.notify_one();
461 }
462}
463
464impl Drop for ConcurrencyGuard<'_> {
465 fn drop(&mut self) {
466 if !self.released {
467 self.release_inner();
468 self.released = true;
469 }
470 }
471}
472
473const fn in_window(output: usize, requested: usize, before: usize, after: usize) -> bool {
474 requested >= output.saturating_sub(before) && requested <= output.saturating_add(after)
475}
476
477fn worker_loop(shared: &Arc<PoolShared>) {
478 loop {
479 let job = {
480 let mut state = lock(&shared.state);
481 loop {
482 if let Some(job) = state.queue.pop_front() {
483 break job;
484 }
485 if state.closed {
486 return;
487 }
488 state = wait(&shared.wake, state);
489 }
490 };
491 job();
492 }
493}
494
495fn lock<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
496 mutex
497 .lock()
498 .unwrap_or_else(|poisoned| poisoned.into_inner())
499}
500
501fn wait<'a, T>(condvar: &Condvar, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> {
502 condvar
503 .wait(guard)
504 .unwrap_or_else(|poisoned| poisoned.into_inner())
505}
506
507#[cfg(test)]
508mod tests {
509 use std::sync::Arc;
510 use std::sync::mpsc;
511 use std::time::Duration;
512
513 use super::{
514 ConcurrencyClass, ConcurrencyGate, DependencyPattern, DynamicDependencyBounds,
515 SourceCapabilities, WorkerPoolConfig,
516 };
517
518 #[test]
519 fn same_frame_contract_accepts_only_matching_frame() {
520 let contract = DependencyPattern::same_frame();
521
522 assert!(contract.allows(10, 10));
523 assert!(!contract.allows(10, 9));
524 assert!(!contract.allows(10, 11));
525 }
526
527 #[test]
528 fn window_contract_accepts_declared_relative_bounds() {
529 let contract = DependencyPattern::window(2, 1);
530
531 assert!(contract.allows(10, 8));
532 assert!(contract.allows(10, 10));
533 assert!(contract.allows(10, 11));
534 assert!(!contract.allows(10, 7));
535 assert!(!contract.allows(10, 12));
536 }
537
538 #[test]
539 fn dynamic_future_window_rejects_past_and_far_future() {
540 let contract = DependencyPattern::dynamic(DynamicDependencyBounds::future_window(3));
541
542 assert!(contract.allows(10, 10));
543 assert!(contract.allows(10, 13));
544 assert!(!contract.allows(10, 9));
545 assert!(!contract.allows(10, 14));
546 }
547
548 #[test]
549 fn worker_pool_config_clamps_zero_to_one() {
550 assert_eq!(WorkerPoolConfig::new(0).worker_threads(), 1);
551 assert_eq!(WorkerPoolConfig::new(4).worker_threads(), 4);
552 }
553
554 #[test]
555 fn source_capabilities_record_concurrency_limit() {
556 let caps = SourceCapabilities::random_access().with_concurrency_limit(2);
557
558 assert!(caps.supports_random_access());
559 assert_eq!(caps.concurrency_limit(), Some(2));
560 }
561
562 #[test]
563 fn concurrency_gate_blocks_until_active_work_releases() {
564 let gate = Arc::new(ConcurrencyGate::new(1));
565 let first = gate.acquire();
566 let gate_for_thread = Arc::clone(&gate);
567 let (started_tx, started_rx) = mpsc::channel();
568 let (acquired_tx, acquired_rx) = mpsc::channel();
569
570 let worker = std::thread::spawn(move || {
571 started_tx
572 .send(())
573 .expect("worker should signal before waiting");
574 let _second = gate_for_thread.acquire();
575 acquired_tx
576 .send(())
577 .expect("worker should signal after acquiring gate");
578 });
579
580 started_rx.recv().expect("worker should reach acquire call");
581 assert!(
582 matches!(
583 acquired_rx.recv_timeout(Duration::from_millis(100)),
584 Err(mpsc::RecvTimeoutError::Timeout)
585 ),
586 "second acquisition should stay blocked while first guard is held"
587 );
588
589 drop(first);
590
591 acquired_rx
592 .recv_timeout(Duration::from_millis(200))
593 .expect("worker should acquire after first guard drops");
594 worker.join().expect("worker should join cleanly");
595 }
596
597 #[test]
598 fn concurrency_classes_are_named_for_diagnostics() {
599 assert_eq!(ConcurrencyClass::Stateless.as_str(), "stateless");
600 assert_eq!(
601 ConcurrencyClass::OrderedStateful.as_str(),
602 "ordered_stateful"
603 );
604 assert_eq!(ConcurrencyClass::Source.as_str(), "source");
605 }
606}