1use crate::circuit::checkpointer::Checkpointer;
5use crate::circuit::DevTweaks;
6use crate::error::Error as DbspError;
7use crate::operator::communication::Exchange;
8use crate::storage::backend::StorageBackend;
9use crate::storage::file::format::Compression;
10use crate::storage::file::to_bytes;
11use crate::storage::file::writer::Parameters;
12use crate::trace::unaligned_deserialize;
13use crate::SchedulerError;
14use crate::{
15 storage::{backend::StorageError, buffer_cache::BufferCache, dirlock::LockedDirectory},
16 DetailedError,
17};
18use core_affinity::{get_core_ids, CoreId};
19use crossbeam::sync::{Parker, Unparker};
20use enum_map::{enum_map, Enum, EnumMap};
21use feldera_types::config::{StorageCompression, StorageConfig, StorageOptions};
22use indexmap::IndexSet;
23use once_cell::sync::Lazy;
24use serde::Serialize;
25use std::iter::repeat;
26use std::path::Path;
27use std::sync::atomic::AtomicUsize;
28use std::sync::{LazyLock, Mutex};
29use std::thread::Thread;
30use std::time::Duration;
31use std::{
32 backtrace::Backtrace,
33 borrow::Cow,
34 cell::{Cell, RefCell},
35 error::Error as StdError,
36 fmt,
37 fmt::{Debug, Display, Error as FmtError, Formatter},
38 panic::{self, Location, PanicHookInfo},
39 sync::{
40 atomic::{AtomicBool, Ordering},
41 Arc, RwLock, Weak,
42 },
43 thread::{Builder, JoinHandle, Result as ThreadResult},
44};
45use tokio::sync::Notify;
46use tracing::{debug, error, info, warn};
47use typedmap::TypedDashMap;
48
49use super::dbsp_handle::{Layout, Mode};
50use super::CircuitConfig;
51
52pub const DEFAULT_REPLAY_STEP_SIZE: usize = 10000;
54
55#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
56pub enum Error {
57 WorkerPanic {
59 panic_info: Vec<(usize, ThreadType, WorkerPanicInfo)>,
62 },
63 IncompatibleStorage,
65 Terminated,
66}
67
68impl DetailedError for Error {
69 fn error_code(&self) -> Cow<'static, str> {
70 match self {
71 Self::WorkerPanic { .. } => Cow::from("WorkerPanic"),
72 Self::Terminated => Cow::from("Terminated"),
73 Self::IncompatibleStorage => Cow::from("IncompatibleStorage"),
74 }
75 }
76}
77
78impl Display for Error {
79 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
80 match self {
81 Self::WorkerPanic { panic_info } => {
82 writeln!(f, "One or more worker threads terminated unexpectedly")?;
83
84 for (worker, thread_type, worker_panic_info) in panic_info.iter() {
85 writeln!(f, "{thread_type} worker thread {worker} panicked")?;
86 writeln!(f, "{worker_panic_info}")?;
87 }
88 Ok(())
89 }
90 Self::Terminated => f.write_str("circuit has been terminated"),
91 Self::IncompatibleStorage => {
92 f.write_str("Supplied storage directory does not fit the runtime circuit")
93 }
94 }
95 }
96}
97
98impl StdError for Error {}
99
100thread_local! {
102 static RUNTIME: RefCell<Option<Runtime>> = const { RefCell::new(None) };
105
106 static WORKER_INDEX: Cell<usize> = const { Cell::new(0) };
110}
111
112mod thread_type {
113 use std::{cell::Cell, fmt::Display};
114
115 #[cfg(doc)]
116 use super::Runtime;
117 use enum_map::Enum;
118 use serde::Serialize;
119
120 thread_local! {
121 static CURRENT: Cell<Option<ThreadType>> = const { Cell::new(None) };
124 }
125
126 #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Enum, Serialize)]
128 #[serde(rename_all = "snake_case")]
129 pub enum ThreadType {
130 Foreground,
132
133 Background,
135 }
136
137 impl ThreadType {
138 pub fn current() -> Option<Self> {
142 CURRENT.get()
143 }
144
145 pub(super) fn set_current(thread_type: Self) {
146 CURRENT.set(Some(thread_type));
147 }
148 }
149
150 impl Display for ThreadType {
151 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152 match self {
153 ThreadType::Foreground => write!(f, "foreground"),
154 ThreadType::Background => write!(f, "background"),
155 }
156 }
157 }
158}
159pub use thread_type::ThreadType;
160
161pub struct LocalStoreMarker;
162
163pub type LocalStore = TypedDashMap<LocalStoreMarker>;
165
166#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
168pub struct PanicLocation {
169 file: String,
170 line: u32,
171 col: u32,
172}
173
174impl Display for PanicLocation {
175 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
176 write!(f, "{}:{}:{}", self.file, self.line, self.col)
177 }
178}
179
180impl PanicLocation {
181 fn new(loc: &Location) -> Self {
182 Self {
183 file: loc.file().to_string(),
184 line: loc.line(),
185 col: loc.column(),
186 }
187 }
188}
189
190#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
192pub struct WorkerPanicInfo {
193 message: Option<String>,
195 location: Option<PanicLocation>,
197 backtrace: String,
199}
200
201impl Display for WorkerPanicInfo {
202 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
203 if let Some(message) = &self.message {
204 writeln!(f, "panic message: {message}")?;
205 } else {
206 writeln!(f, "panic message (none)")?;
207 }
208
209 if let Some(location) = &self.location {
210 writeln!(f, "panic location: {location}")?;
211 } else {
212 writeln!(f, "panic location: unknown")?;
213 }
214 writeln!(f, "stack trace:\n{}", self.backtrace)
215 }
216}
217
218impl WorkerPanicInfo {
219 fn new(panic_info: &PanicHookInfo) -> Self {
220 #[allow(clippy::manual_map)]
221 let message = if let Some(v) = panic_info.payload().downcast_ref::<String>() {
222 Some(v.clone())
223 } else if let Some(v) = panic_info.payload().downcast_ref::<&str>() {
224 Some(v.to_string())
225 } else {
226 None
227 };
228 let backtrace = Backtrace::force_capture().to_string();
229 let location = panic_info
230 .location()
231 .map(|location| PanicLocation::new(location));
232
233 Self {
234 message,
235 location,
236 backtrace,
237 }
238 }
239}
240
241#[derive(derive_more::Debug)]
242struct RuntimeStorage {
243 pub config: StorageConfig,
245
246 pub options: StorageOptions,
248
249 #[debug(skip)]
251 pub backend: Arc<dyn StorageBackend>,
252
253 #[allow(dead_code)]
255 locked_directory: LockedDirectory,
256}
257
258struct RuntimeInner {
259 layout: Layout,
260 mode: Mode,
261 dev_tweaks: DevTweaks,
262
263 storage: Option<RuntimeStorage>,
264 store: LocalStore,
265 kill_signal: AtomicBool,
266 background_threads: Mutex<Vec<JoinHandle<()>>>,
268 aux_threads: Mutex<Vec<JoinHandle<()>>>,
269 buffer_caches: Vec<EnumMap<ThreadType, Arc<BufferCache>>>,
270 pin_cpus: Vec<EnumMap<ThreadType, CoreId>>,
271 worker_sequence_numbers: Vec<AtomicUsize>,
272 panic_info: Vec<EnumMap<ThreadType, RwLock<Option<WorkerPanicInfo>>>>,
274 panicked: AtomicBool,
275 replay_step_size: AtomicUsize,
276}
277
278impl Drop for RuntimeInner {
279 fn drop(&mut self) {
280 debug!("dropping RuntimeInner");
281 }
282}
283
284impl Debug for RuntimeInner {
285 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
286 f.debug_struct("RuntimeInner")
287 .field("layout", &self.layout)
288 .field("storage", &self.storage)
289 .finish()
290 }
291}
292
293fn display_core_ids<'a>(iter: impl Iterator<Item = &'a CoreId>) -> String {
294 format!(
295 "{:?}",
296 iter.map(|core| core.id).collect::<Vec<_>>().as_slice()
297 )
298}
299
300fn map_pin_cpus(layout: &Layout, pin_cpus: &[usize]) -> Vec<EnumMap<ThreadType, CoreId>> {
301 if layout.is_multihost() {
302 if !pin_cpus.is_empty() {
303 warn!("CPU pinning not yet supported with multihost DBSP");
304 }
305 return Vec::new();
306 }
307
308 let nworkers = layout.n_workers();
309 let pin_cpus = pin_cpus
310 .iter()
311 .copied()
312 .map(|id| CoreId { id })
313 .collect::<IndexSet<_>>();
314 if pin_cpus.len() < 2 * nworkers {
315 if !pin_cpus.is_empty() {
316 warn!("ignoring CPU pinning request because {nworkers} workers require {} pinned CPUs but only {} were specified",
317 2 * nworkers, pin_cpus.len())
318 }
319 return Vec::new();
320 }
321
322 let Some(core_ids) = get_core_ids() else {
323 warn!("ignoring CPU pinning request because this system's core ids list could not be obtained");
324 return Vec::new();
325 };
326 let core_ids = core_ids.iter().copied().collect::<IndexSet<_>>();
327
328 let missing_cpus = pin_cpus.difference(&core_ids).copied().collect::<Vec<_>>();
329 if !missing_cpus.is_empty() {
330 warn!("ignoring CPU pinning request because requested CPUs {missing_cpus:?} are not available (available CPUs are: {})",
331 display_core_ids(core_ids.iter()));
332 return Vec::new();
333 }
334
335 let fg_cpus = &pin_cpus[0..nworkers];
336 let bg_cpus = &pin_cpus[nworkers..nworkers * 2];
337 info!(
338 "pinning foreground workers to CPUs {} and background workers to CPUs {}",
339 display_core_ids(fg_cpus.iter()),
340 display_core_ids(bg_cpus.iter())
341 );
342 (0..nworkers)
343 .map(|i| {
344 enum_map! {
345 ThreadType::Foreground => fg_cpus[i],
346 ThreadType::Background => bg_cpus[i],
347 }
348 })
349 .collect()
350}
351
352impl RuntimeInner {
353 fn new(config: CircuitConfig) -> Result<Self, DbspError> {
354 let nworkers = config.layout.local_workers().len();
355
356 let storage = if let Some(storage) = config.storage {
357 let locked_directory =
358 LockedDirectory::new_blocking(storage.config.path(), Duration::from_secs(60))?;
359 let backend = storage.backend;
360
361 if let Some(init_checkpoint) = storage.init_checkpoint {
362 if !backend
363 .exists(&Checkpointer::checkpoint_dir(init_checkpoint).child("CHECKPOINT"))?
364 {
365 return Err(DbspError::Storage(StorageError::CheckpointNotFound(
366 init_checkpoint,
367 )));
368 }
369 }
370
371 Some(RuntimeStorage {
372 config: storage.config,
373 options: storage.options,
374 backend,
375 locked_directory,
376 })
377 } else {
378 None
379 };
380
381 let cache_size_bytes = if let Some(storage) = &storage {
382 storage
383 .options
384 .cache_mib
385 .map_or(256 * 1024 * 1024, |cache_mib| {
386 cache_mib.saturating_mul(1024 * 1024) / nworkers / ThreadType::LENGTH
387 })
388 } else {
389 1
391 };
392
393 Ok(Self {
394 pin_cpus: map_pin_cpus(&config.layout, &config.pin_cpus),
395 layout: config.layout,
396 mode: config.mode,
397 dev_tweaks: config.dev_tweaks,
398 storage,
399 store: TypedDashMap::new(),
400 kill_signal: AtomicBool::new(false),
401 background_threads: Mutex::new(Vec::new()),
402 aux_threads: Mutex::new(Vec::new()),
403 buffer_caches: (0..nworkers)
404 .map(|_| EnumMap::from_fn(|_| Arc::new(BufferCache::new(cache_size_bytes))))
405 .collect(),
406 worker_sequence_numbers: (0..nworkers).map(|_| AtomicUsize::new(0)).collect(),
407 panic_info: (0..nworkers)
408 .map(|_| EnumMap::from_fn(|_| RwLock::new(None)))
409 .collect(),
410 panicked: AtomicBool::new(false),
411 replay_step_size: AtomicUsize::new(DEFAULT_REPLAY_STEP_SIZE),
412 })
413 }
414
415 fn pin_cpu(&self) {
416 if !self.pin_cpus.is_empty() {
417 let local_worker_offset = Runtime::local_worker_offset();
418 let Some(thread_type) = ThreadType::current() else {
419 panic!("pin_cpu() called outside of a runtime or on an aux thread");
420 };
421 let core = self.pin_cpus[local_worker_offset][thread_type];
422 if !core_affinity::set_for_current(core) {
423 warn!(
424 "failed to pin worker {local_worker_offset} {thread_type} thread to core {}",
425 core.id
426 );
427 }
428 }
429 }
430}
431
432fn panic_hook(panic_info: &PanicHookInfo<'_>, default_panic_hook: &dyn Fn(&PanicHookInfo<'_>)) {
442 default_panic_hook(panic_info);
444
445 RUNTIME.with(|runtime| {
446 if let Ok(runtime) = runtime.try_borrow() {
447 if let Some(runtime) = runtime.as_ref() {
448 runtime.panic(panic_info);
449 }
450 }
451 })
452}
453
454#[repr(transparent)]
458#[derive(Clone, Debug)]
459pub struct Runtime(Arc<RuntimeInner>);
460
461#[repr(transparent)]
463#[derive(Clone, Debug)]
464pub struct WeakRuntime(Weak<RuntimeInner>);
465
466impl WeakRuntime {
467 pub fn upgrade(&self) -> Option<Runtime> {
468 self.0.upgrade().map(Runtime)
469 }
470}
471
472#[allow(clippy::type_complexity)]
475static DEFAULT_PANIC_HOOK: Lazy<Box<dyn Fn(&PanicHookInfo<'_>) + 'static + Sync + Send>> =
476 Lazy::new(|| {
477 let _ = panic::take_hook();
479 panic::take_hook()
480 });
481
482fn default_panic_hook() -> &'static (dyn Fn(&PanicHookInfo<'_>) + 'static + Sync + Send) {
484 &*DEFAULT_PANIC_HOOK
485}
486
487impl Runtime {
488 pub fn run<F>(config: impl Into<CircuitConfig>, circuit: F) -> Result<RuntimeHandle, DbspError>
527 where
528 F: FnOnce(Parker) + Clone + Send + 'static,
529 {
530 let config: CircuitConfig = config.into();
531
532 let workers = config.layout.local_workers();
533
534 let runtime = Self(Arc::new(RuntimeInner::new(config)?));
535
536 let default_hook = default_panic_hook();
538 panic::set_hook(Box::new(move |panic_info| {
539 panic_hook(panic_info, default_hook)
540 }));
541
542 let workers = workers
543 .map(|worker_index| {
544 let runtime = runtime.clone();
545 let build_circuit = circuit.clone();
546 let parker = Parker::new();
547 let unparker = parker.unparker().clone();
548 let handle = Builder::new()
549 .name(format!("dbsp-worker-{worker_index}"))
550 .spawn(move || {
551 WORKER_INDEX.set(worker_index);
553 ThreadType::set_current(ThreadType::Foreground);
554 runtime.inner().pin_cpu();
555 RUNTIME.with(|rt| *rt.borrow_mut() = Some(runtime));
556
557 build_circuit(parker);
559 })
560 .unwrap_or_else(|error| {
561 panic!("failed to spawn worker thread {worker_index}: {error}");
562 });
563 (handle, unparker)
564 })
565 .collect::<Vec<_>>();
566
567 Ok(RuntimeHandle::new(runtime, workers))
568 }
569
570 pub fn downgrade(&self) -> WeakRuntime {
571 WeakRuntime(Arc::downgrade(&self.0))
572 }
573
574 #[allow(clippy::self_named_constructors)]
584 pub fn runtime() -> Option<Runtime> {
585 RUNTIME.with(|rt| rt.borrow().clone())
586 }
587
588 pub fn storage_backend() -> Result<Arc<dyn StorageBackend>, StorageError> {
594 Runtime::runtime()
595 .unwrap()
596 .inner()
597 .storage
598 .as_ref()
599 .map_or(Err(StorageError::StorageDisabled), |storage| {
600 Ok(storage.backend.clone())
601 })
602 }
603
604 pub fn buffer_cache() -> Arc<BufferCache> {
615 thread_local! {
617 static BUFFER_CACHE: RefCell<Option<Arc<BufferCache>>> = const { RefCell::new(None) };
618 }
619 static NO_RUNTIME_CACHE: LazyLock<Arc<BufferCache>> =
627 LazyLock::new(|| Arc::new(BufferCache::new(1024 * 1024 * 256)));
628
629 if let Some(buffer_cache) = BUFFER_CACHE.with(|bc| bc.borrow().clone()) {
630 return buffer_cache;
631 }
632
633 let buffer_cache = if let Some(rt) = Runtime::runtime() {
635 if let Some(thread_type) = ThreadType::current() {
636 rt.get_buffer_cache(Runtime::local_worker_offset(), thread_type)
637 } else {
638 NO_RUNTIME_CACHE.clone()
640 }
641 } else {
642 NO_RUNTIME_CACHE.clone()
643 };
644 BUFFER_CACHE.set(Some(buffer_cache.clone()));
645 buffer_cache
646 }
647
648 pub fn spawn_aux_thread<F>(&self, thread_name: &str, f: F)
654 where
655 F: FnOnce() + Send + 'static,
656 {
657 let runtime = self.clone();
658 let handle = Builder::new()
659 .name(thread_name.to_string())
660 .spawn(|| {
661 RUNTIME.with(|rt| *rt.borrow_mut() = Some(runtime));
662 f()
663 })
664 .expect("failed to spawn auxiliary thread");
665
666 self.inner().aux_threads.lock().unwrap().push(handle)
667 }
668
669 pub fn get_buffer_cache(
674 &self,
675 local_worker_offset: usize,
676 thread_type: ThreadType,
677 ) -> Arc<BufferCache> {
678 self.0.buffer_caches[local_worker_offset][thread_type].clone()
679 }
680
681 pub fn cache_occupancy(&self) -> (usize, usize) {
684 if self.0.storage.is_some() {
685 self.0
686 .buffer_caches
687 .iter()
688 .flat_map(|map| map.values())
689 .map(|cache| cache.occupancy())
690 .fold((0, 0), |(a_cur, a_max), (b_cur, b_max)| {
691 (a_cur + b_cur, a_max + b_max)
692 })
693 } else {
694 (0, 0)
695 }
696 }
697
698 pub fn worker_index() -> usize {
702 WORKER_INDEX.get()
703 }
704
705 pub fn local_worker_offset() -> usize {
707 let local_workers_start = RUNTIME
709 .with(|rt| Some(rt.borrow().as_ref()?.layout().local_workers().start))
710 .unwrap_or_default();
711 Self::worker_index() - local_workers_start
712 }
713
714 pub fn mode() -> Mode {
715 RUNTIME
716 .with(|rt| Some(rt.borrow().as_ref()?.get_mode()))
717 .unwrap_or_default()
718 }
719
720 pub fn with_dev_tweaks<F, T>(f: F) -> T
721 where
722 F: Fn(&DevTweaks) -> T,
723 {
724 static DEFAULT: Lazy<DevTweaks> = Lazy::new(DevTweaks::default);
725 RUNTIME
726 .with(|rt| Some(f(&rt.borrow().as_ref()?.inner().dev_tweaks)))
727 .unwrap_or_else(|| f(&DEFAULT))
728 }
729
730 pub fn get_mode(&self) -> Mode {
731 self.inner().mode.clone()
732 }
733
734 pub fn set_replay_step_size(&self, step_size: usize) {
738 self.inner()
739 .replay_step_size
740 .store(step_size, Ordering::Release);
741 }
742
743 pub fn replay_step_size() -> usize {
747 RUNTIME
748 .with(|rt| Some(rt.borrow().as_ref()?.get_replay_step_size()))
749 .unwrap_or(DEFAULT_REPLAY_STEP_SIZE)
750 }
751
752 pub fn get_replay_step_size(&self) -> usize {
754 self.inner().replay_step_size.load(Ordering::Acquire)
755 }
756
757 pub fn worker_index_str() -> &'static str {
761 static WORKER_INDEX_STRS: Lazy<[&'static str; 256]> = Lazy::new(|| {
762 let mut data: [&'static str; 256] = [""; 256];
763 for (i, item) in data.iter_mut().enumerate() {
764 *item = Box::leak(i.to_string().into_boxed_str());
765 }
766 data
767 });
768
769 WORKER_INDEX_STRS
770 .get(WORKER_INDEX.get())
771 .copied()
772 .unwrap_or_else(|| {
773 panic!("Limit workers to less than 256 or increase the limit in the code.")
774 })
775 }
776
777 pub fn min_index_storage_bytes() -> Option<usize> {
782 RUNTIME.with(|rt| {
783 Some(
784 rt.borrow()
785 .as_ref()?
786 .inner()
787 .storage
788 .as_ref()?
789 .options
790 .min_storage_bytes
791 .unwrap_or({
792 10 * 1024 * 1024
795 }),
796 )
797 })
798 }
799
800 pub fn min_step_storage_bytes() -> Option<usize> {
805 RUNTIME.with(|rt| {
806 Some(
807 rt.borrow()
808 .as_ref()?
809 .inner()
810 .storage
811 .as_ref()?
812 .options
813 .min_step_storage_bytes
814 .unwrap_or(usize::MAX),
815 )
816 })
817 }
818
819 pub fn file_writer_parameters() -> Parameters {
820 let compression = Runtime::runtime()
821 .unwrap()
822 .inner()
823 .storage
824 .as_ref()
825 .unwrap()
826 .options
827 .compression;
828 let compression = match compression {
829 StorageCompression::Default | StorageCompression::Snappy => Some(Compression::Snappy),
830 StorageCompression::None => None,
831 };
832 Parameters::default().with_compression(compression)
833 }
834
835 fn inner(&self) -> &RuntimeInner {
836 &self.0
837 }
838
839 pub fn num_workers() -> usize {
844 RUNTIME.with(|rt| {
845 rt.borrow()
846 .as_ref()
847 .map_or(1, |runtime| runtime.layout().n_workers())
848 })
849 }
850
851 pub fn layout(&self) -> &Layout {
853 &self.inner().layout
854 }
855
856 pub fn local_store(&self) -> &LocalStore {
867 &self.inner().store
868 }
869
870 pub fn storage_path(&self) -> Option<&Path> {
872 self.inner()
873 .storage
874 .as_ref()
875 .map(|storage| storage.config.path())
876 }
877
878 pub fn sequence_next(&self) -> usize {
884 self.inner().worker_sequence_numbers[Self::local_worker_offset()]
885 .fetch_add(1, Ordering::Relaxed)
886 }
887
888 pub fn kill_in_progress() -> bool {
892 RUNTIME.with(|runtime| {
896 runtime
897 .borrow()
898 .as_ref()
899 .map(|runtime| runtime.inner().kill_signal.load(Ordering::SeqCst))
900 .unwrap_or(false)
901 })
902 }
903
904 pub fn worker_panic_info(
905 &self,
906 worker: usize,
907 thread_type: ThreadType,
908 ) -> Option<WorkerPanicInfo> {
909 if let Ok(guard) = self.inner().panic_info[worker][thread_type].read() {
910 guard.clone()
911 } else {
912 warn!("poisoned panic_lock lock for {thread_type} worker {worker}");
913 None
914 }
915 }
916
917 fn panic(&self, panic_info: &PanicHookInfo) {
919 let local_worker_offset = Self::local_worker_offset();
920 let Some(thread_type) = ThreadType::current() else {
921 error!("panic hook called outside of a runtime or on an aux thread");
924 return;
925 };
926 let panic_info = WorkerPanicInfo::new(panic_info);
927 let _ = self.inner().panic_info[local_worker_offset][thread_type]
928 .write()
929 .map(|mut guard| *guard = Some(panic_info));
930 self.inner().panicked.store(true, Ordering::Release);
931 }
932
933 pub(crate) fn spawn_background_thread<F>(builder: Builder, f: F) -> (Thread, Unparker)
937 where
938 F: FnOnce(Parker) + Send + 'static,
939 {
940 let runtime = Self::runtime();
941 let worker_index = Self::worker_index();
942 let parker = Parker::new();
943 let unparker = parker.unparker().clone();
944 let join_handle = builder
945 .spawn(move || {
946 WORKER_INDEX.set(worker_index);
947 ThreadType::set_current(ThreadType::Background);
948 if let Some(runtime) = runtime {
949 runtime.inner().pin_cpu();
950 RUNTIME.with(|rt| *rt.borrow_mut() = Some(runtime));
951 }
952 f(parker)
953 })
954 .unwrap_or_else(|error| {
955 panic!("failed to spawn background worker thread {worker_index}: {error}");
956 });
957 let thread = join_handle.thread().clone();
958 if let Some(runtime) = Self::runtime() {
959 runtime
960 .inner()
961 .background_threads
962 .lock()
963 .unwrap()
964 .push(join_handle);
965 }
966 (thread, unparker)
967 }
968}
969
970pub(crate) enum Consensus {
973 SingleThreaded,
974 MultiThreaded {
975 notify_sender: Arc<Notify>,
976 notify_receiver: Arc<Notify>,
977 exchange: Arc<Exchange<bool>>,
978 },
979}
980
981impl Consensus {
982 pub fn new() -> Self {
983 match Runtime::runtime() {
984 Some(runtime) if Runtime::num_workers() > 1 => {
985 let worker_index = Runtime::worker_index();
986 let exchange_id = runtime.sequence_next();
987 let exchange = Exchange::with_runtime(
988 &runtime,
989 exchange_id,
990 Box::new(|vote| to_bytes(&vote).unwrap().into_vec()),
991 Box::new(|data| unaligned_deserialize(&data[..])),
992 );
993
994 let notify_sender = Arc::new(Notify::new());
995 let notify_sender_clone = notify_sender.clone();
996 let notify_receiver = Arc::new(Notify::new());
997 let notify_receiver_clone = notify_receiver.clone();
998
999 exchange.register_sender_callback(worker_index, move || {
1000 notify_sender_clone.notify_one()
1001 });
1002
1003 exchange.register_receiver_callback(worker_index, move || {
1004 notify_receiver_clone.notify_one()
1005 });
1006
1007 Self::MultiThreaded {
1008 notify_sender,
1009 notify_receiver,
1010 exchange,
1011 }
1012 }
1013 _ => Self::SingleThreaded,
1014 }
1015 }
1016
1017 pub async fn check(&self, local: bool) -> Result<bool, SchedulerError> {
1023 match self {
1024 Self::SingleThreaded => Ok(local),
1025 Self::MultiThreaded {
1026 notify_sender,
1027 notify_receiver,
1028 exchange,
1029 } => {
1030 while !exchange.try_send_all(Runtime::worker_index(), &mut repeat(local)) {
1031 if Runtime::kill_in_progress() {
1032 return Err(SchedulerError::Killed);
1033 }
1034 notify_sender.notified().await;
1035 }
1036 let mut global = true;
1039 while !exchange.try_receive_all(Runtime::worker_index(), |status| global &= status)
1040 {
1041 if Runtime::kill_in_progress() {
1042 return Err(SchedulerError::Killed);
1043 }
1044 notify_receiver.notified().await;
1046 }
1047 Ok(global)
1048 }
1049 }
1050 }
1051}
1052
1053#[derive(Debug)]
1055pub struct RuntimeHandle {
1056 runtime: Runtime,
1057 workers: Vec<(JoinHandle<()>, Unparker)>,
1058}
1059
1060impl RuntimeHandle {
1061 fn new(runtime: Runtime, workers: Vec<(JoinHandle<()>, Unparker)>) -> Self {
1062 Self { runtime, workers }
1063 }
1064
1065 pub(super) fn unpark_worker(&self, worker: usize) {
1071 self.workers[worker].1.unpark();
1072 }
1073
1074 pub fn runtime(&self) -> &Runtime {
1076 &self.runtime
1077 }
1078
1079 pub fn kill(self) -> ThreadResult<()> {
1087 self.kill_async();
1088 self.join()
1089 }
1090
1091 pub fn kill_async(&self) {
1094 self.runtime
1095 .inner()
1096 .kill_signal
1097 .store(true, Ordering::SeqCst);
1098 for (_worker, unparker) in self.workers.iter() {
1099 unparker.unpark();
1100 }
1101 }
1102
1103 pub fn join(self) -> ThreadResult<()> {
1107 #[allow(clippy::needless_collect)]
1109 let results: Vec<ThreadResult<()>> = self
1110 .workers
1111 .into_iter()
1112 .map(|(h, _unparker)| h.join())
1113 .collect();
1114
1115 self.runtime
1119 .inner()
1120 .background_threads
1121 .lock()
1122 .unwrap()
1123 .drain(..)
1124 .for_each(|h| {
1125 let _ = h.join();
1126 });
1127
1128 self.runtime
1130 .inner()
1131 .aux_threads
1132 .lock()
1133 .unwrap()
1134 .drain(..)
1135 .for_each(|h| {
1136 let _ = h.join();
1137 });
1138
1139 self.runtime.local_store().clear();
1144
1145 results.into_iter().collect()
1146 }
1147
1148 pub fn worker_panic_info(
1150 &self,
1151 worker: usize,
1152 thread_type: ThreadType,
1153 ) -> Option<WorkerPanicInfo> {
1154 self.runtime.worker_panic_info(worker, thread_type)
1155 }
1156
1157 pub fn collect_panic_info(&self) -> Vec<(usize, ThreadType, WorkerPanicInfo)> {
1159 let mut result = Vec::new();
1160
1161 for worker in 0..self.workers.len() {
1162 for thread_type in [ThreadType::Foreground, ThreadType::Background] {
1163 if let Some(panic_info) = self.worker_panic_info(worker, thread_type) {
1164 result.push((worker, thread_type, panic_info))
1165 }
1166 }
1167 }
1168 result
1169 }
1170
1171 pub fn panicked(&self) -> bool {
1173 self.runtime.inner().panicked.load(Ordering::Acquire)
1174 }
1175}
1176
1177#[cfg(test)]
1178mod tests {
1179 use super::Runtime;
1180 use crate::{
1181 circuit::{
1182 dbsp_handle::{CircuitStorageConfig, DevTweaks, Mode},
1183 schedule::{DynamicScheduler, Scheduler},
1184 CircuitConfig, Layout,
1185 },
1186 operator::Generator,
1187 Circuit, RootCircuit,
1188 };
1189 use feldera_types::config::{StorageCacheConfig, StorageConfig, StorageOptions};
1190 use std::{cell::RefCell, rc::Rc, thread::sleep, time::Duration};
1191
1192 #[test]
1193 #[cfg_attr(miri, ignore)]
1194 fn test_runtime_dynamic() {
1195 test_runtime::<DynamicScheduler>();
1196 }
1197
1198 #[test]
1199 #[cfg_attr(miri, ignore)]
1200 fn storage_no_cleanup() {
1201 let path = tempfile::tempdir().unwrap().keep();
1203 let path_clone = path.clone();
1204 let cconf = CircuitConfig {
1205 layout: Layout::new_solo(4),
1206 mode: Mode::Ephemeral,
1207 pin_cpus: Vec::new(),
1208 storage: Some(
1209 CircuitStorageConfig::for_config(
1210 StorageConfig {
1211 path: path.to_string_lossy().into_owned(),
1212 cache: StorageCacheConfig::default(),
1213 },
1214 StorageOptions::default(),
1215 )
1216 .unwrap(),
1217 ),
1218 dev_tweaks: DevTweaks::default(),
1219 };
1220
1221 let hruntime = Runtime::run(cconf, move |_parker| {
1222 let runtime = Runtime::runtime().unwrap();
1223 assert_eq!(runtime.storage_path(), Some(path_clone.as_ref()));
1224 })
1225 .expect("failed to start runtime");
1226 hruntime.join().unwrap();
1227 assert!(path.exists(), "persistent storage is not cleaned up");
1228 }
1229
1230 fn test_runtime<S>()
1231 where
1232 S: Scheduler + 'static,
1233 {
1234 let hruntime = Runtime::run(4, |_parker| {
1235 let data = Rc::new(RefCell::new(vec![]));
1236 let data_clone = data.clone();
1237 let root = RootCircuit::build_with_scheduler::<_, _, S>(move |circuit| {
1238 let runtime = Runtime::runtime().unwrap();
1239 circuit
1241 .add_source(Generator::new(move || runtime.sequence_next()))
1242 .inspect(move |n: &usize| data_clone.borrow_mut().push(*n));
1243 Ok(())
1244 })
1245 .unwrap()
1246 .0;
1247
1248 for _ in 0..100 {
1249 root.transaction().unwrap();
1250 }
1251
1252 assert_eq!(&*data.borrow(), &(1..101).collect::<Vec<usize>>());
1253 })
1254 .expect("failed to start runtime");
1255
1256 hruntime.join().unwrap();
1257 }
1258
1259 #[test]
1260 #[cfg_attr(miri, ignore)]
1261 fn test_kill_dynamic() {
1262 test_kill::<DynamicScheduler>();
1263 }
1264
1265 fn test_kill<S>()
1267 where
1268 S: Scheduler + 'static,
1269 {
1270 let hruntime = Runtime::run(16, |_parker| {
1271 let root = RootCircuit::build_with_scheduler::<_, _, S>(move |circuit| {
1273 circuit
1274 .iterate_with_scheduler::<_, _, _, S>(|child| {
1275 let mut n: usize = 0;
1276 child
1277 .add_source(Generator::new(move || {
1278 n += 1;
1279 n
1280 }))
1281 .inspect(|_: &usize| {});
1282 Ok((async || Ok(false), ()))
1283 })
1284 .unwrap();
1285 Ok(())
1286 })
1287 .unwrap()
1288 .0;
1289
1290 loop {
1291 if root.transaction().is_err() {
1292 return;
1293 }
1294 }
1295 })
1296 .expect("failed to start runtime");
1297
1298 sleep(Duration::from_millis(100));
1299 hruntime.kill().unwrap();
1300 }
1301}