Skip to main content

datum/stream/
runtime.rs

1//! `Runtime` (alias `Materializer`) and the `StreamExecutor` thread pool.
2//!
3//! The executor grows on demand and reaps idle workers after a 10s timeout — it
4//! must NOT become fixed-size, which would deadlock once `num_cpus` long-lived
5//! streams (`Sink::never`, infinite `map_async`/ask) are live. A spawn failure
6//! falls back to running the job inline. `runtime_checked_stream` wraps a
7//! `BoxStream` with per-element shutdown and cancellation checks.
8
9use super::completion::StreamCancellation;
10use super::*;
11use crate::Attributes;
12use crate::stream::timer::TimerDriver;
13use std::cell::RefCell;
14
15thread_local! {
16    static CURRENT_STREAM_CANCELLED: RefCell<Option<Arc<AtomicBool>>> = const { RefCell::new(None) };
17}
18
19pub(super) struct CurrentStreamCancelledGuard {
20    previous: Option<Arc<AtomicBool>>,
21}
22
23impl Drop for CurrentStreamCancelledGuard {
24    fn drop(&mut self) {
25        let previous = self.previous.take();
26        CURRENT_STREAM_CANCELLED.with(|slot| {
27            *slot.borrow_mut() = previous;
28        });
29    }
30}
31
32pub(super) fn current_stream_cancelled() -> Option<Arc<AtomicBool>> {
33    CURRENT_STREAM_CANCELLED.with(|slot| slot.borrow().clone())
34}
35
36pub(super) fn set_current_stream_cancelled(
37    cancelled: &Arc<AtomicBool>,
38) -> CurrentStreamCancelledGuard {
39    let previous = CURRENT_STREAM_CANCELLED.with(|slot| slot.replace(Some(Arc::clone(cancelled))));
40    CurrentStreamCancelledGuard { previous }
41}
42
43static NEXT_TIMER_DRIVER_ID: AtomicUsize = AtomicUsize::new(0);
44
45#[derive(Clone)]
46pub struct Runtime {
47    pub(super) inner: Arc<RuntimeInner>,
48    name_prefix: Arc<str>,
49    attributes: Attributes,
50}
51
52#[derive(Debug)]
53pub(super) struct RuntimeInner {
54    pub(super) state: Arc<RuntimeState>,
55    timer: Arc<TimerDriver>,
56}
57
58#[derive(Debug)]
59pub(super) struct RuntimeState {
60    pub(super) shutdown: Arc<AtomicBool>,
61    active_streams: AtomicUsize,
62}
63
64impl RuntimeState {
65    fn new() -> Self {
66        Self {
67            shutdown: Arc::new(AtomicBool::new(false)),
68            active_streams: AtomicUsize::new(0),
69        }
70    }
71}
72
73struct ActiveStreamGuard {
74    state: Arc<RuntimeState>,
75}
76
77impl ActiveStreamGuard {
78    fn decrement_on_drop(state: Arc<RuntimeState>) -> Self {
79        Self { state }
80    }
81}
82
83impl Drop for ActiveStreamGuard {
84    fn drop(&mut self) {
85        self.state.active_streams.fetch_sub(1, Ordering::SeqCst);
86    }
87}
88
89pub type Materializer = Runtime;
90
91fn run_stream_task<T, F>(
92    state: &RuntimeState,
93    cancelled: Arc<AtomicBool>,
94    run: F,
95) -> StreamResult<T>
96where
97    F: FnOnce(Arc<AtomicBool>) -> StreamResult<T>,
98{
99    if state.shutdown.load(Ordering::SeqCst) {
100        Err(StreamError::AbruptTermination)
101    } else if cancelled.load(Ordering::SeqCst) {
102        Err(StreamError::Cancelled)
103    } else {
104        catch_unwind(AssertUnwindSafe(|| run(cancelled)))
105            .unwrap_or(Err(StreamError::AbruptTermination))
106    }
107}
108
109impl Runtime {
110    #[must_use]
111    pub fn new() -> Self {
112        let timer_id = NEXT_TIMER_DRIVER_ID.fetch_add(1, Ordering::SeqCst);
113        Self {
114            inner: Arc::new(RuntimeInner {
115                state: Arc::new(RuntimeState::new()),
116                timer: TimerDriver::launch(&format!("datum-tmr-{timer_id:x}")),
117            }),
118            name_prefix: Arc::from("datum-stream"),
119            attributes: Attributes::default(),
120        }
121    }
122
123    #[must_use]
124    pub fn with_name_prefix(&self, name_prefix: impl Into<Arc<str>>) -> Self {
125        Self {
126            inner: Arc::clone(&self.inner),
127            name_prefix: name_prefix.into(),
128            attributes: self.attributes.clone(),
129        }
130    }
131
132    #[must_use]
133    pub fn name_prefix(&self) -> &str {
134        &self.name_prefix
135    }
136
137    #[must_use]
138    pub fn with_attributes(&self, attributes: Attributes) -> Self {
139        Self {
140            inner: Arc::clone(&self.inner),
141            name_prefix: Arc::clone(&self.name_prefix),
142            attributes,
143        }
144    }
145
146    #[must_use]
147    pub fn attributes(&self) -> &Attributes {
148        &self.attributes
149    }
150
151    #[must_use]
152    pub fn effective_attributes(&self, local: &Attributes) -> Attributes {
153        self.attributes.clone().and(local.clone())
154    }
155
156    pub fn shutdown(&self) {
157        self.inner.state.shutdown.store(true, Ordering::SeqCst);
158        self.inner.timer.stop();
159    }
160
161    #[must_use]
162    pub fn is_shutdown(&self) -> bool {
163        self.inner.state.shutdown.load(Ordering::SeqCst)
164    }
165
166    #[must_use]
167    pub fn active_streams(&self) -> usize {
168        self.inner.state.active_streams.load(Ordering::SeqCst)
169    }
170
171    pub fn materialize<Mat: Send + 'static>(
172        &self,
173        graph: &RunnableGraph<Mat>,
174    ) -> StreamResult<Mat> {
175        if self.is_shutdown() {
176            return Err(StreamError::AbruptTermination);
177        }
178
179        (graph.runner)(self)
180    }
181
182    pub fn schedule_once<F>(&self, delay: Duration, task: F) -> Cancellable
183    where
184        F: FnOnce() + Send + 'static,
185    {
186        let keep_alive: Arc<dyn Send + Sync> = Arc::clone(&self.inner) as Arc<dyn Send + Sync>;
187        self.inner.timer.schedule_once(
188            delay,
189            task,
190            Arc::clone(&self.inner.state.shutdown),
191            keep_alive,
192        )
193    }
194
195    pub fn schedule_with_fixed_delay<F>(
196        &self,
197        initial_delay: Duration,
198        delay: Duration,
199        task: F,
200    ) -> Cancellable
201    where
202        F: Fn() + Send + Sync + 'static,
203    {
204        let keep_alive: Arc<dyn Send + Sync> = Arc::clone(&self.inner) as Arc<dyn Send + Sync>;
205        self.inner.timer.schedule_with_fixed_delay(
206            initial_delay,
207            delay,
208            task,
209            Arc::clone(&self.inner.state.shutdown),
210            keep_alive,
211        )
212    }
213
214    pub fn schedule_at_fixed_rate<F>(
215        &self,
216        initial_delay: Duration,
217        interval: Duration,
218        task: F,
219    ) -> Cancellable
220    where
221        F: Fn() + Send + Sync + 'static,
222    {
223        let keep_alive: Arc<dyn Send + Sync> = Arc::clone(&self.inner) as Arc<dyn Send + Sync>;
224        self.inner.timer.schedule_at_fixed_rate(
225            initial_delay,
226            interval,
227            task,
228            Arc::clone(&self.inner.state.shutdown),
229            keep_alive,
230        )
231    }
232
233    pub(crate) fn spawn_stream<T, F>(&self, run: F) -> StreamCompletion<T>
234    where
235        T: Send + 'static,
236        F: FnOnce(Arc<AtomicBool>) -> StreamResult<T> + Send + 'static,
237    {
238        if self.is_shutdown() {
239            return StreamCompletion::ready(Err(StreamError::AbruptTermination));
240        }
241
242        let (sender, receiver) = oneshot::channel();
243        let state = Arc::clone(&self.inner.state);
244        let cancellation = StreamCancellation::new();
245        let task_cancelled = cancellation.cancelled();
246        let task_cancellation = cancellation.clone();
247        state.active_streams.fetch_add(1, Ordering::SeqCst);
248        default_stream_executor().execute(Box::new(move || {
249            let _worker = task_cancellation.register_current_worker();
250            let result = {
251                let _active = ActiveStreamGuard::decrement_on_drop(Arc::clone(&state));
252                run_stream_task(&state, task_cancelled, run)
253            };
254            let _ = sender.send(result);
255        }));
256
257        StreamCompletion::from_receiver(receiver, Some(cancellation))
258    }
259
260    pub(super) fn spawn_stream_inline<T, F>(&self, run: F) -> StreamCompletion<T>
261    where
262        T: Send + 'static,
263        F: FnOnce(Arc<AtomicBool>) -> StreamResult<T>,
264    {
265        if self.is_shutdown() {
266            return StreamCompletion::ready(Err(StreamError::AbruptTermination));
267        }
268
269        let state = Arc::clone(&self.inner.state);
270        let cancelled = Arc::new(AtomicBool::new(false));
271        state.active_streams.fetch_add(1, Ordering::SeqCst);
272        let result = {
273            let _active = ActiveStreamGuard::decrement_on_drop(Arc::clone(&state));
274            run_stream_task(&state, cancelled, run)
275        };
276        StreamCompletion::ready(result)
277    }
278
279    pub(crate) fn checked_stream<T: Send + 'static>(
280        &self,
281        input: BoxStream<T>,
282        cancelled: Option<Arc<AtomicBool>>,
283    ) -> BoxStream<T> {
284        runtime_checked_stream(input, Arc::clone(&self.inner.state), cancelled)
285    }
286}
287
288type StreamJob = Box<dyn FnOnce() + Send>;
289
290/// Process-wide pool that runs each materialized stream's (synchronous,
291/// potentially long-lived or busy-spinning) drain loop on a worker thread.
292///
293/// Unlike a fixed-size executor, this pool reuses idle workers and spawns a new
294/// one whenever a job arrives while every worker is busy. A stream that
295/// monopolizes its worker for its whole lifetime (`Sink::never`, an infinite
296/// `map_async`/ask flow, etc.) therefore cannot starve other streams: the pool
297/// grows to match the number of concurrently running streams instead of
298/// dead-locking once `num_cpus` of them are live.
299struct StreamExecutor {
300    shared: Arc<ExecutorShared>,
301}
302
303struct ExecutorShared {
304    inner: Mutex<ExecutorInner>,
305    available: Condvar,
306    name_prefix: &'static str,
307}
308
309struct ExecutorInner {
310    queue: VecDeque<StreamJob>,
311    idle: usize,
312    workers: usize,
313}
314
315impl StreamExecutor {
316    fn new(name_prefix: &'static str) -> Self {
317        Self {
318            shared: Arc::new(ExecutorShared {
319                inner: Mutex::new(ExecutorInner {
320                    queue: VecDeque::new(),
321                    idle: 0,
322                    workers: 0,
323                }),
324                available: Condvar::new(),
325                name_prefix,
326            }),
327        }
328    }
329
330    fn execute(&self, job: StreamJob) {
331        let mut inner = self
332            .shared
333            .inner
334            .lock()
335            .unwrap_or_else(|poison| poison.into_inner());
336
337        if inner.idle > inner.queue.len() {
338            // A parked worker can take this immediately. Jobs already in the
339            // queue reserve idle workers that have been notified but have not
340            // yet woken up.
341            inner.queue.push_back(job);
342            drop(inner);
343            self.shared.available.notify_one();
344            return;
345        }
346
347        // No worker is parked; grow the pool. Reserve the worker slot, then only
348        // enqueue the job once the thread actually spawned, so a spawn failure
349        // cannot strand a queued job or pop an unrelated one.
350        inner.workers += 1;
351        let worker_index = inner.workers;
352        drop(inner);
353
354        let shared = Arc::clone(&self.shared);
355        let name = format!("{}-{worker_index}", self.shared.name_prefix);
356        match thread::Builder::new()
357            .name(name)
358            .spawn(move || worker_loop(&shared))
359        {
360            Ok(_) => {
361                let mut inner = self
362                    .shared
363                    .inner
364                    .lock()
365                    .unwrap_or_else(|poison| poison.into_inner());
366                inner.queue.push_back(job);
367                drop(inner);
368                self.shared.available.notify_one();
369            }
370            Err(_) => {
371                // Spawning failed (e.g. resource exhaustion); undo the reserved
372                // slot and run the job inline so the stream still makes progress.
373                let mut inner = self
374                    .shared
375                    .inner
376                    .lock()
377                    .unwrap_or_else(|poison| poison.into_inner());
378                inner.workers -= 1;
379                drop(inner);
380                job();
381            }
382        }
383    }
384}
385
386fn worker_loop(shared: &ExecutorShared) {
387    // Decrement the live-worker count on any exit (idle timeout or a job panic)
388    // so `execute` knows to spawn a replacement.
389    struct WorkerGuard<'a> {
390        shared: &'a ExecutorShared,
391    }
392    impl Drop for WorkerGuard<'_> {
393        fn drop(&mut self) {
394            let mut inner = self
395                .shared
396                .inner
397                .lock()
398                .unwrap_or_else(|poison| poison.into_inner());
399            inner.workers -= 1;
400        }
401    }
402    let _guard = WorkerGuard { shared };
403
404    // Reap workers that have been idle for a while so bursts of concurrent
405    // streams do not leak threads indefinitely.
406    const IDLE_TIMEOUT: Duration = Duration::from_secs(10);
407    loop {
408        let job = {
409            let mut inner = shared
410                .inner
411                .lock()
412                .unwrap_or_else(|poison| poison.into_inner());
413            loop {
414                if let Some(job) = inner.queue.pop_front() {
415                    break job;
416                }
417                inner.idle += 1;
418                let (next, timeout) = shared
419                    .available
420                    .wait_timeout(inner, IDLE_TIMEOUT)
421                    .unwrap_or_else(|poison| poison.into_inner());
422                inner = next;
423                inner.idle -= 1;
424                if timeout.timed_out() && inner.queue.is_empty() {
425                    return;
426                }
427            }
428        };
429        job();
430    }
431}
432
433fn default_stream_executor() -> &'static StreamExecutor {
434    static EXECUTOR: OnceLock<StreamExecutor> = OnceLock::new();
435    EXECUTOR.get_or_init(|| StreamExecutor::new("datum-stream-runtime"))
436}
437
438pub(super) fn dispatch_stream_job(job: StreamJob) {
439    default_stream_executor().execute(job);
440}
441
442impl Default for Runtime {
443    fn default() -> Self {
444        Self::new()
445    }
446}
447
448impl Drop for RuntimeInner {
449    fn drop(&mut self) {
450        self.timer.stop();
451    }
452}
453
454#[cfg(test)]
455impl Runtime {
456    pub(super) fn timer_driver_is_live(&self) -> bool {
457        self.inner.timer.is_live()
458    }
459
460    pub(super) fn timer_thread_name(&self) -> &str {
461        self.inner.timer.thread_name()
462    }
463}
464
465pub(super) fn runtime_checked_stream<T: Send + 'static>(
466    mut input: BoxStream<T>,
467    state: Arc<RuntimeState>,
468    cancelled: Option<Arc<AtomicBool>>,
469) -> BoxStream<T> {
470    let mut terminated = false;
471    Box::new(std::iter::from_fn(move || {
472        if terminated {
473            return None;
474        }
475
476        if state.shutdown.load(Ordering::SeqCst) {
477            terminated = true;
478            return Some(Err(StreamError::AbruptTermination));
479        }
480        if cancelled
481            .as_ref()
482            .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
483        {
484            terminated = true;
485            return Some(Err(StreamError::Cancelled));
486        }
487
488        let previous = cancelled
489            .is_some()
490            .then(|| CURRENT_STREAM_CANCELLED.with(|slot| slot.replace(cancelled.clone())));
491        let next = input.next();
492        if let Some(previous) = previous {
493            CURRENT_STREAM_CANCELLED.with(|slot| {
494                *slot.borrow_mut() = previous;
495            });
496        }
497        if next.is_none() {
498            terminated = true;
499        }
500        next
501    }))
502}