1use super::completion::StreamCancellation;
2use super::*;
3use crate::Attributes;
4use crate::stream::timer::TimerDriver;
5use std::cell::RefCell;
6
7thread_local! {
8 static CURRENT_STREAM_CANCELLED: RefCell<Option<Arc<AtomicBool>>> = const { RefCell::new(None) };
9}
10
11pub(super) struct CurrentStreamCancelledGuard {
12 previous: Option<Arc<AtomicBool>>,
13}
14
15impl Drop for CurrentStreamCancelledGuard {
16 fn drop(&mut self) {
17 let previous = self.previous.take();
18 CURRENT_STREAM_CANCELLED.with(|slot| {
19 *slot.borrow_mut() = previous;
20 });
21 }
22}
23
24pub(super) fn current_stream_cancelled() -> Option<Arc<AtomicBool>> {
25 CURRENT_STREAM_CANCELLED.with(|slot| slot.borrow().clone())
26}
27
28pub(super) fn set_current_stream_cancelled(
29 cancelled: &Arc<AtomicBool>,
30) -> CurrentStreamCancelledGuard {
31 let previous = CURRENT_STREAM_CANCELLED.with(|slot| slot.replace(Some(Arc::clone(cancelled))));
32 CurrentStreamCancelledGuard { previous }
33}
34
35static NEXT_TIMER_DRIVER_ID: AtomicUsize = AtomicUsize::new(0);
36
37#[derive(Clone)]
38pub struct Runtime {
39 pub(super) inner: Arc<RuntimeInner>,
40 name_prefix: Arc<str>,
41 attributes: Attributes,
42}
43
44#[derive(Debug)]
45pub(super) struct RuntimeInner {
46 pub(super) state: Arc<RuntimeState>,
47 timer: Arc<TimerDriver>,
48}
49
50#[derive(Debug)]
51pub(super) struct RuntimeState {
52 pub(super) shutdown: Arc<AtomicBool>,
53 active_streams: AtomicUsize,
54}
55
56impl RuntimeState {
57 fn new() -> Self {
58 Self {
59 shutdown: Arc::new(AtomicBool::new(false)),
60 active_streams: AtomicUsize::new(0),
61 }
62 }
63}
64
65struct ActiveStreamGuard {
66 state: Arc<RuntimeState>,
67}
68
69impl ActiveStreamGuard {
70 fn decrement_on_drop(state: Arc<RuntimeState>) -> Self {
71 Self { state }
72 }
73}
74
75impl Drop for ActiveStreamGuard {
76 fn drop(&mut self) {
77 self.state.active_streams.fetch_sub(1, Ordering::SeqCst);
78 }
79}
80
81pub type Materializer = Runtime;
82
83fn run_stream_task<T, F>(
84 state: &RuntimeState,
85 cancelled: Arc<AtomicBool>,
86 run: F,
87) -> StreamResult<T>
88where
89 F: FnOnce(Arc<AtomicBool>) -> StreamResult<T>,
90{
91 if state.shutdown.load(Ordering::SeqCst) {
92 Err(StreamError::AbruptTermination)
93 } else if cancelled.load(Ordering::SeqCst) {
94 Err(StreamError::Cancelled)
95 } else {
96 catch_unwind(AssertUnwindSafe(|| run(cancelled)))
97 .unwrap_or(Err(StreamError::AbruptTermination))
98 }
99}
100
101impl Runtime {
102 #[must_use]
103 pub fn new() -> Self {
104 let timer_id = NEXT_TIMER_DRIVER_ID.fetch_add(1, Ordering::SeqCst);
105 Self {
106 inner: Arc::new(RuntimeInner {
107 state: Arc::new(RuntimeState::new()),
108 timer: TimerDriver::launch(&format!("datum-tmr-{timer_id:x}")),
109 }),
110 name_prefix: Arc::from("datum-stream"),
111 attributes: Attributes::default(),
112 }
113 }
114
115 #[must_use]
116 pub fn with_name_prefix(&self, name_prefix: impl Into<Arc<str>>) -> Self {
117 Self {
118 inner: Arc::clone(&self.inner),
119 name_prefix: name_prefix.into(),
120 attributes: self.attributes.clone(),
121 }
122 }
123
124 #[must_use]
125 pub fn name_prefix(&self) -> &str {
126 &self.name_prefix
127 }
128
129 #[must_use]
130 pub fn with_attributes(&self, attributes: Attributes) -> Self {
131 Self {
132 inner: Arc::clone(&self.inner),
133 name_prefix: Arc::clone(&self.name_prefix),
134 attributes,
135 }
136 }
137
138 #[must_use]
139 pub fn attributes(&self) -> &Attributes {
140 &self.attributes
141 }
142
143 #[must_use]
144 pub fn effective_attributes(&self, local: &Attributes) -> Attributes {
145 self.attributes.clone().and(local.clone())
146 }
147
148 pub fn shutdown(&self) {
149 self.inner.state.shutdown.store(true, Ordering::SeqCst);
150 self.inner.timer.stop();
151 }
152
153 #[must_use]
154 pub fn is_shutdown(&self) -> bool {
155 self.inner.state.shutdown.load(Ordering::SeqCst)
156 }
157
158 #[must_use]
159 pub fn active_streams(&self) -> usize {
160 self.inner.state.active_streams.load(Ordering::SeqCst)
161 }
162
163 pub fn materialize<Mat: Send + 'static>(
164 &self,
165 graph: &RunnableGraph<Mat>,
166 ) -> StreamResult<Mat> {
167 if self.is_shutdown() {
168 return Err(StreamError::AbruptTermination);
169 }
170
171 (graph.runner)(self)
172 }
173
174 pub fn schedule_once<F>(&self, delay: Duration, task: F) -> Cancellable
175 where
176 F: FnOnce() + Send + 'static,
177 {
178 let keep_alive: Arc<dyn Send + Sync> = Arc::clone(&self.inner) as Arc<dyn Send + Sync>;
179 self.inner.timer.schedule_once(
180 delay,
181 task,
182 Arc::clone(&self.inner.state.shutdown),
183 keep_alive,
184 )
185 }
186
187 pub fn schedule_with_fixed_delay<F>(
188 &self,
189 initial_delay: Duration,
190 delay: Duration,
191 task: F,
192 ) -> Cancellable
193 where
194 F: Fn() + Send + Sync + 'static,
195 {
196 let keep_alive: Arc<dyn Send + Sync> = Arc::clone(&self.inner) as Arc<dyn Send + Sync>;
197 self.inner.timer.schedule_with_fixed_delay(
198 initial_delay,
199 delay,
200 task,
201 Arc::clone(&self.inner.state.shutdown),
202 keep_alive,
203 )
204 }
205
206 pub fn schedule_at_fixed_rate<F>(
207 &self,
208 initial_delay: Duration,
209 interval: Duration,
210 task: F,
211 ) -> Cancellable
212 where
213 F: Fn() + Send + Sync + 'static,
214 {
215 let keep_alive: Arc<dyn Send + Sync> = Arc::clone(&self.inner) as Arc<dyn Send + Sync>;
216 self.inner.timer.schedule_at_fixed_rate(
217 initial_delay,
218 interval,
219 task,
220 Arc::clone(&self.inner.state.shutdown),
221 keep_alive,
222 )
223 }
224
225 pub(crate) fn spawn_stream<T, F>(&self, run: F) -> StreamCompletion<T>
226 where
227 T: Send + 'static,
228 F: FnOnce(Arc<AtomicBool>) -> StreamResult<T> + Send + 'static,
229 {
230 if self.is_shutdown() {
231 return StreamCompletion::ready(Err(StreamError::AbruptTermination));
232 }
233
234 let (sender, receiver) = oneshot::channel();
235 let state = Arc::clone(&self.inner.state);
236 let cancellation = StreamCancellation::new();
237 let task_cancelled = cancellation.cancelled();
238 let task_cancellation = cancellation.clone();
239 state.active_streams.fetch_add(1, Ordering::SeqCst);
240 default_stream_executor().execute(Box::new(move || {
241 let _worker = task_cancellation.register_current_worker();
242 let result = {
243 let _active = ActiveStreamGuard::decrement_on_drop(Arc::clone(&state));
244 run_stream_task(&state, task_cancelled, run)
245 };
246 let _ = sender.send(result);
247 }));
248
249 StreamCompletion::from_receiver(receiver, Some(cancellation))
250 }
251
252 pub(super) fn spawn_stream_inline<T, F>(&self, run: F) -> StreamCompletion<T>
253 where
254 T: Send + 'static,
255 F: FnOnce(Arc<AtomicBool>) -> StreamResult<T>,
256 {
257 if self.is_shutdown() {
258 return StreamCompletion::ready(Err(StreamError::AbruptTermination));
259 }
260
261 let state = Arc::clone(&self.inner.state);
262 let cancelled = Arc::new(AtomicBool::new(false));
263 state.active_streams.fetch_add(1, Ordering::SeqCst);
264 let result = {
265 let _active = ActiveStreamGuard::decrement_on_drop(Arc::clone(&state));
266 run_stream_task(&state, cancelled, run)
267 };
268 StreamCompletion::ready(result)
269 }
270
271 pub(crate) fn checked_stream<T: Send + 'static>(
272 &self,
273 input: BoxStream<T>,
274 cancelled: Option<Arc<AtomicBool>>,
275 ) -> BoxStream<T> {
276 runtime_checked_stream(input, Arc::clone(&self.inner.state), cancelled)
277 }
278}
279
280type StreamJob = Box<dyn FnOnce() + Send>;
281
282struct StreamExecutor {
292 shared: Arc<ExecutorShared>,
293}
294
295struct ExecutorShared {
296 inner: Mutex<ExecutorInner>,
297 available: Condvar,
298 name_prefix: &'static str,
299}
300
301struct ExecutorInner {
302 queue: VecDeque<StreamJob>,
303 idle: usize,
304 workers: usize,
305}
306
307impl StreamExecutor {
308 fn new(name_prefix: &'static str) -> Self {
309 Self {
310 shared: Arc::new(ExecutorShared {
311 inner: Mutex::new(ExecutorInner {
312 queue: VecDeque::new(),
313 idle: 0,
314 workers: 0,
315 }),
316 available: Condvar::new(),
317 name_prefix,
318 }),
319 }
320 }
321
322 fn execute(&self, job: StreamJob) {
323 let mut inner = self
324 .shared
325 .inner
326 .lock()
327 .unwrap_or_else(|poison| poison.into_inner());
328
329 if inner.idle > inner.queue.len() {
330 inner.queue.push_back(job);
334 drop(inner);
335 self.shared.available.notify_one();
336 return;
337 }
338
339 inner.workers += 1;
343 let worker_index = inner.workers;
344 drop(inner);
345
346 let shared = Arc::clone(&self.shared);
347 let name = format!("{}-{worker_index}", self.shared.name_prefix);
348 match thread::Builder::new()
349 .name(name)
350 .spawn(move || worker_loop(&shared))
351 {
352 Ok(_) => {
353 let mut inner = self
354 .shared
355 .inner
356 .lock()
357 .unwrap_or_else(|poison| poison.into_inner());
358 inner.queue.push_back(job);
359 drop(inner);
360 self.shared.available.notify_one();
361 }
362 Err(_) => {
363 let mut inner = self
366 .shared
367 .inner
368 .lock()
369 .unwrap_or_else(|poison| poison.into_inner());
370 inner.workers -= 1;
371 drop(inner);
372 job();
373 }
374 }
375 }
376}
377
378fn worker_loop(shared: &ExecutorShared) {
379 struct WorkerGuard<'a> {
382 shared: &'a ExecutorShared,
383 }
384 impl Drop for WorkerGuard<'_> {
385 fn drop(&mut self) {
386 let mut inner = self
387 .shared
388 .inner
389 .lock()
390 .unwrap_or_else(|poison| poison.into_inner());
391 inner.workers -= 1;
392 }
393 }
394 let _guard = WorkerGuard { shared };
395
396 const IDLE_TIMEOUT: Duration = Duration::from_secs(10);
399 loop {
400 let job = {
401 let mut inner = shared
402 .inner
403 .lock()
404 .unwrap_or_else(|poison| poison.into_inner());
405 loop {
406 if let Some(job) = inner.queue.pop_front() {
407 break job;
408 }
409 inner.idle += 1;
410 let (next, timeout) = shared
411 .available
412 .wait_timeout(inner, IDLE_TIMEOUT)
413 .unwrap_or_else(|poison| poison.into_inner());
414 inner = next;
415 inner.idle -= 1;
416 if timeout.timed_out() && inner.queue.is_empty() {
417 return;
418 }
419 }
420 };
421 job();
422 }
423}
424
425fn default_stream_executor() -> &'static StreamExecutor {
426 static EXECUTOR: OnceLock<StreamExecutor> = OnceLock::new();
427 EXECUTOR.get_or_init(|| StreamExecutor::new("datum-stream-runtime"))
428}
429
430pub(super) fn dispatch_stream_job(job: StreamJob) {
431 default_stream_executor().execute(job);
432}
433
434impl Default for Runtime {
435 fn default() -> Self {
436 Self::new()
437 }
438}
439
440impl Drop for RuntimeInner {
441 fn drop(&mut self) {
442 self.timer.stop();
443 }
444}
445
446#[cfg(test)]
447impl Runtime {
448 pub(super) fn timer_driver_is_live(&self) -> bool {
449 self.inner.timer.is_live()
450 }
451
452 pub(super) fn timer_thread_name(&self) -> &str {
453 self.inner.timer.thread_name()
454 }
455}
456
457pub(super) fn runtime_checked_stream<T: Send + 'static>(
458 mut input: BoxStream<T>,
459 state: Arc<RuntimeState>,
460 cancelled: Option<Arc<AtomicBool>>,
461) -> BoxStream<T> {
462 let mut terminated = false;
463 Box::new(std::iter::from_fn(move || {
464 if terminated {
465 return None;
466 }
467
468 if state.shutdown.load(Ordering::SeqCst) {
469 terminated = true;
470 return Some(Err(StreamError::AbruptTermination));
471 }
472 if cancelled
473 .as_ref()
474 .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
475 {
476 terminated = true;
477 return Some(Err(StreamError::Cancelled));
478 }
479
480 let previous = cancelled
481 .is_some()
482 .then(|| CURRENT_STREAM_CANCELLED.with(|slot| slot.replace(cancelled.clone())));
483 let next = input.next();
484 if let Some(previous) = previous {
485 CURRENT_STREAM_CANCELLED.with(|slot| {
486 *slot.borrow_mut() = previous;
487 });
488 }
489 if next.is_none() {
490 terminated = true;
491 }
492 next
493 }))
494}