datum-core 0.3.0

Rust stream-processing library mirroring Akka/Pekko Streams Typed, built on Ractor actors
Documentation
use std::{
    cmp::{Ordering as CmpOrdering, Reverse},
    collections::BinaryHeap,
    panic::{AssertUnwindSafe, catch_unwind},
    sync::{
        Arc, Condvar, Mutex,
        atomic::{AtomicBool, Ordering},
    },
    time::{Duration, Instant},
};

use crate::Cancellable;
use crate::stream::runtime::dispatch_stream_job;

/// A single-threaded timer driver backed by a min-heap of deadlines.
///
/// Replaces the v0.1.0 thread-per-timer pattern. All `schedule_*` calls
/// register entries in the heap; one dedicated thread drains them in
/// deadline order.
pub(super) struct TimerDriver {
    inner: Mutex<TimerInner>,
    condvar: Condvar,
    #[cfg(test)]
    live: Arc<AtomicBool>,
    #[cfg(test)]
    thread_name: Arc<str>,
}

impl std::fmt::Debug for TimerDriver {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("TimerDriver").finish_non_exhaustive()
    }
}

struct TimerInner {
    heap: BinaryHeap<Reverse<TimerEntry>>,
    next_id: u64,
    stopped: bool,
}

struct TimerEntry {
    deadline: Instant,
    id: u64,
    kind: TimerKind,
}

enum TimerKind {
    Once {
        task: Option<Box<dyn FnOnce() + Send>>,
        cancelled: Arc<AtomicBool>,
        shutdown: Arc<AtomicBool>,
    },
    FixedDelay {
        task: Arc<dyn Fn() + Send + Sync>,
        delay: Duration,
        cancelled: Arc<AtomicBool>,
        shutdown: Arc<AtomicBool>,
    },
    FixedRate {
        task: Arc<dyn Fn() + Send + Sync>,
        interval: Duration,
        cancelled: Arc<AtomicBool>,
        shutdown: Arc<AtomicBool>,
    },
}

impl PartialEq for TimerEntry {
    fn eq(&self, other: &Self) -> bool {
        self.deadline == other.deadline && self.id == other.id
    }
}
impl Eq for TimerEntry {}
impl PartialOrd for TimerEntry {
    fn partial_cmp(&self, other: &Self) -> Option<CmpOrdering> {
        Some(self.cmp(other))
    }
}
impl Ord for TimerEntry {
    fn cmp(&self, other: &Self) -> CmpOrdering {
        self.deadline
            .cmp(&other.deadline)
            .then_with(|| self.id.cmp(&other.id))
    }
}

impl TimerDriver {
    pub(super) fn launch(name: &str) -> Arc<Self> {
        let driver = Arc::new(Self {
            inner: Mutex::new(TimerInner {
                heap: BinaryHeap::new(),
                next_id: 0,
                stopped: false,
            }),
            condvar: Condvar::new(),
            #[cfg(test)]
            live: Arc::new(AtomicBool::new(false)),
            #[cfg(test)]
            thread_name: Arc::from(name),
        });
        let driver_clone = Arc::clone(&driver);
        std::thread::Builder::new()
            .name(name.to_owned())
            .spawn(move || {
                #[cfg(test)]
                struct LiveGuard {
                    live: Arc<AtomicBool>,
                }
                #[cfg(test)]
                impl Drop for LiveGuard {
                    fn drop(&mut self) {
                        self.live.store(false, Ordering::SeqCst);
                    }
                }
                #[cfg(test)]
                let _guard = LiveGuard {
                    live: Arc::clone(&driver_clone.live),
                };

                #[cfg(test)]
                driver_clone.live.store(true, Ordering::SeqCst);
                driver_clone.run()
            })
            .expect("timer driver thread spawns");
        driver
    }

    fn run(self: Arc<Self>) {
        loop {
            let entry = {
                let mut inner = self.inner.lock().unwrap_or_else(|p| p.into_inner());
                loop {
                    if inner.stopped {
                        return;
                    }
                    if let Some(Reverse(entry)) = inner.heap.peek() {
                        let now = Instant::now();
                        if entry.deadline <= now {
                            let Reverse(entry) = inner.heap.pop().expect("just peeked");
                            break entry;
                        }
                        let wait = entry.deadline - now;
                        let (next, timeout) = self
                            .condvar
                            .wait_timeout(inner, wait)
                            .unwrap_or_else(|p| p.into_inner());
                        inner = next;
                        if timeout.timed_out() {
                            // Deadline reached — loop again to pop it.
                        }
                    } else {
                        if inner.stopped {
                            return;
                        }
                        inner = self.condvar.wait(inner).unwrap_or_else(|p| p.into_inner());
                    }
                }
            };
            self.fire(entry);
        }
    }

    fn fire(self: &Arc<Self>, entry: TimerEntry) {
        let shutdown = match &entry.kind {
            TimerKind::Once { shutdown, .. }
            | TimerKind::FixedDelay { shutdown, .. }
            | TimerKind::FixedRate { shutdown, .. } => shutdown,
        };
        if shutdown.load(Ordering::SeqCst) {
            return;
        }

        match entry.kind {
            TimerKind::Once {
                task, cancelled, ..
            } => {
                if cancelled.load(Ordering::SeqCst) {
                    return;
                }
                if let Some(task) = task {
                    dispatch_stream_job(Box::new(move || {
                        let _ = catch_unwind(AssertUnwindSafe(task));
                    }));
                }
            }
            TimerKind::FixedDelay {
                task,
                delay,
                cancelled,
                shutdown,
            } => {
                if cancelled.load(Ordering::SeqCst) {
                    return;
                }
                let driver = Arc::clone(self);
                dispatch_stream_job(Box::new(move || {
                    if cancelled.load(Ordering::SeqCst) || shutdown.load(Ordering::SeqCst) {
                        return;
                    }
                    let completed = catch_unwind(AssertUnwindSafe(|| task())).is_ok();
                    if !completed
                        || cancelled.load(Ordering::SeqCst)
                        || shutdown.load(Ordering::SeqCst)
                    {
                        return;
                    }
                    driver.schedule_kind(
                        Instant::now() + delay,
                        TimerKind::FixedDelay {
                            task,
                            delay,
                            cancelled,
                            shutdown,
                        },
                    );
                }));
            }
            TimerKind::FixedRate {
                task,
                interval,
                cancelled,
                shutdown,
            } => {
                if cancelled.load(Ordering::SeqCst) {
                    return;
                }
                let driver = Arc::clone(self);
                dispatch_stream_job(Box::new(move || {
                    if cancelled.load(Ordering::SeqCst) || shutdown.load(Ordering::SeqCst) {
                        return;
                    }
                    let completed = catch_unwind(AssertUnwindSafe(|| task())).is_ok();
                    if !completed
                        || cancelled.load(Ordering::SeqCst)
                        || shutdown.load(Ordering::SeqCst)
                    {
                        return;
                    }
                    driver.schedule_kind(
                        (entry.deadline + interval).max(Instant::now()),
                        TimerKind::FixedRate {
                            task,
                            interval,
                            cancelled,
                            shutdown,
                        },
                    );
                }));
            }
        }
    }

    fn schedule_kind(&self, deadline: Instant, kind: TimerKind) {
        let mut inner = self.inner.lock().unwrap_or_else(|p| p.into_inner());
        let id = inner.next_id;
        inner.next_id = inner.next_id.wrapping_add(1);
        inner.heap.push(Reverse(TimerEntry { deadline, id, kind }));
        drop(inner);
        self.condvar.notify_one();
    }

    pub(super) fn schedule_once(
        self: &Arc<Self>,
        delay: Duration,
        task: impl FnOnce() + Send + 'static,
        shutdown: Arc<AtomicBool>,
        keep_alive: Arc<dyn Send + Sync>,
    ) -> Cancellable {
        let cancellable = Cancellable::new_with_keep_alive(Some(keep_alive));
        let cancelled = Arc::clone(&cancellable.cancelled);
        self.schedule_kind(
            Instant::now() + delay,
            TimerKind::Once {
                task: Some(Box::new(task)),
                cancelled,
                shutdown,
            },
        );
        cancellable
    }

    pub(super) fn schedule_with_fixed_delay(
        self: &Arc<Self>,
        initial_delay: Duration,
        delay: Duration,
        task: impl Fn() + Send + Sync + 'static,
        shutdown: Arc<AtomicBool>,
        keep_alive: Arc<dyn Send + Sync>,
    ) -> Cancellable {
        assert!(delay > Duration::ZERO);
        let cancellable = Cancellable::new_with_keep_alive(Some(keep_alive));
        let cancelled = Arc::clone(&cancellable.cancelled);
        self.schedule_kind(
            Instant::now() + initial_delay,
            TimerKind::FixedDelay {
                task: Arc::new(task),
                delay,
                cancelled,
                shutdown,
            },
        );
        cancellable
    }

    pub(super) fn schedule_at_fixed_rate(
        self: &Arc<Self>,
        initial_delay: Duration,
        interval: Duration,
        task: impl Fn() + Send + Sync + 'static,
        shutdown: Arc<AtomicBool>,
        keep_alive: Arc<dyn Send + Sync>,
    ) -> Cancellable {
        assert!(interval > Duration::ZERO);
        let cancellable = Cancellable::new_with_keep_alive(Some(keep_alive));
        let cancelled = Arc::clone(&cancellable.cancelled);
        self.schedule_kind(
            Instant::now() + initial_delay,
            TimerKind::FixedRate {
                task: Arc::new(task),
                interval,
                cancelled,
                shutdown,
            },
        );
        cancellable
    }

    pub(super) fn stop(&self) {
        let mut inner = self.inner.lock().unwrap_or_else(|p| p.into_inner());
        inner.stopped = true;
        inner.heap.clear();
        drop(inner);
        self.condvar.notify_all();
    }
}

#[cfg(test)]
impl TimerDriver {
    pub(super) fn is_live(&self) -> bool {
        self.live.load(Ordering::SeqCst)
    }

    pub(super) fn thread_name(&self) -> &str {
        &self.thread_name
    }
}