1use super::completion::StreamCancellation;
10use super::*;
11use crate::Attributes;
12use crate::stream::timer::TimerDriver;
13use std::cell::RefCell;
14
15thread_local! {
16 static CURRENT_STREAM_CANCELLED: RefCell<Option<Arc<AtomicBool>>> = const { RefCell::new(None) };
17}
18
19pub(super) struct CurrentStreamCancelledGuard {
20 previous: Option<Arc<AtomicBool>>,
21}
22
23impl Drop for CurrentStreamCancelledGuard {
24 fn drop(&mut self) {
25 let previous = self.previous.take();
26 CURRENT_STREAM_CANCELLED.with(|slot| {
27 *slot.borrow_mut() = previous;
28 });
29 }
30}
31
32pub(super) fn current_stream_cancelled() -> Option<Arc<AtomicBool>> {
33 CURRENT_STREAM_CANCELLED.with(|slot| slot.borrow().clone())
34}
35
36pub(super) fn set_current_stream_cancelled(
37 cancelled: &Arc<AtomicBool>,
38) -> CurrentStreamCancelledGuard {
39 let previous = CURRENT_STREAM_CANCELLED.with(|slot| slot.replace(Some(Arc::clone(cancelled))));
40 CurrentStreamCancelledGuard { previous }
41}
42
43static NEXT_TIMER_DRIVER_ID: AtomicUsize = AtomicUsize::new(0);
44
45#[derive(Clone)]
46pub struct Runtime {
47 pub(super) inner: Arc<RuntimeInner>,
48 name_prefix: Arc<str>,
49 attributes: Attributes,
50}
51
52#[derive(Debug)]
53pub(super) struct RuntimeInner {
54 pub(super) state: Arc<RuntimeState>,
55 timer: Arc<TimerDriver>,
56}
57
58#[derive(Debug)]
59pub(super) struct RuntimeState {
60 pub(super) shutdown: Arc<AtomicBool>,
61 active_streams: AtomicUsize,
62}
63
64impl RuntimeState {
65 fn new() -> Self {
66 Self {
67 shutdown: Arc::new(AtomicBool::new(false)),
68 active_streams: AtomicUsize::new(0),
69 }
70 }
71}
72
73struct ActiveStreamGuard {
74 state: Arc<RuntimeState>,
75}
76
77impl ActiveStreamGuard {
78 fn decrement_on_drop(state: Arc<RuntimeState>) -> Self {
79 Self { state }
80 }
81}
82
83impl Drop for ActiveStreamGuard {
84 fn drop(&mut self) {
85 self.state.active_streams.fetch_sub(1, Ordering::SeqCst);
86 }
87}
88
89pub type Materializer = Runtime;
90
91fn run_stream_task<T, F>(
92 state: &RuntimeState,
93 cancelled: Arc<AtomicBool>,
94 run: F,
95) -> StreamResult<T>
96where
97 F: FnOnce(Arc<AtomicBool>) -> StreamResult<T>,
98{
99 if state.shutdown.load(Ordering::SeqCst) {
100 Err(StreamError::AbruptTermination)
101 } else if cancelled.load(Ordering::SeqCst) {
102 Err(StreamError::Cancelled)
103 } else {
104 catch_unwind(AssertUnwindSafe(|| run(cancelled)))
105 .unwrap_or(Err(StreamError::AbruptTermination))
106 }
107}
108
109impl Runtime {
110 #[must_use]
111 pub fn new() -> Self {
112 let timer_id = NEXT_TIMER_DRIVER_ID.fetch_add(1, Ordering::SeqCst);
113 Self {
114 inner: Arc::new(RuntimeInner {
115 state: Arc::new(RuntimeState::new()),
116 timer: TimerDriver::launch(&format!("datum-tmr-{timer_id:x}")),
117 }),
118 name_prefix: Arc::from("datum-stream"),
119 attributes: Attributes::default(),
120 }
121 }
122
123 #[must_use]
124 pub fn with_name_prefix(&self, name_prefix: impl Into<Arc<str>>) -> Self {
125 Self {
126 inner: Arc::clone(&self.inner),
127 name_prefix: name_prefix.into(),
128 attributes: self.attributes.clone(),
129 }
130 }
131
132 #[must_use]
133 pub fn name_prefix(&self) -> &str {
134 &self.name_prefix
135 }
136
137 #[must_use]
138 pub fn with_attributes(&self, attributes: Attributes) -> Self {
139 Self {
140 inner: Arc::clone(&self.inner),
141 name_prefix: Arc::clone(&self.name_prefix),
142 attributes,
143 }
144 }
145
146 #[must_use]
147 pub fn attributes(&self) -> &Attributes {
148 &self.attributes
149 }
150
151 #[must_use]
152 pub fn effective_attributes(&self, local: &Attributes) -> Attributes {
153 self.attributes.clone().and(local.clone())
154 }
155
156 pub fn shutdown(&self) {
157 self.inner.state.shutdown.store(true, Ordering::SeqCst);
158 self.inner.timer.stop();
159 }
160
161 #[must_use]
162 pub fn is_shutdown(&self) -> bool {
163 self.inner.state.shutdown.load(Ordering::SeqCst)
164 }
165
166 #[must_use]
167 pub fn active_streams(&self) -> usize {
168 self.inner.state.active_streams.load(Ordering::SeqCst)
169 }
170
171 pub fn materialize<Mat: Send + 'static>(
172 &self,
173 graph: &RunnableGraph<Mat>,
174 ) -> StreamResult<Mat> {
175 if self.is_shutdown() {
176 return Err(StreamError::AbruptTermination);
177 }
178
179 (graph.runner)(self)
180 }
181
182 pub fn schedule_once<F>(&self, delay: Duration, task: F) -> Cancellable
183 where
184 F: FnOnce() + Send + 'static,
185 {
186 let keep_alive: Arc<dyn Send + Sync> = Arc::clone(&self.inner) as Arc<dyn Send + Sync>;
187 self.inner.timer.schedule_once(
188 delay,
189 task,
190 Arc::clone(&self.inner.state.shutdown),
191 keep_alive,
192 )
193 }
194
195 pub fn schedule_with_fixed_delay<F>(
196 &self,
197 initial_delay: Duration,
198 delay: Duration,
199 task: F,
200 ) -> Cancellable
201 where
202 F: Fn() + Send + Sync + 'static,
203 {
204 let keep_alive: Arc<dyn Send + Sync> = Arc::clone(&self.inner) as Arc<dyn Send + Sync>;
205 self.inner.timer.schedule_with_fixed_delay(
206 initial_delay,
207 delay,
208 task,
209 Arc::clone(&self.inner.state.shutdown),
210 keep_alive,
211 )
212 }
213
214 pub fn schedule_at_fixed_rate<F>(
215 &self,
216 initial_delay: Duration,
217 interval: Duration,
218 task: F,
219 ) -> Cancellable
220 where
221 F: Fn() + Send + Sync + 'static,
222 {
223 let keep_alive: Arc<dyn Send + Sync> = Arc::clone(&self.inner) as Arc<dyn Send + Sync>;
224 self.inner.timer.schedule_at_fixed_rate(
225 initial_delay,
226 interval,
227 task,
228 Arc::clone(&self.inner.state.shutdown),
229 keep_alive,
230 )
231 }
232
233 pub(crate) fn spawn_stream<T, F>(&self, run: F) -> StreamCompletion<T>
234 where
235 T: Send + 'static,
236 F: FnOnce(Arc<AtomicBool>) -> StreamResult<T> + Send + 'static,
237 {
238 if self.is_shutdown() {
239 return StreamCompletion::ready(Err(StreamError::AbruptTermination));
240 }
241
242 let (sender, receiver) = oneshot::channel();
243 let state = Arc::clone(&self.inner.state);
244 let cancellation = StreamCancellation::new();
245 let task_cancelled = cancellation.cancelled();
246 let task_cancellation = cancellation.clone();
247 state.active_streams.fetch_add(1, Ordering::SeqCst);
248 default_stream_executor().execute(Box::new(move || {
249 let _worker = task_cancellation.register_current_worker();
250 let result = {
251 let _active = ActiveStreamGuard::decrement_on_drop(Arc::clone(&state));
252 run_stream_task(&state, task_cancelled, run)
253 };
254 let _ = sender.send(result);
255 }));
256
257 StreamCompletion::from_receiver(receiver, Some(cancellation))
258 }
259
260 pub(super) fn spawn_stream_inline<T, F>(&self, run: F) -> StreamCompletion<T>
261 where
262 T: Send + 'static,
263 F: FnOnce(Arc<AtomicBool>) -> StreamResult<T>,
264 {
265 if self.is_shutdown() {
266 return StreamCompletion::ready(Err(StreamError::AbruptTermination));
267 }
268
269 let state = Arc::clone(&self.inner.state);
270 let cancelled = Arc::new(AtomicBool::new(false));
271 state.active_streams.fetch_add(1, Ordering::SeqCst);
272 let result = {
273 let _active = ActiveStreamGuard::decrement_on_drop(Arc::clone(&state));
274 run_stream_task(&state, cancelled, run)
275 };
276 StreamCompletion::ready(result)
277 }
278
279 pub(crate) fn checked_stream<T: Send + 'static>(
280 &self,
281 input: BoxStream<T>,
282 cancelled: Option<Arc<AtomicBool>>,
283 ) -> BoxStream<T> {
284 runtime_checked_stream(input, Arc::clone(&self.inner.state), cancelled)
285 }
286}
287
288type StreamJob = Box<dyn FnOnce() + Send>;
289
290struct StreamExecutor {
300 shared: Arc<ExecutorShared>,
301}
302
303struct ExecutorShared {
304 inner: Mutex<ExecutorInner>,
305 available: Condvar,
306 name_prefix: &'static str,
307}
308
309struct ExecutorInner {
310 queue: VecDeque<StreamJob>,
311 idle: usize,
312 workers: usize,
313}
314
315impl StreamExecutor {
316 fn new(name_prefix: &'static str) -> Self {
317 Self {
318 shared: Arc::new(ExecutorShared {
319 inner: Mutex::new(ExecutorInner {
320 queue: VecDeque::new(),
321 idle: 0,
322 workers: 0,
323 }),
324 available: Condvar::new(),
325 name_prefix,
326 }),
327 }
328 }
329
330 fn execute(&self, job: StreamJob) {
331 let mut inner = self
332 .shared
333 .inner
334 .lock()
335 .unwrap_or_else(|poison| poison.into_inner());
336
337 if inner.idle > inner.queue.len() {
338 inner.queue.push_back(job);
342 drop(inner);
343 self.shared.available.notify_one();
344 return;
345 }
346
347 inner.workers += 1;
351 let worker_index = inner.workers;
352 drop(inner);
353
354 let shared = Arc::clone(&self.shared);
355 let name = format!("{}-{worker_index}", self.shared.name_prefix);
356 match thread::Builder::new()
357 .name(name)
358 .spawn(move || worker_loop(&shared))
359 {
360 Ok(_) => {
361 let mut inner = self
362 .shared
363 .inner
364 .lock()
365 .unwrap_or_else(|poison| poison.into_inner());
366 inner.queue.push_back(job);
367 drop(inner);
368 self.shared.available.notify_one();
369 }
370 Err(_) => {
371 let mut inner = self
374 .shared
375 .inner
376 .lock()
377 .unwrap_or_else(|poison| poison.into_inner());
378 inner.workers -= 1;
379 drop(inner);
380 job();
381 }
382 }
383 }
384}
385
386fn worker_loop(shared: &ExecutorShared) {
387 struct WorkerGuard<'a> {
390 shared: &'a ExecutorShared,
391 }
392 impl Drop for WorkerGuard<'_> {
393 fn drop(&mut self) {
394 let mut inner = self
395 .shared
396 .inner
397 .lock()
398 .unwrap_or_else(|poison| poison.into_inner());
399 inner.workers -= 1;
400 }
401 }
402 let _guard = WorkerGuard { shared };
403
404 const IDLE_TIMEOUT: Duration = Duration::from_secs(10);
407 loop {
408 let job = {
409 let mut inner = shared
410 .inner
411 .lock()
412 .unwrap_or_else(|poison| poison.into_inner());
413 loop {
414 if let Some(job) = inner.queue.pop_front() {
415 break job;
416 }
417 inner.idle += 1;
418 let (next, timeout) = shared
419 .available
420 .wait_timeout(inner, IDLE_TIMEOUT)
421 .unwrap_or_else(|poison| poison.into_inner());
422 inner = next;
423 inner.idle -= 1;
424 if timeout.timed_out() && inner.queue.is_empty() {
425 return;
426 }
427 }
428 };
429 job();
430 }
431}
432
433fn default_stream_executor() -> &'static StreamExecutor {
434 static EXECUTOR: OnceLock<StreamExecutor> = OnceLock::new();
435 EXECUTOR.get_or_init(|| StreamExecutor::new("datum-stream-runtime"))
436}
437
438pub(super) fn dispatch_stream_job(job: StreamJob) {
439 default_stream_executor().execute(job);
440}
441
442impl Default for Runtime {
443 fn default() -> Self {
444 Self::new()
445 }
446}
447
448impl Drop for RuntimeInner {
449 fn drop(&mut self) {
450 self.timer.stop();
451 }
452}
453
454#[cfg(test)]
455impl Runtime {
456 pub(super) fn timer_driver_is_live(&self) -> bool {
457 self.inner.timer.is_live()
458 }
459
460 pub(super) fn timer_thread_name(&self) -> &str {
461 self.inner.timer.thread_name()
462 }
463}
464
465pub(super) fn runtime_checked_stream<T: Send + 'static>(
466 mut input: BoxStream<T>,
467 state: Arc<RuntimeState>,
468 cancelled: Option<Arc<AtomicBool>>,
469) -> BoxStream<T> {
470 let mut terminated = false;
471 Box::new(std::iter::from_fn(move || {
472 if terminated {
473 return None;
474 }
475
476 if state.shutdown.load(Ordering::SeqCst) {
477 terminated = true;
478 return Some(Err(StreamError::AbruptTermination));
479 }
480 if cancelled
481 .as_ref()
482 .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
483 {
484 terminated = true;
485 return Some(Err(StreamError::Cancelled));
486 }
487
488 let previous = cancelled
489 .is_some()
490 .then(|| CURRENT_STREAM_CANCELLED.with(|slot| slot.replace(cancelled.clone())));
491 let next = input.next();
492 if let Some(previous) = previous {
493 CURRENT_STREAM_CANCELLED.with(|slot| {
494 *slot.borrow_mut() = previous;
495 });
496 }
497 if next.is_none() {
498 terminated = true;
499 }
500 next
501 }))
502}