dbsp/circuit/
runtime.rs

1//! A multithreaded runtime for evaluating DBSP circuits in a data-parallel
2//! fashion.
3
4use crate::circuit::checkpointer::Checkpointer;
5use crate::circuit::DevTweaks;
6use crate::error::Error as DbspError;
7use crate::operator::communication::Exchange;
8use crate::storage::backend::StorageBackend;
9use crate::storage::file::format::Compression;
10use crate::storage::file::to_bytes;
11use crate::storage::file::writer::Parameters;
12use crate::trace::unaligned_deserialize;
13use crate::SchedulerError;
14use crate::{
15    storage::{backend::StorageError, buffer_cache::BufferCache, dirlock::LockedDirectory},
16    DetailedError,
17};
18use core_affinity::{get_core_ids, CoreId};
19use crossbeam::sync::{Parker, Unparker};
20use enum_map::{enum_map, Enum, EnumMap};
21use feldera_types::config::{StorageCompression, StorageConfig, StorageOptions};
22use indexmap::IndexSet;
23use once_cell::sync::Lazy;
24use serde::Serialize;
25use std::iter::repeat;
26use std::path::Path;
27use std::sync::atomic::AtomicUsize;
28use std::sync::{LazyLock, Mutex};
29use std::thread::Thread;
30use std::time::Duration;
31use std::{
32    backtrace::Backtrace,
33    borrow::Cow,
34    cell::{Cell, RefCell},
35    error::Error as StdError,
36    fmt,
37    fmt::{Debug, Display, Error as FmtError, Formatter},
38    panic::{self, Location, PanicHookInfo},
39    sync::{
40        atomic::{AtomicBool, Ordering},
41        Arc, RwLock, Weak,
42    },
43    thread::{Builder, JoinHandle, Result as ThreadResult},
44};
45use tokio::sync::Notify;
46use tracing::{debug, error, info, warn};
47use typedmap::TypedDashMap;
48
49use super::dbsp_handle::{Layout, Mode};
50use super::CircuitConfig;
51
52/// The number of tuples a stateful operator outputs per step during replay.
53pub const DEFAULT_REPLAY_STEP_SIZE: usize = 10000;
54
55#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
56pub enum Error {
57    /// One of the worker threads terminated unexpectedly.
58    WorkerPanic {
59        // Detailed panic information from all threads that
60        // reported panics.
61        panic_info: Vec<(usize, ThreadType, WorkerPanicInfo)>,
62    },
63    /// The storage directory supplied does not match the runtime circuit.
64    IncompatibleStorage,
65    Terminated,
66}
67
68impl DetailedError for Error {
69    fn error_code(&self) -> Cow<'static, str> {
70        match self {
71            Self::WorkerPanic { .. } => Cow::from("WorkerPanic"),
72            Self::Terminated => Cow::from("Terminated"),
73            Self::IncompatibleStorage => Cow::from("IncompatibleStorage"),
74        }
75    }
76}
77
78impl Display for Error {
79    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
80        match self {
81            Self::WorkerPanic { panic_info } => {
82                writeln!(f, "One or more worker threads terminated unexpectedly")?;
83
84                for (worker, thread_type, worker_panic_info) in panic_info.iter() {
85                    writeln!(f, "{thread_type} worker thread {worker} panicked")?;
86                    writeln!(f, "{worker_panic_info}")?;
87                }
88                Ok(())
89            }
90            Self::Terminated => f.write_str("circuit has been terminated"),
91            Self::IncompatibleStorage => {
92                f.write_str("Supplied storage directory does not fit the runtime circuit")
93            }
94        }
95    }
96}
97
98impl StdError for Error {}
99
100// Thread-local variables used to store per-worker context.
101thread_local! {
102    // Reference to the `Runtime` that manages this worker thread or `None`
103    // if the current thread is not running in a multithreaded runtime.
104    static RUNTIME: RefCell<Option<Runtime>> = const { RefCell::new(None) };
105
106    // 0-based index of the current worker thread within its runtime.
107    // Returns `0` if the current thread in not running in a multithreaded
108    // runtime.
109    static WORKER_INDEX: Cell<usize> = const { Cell::new(0) };
110}
111
112mod thread_type {
113    use std::{cell::Cell, fmt::Display};
114
115    #[cfg(doc)]
116    use super::Runtime;
117    use enum_map::Enum;
118    use serde::Serialize;
119
120    thread_local! {
121        /// `None` means that this is an auxiliary thread that runs inside the runtime
122        /// but is neither a DBSP foreground nor a background thread.
123        static CURRENT: Cell<Option<ThreadType>> = const { Cell::new(None) };
124    }
125
126    /// Type of a thread running in a [Runtime].
127    #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Enum, Serialize)]
128    #[serde(rename_all = "snake_case")]
129    pub enum ThreadType {
130        /// Circuit thread.
131        Foreground,
132
133        /// Merger thread.
134        Background,
135    }
136
137    impl ThreadType {
138        /// Returns the kind of thread we're currently running in, if we're in a
139        /// [Runtime].  Outside of a [Runtime], this returns
140        /// [ThreadType::Foreground].
141        pub fn current() -> Option<Self> {
142            CURRENT.get()
143        }
144
145        pub(super) fn set_current(thread_type: Self) {
146            CURRENT.set(Some(thread_type));
147        }
148    }
149
150    impl Display for ThreadType {
151        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152            match self {
153                ThreadType::Foreground => write!(f, "foreground"),
154                ThreadType::Background => write!(f, "background"),
155            }
156        }
157    }
158}
159pub use thread_type::ThreadType;
160
161pub struct LocalStoreMarker;
162
163/// Local data store shared by all workers in a runtime.
164pub type LocalStore = TypedDashMap<LocalStoreMarker>;
165
166// Rust source code location of a panic.
167#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
168pub struct PanicLocation {
169    file: String,
170    line: u32,
171    col: u32,
172}
173
174impl Display for PanicLocation {
175    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
176        write!(f, "{}:{}:{}", self.file, self.line, self.col)
177    }
178}
179
180impl PanicLocation {
181    fn new(loc: &Location) -> Self {
182        Self {
183            file: loc.file().to_string(),
184            line: loc.line(),
185            col: loc.column(),
186        }
187    }
188}
189
190/// Information about a panic in a worker thread.
191#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
192pub struct WorkerPanicInfo {
193    // Panic message, if any.
194    message: Option<String>,
195    // Panic location.
196    location: Option<PanicLocation>,
197    // Backtrace.
198    backtrace: String,
199}
200
201impl Display for WorkerPanicInfo {
202    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
203        if let Some(message) = &self.message {
204            writeln!(f, "panic message: {message}")?;
205        } else {
206            writeln!(f, "panic message (none)")?;
207        }
208
209        if let Some(location) = &self.location {
210            writeln!(f, "panic location: {location}")?;
211        } else {
212            writeln!(f, "panic location: unknown")?;
213        }
214        writeln!(f, "stack trace:\n{}", self.backtrace)
215    }
216}
217
218impl WorkerPanicInfo {
219    fn new(panic_info: &PanicHookInfo) -> Self {
220        #[allow(clippy::manual_map)]
221        let message = if let Some(v) = panic_info.payload().downcast_ref::<String>() {
222            Some(v.clone())
223        } else if let Some(v) = panic_info.payload().downcast_ref::<&str>() {
224            Some(v.to_string())
225        } else {
226            None
227        };
228        let backtrace = Backtrace::force_capture().to_string();
229        let location = panic_info
230            .location()
231            .map(|location| PanicLocation::new(location));
232
233        Self {
234            message,
235            location,
236            backtrace,
237        }
238    }
239}
240
241#[derive(derive_more::Debug)]
242struct RuntimeStorage {
243    /// Runner configuration.
244    pub config: StorageConfig,
245
246    /// User options.
247    pub options: StorageOptions,
248
249    /// Backend.
250    #[debug(skip)]
251    pub backend: Arc<dyn StorageBackend>,
252
253    // This is just here for the `Drop` behavior.
254    #[allow(dead_code)]
255    locked_directory: LockedDirectory,
256}
257
258struct RuntimeInner {
259    layout: Layout,
260    mode: Mode,
261    dev_tweaks: DevTweaks,
262
263    storage: Option<RuntimeStorage>,
264    store: LocalStore,
265    kill_signal: AtomicBool,
266    // Background threads spawned by this runtime, including for aux threads, in no specific order.
267    background_threads: Mutex<Vec<JoinHandle<()>>>,
268    aux_threads: Mutex<Vec<JoinHandle<()>>>,
269    buffer_caches: Vec<EnumMap<ThreadType, Arc<BufferCache>>>,
270    pin_cpus: Vec<EnumMap<ThreadType, CoreId>>,
271    worker_sequence_numbers: Vec<AtomicUsize>,
272    // Panic info collected from failed worker threads.
273    panic_info: Vec<EnumMap<ThreadType, RwLock<Option<WorkerPanicInfo>>>>,
274    panicked: AtomicBool,
275    replay_step_size: AtomicUsize,
276}
277
278impl Drop for RuntimeInner {
279    fn drop(&mut self) {
280        debug!("dropping RuntimeInner");
281    }
282}
283
284impl Debug for RuntimeInner {
285    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
286        f.debug_struct("RuntimeInner")
287            .field("layout", &self.layout)
288            .field("storage", &self.storage)
289            .finish()
290    }
291}
292
293fn display_core_ids<'a>(iter: impl Iterator<Item = &'a CoreId>) -> String {
294    format!(
295        "{:?}",
296        iter.map(|core| core.id).collect::<Vec<_>>().as_slice()
297    )
298}
299
300fn map_pin_cpus(layout: &Layout, pin_cpus: &[usize]) -> Vec<EnumMap<ThreadType, CoreId>> {
301    if layout.is_multihost() {
302        if !pin_cpus.is_empty() {
303            warn!("CPU pinning not yet supported with multihost DBSP");
304        }
305        return Vec::new();
306    }
307
308    let nworkers = layout.n_workers();
309    let pin_cpus = pin_cpus
310        .iter()
311        .copied()
312        .map(|id| CoreId { id })
313        .collect::<IndexSet<_>>();
314    if pin_cpus.len() < 2 * nworkers {
315        if !pin_cpus.is_empty() {
316            warn!("ignoring CPU pinning request because {nworkers} workers require {} pinned CPUs but only {} were specified",
317                      2 * nworkers, pin_cpus.len())
318        }
319        return Vec::new();
320    }
321
322    let Some(core_ids) = get_core_ids() else {
323        warn!("ignoring CPU pinning request because this system's core ids list could not be obtained");
324        return Vec::new();
325    };
326    let core_ids = core_ids.iter().copied().collect::<IndexSet<_>>();
327
328    let missing_cpus = pin_cpus.difference(&core_ids).copied().collect::<Vec<_>>();
329    if !missing_cpus.is_empty() {
330        warn!("ignoring CPU pinning request because requested CPUs {missing_cpus:?} are not available (available CPUs are: {})",
331              display_core_ids(core_ids.iter()));
332        return Vec::new();
333    }
334
335    let fg_cpus = &pin_cpus[0..nworkers];
336    let bg_cpus = &pin_cpus[nworkers..nworkers * 2];
337    info!(
338        "pinning foreground workers to CPUs {} and background workers to CPUs {}",
339        display_core_ids(fg_cpus.iter()),
340        display_core_ids(bg_cpus.iter())
341    );
342    (0..nworkers)
343        .map(|i| {
344            enum_map! {
345                ThreadType::Foreground => fg_cpus[i],
346                ThreadType::Background => bg_cpus[i],
347            }
348        })
349        .collect()
350}
351
352impl RuntimeInner {
353    fn new(config: CircuitConfig) -> Result<Self, DbspError> {
354        let nworkers = config.layout.local_workers().len();
355
356        let storage = if let Some(storage) = config.storage {
357            let locked_directory =
358                LockedDirectory::new_blocking(storage.config.path(), Duration::from_secs(60))?;
359            let backend = storage.backend;
360
361            if let Some(init_checkpoint) = storage.init_checkpoint {
362                if !backend
363                    .exists(&Checkpointer::checkpoint_dir(init_checkpoint).child("CHECKPOINT"))?
364                {
365                    return Err(DbspError::Storage(StorageError::CheckpointNotFound(
366                        init_checkpoint,
367                    )));
368                }
369            }
370
371            Some(RuntimeStorage {
372                config: storage.config,
373                options: storage.options,
374                backend,
375                locked_directory,
376            })
377        } else {
378            None
379        };
380
381        let cache_size_bytes = if let Some(storage) = &storage {
382            storage
383                .options
384                .cache_mib
385                .map_or(256 * 1024 * 1024, |cache_mib| {
386                    cache_mib.saturating_mul(1024 * 1024) / nworkers / ThreadType::LENGTH
387                })
388        } else {
389            // Dummy buffer cache.
390            1
391        };
392
393        Ok(Self {
394            pin_cpus: map_pin_cpus(&config.layout, &config.pin_cpus),
395            layout: config.layout,
396            mode: config.mode,
397            dev_tweaks: config.dev_tweaks,
398            storage,
399            store: TypedDashMap::new(),
400            kill_signal: AtomicBool::new(false),
401            background_threads: Mutex::new(Vec::new()),
402            aux_threads: Mutex::new(Vec::new()),
403            buffer_caches: (0..nworkers)
404                .map(|_| EnumMap::from_fn(|_| Arc::new(BufferCache::new(cache_size_bytes))))
405                .collect(),
406            worker_sequence_numbers: (0..nworkers).map(|_| AtomicUsize::new(0)).collect(),
407            panic_info: (0..nworkers)
408                .map(|_| EnumMap::from_fn(|_| RwLock::new(None)))
409                .collect(),
410            panicked: AtomicBool::new(false),
411            replay_step_size: AtomicUsize::new(DEFAULT_REPLAY_STEP_SIZE),
412        })
413    }
414
415    fn pin_cpu(&self) {
416        if !self.pin_cpus.is_empty() {
417            let local_worker_offset = Runtime::local_worker_offset();
418            let Some(thread_type) = ThreadType::current() else {
419                panic!("pin_cpu() called outside of a runtime or on an aux thread");
420            };
421            let core = self.pin_cpus[local_worker_offset][thread_type];
422            if !core_affinity::set_for_current(core) {
423                warn!(
424                    "failed to pin worker {local_worker_offset} {thread_type} thread to core {}",
425                    core.id
426                );
427            }
428        }
429    }
430}
431
432// Panic callback used to record worker thread panic information
433// in the runtime.
434//
435// Note: this is a global hook shared by all threads in the process.
436// It is installed when a new DBSP runtime starts, possibly overriding
437// a hook installed by another DBSP instance.  However it should work
438// correctly for threads from different DBSP runtimes or for threads
439// that don't belong to any DBSP runtime since it uses `RUNTIME`
440// thread-local variable to detect a DBSP runtime.
441fn panic_hook(panic_info: &PanicHookInfo<'_>, default_panic_hook: &dyn Fn(&PanicHookInfo<'_>)) {
442    // Call the default panic hook first.
443    default_panic_hook(panic_info);
444
445    RUNTIME.with(|runtime| {
446        if let Ok(runtime) = runtime.try_borrow() {
447            if let Some(runtime) = runtime.as_ref() {
448                runtime.panic(panic_info);
449            }
450        }
451    })
452}
453
454/// A multithreaded runtime that hosts `N` circuits running in parallel worker
455/// threads. Typically, all `N` circuits are identical, but this is not required
456/// or enforced.
457#[repr(transparent)]
458#[derive(Clone, Debug)]
459pub struct Runtime(Arc<RuntimeInner>);
460
461/// A weak reference to a [Runtime].
462#[repr(transparent)]
463#[derive(Clone, Debug)]
464pub struct WeakRuntime(Weak<RuntimeInner>);
465
466impl WeakRuntime {
467    pub fn upgrade(&self) -> Option<Runtime> {
468        self.0.upgrade().map(Runtime)
469    }
470}
471
472/// Stores the default Rust panic hook, so we can invoke it as part of
473/// the DBSP custom hook.
474#[allow(clippy::type_complexity)]
475static DEFAULT_PANIC_HOOK: Lazy<Box<dyn Fn(&PanicHookInfo<'_>) + 'static + Sync + Send>> =
476    Lazy::new(|| {
477        // Clear any hooks installed by other libraries.
478        let _ = panic::take_hook();
479        panic::take_hook()
480    });
481
482/// Returns the default Rust panic hook.
483fn default_panic_hook() -> &'static (dyn Fn(&PanicHookInfo<'_>) + 'static + Sync + Send) {
484    &*DEFAULT_PANIC_HOOK
485}
486
487impl Runtime {
488    /// Creates a new runtime with the specified `layout` and runs user-provided
489    /// closure `circuit` in each thread, and returns a handle to the runtime. The closure
490    /// takes an unparker.  The runtime will use this unparker to wake up the thread
491    /// when terminating the circuit.
492    ///
493    /// The `layout` may be specified as a number of worker threads or as a
494    /// [`Layout`].
495    ///
496    /// # Examples
497    /// ```
498    /// # #[cfg(all(windows, miri))]
499    /// # fn main() {}
500    ///
501    /// # #[cfg(not(all(windows, miri)))]
502    /// # fn main() {
503    /// use dbsp::circuit::{Circuit, RootCircuit, Runtime};
504    ///
505    /// // Create a runtime with 4 worker threads.
506    /// let hruntime = Runtime::run(4, |_parker| {
507    ///     // This closure runs within each worker thread.
508    ///     let root = RootCircuit::build(move |circuit| {
509    ///         // Populate `circuit` with operators.
510    ///         Ok(())
511    ///     })
512    ///     .unwrap()
513    ///     .0;
514    ///
515    ///     // Run circuit for 100 clock cycles.
516    ///     for _ in 0..100 {
517    ///         root.transaction().unwrap();
518    ///     }
519    /// })
520    /// .unwrap();
521    ///
522    /// // Wait for all worker threads to terminate.
523    /// hruntime.join().unwrap();
524    /// # }
525    /// ```
526    pub fn run<F>(config: impl Into<CircuitConfig>, circuit: F) -> Result<RuntimeHandle, DbspError>
527    where
528        F: FnOnce(Parker) + Clone + Send + 'static,
529    {
530        let config: CircuitConfig = config.into();
531
532        let workers = config.layout.local_workers();
533
534        let runtime = Self(Arc::new(RuntimeInner::new(config)?));
535
536        // Install custom panic hook.
537        let default_hook = default_panic_hook();
538        panic::set_hook(Box::new(move |panic_info| {
539            panic_hook(panic_info, default_hook)
540        }));
541
542        let workers = workers
543            .map(|worker_index| {
544                let runtime = runtime.clone();
545                let build_circuit = circuit.clone();
546                let parker = Parker::new();
547                let unparker = parker.unparker().clone();
548                let handle = Builder::new()
549                    .name(format!("dbsp-worker-{worker_index}"))
550                    .spawn(move || {
551                        // Set the worker's runtime handle and index
552                        WORKER_INDEX.set(worker_index);
553                        ThreadType::set_current(ThreadType::Foreground);
554                        runtime.inner().pin_cpu();
555                        RUNTIME.with(|rt| *rt.borrow_mut() = Some(runtime));
556
557                        // Build the worker's circuit
558                        build_circuit(parker);
559                    })
560                    .unwrap_or_else(|error| {
561                        panic!("failed to spawn worker thread {worker_index}: {error}");
562                    });
563                (handle, unparker)
564            })
565            .collect::<Vec<_>>();
566
567        Ok(RuntimeHandle::new(runtime, workers))
568    }
569
570    pub fn downgrade(&self) -> WeakRuntime {
571        WeakRuntime(Arc::downgrade(&self.0))
572    }
573
574    /// Returns a reference to the multithreaded runtime that
575    /// manages the current worker thread, or `None` if the thread
576    /// runs without a runtime.
577    ///
578    /// Worker threads created by the [`Runtime::run`] method can access
579    /// the services provided by this API via an instance of `struct Runtime`,
580    /// which they can obtain by calling `Runtime::runtime()`.  DBSP circuits
581    /// created without a managed runtime run in the context of the client
582    /// thread.  When invoked by such a thread, this method returns `None`.
583    #[allow(clippy::self_named_constructors)]
584    pub fn runtime() -> Option<Runtime> {
585        RUNTIME.with(|rt| rt.borrow().clone())
586    }
587
588    /// Returns this runtime's storage backend, if storage is configured.
589    ///
590    /// # Panic
591    ///
592    /// Panics if this thread is not in a [Runtime].
593    pub fn storage_backend() -> Result<Arc<dyn StorageBackend>, StorageError> {
594        Runtime::runtime()
595            .unwrap()
596            .inner()
597            .storage
598            .as_ref()
599            .map_or(Err(StorageError::StorageDisabled), |storage| {
600                Ok(storage.backend.clone())
601            })
602    }
603
604    /// Returns this thread's buffer cache.  Every thread has a buffer cache,
605    /// but:
606    ///
607    /// - If the thread's [Runtime] does not have storage configured, the cache
608    ///   size is trivially small.
609    ///
610    /// - If the thread is not in a [Runtime], then the cache is shared among
611    ///   all such threads. (Such a thread might be in a circuit that uses
612    ///   storage, but there's no way to know because only [Runtime] makes that
613    ///   available at a thread level.)
614    pub fn buffer_cache() -> Arc<BufferCache> {
615        // Fast path, look up from TLS
616        thread_local! {
617            static BUFFER_CACHE: RefCell<Option<Arc<BufferCache>>> = const { RefCell::new(None) };
618        }
619        // No `Runtime` means there's only a single worker, so use a single
620        // global cache.
621        // This cache is also used by all auxiliary threads in the runtime.
622        // FIXME: We may need a tunable strategy for aux threads. We cannot simply give each of them the
623        // same cache as DBSP worker threads, as there can be dozens of aux threads (currently one per
624        // output connector), which do not necessarily need a large cache. OTOH, sharing the same cache
625        // across all of them may potentially cause performance issues.
626        static NO_RUNTIME_CACHE: LazyLock<Arc<BufferCache>> =
627            LazyLock::new(|| Arc::new(BufferCache::new(1024 * 1024 * 256)));
628
629        if let Some(buffer_cache) = BUFFER_CACHE.with(|bc| bc.borrow().clone()) {
630            return buffer_cache;
631        }
632
633        // Slow path for initializing the thread-local.
634        let buffer_cache = if let Some(rt) = Runtime::runtime() {
635            if let Some(thread_type) = ThreadType::current() {
636                rt.get_buffer_cache(Runtime::local_worker_offset(), thread_type)
637            } else {
638                // Aux thread: use the global cache.
639                NO_RUNTIME_CACHE.clone()
640            }
641        } else {
642            NO_RUNTIME_CACHE.clone()
643        };
644        BUFFER_CACHE.set(Some(buffer_cache.clone()));
645        buffer_cache
646    }
647
648    /// Spawn an auxiliary thread inside the runtime.
649    ///
650    /// The auxiliary thread will have access to the runtime's resources, including the
651    /// storage backend. The current use case for this is to be able to use spines outside
652    /// of the DBSP worker threads, e.g., to maintain output buffers.
653    pub fn spawn_aux_thread<F>(&self, thread_name: &str, f: F)
654    where
655        F: FnOnce() + Send + 'static,
656    {
657        let runtime = self.clone();
658        let handle = Builder::new()
659            .name(thread_name.to_string())
660            .spawn(|| {
661                RUNTIME.with(|rt| *rt.borrow_mut() = Some(runtime));
662                f()
663            })
664            .expect("failed to spawn auxiliary thread");
665
666        self.inner().aux_threads.lock().unwrap().push(handle)
667    }
668
669    /// Returns this runtime's buffer cache for thread type `thread_type` in
670    /// worker with local offset `local_worker_offset`.
671    ///
672    /// Usually it's easier and faster to call [Runtime::buffer_cache] instead.
673    pub fn get_buffer_cache(
674        &self,
675        local_worker_offset: usize,
676        thread_type: ThreadType,
677    ) -> Arc<BufferCache> {
678        self.0.buffer_caches[local_worker_offset][thread_type].clone()
679    }
680
681    /// Returns `(current, max)`, reporting the amount of the buffer cache
682    /// that is currently used and its maximum size, both in bytes.
683    pub fn cache_occupancy(&self) -> (usize, usize) {
684        if self.0.storage.is_some() {
685            self.0
686                .buffer_caches
687                .iter()
688                .flat_map(|map| map.values())
689                .map(|cache| cache.occupancy())
690                .fold((0, 0), |(a_cur, a_max), (b_cur, b_max)| {
691                    (a_cur + b_cur, a_max + b_max)
692                })
693        } else {
694            (0, 0)
695        }
696    }
697
698    /// Returns 0-based index of the current worker thread within its runtime.
699    /// For threads that run without a runtime, this method returns `0`.  In a
700    /// multihost runtime, this is a global index across all hosts.
701    pub fn worker_index() -> usize {
702        WORKER_INDEX.get()
703    }
704
705    /// Returns the 0-based index of the current worker within its local host.
706    pub fn local_worker_offset() -> usize {
707        // Find the lowest-numbered local worker.
708        let local_workers_start = RUNTIME
709            .with(|rt| Some(rt.borrow().as_ref()?.layout().local_workers().start))
710            .unwrap_or_default();
711        Self::worker_index() - local_workers_start
712    }
713
714    pub fn mode() -> Mode {
715        RUNTIME
716            .with(|rt| Some(rt.borrow().as_ref()?.get_mode()))
717            .unwrap_or_default()
718    }
719
720    pub fn with_dev_tweaks<F, T>(f: F) -> T
721    where
722        F: Fn(&DevTweaks) -> T,
723    {
724        static DEFAULT: Lazy<DevTweaks> = Lazy::new(DevTweaks::default);
725        RUNTIME
726            .with(|rt| Some(f(&rt.borrow().as_ref()?.inner().dev_tweaks)))
727            .unwrap_or_else(|| f(&DEFAULT))
728    }
729
730    pub fn get_mode(&self) -> Mode {
731        self.inner().mode.clone()
732    }
733
734    /// Configure the number of tuples a stateful operator outputs per step during replay.
735    ///
736    /// The default is `DEFAULT_REPLAY_STEP_SIZE`.
737    pub fn set_replay_step_size(&self, step_size: usize) {
738        self.inner()
739            .replay_step_size
740            .store(step_size, Ordering::Release);
741    }
742
743    /// Get currently configured replay step size.
744    ///
745    /// Returns `DEFAULT_REPLAY_STEP_SIZE` if the current thread doesn't have a runtime.
746    pub fn replay_step_size() -> usize {
747        RUNTIME
748            .with(|rt| Some(rt.borrow().as_ref()?.get_replay_step_size()))
749            .unwrap_or(DEFAULT_REPLAY_STEP_SIZE)
750    }
751
752    /// Get currently configured replay step size.
753    pub fn get_replay_step_size(&self) -> usize {
754        self.inner().replay_step_size.load(Ordering::Acquire)
755    }
756
757    /// Returns the worker index as a string.
758    ///
759    /// This is useful for metric labels.
760    pub fn worker_index_str() -> &'static str {
761        static WORKER_INDEX_STRS: Lazy<[&'static str; 256]> = Lazy::new(|| {
762            let mut data: [&'static str; 256] = [""; 256];
763            for (i, item) in data.iter_mut().enumerate() {
764                *item = Box::leak(i.to_string().into_boxed_str());
765            }
766            data
767        });
768
769        WORKER_INDEX_STRS
770            .get(WORKER_INDEX.get())
771            .copied()
772            .unwrap_or_else(|| {
773                panic!("Limit workers to less than 256 or increase the limit in the code.")
774            })
775    }
776
777    /// Returns the minimum number of bytes in a batch (one that persists from
778    /// step to step) to spill it to storage. Returns `None` if this thread doesn't
779    /// have storage configured or a default value (1MiB) if it runs without a
780    /// [Runtime].
781    pub fn min_index_storage_bytes() -> Option<usize> {
782        RUNTIME.with(|rt| {
783            Some(
784                rt.borrow()
785                    .as_ref()?
786                    .inner()
787                    .storage
788                    .as_ref()?
789                    .options
790                    .min_storage_bytes
791                    .unwrap_or({
792                        // This reduces the files stored on disk to a reasonable number.
793
794                        10 * 1024 * 1024
795                    }),
796            )
797        })
798    }
799
800    /// Returns the minimum number of bytes in a batch (one that does not
801    /// persist from step to step) to spill it to storage, or `None` if this
802    /// thread doesn't have a [Runtime] or if it doesn't have storage
803    /// configured.
804    pub fn min_step_storage_bytes() -> Option<usize> {
805        RUNTIME.with(|rt| {
806            Some(
807                rt.borrow()
808                    .as_ref()?
809                    .inner()
810                    .storage
811                    .as_ref()?
812                    .options
813                    .min_step_storage_bytes
814                    .unwrap_or(usize::MAX),
815            )
816        })
817    }
818
819    pub fn file_writer_parameters() -> Parameters {
820        let compression = Runtime::runtime()
821            .unwrap()
822            .inner()
823            .storage
824            .as_ref()
825            .unwrap()
826            .options
827            .compression;
828        let compression = match compression {
829            StorageCompression::Default | StorageCompression::Snappy => Some(Compression::Snappy),
830            StorageCompression::None => None,
831        };
832        Parameters::default().with_compression(compression)
833    }
834
835    fn inner(&self) -> &RuntimeInner {
836        &self.0
837    }
838
839    /// Returns the number of workers in the runtime's [`Layout`].  In a
840    /// multihost runtime, this is the total number of workers across all hosts.
841    ///
842    /// If this thread is not in a [Runtime], returns 1.
843    pub fn num_workers() -> usize {
844        RUNTIME.with(|rt| {
845            rt.borrow()
846                .as_ref()
847                .map_or(1, |runtime| runtime.layout().n_workers())
848        })
849    }
850
851    /// Returns the [`Layout`] for this runtime.
852    pub fn layout(&self) -> &Layout {
853        &self.inner().layout
854    }
855
856    /// Returns reference to the data store shared by all workers within the
857    /// runtime.  In a multihost runtime, this data store is local to this
858    /// particular host.
859    ///
860    /// This low-level mechanism can be used by various services that
861    /// require common state shared across all workers on a host.
862    ///
863    /// The [`LocalStore`] type is an alias to [`TypedDashMap`], a
864    /// concurrent map type that can store key/value pairs of different
865    /// types.  See `typedmap` crate documentation for details.
866    pub fn local_store(&self) -> &LocalStore {
867        &self.inner().store
868    }
869
870    /// Returns the path to the storage directory for this runtime.
871    pub fn storage_path(&self) -> Option<&Path> {
872        self.inner()
873            .storage
874            .as_ref()
875            .map(|storage| storage.config.path())
876    }
877
878    /// A per-worker sequential counter.
879    ///
880    /// This method can be used to generate unique identifiers that will be the
881    /// same across all worker threads.  Repeated calls to this function
882    /// from the same worker generate numbers 0, 1, 2, ...
883    pub fn sequence_next(&self) -> usize {
884        self.inner().worker_sequence_numbers[Self::local_worker_offset()]
885            .fetch_add(1, Ordering::Relaxed)
886    }
887
888    /// `true` if the current worker thread has received a kill signal
889    /// and should exit asap.  Schedulers should use this method before
890    /// scheduling the next operator and after parking.
891    pub fn kill_in_progress() -> bool {
892        // Only a circuit with a `Runtime` can receive a kill signal, which is
893        // OK because a kill request can only be sent via a `RuntimeHandle`
894        // anyway.
895        RUNTIME.with(|runtime| {
896            runtime
897                .borrow()
898                .as_ref()
899                .map(|runtime| runtime.inner().kill_signal.load(Ordering::SeqCst))
900                .unwrap_or(false)
901        })
902    }
903
904    pub fn worker_panic_info(
905        &self,
906        worker: usize,
907        thread_type: ThreadType,
908    ) -> Option<WorkerPanicInfo> {
909        if let Ok(guard) = self.inner().panic_info[worker][thread_type].read() {
910            guard.clone()
911        } else {
912            warn!("poisoned panic_lock lock for {thread_type} worker {worker}");
913            None
914        }
915    }
916
917    // Record information about a worker thread panic in `panic_info`
918    fn panic(&self, panic_info: &PanicHookInfo) {
919        let local_worker_offset = Self::local_worker_offset();
920        let Some(thread_type) = ThreadType::current() else {
921            // We only install panic hooks on foreground and background threads,
922            // so this shouldn't happen, but we cannot panic here.
923            error!("panic hook called outside of a runtime or on an aux thread");
924            return;
925        };
926        let panic_info = WorkerPanicInfo::new(panic_info);
927        let _ = self.inner().panic_info[local_worker_offset][thread_type]
928            .write()
929            .map(|mut guard| *guard = Some(panic_info));
930        self.inner().panicked.store(true, Ordering::Release);
931    }
932
933    /// Spawn a new thread using `builder` and `f`. If the current thread is
934    /// associated with a runtime, then the new thread will also be associated
935    /// with the same runtime and worker index.
936    pub(crate) fn spawn_background_thread<F>(builder: Builder, f: F) -> (Thread, Unparker)
937    where
938        F: FnOnce(Parker) + Send + 'static,
939    {
940        let runtime = Self::runtime();
941        let worker_index = Self::worker_index();
942        let parker = Parker::new();
943        let unparker = parker.unparker().clone();
944        let join_handle = builder
945            .spawn(move || {
946                WORKER_INDEX.set(worker_index);
947                ThreadType::set_current(ThreadType::Background);
948                if let Some(runtime) = runtime {
949                    runtime.inner().pin_cpu();
950                    RUNTIME.with(|rt| *rt.borrow_mut() = Some(runtime));
951                }
952                f(parker)
953            })
954            .unwrap_or_else(|error| {
955                panic!("failed to spawn background worker thread {worker_index}: {error}");
956            });
957        let thread = join_handle.thread().clone();
958        if let Some(runtime) = Self::runtime() {
959            runtime
960                .inner()
961                .background_threads
962                .lock()
963                .unwrap()
964                .push(join_handle);
965        }
966        (thread, unparker)
967    }
968}
969
970/// A synchronization primitive that allows multiple threads within a runtime to agree
971/// when a condition is satisfied.
972pub(crate) enum Consensus {
973    SingleThreaded,
974    MultiThreaded {
975        notify_sender: Arc<Notify>,
976        notify_receiver: Arc<Notify>,
977        exchange: Arc<Exchange<bool>>,
978    },
979}
980
981impl Consensus {
982    pub fn new() -> Self {
983        match Runtime::runtime() {
984            Some(runtime) if Runtime::num_workers() > 1 => {
985                let worker_index = Runtime::worker_index();
986                let exchange_id = runtime.sequence_next();
987                let exchange = Exchange::with_runtime(
988                    &runtime,
989                    exchange_id,
990                    Box::new(|vote| to_bytes(&vote).unwrap().into_vec()),
991                    Box::new(|data| unaligned_deserialize(&data[..])),
992                );
993
994                let notify_sender = Arc::new(Notify::new());
995                let notify_sender_clone = notify_sender.clone();
996                let notify_receiver = Arc::new(Notify::new());
997                let notify_receiver_clone = notify_receiver.clone();
998
999                exchange.register_sender_callback(worker_index, move || {
1000                    notify_sender_clone.notify_one()
1001                });
1002
1003                exchange.register_receiver_callback(worker_index, move || {
1004                    notify_receiver_clone.notify_one()
1005                });
1006
1007                Self::MultiThreaded {
1008                    notify_sender,
1009                    notify_receiver,
1010                    exchange,
1011                }
1012            }
1013            _ => Self::SingleThreaded,
1014        }
1015    }
1016
1017    /// Returns `true` if all workers vote `true`.
1018    ///
1019    /// # Arguments
1020    ///
1021    /// * `local` - Local vote by the current worker.
1022    pub async fn check(&self, local: bool) -> Result<bool, SchedulerError> {
1023        match self {
1024            Self::SingleThreaded => Ok(local),
1025            Self::MultiThreaded {
1026                notify_sender,
1027                notify_receiver,
1028                exchange,
1029            } => {
1030                while !exchange.try_send_all(Runtime::worker_index(), &mut repeat(local)) {
1031                    if Runtime::kill_in_progress() {
1032                        return Err(SchedulerError::Killed);
1033                    }
1034                    notify_sender.notified().await;
1035                }
1036                // Receive the status of each peer, compute global result
1037                // as a logical and of all peer statuses.
1038                let mut global = true;
1039                while !exchange.try_receive_all(Runtime::worker_index(), |status| global &= status)
1040                {
1041                    if Runtime::kill_in_progress() {
1042                        return Err(SchedulerError::Killed);
1043                    }
1044                    // Sleep if other threads are still working.
1045                    notify_receiver.notified().await;
1046                }
1047                Ok(global)
1048            }
1049        }
1050    }
1051}
1052
1053/// Handle returned by `Runtime::run`.
1054#[derive(Debug)]
1055pub struct RuntimeHandle {
1056    runtime: Runtime,
1057    workers: Vec<(JoinHandle<()>, Unparker)>,
1058}
1059
1060impl RuntimeHandle {
1061    fn new(runtime: Runtime, workers: Vec<(JoinHandle<()>, Unparker)>) -> Self {
1062        Self { runtime, workers }
1063    }
1064
1065    /// Unpark worker thread.
1066    ///
1067    /// Workers release the CPU by parking when they have no work to do.
1068    /// This method unparks a thread after sending a command to it or
1069    /// when killing a circuit.
1070    pub(super) fn unpark_worker(&self, worker: usize) {
1071        self.workers[worker].1.unpark();
1072    }
1073
1074    /// Returns reference to the runtime.
1075    pub fn runtime(&self) -> &Runtime {
1076        &self.runtime
1077    }
1078
1079    /// Terminate the runtime and all worker threads without waiting for any
1080    /// in-progress computation to complete.
1081    ///
1082    /// Signals all workers to exit.  Any operators already running are
1083    /// evaluated to completion, after which the worker thread terminates
1084    /// even if the circuit has not been fully evaluated for the current
1085    /// clock cycle.
1086    pub fn kill(self) -> ThreadResult<()> {
1087        self.kill_async();
1088        self.join()
1089    }
1090
1091    // Signals all worker threads to exit, and returns immediately without
1092    // waiting for them to exit.
1093    pub fn kill_async(&self) {
1094        self.runtime
1095            .inner()
1096            .kill_signal
1097            .store(true, Ordering::SeqCst);
1098        for (_worker, unparker) in self.workers.iter() {
1099            unparker.unpark();
1100        }
1101    }
1102
1103    /// Wait for all workers in the runtime to terminate.
1104    ///
1105    /// The calling thread blocks until all worker threads have terminated.
1106    pub fn join(self) -> ThreadResult<()> {
1107        // Insist on joining all threads even if some of them fail.
1108        #[allow(clippy::needless_collect)]
1109        let results: Vec<ThreadResult<()>> = self
1110            .workers
1111            .into_iter()
1112            .map(|(h, _unparker)| h.join())
1113            .collect();
1114
1115        // Wait for the background threads. They will exit automatically without
1116        // explicit signaling from us because the worker threads removed all of
1117        // their background work.
1118        self.runtime
1119            .inner()
1120            .background_threads
1121            .lock()
1122            .unwrap()
1123            .drain(..)
1124            .for_each(|h| {
1125                let _ = h.join();
1126            });
1127
1128        // Wait for aux threads.
1129        self.runtime
1130            .inner()
1131            .aux_threads
1132            .lock()
1133            .unwrap()
1134            .drain(..)
1135            .for_each(|h| {
1136                let _ = h.join();
1137            });
1138
1139        // This must happen after we wait for the background threads, because
1140        // they might try to initiate another merge before they exit, which
1141        // would require them to have access to storage, which is kept in the
1142        // local store.
1143        self.runtime.local_store().clear();
1144
1145        results.into_iter().collect()
1146    }
1147
1148    /// Retrieve panic info for a specific worker.
1149    pub fn worker_panic_info(
1150        &self,
1151        worker: usize,
1152        thread_type: ThreadType,
1153    ) -> Option<WorkerPanicInfo> {
1154        self.runtime.worker_panic_info(worker, thread_type)
1155    }
1156
1157    /// Retrieve panic info for all workers.
1158    pub fn collect_panic_info(&self) -> Vec<(usize, ThreadType, WorkerPanicInfo)> {
1159        let mut result = Vec::new();
1160
1161        for worker in 0..self.workers.len() {
1162            for thread_type in [ThreadType::Foreground, ThreadType::Background] {
1163                if let Some(panic_info) = self.worker_panic_info(worker, thread_type) {
1164                    result.push((worker, thread_type, panic_info))
1165                }
1166            }
1167        }
1168        result
1169    }
1170
1171    /// Returns true if any worker has panicked.
1172    pub fn panicked(&self) -> bool {
1173        self.runtime.inner().panicked.load(Ordering::Acquire)
1174    }
1175}
1176
1177#[cfg(test)]
1178mod tests {
1179    use super::Runtime;
1180    use crate::{
1181        circuit::{
1182            dbsp_handle::{CircuitStorageConfig, DevTweaks, Mode},
1183            schedule::{DynamicScheduler, Scheduler},
1184            CircuitConfig, Layout,
1185        },
1186        operator::Generator,
1187        Circuit, RootCircuit,
1188    };
1189    use feldera_types::config::{StorageCacheConfig, StorageConfig, StorageOptions};
1190    use std::{cell::RefCell, rc::Rc, thread::sleep, time::Duration};
1191
1192    #[test]
1193    #[cfg_attr(miri, ignore)]
1194    fn test_runtime_dynamic() {
1195        test_runtime::<DynamicScheduler>();
1196    }
1197
1198    #[test]
1199    #[cfg_attr(miri, ignore)]
1200    fn storage_no_cleanup() {
1201        // Case 1: storage specified, runtime should not clean up storage when exiting
1202        let path = tempfile::tempdir().unwrap().keep();
1203        let path_clone = path.clone();
1204        let cconf = CircuitConfig {
1205            layout: Layout::new_solo(4),
1206            mode: Mode::Ephemeral,
1207            pin_cpus: Vec::new(),
1208            storage: Some(
1209                CircuitStorageConfig::for_config(
1210                    StorageConfig {
1211                        path: path.to_string_lossy().into_owned(),
1212                        cache: StorageCacheConfig::default(),
1213                    },
1214                    StorageOptions::default(),
1215                )
1216                .unwrap(),
1217            ),
1218            dev_tweaks: DevTweaks::default(),
1219        };
1220
1221        let hruntime = Runtime::run(cconf, move |_parker| {
1222            let runtime = Runtime::runtime().unwrap();
1223            assert_eq!(runtime.storage_path(), Some(path_clone.as_ref()));
1224        })
1225        .expect("failed to start runtime");
1226        hruntime.join().unwrap();
1227        assert!(path.exists(), "persistent storage is not cleaned up");
1228    }
1229
1230    fn test_runtime<S>()
1231    where
1232        S: Scheduler + 'static,
1233    {
1234        let hruntime = Runtime::run(4, |_parker| {
1235            let data = Rc::new(RefCell::new(vec![]));
1236            let data_clone = data.clone();
1237            let root = RootCircuit::build_with_scheduler::<_, _, S>(move |circuit| {
1238                let runtime = Runtime::runtime().unwrap();
1239                // Generator that produces values using `sequence_next`.
1240                circuit
1241                    .add_source(Generator::new(move || runtime.sequence_next()))
1242                    .inspect(move |n: &usize| data_clone.borrow_mut().push(*n));
1243                Ok(())
1244            })
1245            .unwrap()
1246            .0;
1247
1248            for _ in 0..100 {
1249                root.transaction().unwrap();
1250            }
1251
1252            assert_eq!(&*data.borrow(), &(1..101).collect::<Vec<usize>>());
1253        })
1254        .expect("failed to start runtime");
1255
1256        hruntime.join().unwrap();
1257    }
1258
1259    #[test]
1260    #[cfg_attr(miri, ignore)]
1261    fn test_kill_dynamic() {
1262        test_kill::<DynamicScheduler>();
1263    }
1264
1265    // Test `RuntimeHandle::kill`.
1266    fn test_kill<S>()
1267    where
1268        S: Scheduler + 'static,
1269    {
1270        let hruntime = Runtime::run(16, |_parker| {
1271            // Create a nested circuit that iterates forever.
1272            let root = RootCircuit::build_with_scheduler::<_, _, S>(move |circuit| {
1273                circuit
1274                    .iterate_with_scheduler::<_, _, _, S>(|child| {
1275                        let mut n: usize = 0;
1276                        child
1277                            .add_source(Generator::new(move || {
1278                                n += 1;
1279                                n
1280                            }))
1281                            .inspect(|_: &usize| {});
1282                        Ok((async || Ok(false), ()))
1283                    })
1284                    .unwrap();
1285                Ok(())
1286            })
1287            .unwrap()
1288            .0;
1289
1290            loop {
1291                if root.transaction().is_err() {
1292                    return;
1293                }
1294            }
1295        })
1296        .expect("failed to start runtime");
1297
1298        sleep(Duration::from_millis(100));
1299        hruntime.kill().unwrap();
1300    }
1301}