Skip to main content

datum/stream/
runtime.rs

1use super::completion::StreamCancellation;
2use super::*;
3use crate::Attributes;
4use crate::stream::timer::TimerDriver;
5use std::cell::RefCell;
6
7thread_local! {
8    static CURRENT_STREAM_CANCELLED: RefCell<Option<Arc<AtomicBool>>> = const { RefCell::new(None) };
9}
10
11pub(super) struct CurrentStreamCancelledGuard {
12    previous: Option<Arc<AtomicBool>>,
13}
14
15impl Drop for CurrentStreamCancelledGuard {
16    fn drop(&mut self) {
17        let previous = self.previous.take();
18        CURRENT_STREAM_CANCELLED.with(|slot| {
19            *slot.borrow_mut() = previous;
20        });
21    }
22}
23
24pub(super) fn current_stream_cancelled() -> Option<Arc<AtomicBool>> {
25    CURRENT_STREAM_CANCELLED.with(|slot| slot.borrow().clone())
26}
27
28pub(super) fn set_current_stream_cancelled(
29    cancelled: &Arc<AtomicBool>,
30) -> CurrentStreamCancelledGuard {
31    let previous = CURRENT_STREAM_CANCELLED.with(|slot| slot.replace(Some(Arc::clone(cancelled))));
32    CurrentStreamCancelledGuard { previous }
33}
34
35static NEXT_TIMER_DRIVER_ID: AtomicUsize = AtomicUsize::new(0);
36
37#[derive(Clone)]
38pub struct Runtime {
39    pub(super) inner: Arc<RuntimeInner>,
40    name_prefix: Arc<str>,
41    attributes: Attributes,
42}
43
44#[derive(Debug)]
45pub(super) struct RuntimeInner {
46    pub(super) state: Arc<RuntimeState>,
47    timer: Arc<TimerDriver>,
48}
49
50#[derive(Debug)]
51pub(super) struct RuntimeState {
52    pub(super) shutdown: Arc<AtomicBool>,
53    active_streams: AtomicUsize,
54}
55
56impl RuntimeState {
57    fn new() -> Self {
58        Self {
59            shutdown: Arc::new(AtomicBool::new(false)),
60            active_streams: AtomicUsize::new(0),
61        }
62    }
63}
64
65struct ActiveStreamGuard {
66    state: Arc<RuntimeState>,
67}
68
69impl ActiveStreamGuard {
70    fn decrement_on_drop(state: Arc<RuntimeState>) -> Self {
71        Self { state }
72    }
73}
74
75impl Drop for ActiveStreamGuard {
76    fn drop(&mut self) {
77        self.state.active_streams.fetch_sub(1, Ordering::SeqCst);
78    }
79}
80
81pub type Materializer = Runtime;
82
83fn run_stream_task<T, F>(
84    state: &RuntimeState,
85    cancelled: Arc<AtomicBool>,
86    run: F,
87) -> StreamResult<T>
88where
89    F: FnOnce(Arc<AtomicBool>) -> StreamResult<T>,
90{
91    if state.shutdown.load(Ordering::SeqCst) {
92        Err(StreamError::AbruptTermination)
93    } else if cancelled.load(Ordering::SeqCst) {
94        Err(StreamError::Cancelled)
95    } else {
96        catch_unwind(AssertUnwindSafe(|| run(cancelled)))
97            .unwrap_or(Err(StreamError::AbruptTermination))
98    }
99}
100
101impl Runtime {
102    #[must_use]
103    pub fn new() -> Self {
104        let timer_id = NEXT_TIMER_DRIVER_ID.fetch_add(1, Ordering::SeqCst);
105        Self {
106            inner: Arc::new(RuntimeInner {
107                state: Arc::new(RuntimeState::new()),
108                timer: TimerDriver::launch(&format!("datum-tmr-{timer_id:x}")),
109            }),
110            name_prefix: Arc::from("datum-stream"),
111            attributes: Attributes::default(),
112        }
113    }
114
115    #[must_use]
116    pub fn with_name_prefix(&self, name_prefix: impl Into<Arc<str>>) -> Self {
117        Self {
118            inner: Arc::clone(&self.inner),
119            name_prefix: name_prefix.into(),
120            attributes: self.attributes.clone(),
121        }
122    }
123
124    #[must_use]
125    pub fn name_prefix(&self) -> &str {
126        &self.name_prefix
127    }
128
129    #[must_use]
130    pub fn with_attributes(&self, attributes: Attributes) -> Self {
131        Self {
132            inner: Arc::clone(&self.inner),
133            name_prefix: Arc::clone(&self.name_prefix),
134            attributes,
135        }
136    }
137
138    #[must_use]
139    pub fn attributes(&self) -> &Attributes {
140        &self.attributes
141    }
142
143    #[must_use]
144    pub fn effective_attributes(&self, local: &Attributes) -> Attributes {
145        self.attributes.clone().and(local.clone())
146    }
147
148    pub fn shutdown(&self) {
149        self.inner.state.shutdown.store(true, Ordering::SeqCst);
150        self.inner.timer.stop();
151    }
152
153    #[must_use]
154    pub fn is_shutdown(&self) -> bool {
155        self.inner.state.shutdown.load(Ordering::SeqCst)
156    }
157
158    #[must_use]
159    pub fn active_streams(&self) -> usize {
160        self.inner.state.active_streams.load(Ordering::SeqCst)
161    }
162
163    pub fn materialize<Mat: Send + 'static>(
164        &self,
165        graph: &RunnableGraph<Mat>,
166    ) -> StreamResult<Mat> {
167        if self.is_shutdown() {
168            return Err(StreamError::AbruptTermination);
169        }
170
171        (graph.runner)(self)
172    }
173
174    pub fn schedule_once<F>(&self, delay: Duration, task: F) -> Cancellable
175    where
176        F: FnOnce() + Send + 'static,
177    {
178        let keep_alive: Arc<dyn Send + Sync> = Arc::clone(&self.inner) as Arc<dyn Send + Sync>;
179        self.inner.timer.schedule_once(
180            delay,
181            task,
182            Arc::clone(&self.inner.state.shutdown),
183            keep_alive,
184        )
185    }
186
187    pub fn schedule_with_fixed_delay<F>(
188        &self,
189        initial_delay: Duration,
190        delay: Duration,
191        task: F,
192    ) -> Cancellable
193    where
194        F: Fn() + Send + Sync + 'static,
195    {
196        let keep_alive: Arc<dyn Send + Sync> = Arc::clone(&self.inner) as Arc<dyn Send + Sync>;
197        self.inner.timer.schedule_with_fixed_delay(
198            initial_delay,
199            delay,
200            task,
201            Arc::clone(&self.inner.state.shutdown),
202            keep_alive,
203        )
204    }
205
206    pub fn schedule_at_fixed_rate<F>(
207        &self,
208        initial_delay: Duration,
209        interval: Duration,
210        task: F,
211    ) -> Cancellable
212    where
213        F: Fn() + Send + Sync + 'static,
214    {
215        let keep_alive: Arc<dyn Send + Sync> = Arc::clone(&self.inner) as Arc<dyn Send + Sync>;
216        self.inner.timer.schedule_at_fixed_rate(
217            initial_delay,
218            interval,
219            task,
220            Arc::clone(&self.inner.state.shutdown),
221            keep_alive,
222        )
223    }
224
225    pub(crate) fn spawn_stream<T, F>(&self, run: F) -> StreamCompletion<T>
226    where
227        T: Send + 'static,
228        F: FnOnce(Arc<AtomicBool>) -> StreamResult<T> + Send + 'static,
229    {
230        if self.is_shutdown() {
231            return StreamCompletion::ready(Err(StreamError::AbruptTermination));
232        }
233
234        let (sender, receiver) = oneshot::channel();
235        let state = Arc::clone(&self.inner.state);
236        let cancellation = StreamCancellation::new();
237        let task_cancelled = cancellation.cancelled();
238        let task_cancellation = cancellation.clone();
239        state.active_streams.fetch_add(1, Ordering::SeqCst);
240        default_stream_executor().execute(Box::new(move || {
241            let _worker = task_cancellation.register_current_worker();
242            let result = {
243                let _active = ActiveStreamGuard::decrement_on_drop(Arc::clone(&state));
244                run_stream_task(&state, task_cancelled, run)
245            };
246            let _ = sender.send(result);
247        }));
248
249        StreamCompletion::from_receiver(receiver, Some(cancellation))
250    }
251
252    pub(super) fn spawn_stream_inline<T, F>(&self, run: F) -> StreamCompletion<T>
253    where
254        T: Send + 'static,
255        F: FnOnce(Arc<AtomicBool>) -> StreamResult<T>,
256    {
257        if self.is_shutdown() {
258            return StreamCompletion::ready(Err(StreamError::AbruptTermination));
259        }
260
261        let state = Arc::clone(&self.inner.state);
262        let cancelled = Arc::new(AtomicBool::new(false));
263        state.active_streams.fetch_add(1, Ordering::SeqCst);
264        let result = {
265            let _active = ActiveStreamGuard::decrement_on_drop(Arc::clone(&state));
266            run_stream_task(&state, cancelled, run)
267        };
268        StreamCompletion::ready(result)
269    }
270
271    pub(crate) fn checked_stream<T: Send + 'static>(
272        &self,
273        input: BoxStream<T>,
274        cancelled: Option<Arc<AtomicBool>>,
275    ) -> BoxStream<T> {
276        runtime_checked_stream(input, Arc::clone(&self.inner.state), cancelled)
277    }
278}
279
280type StreamJob = Box<dyn FnOnce() + Send>;
281
282/// Process-wide pool that runs each materialized stream's (synchronous,
283/// potentially long-lived or busy-spinning) drain loop on a worker thread.
284///
285/// Unlike a fixed-size executor, this pool reuses idle workers and spawns a new
286/// one whenever a job arrives while every worker is busy. A stream that
287/// monopolizes its worker for its whole lifetime (`Sink::never`, an infinite
288/// `map_async`/ask flow, etc.) therefore cannot starve other streams: the pool
289/// grows to match the number of concurrently running streams instead of
290/// dead-locking once `num_cpus` of them are live.
291struct StreamExecutor {
292    shared: Arc<ExecutorShared>,
293}
294
295struct ExecutorShared {
296    inner: Mutex<ExecutorInner>,
297    available: Condvar,
298    name_prefix: &'static str,
299}
300
301struct ExecutorInner {
302    queue: VecDeque<StreamJob>,
303    idle: usize,
304    workers: usize,
305}
306
307impl StreamExecutor {
308    fn new(name_prefix: &'static str) -> Self {
309        Self {
310            shared: Arc::new(ExecutorShared {
311                inner: Mutex::new(ExecutorInner {
312                    queue: VecDeque::new(),
313                    idle: 0,
314                    workers: 0,
315                }),
316                available: Condvar::new(),
317                name_prefix,
318            }),
319        }
320    }
321
322    fn execute(&self, job: StreamJob) {
323        let mut inner = self
324            .shared
325            .inner
326            .lock()
327            .unwrap_or_else(|poison| poison.into_inner());
328
329        if inner.idle > inner.queue.len() {
330            // A parked worker can take this immediately. Jobs already in the
331            // queue reserve idle workers that have been notified but have not
332            // yet woken up.
333            inner.queue.push_back(job);
334            drop(inner);
335            self.shared.available.notify_one();
336            return;
337        }
338
339        // No worker is parked; grow the pool. Reserve the worker slot, then only
340        // enqueue the job once the thread actually spawned, so a spawn failure
341        // cannot strand a queued job or pop an unrelated one.
342        inner.workers += 1;
343        let worker_index = inner.workers;
344        drop(inner);
345
346        let shared = Arc::clone(&self.shared);
347        let name = format!("{}-{worker_index}", self.shared.name_prefix);
348        match thread::Builder::new()
349            .name(name)
350            .spawn(move || worker_loop(&shared))
351        {
352            Ok(_) => {
353                let mut inner = self
354                    .shared
355                    .inner
356                    .lock()
357                    .unwrap_or_else(|poison| poison.into_inner());
358                inner.queue.push_back(job);
359                drop(inner);
360                self.shared.available.notify_one();
361            }
362            Err(_) => {
363                // Spawning failed (e.g. resource exhaustion); undo the reserved
364                // slot and run the job inline so the stream still makes progress.
365                let mut inner = self
366                    .shared
367                    .inner
368                    .lock()
369                    .unwrap_or_else(|poison| poison.into_inner());
370                inner.workers -= 1;
371                drop(inner);
372                job();
373            }
374        }
375    }
376}
377
378fn worker_loop(shared: &ExecutorShared) {
379    // Decrement the live-worker count on any exit (idle timeout or a job panic)
380    // so `execute` knows to spawn a replacement.
381    struct WorkerGuard<'a> {
382        shared: &'a ExecutorShared,
383    }
384    impl Drop for WorkerGuard<'_> {
385        fn drop(&mut self) {
386            let mut inner = self
387                .shared
388                .inner
389                .lock()
390                .unwrap_or_else(|poison| poison.into_inner());
391            inner.workers -= 1;
392        }
393    }
394    let _guard = WorkerGuard { shared };
395
396    // Reap workers that have been idle for a while so bursts of concurrent
397    // streams do not leak threads indefinitely.
398    const IDLE_TIMEOUT: Duration = Duration::from_secs(10);
399    loop {
400        let job = {
401            let mut inner = shared
402                .inner
403                .lock()
404                .unwrap_or_else(|poison| poison.into_inner());
405            loop {
406                if let Some(job) = inner.queue.pop_front() {
407                    break job;
408                }
409                inner.idle += 1;
410                let (next, timeout) = shared
411                    .available
412                    .wait_timeout(inner, IDLE_TIMEOUT)
413                    .unwrap_or_else(|poison| poison.into_inner());
414                inner = next;
415                inner.idle -= 1;
416                if timeout.timed_out() && inner.queue.is_empty() {
417                    return;
418                }
419            }
420        };
421        job();
422    }
423}
424
425fn default_stream_executor() -> &'static StreamExecutor {
426    static EXECUTOR: OnceLock<StreamExecutor> = OnceLock::new();
427    EXECUTOR.get_or_init(|| StreamExecutor::new("datum-stream-runtime"))
428}
429
430pub(super) fn dispatch_stream_job(job: StreamJob) {
431    default_stream_executor().execute(job);
432}
433
434impl Default for Runtime {
435    fn default() -> Self {
436        Self::new()
437    }
438}
439
440impl Drop for RuntimeInner {
441    fn drop(&mut self) {
442        self.timer.stop();
443    }
444}
445
446#[cfg(test)]
447impl Runtime {
448    pub(super) fn timer_driver_is_live(&self) -> bool {
449        self.inner.timer.is_live()
450    }
451
452    pub(super) fn timer_thread_name(&self) -> &str {
453        self.inner.timer.thread_name()
454    }
455}
456
457pub(super) fn runtime_checked_stream<T: Send + 'static>(
458    mut input: BoxStream<T>,
459    state: Arc<RuntimeState>,
460    cancelled: Option<Arc<AtomicBool>>,
461) -> BoxStream<T> {
462    let mut terminated = false;
463    Box::new(std::iter::from_fn(move || {
464        if terminated {
465            return None;
466        }
467
468        if state.shutdown.load(Ordering::SeqCst) {
469            terminated = true;
470            return Some(Err(StreamError::AbruptTermination));
471        }
472        if cancelled
473            .as_ref()
474            .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
475        {
476            terminated = true;
477            return Some(Err(StreamError::Cancelled));
478        }
479
480        let previous = cancelled
481            .is_some()
482            .then(|| CURRENT_STREAM_CANCELLED.with(|slot| slot.replace(cancelled.clone())));
483        let next = input.next();
484        if let Some(previous) = previous {
485            CURRENT_STREAM_CANCELLED.with(|slot| {
486                *slot.borrow_mut() = previous;
487            });
488        }
489        if next.is_none() {
490            terminated = true;
491        }
492        next
493    }))
494}