use super::completion::StreamCancellation;
use super::*;
use crate::Attributes;
use crate::stream::timer::TimerDriver;
use std::cell::RefCell;
thread_local! {
static CURRENT_STREAM_CANCELLED: RefCell<Option<Arc<AtomicBool>>> = const { RefCell::new(None) };
}
pub(super) fn current_stream_cancelled() -> Option<Arc<AtomicBool>> {
CURRENT_STREAM_CANCELLED.with(|slot| slot.borrow().clone())
}
static NEXT_TIMER_DRIVER_ID: AtomicUsize = AtomicUsize::new(0);
#[derive(Clone)]
pub struct Runtime {
pub(super) inner: Arc<RuntimeInner>,
name_prefix: Arc<str>,
attributes: Attributes,
}
#[derive(Debug)]
pub(super) struct RuntimeInner {
pub(super) state: Arc<RuntimeState>,
timer: Arc<TimerDriver>,
}
#[derive(Debug)]
pub(super) struct RuntimeState {
pub(super) shutdown: Arc<AtomicBool>,
active_streams: AtomicUsize,
}
impl RuntimeState {
fn new() -> Self {
Self {
shutdown: Arc::new(AtomicBool::new(false)),
active_streams: AtomicUsize::new(0),
}
}
}
struct ActiveStreamGuard {
state: Arc<RuntimeState>,
}
impl ActiveStreamGuard {
fn decrement_on_drop(state: Arc<RuntimeState>) -> Self {
Self { state }
}
}
impl Drop for ActiveStreamGuard {
fn drop(&mut self) {
self.state.active_streams.fetch_sub(1, Ordering::SeqCst);
}
}
pub type Materializer = Runtime;
fn run_stream_task<T, F>(
state: &RuntimeState,
cancelled: Arc<AtomicBool>,
run: F,
) -> StreamResult<T>
where
F: FnOnce(Arc<AtomicBool>) -> StreamResult<T>,
{
if state.shutdown.load(Ordering::SeqCst) {
Err(StreamError::AbruptTermination)
} else if cancelled.load(Ordering::SeqCst) {
Err(StreamError::Cancelled)
} else {
catch_unwind(AssertUnwindSafe(|| run(cancelled)))
.unwrap_or(Err(StreamError::AbruptTermination))
}
}
impl Runtime {
#[must_use]
pub fn new() -> Self {
let timer_id = NEXT_TIMER_DRIVER_ID.fetch_add(1, Ordering::SeqCst);
Self {
inner: Arc::new(RuntimeInner {
state: Arc::new(RuntimeState::new()),
timer: TimerDriver::launch(&format!("datum-timer-{timer_id}")),
}),
name_prefix: Arc::from("datum-stream"),
attributes: Attributes::default(),
}
}
#[must_use]
pub fn with_name_prefix(&self, name_prefix: impl Into<Arc<str>>) -> Self {
Self {
inner: Arc::clone(&self.inner),
name_prefix: name_prefix.into(),
attributes: self.attributes.clone(),
}
}
#[must_use]
pub fn name_prefix(&self) -> &str {
&self.name_prefix
}
#[must_use]
pub fn with_attributes(&self, attributes: Attributes) -> Self {
Self {
inner: Arc::clone(&self.inner),
name_prefix: Arc::clone(&self.name_prefix),
attributes,
}
}
#[must_use]
pub fn attributes(&self) -> &Attributes {
&self.attributes
}
#[must_use]
pub fn effective_attributes(&self, local: &Attributes) -> Attributes {
self.attributes.clone().and(local.clone())
}
pub fn shutdown(&self) {
self.inner.state.shutdown.store(true, Ordering::SeqCst);
self.inner.timer.stop();
}
#[must_use]
pub fn is_shutdown(&self) -> bool {
self.inner.state.shutdown.load(Ordering::SeqCst)
}
#[must_use]
pub fn active_streams(&self) -> usize {
self.inner.state.active_streams.load(Ordering::SeqCst)
}
pub fn materialize<Mat: Send + 'static>(
&self,
graph: &RunnableGraph<Mat>,
) -> StreamResult<Mat> {
if self.is_shutdown() {
return Err(StreamError::AbruptTermination);
}
(graph.runner)(self)
}
pub fn schedule_once<F>(&self, delay: Duration, task: F) -> Cancellable
where
F: FnOnce() + Send + 'static,
{
let keep_alive: Arc<dyn Send + Sync> = Arc::clone(&self.inner) as Arc<dyn Send + Sync>;
self.inner.timer.schedule_once(
delay,
task,
Arc::clone(&self.inner.state.shutdown),
keep_alive,
)
}
pub fn schedule_with_fixed_delay<F>(
&self,
initial_delay: Duration,
delay: Duration,
task: F,
) -> Cancellable
where
F: Fn() + Send + Sync + 'static,
{
let keep_alive: Arc<dyn Send + Sync> = Arc::clone(&self.inner) as Arc<dyn Send + Sync>;
self.inner.timer.schedule_with_fixed_delay(
initial_delay,
delay,
task,
Arc::clone(&self.inner.state.shutdown),
keep_alive,
)
}
pub fn schedule_at_fixed_rate<F>(
&self,
initial_delay: Duration,
interval: Duration,
task: F,
) -> Cancellable
where
F: Fn() + Send + Sync + 'static,
{
let keep_alive: Arc<dyn Send + Sync> = Arc::clone(&self.inner) as Arc<dyn Send + Sync>;
self.inner.timer.schedule_at_fixed_rate(
initial_delay,
interval,
task,
Arc::clone(&self.inner.state.shutdown),
keep_alive,
)
}
pub(crate) fn spawn_stream<T, F>(&self, run: F) -> StreamCompletion<T>
where
T: Send + 'static,
F: FnOnce(Arc<AtomicBool>) -> StreamResult<T> + Send + 'static,
{
if self.is_shutdown() {
return StreamCompletion::ready(Err(StreamError::AbruptTermination));
}
let (sender, receiver) = oneshot::channel();
let state = Arc::clone(&self.inner.state);
let cancellation = StreamCancellation::new();
let task_cancelled = cancellation.cancelled();
let task_cancellation = cancellation.clone();
state.active_streams.fetch_add(1, Ordering::SeqCst);
default_stream_executor().execute(Box::new(move || {
let _worker = task_cancellation.register_current_worker();
let result = {
let _active = ActiveStreamGuard::decrement_on_drop(Arc::clone(&state));
run_stream_task(&state, task_cancelled, run)
};
let _ = sender.send(result);
}));
StreamCompletion::from_receiver(receiver, Some(cancellation))
}
pub(super) fn spawn_stream_inline<T, F>(&self, run: F) -> StreamCompletion<T>
where
T: Send + 'static,
F: FnOnce(Arc<AtomicBool>) -> StreamResult<T>,
{
if self.is_shutdown() {
return StreamCompletion::ready(Err(StreamError::AbruptTermination));
}
let state = Arc::clone(&self.inner.state);
let cancelled = Arc::new(AtomicBool::new(false));
state.active_streams.fetch_add(1, Ordering::SeqCst);
let result = {
let _active = ActiveStreamGuard::decrement_on_drop(Arc::clone(&state));
run_stream_task(&state, cancelled, run)
};
StreamCompletion::ready(result)
}
}
type StreamJob = Box<dyn FnOnce() + Send>;
struct StreamExecutor {
shared: Arc<ExecutorShared>,
}
struct ExecutorShared {
inner: Mutex<ExecutorInner>,
available: Condvar,
name_prefix: &'static str,
}
struct ExecutorInner {
queue: VecDeque<StreamJob>,
idle: usize,
workers: usize,
}
impl StreamExecutor {
fn new(name_prefix: &'static str) -> Self {
Self {
shared: Arc::new(ExecutorShared {
inner: Mutex::new(ExecutorInner {
queue: VecDeque::new(),
idle: 0,
workers: 0,
}),
available: Condvar::new(),
name_prefix,
}),
}
}
fn execute(&self, job: StreamJob) {
let mut inner = self
.shared
.inner
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if inner.idle > inner.queue.len() {
inner.queue.push_back(job);
drop(inner);
self.shared.available.notify_one();
return;
}
inner.workers += 1;
let worker_index = inner.workers;
drop(inner);
let shared = Arc::clone(&self.shared);
let name = format!("{}-{worker_index}", self.shared.name_prefix);
match thread::Builder::new()
.name(name)
.spawn(move || worker_loop(&shared))
{
Ok(_) => {
let mut inner = self
.shared
.inner
.lock()
.unwrap_or_else(|poison| poison.into_inner());
inner.queue.push_back(job);
drop(inner);
self.shared.available.notify_one();
}
Err(_) => {
let mut inner = self
.shared
.inner
.lock()
.unwrap_or_else(|poison| poison.into_inner());
inner.workers -= 1;
drop(inner);
job();
}
}
}
}
fn worker_loop(shared: &ExecutorShared) {
struct WorkerGuard<'a> {
shared: &'a ExecutorShared,
}
impl Drop for WorkerGuard<'_> {
fn drop(&mut self) {
let mut inner = self
.shared
.inner
.lock()
.unwrap_or_else(|poison| poison.into_inner());
inner.workers -= 1;
}
}
let _guard = WorkerGuard { shared };
const IDLE_TIMEOUT: Duration = Duration::from_secs(10);
loop {
let job = {
let mut inner = shared
.inner
.lock()
.unwrap_or_else(|poison| poison.into_inner());
loop {
if let Some(job) = inner.queue.pop_front() {
break job;
}
inner.idle += 1;
let (next, timeout) = shared
.available
.wait_timeout(inner, IDLE_TIMEOUT)
.unwrap_or_else(|poison| poison.into_inner());
inner = next;
inner.idle -= 1;
if timeout.timed_out() && inner.queue.is_empty() {
return;
}
}
};
job();
}
}
fn default_stream_executor() -> &'static StreamExecutor {
static EXECUTOR: OnceLock<StreamExecutor> = OnceLock::new();
EXECUTOR.get_or_init(|| StreamExecutor::new("datum-stream-runtime"))
}
pub(super) fn dispatch_stream_job(job: StreamJob) {
default_stream_executor().execute(job);
}
impl Default for Runtime {
fn default() -> Self {
Self::new()
}
}
impl Drop for RuntimeInner {
fn drop(&mut self) {
self.timer.stop();
}
}
#[cfg(test)]
impl Runtime {
pub(super) fn timer_driver_is_live(&self) -> bool {
self.inner.timer.is_live()
}
pub(super) fn timer_thread_name(&self) -> &str {
self.inner.timer.thread_name()
}
}
pub(super) fn runtime_checked_stream<T: Send + 'static>(
mut input: BoxStream<T>,
state: Arc<RuntimeState>,
cancelled: Option<Arc<AtomicBool>>,
) -> BoxStream<T> {
let mut terminated = false;
Box::new(std::iter::from_fn(move || {
if terminated {
return None;
}
if state.shutdown.load(Ordering::SeqCst) {
terminated = true;
return Some(Err(StreamError::AbruptTermination));
}
if cancelled
.as_ref()
.is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
{
terminated = true;
return Some(Err(StreamError::Cancelled));
}
let previous = cancelled
.is_some()
.then(|| CURRENT_STREAM_CANCELLED.with(|slot| slot.replace(cancelled.clone())));
let next = input.next();
if let Some(previous) = previous {
CURRENT_STREAM_CANCELLED.with(|slot| {
*slot.borrow_mut() = previous;
});
}
if next.is_none() {
terminated = true;
}
next
}))
}