1use super::completion::StreamCancellation;
2use super::*;
3use crate::Attributes;
4use crate::stream::timer::TimerDriver;
5use std::cell::RefCell;
6
7thread_local! {
8 static CURRENT_STREAM_CANCELLED: RefCell<Option<Arc<AtomicBool>>> = const { RefCell::new(None) };
9}
10
11pub(super) fn current_stream_cancelled() -> Option<Arc<AtomicBool>> {
12 CURRENT_STREAM_CANCELLED.with(|slot| slot.borrow().clone())
13}
14
15static NEXT_TIMER_DRIVER_ID: AtomicUsize = AtomicUsize::new(0);
16
17#[derive(Clone)]
18pub struct Runtime {
19 pub(super) inner: Arc<RuntimeInner>,
20 name_prefix: Arc<str>,
21 attributes: Attributes,
22}
23
24#[derive(Debug)]
25pub(super) struct RuntimeInner {
26 pub(super) state: Arc<RuntimeState>,
27 timer: Arc<TimerDriver>,
28}
29
30#[derive(Debug)]
31pub(super) struct RuntimeState {
32 pub(super) shutdown: Arc<AtomicBool>,
33 active_streams: AtomicUsize,
34}
35
36impl RuntimeState {
37 fn new() -> Self {
38 Self {
39 shutdown: Arc::new(AtomicBool::new(false)),
40 active_streams: AtomicUsize::new(0),
41 }
42 }
43}
44
45struct ActiveStreamGuard {
46 state: Arc<RuntimeState>,
47}
48
49impl ActiveStreamGuard {
50 fn decrement_on_drop(state: Arc<RuntimeState>) -> Self {
51 Self { state }
52 }
53}
54
55impl Drop for ActiveStreamGuard {
56 fn drop(&mut self) {
57 self.state.active_streams.fetch_sub(1, Ordering::SeqCst);
58 }
59}
60
61pub type Materializer = Runtime;
62
63fn run_stream_task<T, F>(
64 state: &RuntimeState,
65 cancelled: Arc<AtomicBool>,
66 run: F,
67) -> StreamResult<T>
68where
69 F: FnOnce(Arc<AtomicBool>) -> StreamResult<T>,
70{
71 if state.shutdown.load(Ordering::SeqCst) {
72 Err(StreamError::AbruptTermination)
73 } else if cancelled.load(Ordering::SeqCst) {
74 Err(StreamError::Cancelled)
75 } else {
76 catch_unwind(AssertUnwindSafe(|| run(cancelled)))
77 .unwrap_or(Err(StreamError::AbruptTermination))
78 }
79}
80
81impl Runtime {
82 #[must_use]
83 pub fn new() -> Self {
84 let timer_id = NEXT_TIMER_DRIVER_ID.fetch_add(1, Ordering::SeqCst);
85 Self {
86 inner: Arc::new(RuntimeInner {
87 state: Arc::new(RuntimeState::new()),
88 timer: TimerDriver::launch(&format!("datum-timer-{timer_id}")),
89 }),
90 name_prefix: Arc::from("datum-stream"),
91 attributes: Attributes::default(),
92 }
93 }
94
95 #[must_use]
96 pub fn with_name_prefix(&self, name_prefix: impl Into<Arc<str>>) -> Self {
97 Self {
98 inner: Arc::clone(&self.inner),
99 name_prefix: name_prefix.into(),
100 attributes: self.attributes.clone(),
101 }
102 }
103
104 #[must_use]
105 pub fn name_prefix(&self) -> &str {
106 &self.name_prefix
107 }
108
109 #[must_use]
110 pub fn with_attributes(&self, attributes: Attributes) -> Self {
111 Self {
112 inner: Arc::clone(&self.inner),
113 name_prefix: Arc::clone(&self.name_prefix),
114 attributes,
115 }
116 }
117
118 #[must_use]
119 pub fn attributes(&self) -> &Attributes {
120 &self.attributes
121 }
122
123 #[must_use]
124 pub fn effective_attributes(&self, local: &Attributes) -> Attributes {
125 self.attributes.clone().and(local.clone())
126 }
127
128 pub fn shutdown(&self) {
129 self.inner.state.shutdown.store(true, Ordering::SeqCst);
130 self.inner.timer.stop();
131 }
132
133 #[must_use]
134 pub fn is_shutdown(&self) -> bool {
135 self.inner.state.shutdown.load(Ordering::SeqCst)
136 }
137
138 #[must_use]
139 pub fn active_streams(&self) -> usize {
140 self.inner.state.active_streams.load(Ordering::SeqCst)
141 }
142
143 pub fn materialize<Mat: Send + 'static>(
144 &self,
145 graph: &RunnableGraph<Mat>,
146 ) -> StreamResult<Mat> {
147 if self.is_shutdown() {
148 return Err(StreamError::AbruptTermination);
149 }
150
151 (graph.runner)(self)
152 }
153
154 pub fn schedule_once<F>(&self, delay: Duration, task: F) -> Cancellable
155 where
156 F: FnOnce() + Send + 'static,
157 {
158 let keep_alive: Arc<dyn Send + Sync> = Arc::clone(&self.inner) as Arc<dyn Send + Sync>;
159 self.inner.timer.schedule_once(
160 delay,
161 task,
162 Arc::clone(&self.inner.state.shutdown),
163 keep_alive,
164 )
165 }
166
167 pub fn schedule_with_fixed_delay<F>(
168 &self,
169 initial_delay: Duration,
170 delay: Duration,
171 task: F,
172 ) -> Cancellable
173 where
174 F: Fn() + Send + Sync + 'static,
175 {
176 let keep_alive: Arc<dyn Send + Sync> = Arc::clone(&self.inner) as Arc<dyn Send + Sync>;
177 self.inner.timer.schedule_with_fixed_delay(
178 initial_delay,
179 delay,
180 task,
181 Arc::clone(&self.inner.state.shutdown),
182 keep_alive,
183 )
184 }
185
186 pub fn schedule_at_fixed_rate<F>(
187 &self,
188 initial_delay: Duration,
189 interval: Duration,
190 task: F,
191 ) -> Cancellable
192 where
193 F: Fn() + Send + Sync + 'static,
194 {
195 let keep_alive: Arc<dyn Send + Sync> = Arc::clone(&self.inner) as Arc<dyn Send + Sync>;
196 self.inner.timer.schedule_at_fixed_rate(
197 initial_delay,
198 interval,
199 task,
200 Arc::clone(&self.inner.state.shutdown),
201 keep_alive,
202 )
203 }
204
205 pub(crate) fn spawn_stream<T, F>(&self, run: F) -> StreamCompletion<T>
206 where
207 T: Send + 'static,
208 F: FnOnce(Arc<AtomicBool>) -> StreamResult<T> + Send + 'static,
209 {
210 if self.is_shutdown() {
211 return StreamCompletion::ready(Err(StreamError::AbruptTermination));
212 }
213
214 let (sender, receiver) = oneshot::channel();
215 let state = Arc::clone(&self.inner.state);
216 let cancellation = StreamCancellation::new();
217 let task_cancelled = cancellation.cancelled();
218 let task_cancellation = cancellation.clone();
219 state.active_streams.fetch_add(1, Ordering::SeqCst);
220 default_stream_executor().execute(Box::new(move || {
221 let _worker = task_cancellation.register_current_worker();
222 let result = {
223 let _active = ActiveStreamGuard::decrement_on_drop(Arc::clone(&state));
224 run_stream_task(&state, task_cancelled, run)
225 };
226 let _ = sender.send(result);
227 }));
228
229 StreamCompletion::from_receiver(receiver, Some(cancellation))
230 }
231
232 pub(super) fn spawn_stream_inline<T, F>(&self, run: F) -> StreamCompletion<T>
233 where
234 T: Send + 'static,
235 F: FnOnce(Arc<AtomicBool>) -> StreamResult<T>,
236 {
237 if self.is_shutdown() {
238 return StreamCompletion::ready(Err(StreamError::AbruptTermination));
239 }
240
241 let state = Arc::clone(&self.inner.state);
242 let cancelled = Arc::new(AtomicBool::new(false));
243 state.active_streams.fetch_add(1, Ordering::SeqCst);
244 let result = {
245 let _active = ActiveStreamGuard::decrement_on_drop(Arc::clone(&state));
246 run_stream_task(&state, cancelled, run)
247 };
248 StreamCompletion::ready(result)
249 }
250}
251
252type StreamJob = Box<dyn FnOnce() + Send>;
253
254struct StreamExecutor {
264 shared: Arc<ExecutorShared>,
265}
266
267struct ExecutorShared {
268 inner: Mutex<ExecutorInner>,
269 available: Condvar,
270 name_prefix: &'static str,
271}
272
273struct ExecutorInner {
274 queue: VecDeque<StreamJob>,
275 idle: usize,
276 workers: usize,
277}
278
279impl StreamExecutor {
280 fn new(name_prefix: &'static str) -> Self {
281 Self {
282 shared: Arc::new(ExecutorShared {
283 inner: Mutex::new(ExecutorInner {
284 queue: VecDeque::new(),
285 idle: 0,
286 workers: 0,
287 }),
288 available: Condvar::new(),
289 name_prefix,
290 }),
291 }
292 }
293
294 fn execute(&self, job: StreamJob) {
295 let mut inner = self
296 .shared
297 .inner
298 .lock()
299 .unwrap_or_else(|poison| poison.into_inner());
300
301 if inner.idle > inner.queue.len() {
302 inner.queue.push_back(job);
306 drop(inner);
307 self.shared.available.notify_one();
308 return;
309 }
310
311 inner.workers += 1;
315 let worker_index = inner.workers;
316 drop(inner);
317
318 let shared = Arc::clone(&self.shared);
319 let name = format!("{}-{worker_index}", self.shared.name_prefix);
320 match thread::Builder::new()
321 .name(name)
322 .spawn(move || worker_loop(&shared))
323 {
324 Ok(_) => {
325 let mut inner = self
326 .shared
327 .inner
328 .lock()
329 .unwrap_or_else(|poison| poison.into_inner());
330 inner.queue.push_back(job);
331 drop(inner);
332 self.shared.available.notify_one();
333 }
334 Err(_) => {
335 let mut inner = self
338 .shared
339 .inner
340 .lock()
341 .unwrap_or_else(|poison| poison.into_inner());
342 inner.workers -= 1;
343 drop(inner);
344 job();
345 }
346 }
347 }
348}
349
350fn worker_loop(shared: &ExecutorShared) {
351 struct WorkerGuard<'a> {
354 shared: &'a ExecutorShared,
355 }
356 impl Drop for WorkerGuard<'_> {
357 fn drop(&mut self) {
358 let mut inner = self
359 .shared
360 .inner
361 .lock()
362 .unwrap_or_else(|poison| poison.into_inner());
363 inner.workers -= 1;
364 }
365 }
366 let _guard = WorkerGuard { shared };
367
368 const IDLE_TIMEOUT: Duration = Duration::from_secs(10);
371 loop {
372 let job = {
373 let mut inner = shared
374 .inner
375 .lock()
376 .unwrap_or_else(|poison| poison.into_inner());
377 loop {
378 if let Some(job) = inner.queue.pop_front() {
379 break job;
380 }
381 inner.idle += 1;
382 let (next, timeout) = shared
383 .available
384 .wait_timeout(inner, IDLE_TIMEOUT)
385 .unwrap_or_else(|poison| poison.into_inner());
386 inner = next;
387 inner.idle -= 1;
388 if timeout.timed_out() && inner.queue.is_empty() {
389 return;
390 }
391 }
392 };
393 job();
394 }
395}
396
397fn default_stream_executor() -> &'static StreamExecutor {
398 static EXECUTOR: OnceLock<StreamExecutor> = OnceLock::new();
399 EXECUTOR.get_or_init(|| StreamExecutor::new("datum-stream-runtime"))
400}
401
402pub(super) fn dispatch_stream_job(job: StreamJob) {
403 default_stream_executor().execute(job);
404}
405
406impl Default for Runtime {
407 fn default() -> Self {
408 Self::new()
409 }
410}
411
412impl Drop for RuntimeInner {
413 fn drop(&mut self) {
414 self.timer.stop();
415 }
416}
417
418#[cfg(test)]
419impl Runtime {
420 pub(super) fn timer_driver_is_live(&self) -> bool {
421 self.inner.timer.is_live()
422 }
423
424 pub(super) fn timer_thread_name(&self) -> &str {
425 self.inner.timer.thread_name()
426 }
427}
428
429pub(super) fn runtime_checked_stream<T: Send + 'static>(
430 mut input: BoxStream<T>,
431 state: Arc<RuntimeState>,
432 cancelled: Option<Arc<AtomicBool>>,
433) -> BoxStream<T> {
434 let mut terminated = false;
435 Box::new(std::iter::from_fn(move || {
436 if terminated {
437 return None;
438 }
439
440 if state.shutdown.load(Ordering::SeqCst) {
441 terminated = true;
442 return Some(Err(StreamError::AbruptTermination));
443 }
444 if cancelled
445 .as_ref()
446 .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
447 {
448 terminated = true;
449 return Some(Err(StreamError::Cancelled));
450 }
451
452 let previous = cancelled
453 .is_some()
454 .then(|| CURRENT_STREAM_CANCELLED.with(|slot| slot.replace(cancelled.clone())));
455 let next = input.next();
456 if let Some(previous) = previous {
457 CURRENT_STREAM_CANCELLED.with(|slot| {
458 *slot.borrow_mut() = previous;
459 });
460 }
461 if next.is_none() {
462 terminated = true;
463 }
464 next
465 }))
466}