Skip to main content

datum/stream/
runtime.rs

1use super::completion::StreamCancellation;
2use super::*;
3use crate::Attributes;
4use crate::stream::timer::TimerDriver;
5use std::cell::RefCell;
6
7thread_local! {
8    static CURRENT_STREAM_CANCELLED: RefCell<Option<Arc<AtomicBool>>> = const { RefCell::new(None) };
9}
10
11pub(super) fn current_stream_cancelled() -> Option<Arc<AtomicBool>> {
12    CURRENT_STREAM_CANCELLED.with(|slot| slot.borrow().clone())
13}
14
15static NEXT_TIMER_DRIVER_ID: AtomicUsize = AtomicUsize::new(0);
16
17#[derive(Clone)]
18pub struct Runtime {
19    pub(super) inner: Arc<RuntimeInner>,
20    name_prefix: Arc<str>,
21    attributes: Attributes,
22}
23
24#[derive(Debug)]
25pub(super) struct RuntimeInner {
26    pub(super) state: Arc<RuntimeState>,
27    timer: Arc<TimerDriver>,
28}
29
30#[derive(Debug)]
31pub(super) struct RuntimeState {
32    pub(super) shutdown: Arc<AtomicBool>,
33    active_streams: AtomicUsize,
34}
35
36impl RuntimeState {
37    fn new() -> Self {
38        Self {
39            shutdown: Arc::new(AtomicBool::new(false)),
40            active_streams: AtomicUsize::new(0),
41        }
42    }
43}
44
45struct ActiveStreamGuard {
46    state: Arc<RuntimeState>,
47}
48
49impl ActiveStreamGuard {
50    fn decrement_on_drop(state: Arc<RuntimeState>) -> Self {
51        Self { state }
52    }
53}
54
55impl Drop for ActiveStreamGuard {
56    fn drop(&mut self) {
57        self.state.active_streams.fetch_sub(1, Ordering::SeqCst);
58    }
59}
60
61pub type Materializer = Runtime;
62
63fn run_stream_task<T, F>(
64    state: &RuntimeState,
65    cancelled: Arc<AtomicBool>,
66    run: F,
67) -> StreamResult<T>
68where
69    F: FnOnce(Arc<AtomicBool>) -> StreamResult<T>,
70{
71    if state.shutdown.load(Ordering::SeqCst) {
72        Err(StreamError::AbruptTermination)
73    } else if cancelled.load(Ordering::SeqCst) {
74        Err(StreamError::Cancelled)
75    } else {
76        catch_unwind(AssertUnwindSafe(|| run(cancelled)))
77            .unwrap_or(Err(StreamError::AbruptTermination))
78    }
79}
80
81impl Runtime {
82    #[must_use]
83    pub fn new() -> Self {
84        let timer_id = NEXT_TIMER_DRIVER_ID.fetch_add(1, Ordering::SeqCst);
85        Self {
86            inner: Arc::new(RuntimeInner {
87                state: Arc::new(RuntimeState::new()),
88                timer: TimerDriver::launch(&format!("datum-timer-{timer_id}")),
89            }),
90            name_prefix: Arc::from("datum-stream"),
91            attributes: Attributes::default(),
92        }
93    }
94
95    #[must_use]
96    pub fn with_name_prefix(&self, name_prefix: impl Into<Arc<str>>) -> Self {
97        Self {
98            inner: Arc::clone(&self.inner),
99            name_prefix: name_prefix.into(),
100            attributes: self.attributes.clone(),
101        }
102    }
103
104    #[must_use]
105    pub fn name_prefix(&self) -> &str {
106        &self.name_prefix
107    }
108
109    #[must_use]
110    pub fn with_attributes(&self, attributes: Attributes) -> Self {
111        Self {
112            inner: Arc::clone(&self.inner),
113            name_prefix: Arc::clone(&self.name_prefix),
114            attributes,
115        }
116    }
117
118    #[must_use]
119    pub fn attributes(&self) -> &Attributes {
120        &self.attributes
121    }
122
123    #[must_use]
124    pub fn effective_attributes(&self, local: &Attributes) -> Attributes {
125        self.attributes.clone().and(local.clone())
126    }
127
128    pub fn shutdown(&self) {
129        self.inner.state.shutdown.store(true, Ordering::SeqCst);
130        self.inner.timer.stop();
131    }
132
133    #[must_use]
134    pub fn is_shutdown(&self) -> bool {
135        self.inner.state.shutdown.load(Ordering::SeqCst)
136    }
137
138    #[must_use]
139    pub fn active_streams(&self) -> usize {
140        self.inner.state.active_streams.load(Ordering::SeqCst)
141    }
142
143    pub fn materialize<Mat: Send + 'static>(
144        &self,
145        graph: &RunnableGraph<Mat>,
146    ) -> StreamResult<Mat> {
147        if self.is_shutdown() {
148            return Err(StreamError::AbruptTermination);
149        }
150
151        (graph.runner)(self)
152    }
153
154    pub fn schedule_once<F>(&self, delay: Duration, task: F) -> Cancellable
155    where
156        F: FnOnce() + Send + 'static,
157    {
158        let keep_alive: Arc<dyn Send + Sync> = Arc::clone(&self.inner) as Arc<dyn Send + Sync>;
159        self.inner.timer.schedule_once(
160            delay,
161            task,
162            Arc::clone(&self.inner.state.shutdown),
163            keep_alive,
164        )
165    }
166
167    pub fn schedule_with_fixed_delay<F>(
168        &self,
169        initial_delay: Duration,
170        delay: Duration,
171        task: F,
172    ) -> Cancellable
173    where
174        F: Fn() + Send + Sync + 'static,
175    {
176        let keep_alive: Arc<dyn Send + Sync> = Arc::clone(&self.inner) as Arc<dyn Send + Sync>;
177        self.inner.timer.schedule_with_fixed_delay(
178            initial_delay,
179            delay,
180            task,
181            Arc::clone(&self.inner.state.shutdown),
182            keep_alive,
183        )
184    }
185
186    pub fn schedule_at_fixed_rate<F>(
187        &self,
188        initial_delay: Duration,
189        interval: Duration,
190        task: F,
191    ) -> Cancellable
192    where
193        F: Fn() + Send + Sync + 'static,
194    {
195        let keep_alive: Arc<dyn Send + Sync> = Arc::clone(&self.inner) as Arc<dyn Send + Sync>;
196        self.inner.timer.schedule_at_fixed_rate(
197            initial_delay,
198            interval,
199            task,
200            Arc::clone(&self.inner.state.shutdown),
201            keep_alive,
202        )
203    }
204
205    pub(crate) fn spawn_stream<T, F>(&self, run: F) -> StreamCompletion<T>
206    where
207        T: Send + 'static,
208        F: FnOnce(Arc<AtomicBool>) -> StreamResult<T> + Send + 'static,
209    {
210        if self.is_shutdown() {
211            return StreamCompletion::ready(Err(StreamError::AbruptTermination));
212        }
213
214        let (sender, receiver) = oneshot::channel();
215        let state = Arc::clone(&self.inner.state);
216        let cancellation = StreamCancellation::new();
217        let task_cancelled = cancellation.cancelled();
218        let task_cancellation = cancellation.clone();
219        state.active_streams.fetch_add(1, Ordering::SeqCst);
220        default_stream_executor().execute(Box::new(move || {
221            let _worker = task_cancellation.register_current_worker();
222            let result = {
223                let _active = ActiveStreamGuard::decrement_on_drop(Arc::clone(&state));
224                run_stream_task(&state, task_cancelled, run)
225            };
226            let _ = sender.send(result);
227        }));
228
229        StreamCompletion::from_receiver(receiver, Some(cancellation))
230    }
231
232    pub(super) fn spawn_stream_inline<T, F>(&self, run: F) -> StreamCompletion<T>
233    where
234        T: Send + 'static,
235        F: FnOnce(Arc<AtomicBool>) -> StreamResult<T>,
236    {
237        if self.is_shutdown() {
238            return StreamCompletion::ready(Err(StreamError::AbruptTermination));
239        }
240
241        let state = Arc::clone(&self.inner.state);
242        let cancelled = Arc::new(AtomicBool::new(false));
243        state.active_streams.fetch_add(1, Ordering::SeqCst);
244        let result = {
245            let _active = ActiveStreamGuard::decrement_on_drop(Arc::clone(&state));
246            run_stream_task(&state, cancelled, run)
247        };
248        StreamCompletion::ready(result)
249    }
250}
251
252type StreamJob = Box<dyn FnOnce() + Send>;
253
254/// Process-wide pool that runs each materialized stream's (synchronous,
255/// potentially long-lived or busy-spinning) drain loop on a worker thread.
256///
257/// Unlike a fixed-size executor, this pool reuses idle workers and spawns a new
258/// one whenever a job arrives while every worker is busy. A stream that
259/// monopolizes its worker for its whole lifetime (`Sink::never`, an infinite
260/// `map_async`/ask flow, etc.) therefore cannot starve other streams: the pool
261/// grows to match the number of concurrently running streams instead of
262/// dead-locking once `num_cpus` of them are live.
263struct StreamExecutor {
264    shared: Arc<ExecutorShared>,
265}
266
267struct ExecutorShared {
268    inner: Mutex<ExecutorInner>,
269    available: Condvar,
270    name_prefix: &'static str,
271}
272
273struct ExecutorInner {
274    queue: VecDeque<StreamJob>,
275    idle: usize,
276    workers: usize,
277}
278
279impl StreamExecutor {
280    fn new(name_prefix: &'static str) -> Self {
281        Self {
282            shared: Arc::new(ExecutorShared {
283                inner: Mutex::new(ExecutorInner {
284                    queue: VecDeque::new(),
285                    idle: 0,
286                    workers: 0,
287                }),
288                available: Condvar::new(),
289                name_prefix,
290            }),
291        }
292    }
293
294    fn execute(&self, job: StreamJob) {
295        let mut inner = self
296            .shared
297            .inner
298            .lock()
299            .unwrap_or_else(|poison| poison.into_inner());
300
301        if inner.idle > inner.queue.len() {
302            // A parked worker can take this immediately. Jobs already in the
303            // queue reserve idle workers that have been notified but have not
304            // yet woken up.
305            inner.queue.push_back(job);
306            drop(inner);
307            self.shared.available.notify_one();
308            return;
309        }
310
311        // No worker is parked; grow the pool. Reserve the worker slot, then only
312        // enqueue the job once the thread actually spawned, so a spawn failure
313        // cannot strand a queued job or pop an unrelated one.
314        inner.workers += 1;
315        let worker_index = inner.workers;
316        drop(inner);
317
318        let shared = Arc::clone(&self.shared);
319        let name = format!("{}-{worker_index}", self.shared.name_prefix);
320        match thread::Builder::new()
321            .name(name)
322            .spawn(move || worker_loop(&shared))
323        {
324            Ok(_) => {
325                let mut inner = self
326                    .shared
327                    .inner
328                    .lock()
329                    .unwrap_or_else(|poison| poison.into_inner());
330                inner.queue.push_back(job);
331                drop(inner);
332                self.shared.available.notify_one();
333            }
334            Err(_) => {
335                // Spawning failed (e.g. resource exhaustion); undo the reserved
336                // slot and run the job inline so the stream still makes progress.
337                let mut inner = self
338                    .shared
339                    .inner
340                    .lock()
341                    .unwrap_or_else(|poison| poison.into_inner());
342                inner.workers -= 1;
343                drop(inner);
344                job();
345            }
346        }
347    }
348}
349
350fn worker_loop(shared: &ExecutorShared) {
351    // Decrement the live-worker count on any exit (idle timeout or a job panic)
352    // so `execute` knows to spawn a replacement.
353    struct WorkerGuard<'a> {
354        shared: &'a ExecutorShared,
355    }
356    impl Drop for WorkerGuard<'_> {
357        fn drop(&mut self) {
358            let mut inner = self
359                .shared
360                .inner
361                .lock()
362                .unwrap_or_else(|poison| poison.into_inner());
363            inner.workers -= 1;
364        }
365    }
366    let _guard = WorkerGuard { shared };
367
368    // Reap workers that have been idle for a while so bursts of concurrent
369    // streams do not leak threads indefinitely.
370    const IDLE_TIMEOUT: Duration = Duration::from_secs(10);
371    loop {
372        let job = {
373            let mut inner = shared
374                .inner
375                .lock()
376                .unwrap_or_else(|poison| poison.into_inner());
377            loop {
378                if let Some(job) = inner.queue.pop_front() {
379                    break job;
380                }
381                inner.idle += 1;
382                let (next, timeout) = shared
383                    .available
384                    .wait_timeout(inner, IDLE_TIMEOUT)
385                    .unwrap_or_else(|poison| poison.into_inner());
386                inner = next;
387                inner.idle -= 1;
388                if timeout.timed_out() && inner.queue.is_empty() {
389                    return;
390                }
391            }
392        };
393        job();
394    }
395}
396
397fn default_stream_executor() -> &'static StreamExecutor {
398    static EXECUTOR: OnceLock<StreamExecutor> = OnceLock::new();
399    EXECUTOR.get_or_init(|| StreamExecutor::new("datum-stream-runtime"))
400}
401
402pub(super) fn dispatch_stream_job(job: StreamJob) {
403    default_stream_executor().execute(job);
404}
405
406impl Default for Runtime {
407    fn default() -> Self {
408        Self::new()
409    }
410}
411
412impl Drop for RuntimeInner {
413    fn drop(&mut self) {
414        self.timer.stop();
415    }
416}
417
418#[cfg(test)]
419impl Runtime {
420    pub(super) fn timer_driver_is_live(&self) -> bool {
421        self.inner.timer.is_live()
422    }
423
424    pub(super) fn timer_thread_name(&self) -> &str {
425        self.inner.timer.thread_name()
426    }
427}
428
429pub(super) fn runtime_checked_stream<T: Send + 'static>(
430    mut input: BoxStream<T>,
431    state: Arc<RuntimeState>,
432    cancelled: Option<Arc<AtomicBool>>,
433) -> BoxStream<T> {
434    let mut terminated = false;
435    Box::new(std::iter::from_fn(move || {
436        if terminated {
437            return None;
438        }
439
440        if state.shutdown.load(Ordering::SeqCst) {
441            terminated = true;
442            return Some(Err(StreamError::AbruptTermination));
443        }
444        if cancelled
445            .as_ref()
446            .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
447        {
448            terminated = true;
449            return Some(Err(StreamError::Cancelled));
450        }
451
452        let previous = cancelled
453            .is_some()
454            .then(|| CURRENT_STREAM_CANCELLED.with(|slot| slot.replace(cancelled.clone())));
455        let next = input.next();
456        if let Some(previous) = previous {
457            CURRENT_STREAM_CANCELLED.with(|slot| {
458                *slot.borrow_mut() = previous;
459            });
460        }
461        if next.is_none() {
462            terminated = true;
463        }
464        next
465    }))
466}