use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::sync::{
Arc, Condvar, Mutex, MutexGuard,
atomic::{AtomicUsize, Ordering},
};
use std::thread::JoinHandle;
use std::time::Duration;
use crate::{ErrorCategory, ErrorCode, NodeId, PixelFlowError, Result};
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum DependencyPattern {
SameFrame,
Window {
before: usize,
after: usize,
},
FrameMap(DynamicDependencyBounds),
Dynamic(DynamicDependencyBounds),
}
impl DependencyPattern {
#[must_use]
pub const fn same_frame() -> Self {
Self::SameFrame
}
#[must_use]
pub const fn window(before: usize, after: usize) -> Self {
Self::Window { before, after }
}
#[must_use]
pub const fn frame_map(bounds: DynamicDependencyBounds) -> Self {
Self::FrameMap(bounds)
}
#[must_use]
pub const fn dynamic(bounds: DynamicDependencyBounds) -> Self {
Self::Dynamic(bounds)
}
#[must_use]
pub const fn allows(&self, output: usize, requested: usize) -> bool {
match self {
Self::SameFrame => requested == output,
Self::Window { before, after } => in_window(output, requested, *before, *after),
Self::FrameMap(bounds) | Self::Dynamic(bounds) => bounds.allows(output, requested),
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum DynamicDependencyBounds {
Any,
PastOnly,
FutureWindow {
after: usize,
},
Bounded {
before: usize,
after: usize,
},
}
impl DynamicDependencyBounds {
#[must_use]
pub const fn any() -> Self {
Self::Any
}
#[must_use]
pub const fn past_only() -> Self {
Self::PastOnly
}
#[must_use]
pub const fn future_window(after: usize) -> Self {
Self::FutureWindow { after }
}
#[must_use]
pub const fn bounded(before: usize, after: usize) -> Self {
Self::Bounded { before, after }
}
#[must_use]
pub const fn allows(self, output: usize, requested: usize) -> bool {
match self {
Self::Any => true,
Self::PastOnly => requested <= output,
Self::FutureWindow { after } => {
requested >= output && requested <= output.saturating_add(after)
}
Self::Bounded { before, after } => in_window(output, requested, before, after),
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ConcurrencyClass {
Stateless,
OrderedStateful,
Source,
}
impl ConcurrencyClass {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::Stateless => "stateless",
Self::OrderedStateful => "ordered_stateful",
Self::Source => "source",
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct SourceCapabilities {
random_access: bool,
indexing_required: bool,
known_frame_count: bool,
concurrency_limit: Option<usize>,
}
impl SourceCapabilities {
#[must_use]
pub const fn random_access() -> Self {
Self {
random_access: true,
indexing_required: true,
known_frame_count: true,
concurrency_limit: Some(1),
}
}
#[must_use]
pub const fn with_concurrency_limit(mut self, limit: usize) -> Self {
self.concurrency_limit = Some(if limit == 0 { 1 } else { limit });
self
}
#[must_use]
pub const fn supports_random_access(self) -> bool {
self.random_access
}
#[must_use]
pub const fn indexing_required(self) -> bool {
self.indexing_required
}
#[must_use]
pub const fn known_frame_count(self) -> bool {
self.known_frame_count
}
#[must_use]
pub const fn concurrency_limit(self) -> Option<usize> {
self.concurrency_limit
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct WorkerPoolConfig {
worker_threads: usize,
}
impl WorkerPoolConfig {
#[must_use]
pub const fn new(worker_threads: usize) -> Self {
Self {
worker_threads: if worker_threads == 0 {
1
} else {
worker_threads
},
}
}
#[must_use]
pub const fn worker_threads(self) -> usize {
self.worker_threads
}
}
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct FilterTiming {
frames: usize,
total: Duration,
}
impl FilterTiming {
pub fn record(&mut self, duration: Duration) {
self.frames += 1;
self.total += duration;
}
#[must_use]
pub const fn frames(&self) -> usize {
self.frames
}
#[must_use]
pub const fn total(&self) -> Duration {
self.total
}
}
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct TimingReport {
timings: BTreeMap<NodeId, FilterTiming>,
}
impl TimingReport {
#[must_use]
pub fn get(&self, node_id: NodeId) -> Option<&FilterTiming> {
self.timings.get(&node_id)
}
pub fn iter(&self) -> impl Iterator<Item = (NodeId, &FilterTiming)> {
self.timings
.iter()
.map(|(node_id, timing)| (*node_id, timing))
}
pub(crate) const fn from_timings(timings: BTreeMap<NodeId, FilterTiming>) -> Self {
Self { timings }
}
}
type Job = Box<dyn FnOnce() + Send + 'static>;
struct PoolState {
queue: VecDeque<Job>,
closed: bool,
}
struct PoolShared {
state: Mutex<PoolState>,
wake: Condvar,
}
pub(crate) struct WorkerPool {
shared: Arc<PoolShared>,
workers: Vec<JoinHandle<()>>,
}
impl WorkerPool {
pub(crate) fn new(config: WorkerPoolConfig) -> Self {
let shared = Arc::new(PoolShared {
state: Mutex::new(PoolState {
queue: VecDeque::new(),
closed: false,
}),
wake: Condvar::new(),
});
let mut workers = Vec::with_capacity(config.worker_threads());
for _ in 0..config.worker_threads() {
let shared = Arc::clone(&shared);
workers.push(std::thread::spawn(move || worker_loop(&shared)));
}
Self { shared, workers }
}
pub(crate) fn execute<F>(&self, job: F) -> Result<()>
where
F: FnOnce() + Send + 'static,
{
let mut state = lock(&self.shared.state);
if state.closed {
return Err(PixelFlowError::new(
ErrorCategory::Core,
ErrorCode::new("render.worker_pool_closed"),
"render worker pool is already closed",
));
}
state.queue.push_back(Box::new(job));
self.shared.wake.notify_one();
Ok(())
}
}
impl Drop for WorkerPool {
fn drop(&mut self) {
{
let mut state = lock(&self.shared.state);
state.closed = true;
}
self.shared.wake.notify_all();
for worker in self.workers.drain(..) {
#[expect(
clippy::let_underscore_must_use,
reason = "cannot propagate result during Drop, better to ignore than panic"
)]
let _ = worker.join();
}
}
}
pub(crate) struct OrderedCommitGate {
pending: Mutex<BTreeSet<usize>>,
wake: Condvar,
}
impl OrderedCommitGate {
pub(crate) const fn new() -> Self {
Self {
pending: Mutex::new(BTreeSet::new()),
wake: Condvar::new(),
}
}
pub(crate) fn register(&self, frame_number: usize) -> OrderedCommitTicket<'_> {
let mut pending = lock(&self.pending);
pending.insert(frame_number);
drop(pending);
OrderedCommitTicket {
gate: self,
frame_number,
finished: false,
}
}
}
pub(crate) struct OrderedCommitTicket<'a> {
gate: &'a OrderedCommitGate,
frame_number: usize,
finished: bool,
}
impl OrderedCommitTicket<'_> {
pub(crate) fn wait_turn(&self) {
let mut pending = lock(&self.gate.pending);
while pending.first().copied() != Some(self.frame_number) {
pending = wait(&self.gate.wake, pending);
}
}
pub(crate) fn finish(mut self) {
self.finish_inner();
self.finished = true;
}
fn finish_inner(&self) {
let mut pending = lock(&self.gate.pending);
pending.remove(&self.frame_number);
drop(pending);
self.gate.wake.notify_all();
}
}
impl Drop for OrderedCommitTicket<'_> {
fn drop(&mut self) {
if !self.finished {
self.finish_inner();
}
}
}
pub(crate) struct ConcurrencyGate {
limit: usize,
active: AtomicUsize,
state: Mutex<()>,
wake: Condvar,
}
impl ConcurrencyGate {
pub(crate) fn new(limit: usize) -> Self {
Self {
limit: limit.max(1),
active: AtomicUsize::new(0),
state: Mutex::new(()),
wake: Condvar::new(),
}
}
pub(crate) fn acquire(&self) -> ConcurrencyGuard<'_> {
let mut state = lock(&self.state);
while self.active.load(Ordering::Relaxed) >= self.limit {
state = wait(&self.wake, state);
}
self.active.fetch_add(1, Ordering::Relaxed);
drop(state);
ConcurrencyGuard {
gate: self,
released: false,
}
}
}
pub(crate) struct ConcurrencyGuard<'a> {
gate: &'a ConcurrencyGate,
released: bool,
}
impl ConcurrencyGuard<'_> {
fn release_inner(&self) {
let state = lock(&self.gate.state);
let active = self.gate.active.load(Ordering::Relaxed);
self.gate
.active
.store(active.saturating_sub(1), Ordering::Relaxed);
drop(state);
self.gate.wake.notify_one();
}
}
impl Drop for ConcurrencyGuard<'_> {
fn drop(&mut self) {
if !self.released {
self.release_inner();
self.released = true;
}
}
}
const fn in_window(output: usize, requested: usize, before: usize, after: usize) -> bool {
requested >= output.saturating_sub(before) && requested <= output.saturating_add(after)
}
fn worker_loop(shared: &Arc<PoolShared>) {
loop {
let job = {
let mut state = lock(&shared.state);
loop {
if let Some(job) = state.queue.pop_front() {
break job;
}
if state.closed {
return;
}
state = wait(&shared.wake, state);
}
};
job();
}
}
fn lock<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
mutex
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
}
fn wait<'a, T>(condvar: &Condvar, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> {
condvar
.wait(guard)
.unwrap_or_else(|poisoned| poisoned.into_inner())
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::mpsc;
use std::time::Duration;
use super::{
ConcurrencyClass, ConcurrencyGate, DependencyPattern, DynamicDependencyBounds,
SourceCapabilities, WorkerPoolConfig,
};
#[test]
fn same_frame_contract_accepts_only_matching_frame() {
let contract = DependencyPattern::same_frame();
assert!(contract.allows(10, 10));
assert!(!contract.allows(10, 9));
assert!(!contract.allows(10, 11));
}
#[test]
fn window_contract_accepts_declared_relative_bounds() {
let contract = DependencyPattern::window(2, 1);
assert!(contract.allows(10, 8));
assert!(contract.allows(10, 10));
assert!(contract.allows(10, 11));
assert!(!contract.allows(10, 7));
assert!(!contract.allows(10, 12));
}
#[test]
fn dynamic_future_window_rejects_past_and_far_future() {
let contract = DependencyPattern::dynamic(DynamicDependencyBounds::future_window(3));
assert!(contract.allows(10, 10));
assert!(contract.allows(10, 13));
assert!(!contract.allows(10, 9));
assert!(!contract.allows(10, 14));
}
#[test]
fn worker_pool_config_clamps_zero_to_one() {
assert_eq!(WorkerPoolConfig::new(0).worker_threads(), 1);
assert_eq!(WorkerPoolConfig::new(4).worker_threads(), 4);
}
#[test]
fn source_capabilities_record_concurrency_limit() {
let caps = SourceCapabilities::random_access().with_concurrency_limit(2);
assert!(caps.supports_random_access());
assert_eq!(caps.concurrency_limit(), Some(2));
}
#[test]
fn concurrency_gate_blocks_until_active_work_releases() {
let gate = Arc::new(ConcurrencyGate::new(1));
let first = gate.acquire();
let gate_for_thread = Arc::clone(&gate);
let (started_tx, started_rx) = mpsc::channel();
let (acquired_tx, acquired_rx) = mpsc::channel();
let worker = std::thread::spawn(move || {
started_tx
.send(())
.expect("worker should signal before waiting");
let _second = gate_for_thread.acquire();
acquired_tx
.send(())
.expect("worker should signal after acquiring gate");
});
started_rx.recv().expect("worker should reach acquire call");
assert!(
matches!(
acquired_rx.recv_timeout(Duration::from_millis(100)),
Err(mpsc::RecvTimeoutError::Timeout)
),
"second acquisition should stay blocked while first guard is held"
);
drop(first);
acquired_rx
.recv_timeout(Duration::from_millis(200))
.expect("worker should acquire after first guard drops");
worker.join().expect("worker should join cleanly");
}
#[test]
fn concurrency_classes_are_named_for_diagnostics() {
assert_eq!(ConcurrencyClass::Stateless.as_str(), "stateless");
assert_eq!(
ConcurrencyClass::OrderedStateful.as_str(),
"ordered_stateful"
);
assert_eq!(ConcurrencyClass::Source.as_str(), "source");
}
}