1pub use crate::storage::faulty::Config as FaultConfig;
46use crate::{
47 network::{
48 audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
49 metered::Network as MeteredNetwork,
50 },
51 storage::{
52 audited::Storage as AuditedStorage, faulty::Storage as FaultyStorage,
53 memory::Storage as MemStorage, metered::Storage as MeteredStorage,
54 },
55 telemetry::metrics::task::Label,
56 utils::{
57 add_attribute,
58 signal::{Signal, Stopper},
59 supervision::Tree,
60 Panicker, Registry, ScopeGuard,
61 },
62 validate_label, BufferPool, BufferPoolConfig, Clock, Error, Execution, Handle, ListenerOf,
63 Metrics as _, Panicked, Spawner as _, METRICS_PREFIX,
64};
65#[cfg(feature = "external")]
66use crate::{Blocker, Pacer};
67use commonware_codec::Encode;
68use commonware_macros::select;
69use commonware_parallel::ThreadPool;
70use commonware_utils::{
71 hex,
72 sync::{Mutex, RwLock},
73 time::SYSTEM_TIME_PRECISION,
74 SystemTimeExt,
75};
76#[cfg(feature = "external")]
77use futures::task::noop_waker;
78use futures::{
79 future::Either,
80 task::{waker, ArcWake},
81 Future,
82};
83use governor::clock::{Clock as GClock, ReasonablyRealtime};
84#[cfg(feature = "external")]
85use pin_project::pin_project;
86use prometheus_client::{
87 metrics::{counter::Counter, family::Family, gauge::Gauge},
88 registry::{Metric, Registry as PrometheusRegistry},
89};
90use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
91use rand_core::CryptoRngCore;
92use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
93use sha2::{Digest as _, Sha256};
94use std::{
95 borrow::Cow,
96 collections::{BTreeMap, BinaryHeap, HashMap, HashSet},
97 mem::{replace, take},
98 net::{IpAddr, SocketAddr},
99 num::NonZeroUsize,
100 panic::{catch_unwind, resume_unwind, AssertUnwindSafe},
101 pin::Pin,
102 sync::{Arc, Weak},
103 task::{self, Poll, Waker},
104 time::{Duration, SystemTime, UNIX_EPOCH},
105};
106use tracing::{info_span, trace, Instrument};
107use tracing_opentelemetry::OpenTelemetrySpanExt;
108
109#[derive(Debug)]
110struct Metrics {
111 iterations: Counter,
112 tasks_spawned: Family<Label, Counter>,
113 tasks_running: Family<Label, Gauge>,
114 task_polls: Family<Label, Counter>,
115
116 network_bandwidth: Counter,
117}
118
119impl Metrics {
120 pub fn init(registry: &mut PrometheusRegistry) -> Self {
121 let metrics = Self {
122 iterations: Counter::default(),
123 task_polls: Family::default(),
124 tasks_spawned: Family::default(),
125 tasks_running: Family::default(),
126 network_bandwidth: Counter::default(),
127 };
128 registry.register(
129 "iterations",
130 "Total number of iterations",
131 metrics.iterations.clone(),
132 );
133 registry.register(
134 "tasks_spawned",
135 "Total number of tasks spawned",
136 metrics.tasks_spawned.clone(),
137 );
138 registry.register(
139 "tasks_running",
140 "Number of tasks currently running",
141 metrics.tasks_running.clone(),
142 );
143 registry.register(
144 "task_polls",
145 "Total number of task polls",
146 metrics.task_polls.clone(),
147 );
148 registry.register(
149 "bandwidth",
150 "Total amount of data sent over network",
151 metrics.network_bandwidth.clone(),
152 );
153 metrics
154 }
155}
156
157type Digest = [u8; 32];
159
160pub struct Auditor {
162 digest: Mutex<Digest>,
163}
164
165impl Default for Auditor {
166 fn default() -> Self {
167 Self {
168 digest: Digest::default().into(),
169 }
170 }
171}
172
173impl Auditor {
174 pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
178 where
179 F: FnOnce(&mut Sha256),
180 {
181 let mut digest = self.digest.lock();
182
183 let mut hasher = Sha256::new();
184 hasher.update(digest.as_ref());
185 hasher.update(label);
186 payload(&mut hasher);
187
188 *digest = hasher.finalize().into();
189 }
190
191 pub fn state(&self) -> String {
196 let hash = self.digest.lock();
197 hex(hash.as_ref())
198 }
199}
200
201pub type BoxDynRng = Box<dyn CryptoRngCore + Send + 'static>;
203
204pub struct Config {
206 rng: BoxDynRng,
208
209 cycle: Duration,
212
213 start_time: SystemTime,
215
216 timeout: Option<Duration>,
218
219 catch_panics: bool,
221
222 storage_fault_cfg: FaultConfig,
225
226 network_buffer_pool_cfg: BufferPoolConfig,
228
229 storage_buffer_pool_cfg: BufferPoolConfig,
231}
232
233impl Config {
234 pub fn new() -> Self {
236 cfg_if::cfg_if! {
237 if #[cfg(miri)] {
238 let network_buffer_pool_cfg = BufferPoolConfig::for_network()
240 .with_max_per_class(commonware_utils::NZUsize!(32))
241 .with_thread_cache_disabled();
242 let storage_buffer_pool_cfg = BufferPoolConfig::for_storage()
243 .with_max_per_class(commonware_utils::NZUsize!(32))
244 .with_thread_cache_disabled();
245 } else {
246 let network_buffer_pool_cfg =
247 BufferPoolConfig::for_network().with_thread_cache_disabled();
248 let storage_buffer_pool_cfg =
249 BufferPoolConfig::for_storage().with_thread_cache_disabled();
250 }
251 }
252
253 Self {
254 rng: Box::new(StdRng::seed_from_u64(42)),
255 cycle: Duration::from_millis(1),
256 start_time: UNIX_EPOCH,
257 timeout: None,
258 catch_panics: false,
259 storage_fault_cfg: FaultConfig::default(),
260 network_buffer_pool_cfg,
261 storage_buffer_pool_cfg,
262 }
263 }
264
265 pub fn with_seed(self, seed: u64) -> Self {
268 self.with_rng(Box::new(StdRng::seed_from_u64(seed)))
269 }
270
271 pub fn with_rng(mut self, rng: BoxDynRng) -> Self {
277 self.rng = rng;
278 self
279 }
280
281 pub const fn with_cycle(mut self, cycle: Duration) -> Self {
283 self.cycle = cycle;
284 self
285 }
286 pub const fn with_start_time(mut self, start_time: SystemTime) -> Self {
288 self.start_time = start_time;
289 self
290 }
291 pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
293 self.timeout = timeout;
294 self
295 }
296 pub const fn with_catch_panics(mut self, catch_panics: bool) -> Self {
298 self.catch_panics = catch_panics;
299 self
300 }
301 pub const fn with_network_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
303 self.network_buffer_pool_cfg = cfg;
304 self
305 }
306 pub const fn with_storage_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
308 self.storage_buffer_pool_cfg = cfg;
309 self
310 }
311
312 pub const fn with_storage_fault_config(mut self, faults: FaultConfig) -> Self {
318 self.storage_fault_cfg = faults;
319 self
320 }
321
322 pub const fn cycle(&self) -> Duration {
325 self.cycle
326 }
327 pub const fn start_time(&self) -> SystemTime {
329 self.start_time
330 }
331 pub const fn timeout(&self) -> Option<Duration> {
333 self.timeout
334 }
335 pub const fn catch_panics(&self) -> bool {
337 self.catch_panics
338 }
339 pub const fn network_buffer_pool_config(&self) -> &BufferPoolConfig {
341 &self.network_buffer_pool_cfg
342 }
343 pub const fn storage_buffer_pool_config(&self) -> &BufferPoolConfig {
345 &self.storage_buffer_pool_cfg
346 }
347
348 pub fn assert(&self) {
350 assert!(
351 self.cycle != Duration::default() || self.timeout.is_none(),
352 "cycle duration must be non-zero when timeout is set",
353 );
354 assert!(
355 self.cycle >= SYSTEM_TIME_PRECISION,
356 "cycle duration must be greater than or equal to system time precision"
357 );
358 assert!(
359 self.start_time >= UNIX_EPOCH,
360 "start time must be greater than or equal to unix epoch"
361 );
362 }
363}
364
365impl Default for Config {
366 fn default() -> Self {
367 Self::new()
368 }
369}
370
371type MetricKey = (String, Vec<(String, String)>);
373
374pub struct Executor {
376 registry: Mutex<Registry>,
377 registered_metrics: Mutex<HashSet<MetricKey>>,
378 cycle: Duration,
379 deadline: Option<SystemTime>,
380 metrics: Arc<Metrics>,
381 auditor: Arc<Auditor>,
382 rng: Arc<Mutex<BoxDynRng>>,
383 time: Mutex<SystemTime>,
384 tasks: Arc<Tasks>,
385 sleeping: Mutex<BinaryHeap<Alarm>>,
386 shutdown: Mutex<Stopper>,
387 panicker: Panicker,
388 dns: Mutex<HashMap<String, Vec<IpAddr>>>,
389}
390
391impl Executor {
392 fn advance_time(&self) -> SystemTime {
397 #[cfg(feature = "external")]
398 std::thread::sleep(self.cycle);
399
400 let mut time = self.time.lock();
401 *time = time
402 .checked_add(self.cycle)
403 .expect("executor time overflowed");
404 let now = *time;
405 trace!(now = now.epoch_millis(), "time advanced");
406 now
407 }
408
409 fn skip_idle_time(&self, current: SystemTime) -> SystemTime {
414 if cfg!(feature = "external") || self.tasks.ready() != 0 {
415 return current;
416 }
417
418 let mut skip_until = None;
419 {
420 let sleeping = self.sleeping.lock();
421 if let Some(next) = sleeping.peek() {
422 if next.time > current {
423 skip_until = Some(next.time);
424 }
425 }
426 }
427
428 skip_until.map_or(current, |deadline| {
429 let mut time = self.time.lock();
430 *time = deadline;
431 let now = *time;
432 trace!(now = now.epoch_millis(), "time skipped");
433 now
434 })
435 }
436
437 fn wake_ready_sleepers(&self, current: SystemTime) {
439 let mut sleeping = self.sleeping.lock();
440 while let Some(next) = sleeping.peek() {
441 if next.time <= current {
442 let sleeper = sleeping.pop().unwrap();
443 sleeper.waker.wake();
444 } else {
445 break;
446 }
447 }
448 }
449
450 fn assert_liveness(&self) {
454 if cfg!(feature = "external") || self.tasks.ready() != 0 {
455 return;
456 }
457
458 panic!("runtime stalled");
459 }
460}
461
462pub struct Checkpoint {
466 cycle: Duration,
467 deadline: Option<SystemTime>,
468 auditor: Arc<Auditor>,
469 rng: Arc<Mutex<BoxDynRng>>,
470 time: Mutex<SystemTime>,
471 storage: Arc<Storage>,
472 dns: Mutex<HashMap<String, Vec<IpAddr>>>,
473 catch_panics: bool,
474 network_buffer_pool_cfg: BufferPoolConfig,
475 storage_buffer_pool_cfg: BufferPoolConfig,
476}
477
478impl Checkpoint {
479 pub fn auditor(&self) -> Arc<Auditor> {
481 self.auditor.clone()
482 }
483}
484
485#[allow(clippy::large_enum_variant)]
486enum State {
487 Config(Config),
488 Checkpoint(Checkpoint),
489}
490
491pub struct Runner {
493 state: State,
494}
495
496impl From<Config> for Runner {
497 fn from(cfg: Config) -> Self {
498 Self::new(cfg)
499 }
500}
501
502impl From<Checkpoint> for Runner {
503 fn from(checkpoint: Checkpoint) -> Self {
504 Self {
505 state: State::Checkpoint(checkpoint),
506 }
507 }
508}
509
510impl Runner {
511 pub fn new(cfg: Config) -> Self {
513 cfg.assert();
515 Self {
516 state: State::Config(cfg),
517 }
518 }
519
520 pub fn seeded(seed: u64) -> Self {
523 Self::new(Config::default().with_seed(seed))
524 }
525
526 pub fn timed(timeout: Duration) -> Self {
529 let cfg = Config {
530 timeout: Some(timeout),
531 ..Config::default()
532 };
533 Self::new(cfg)
534 }
535
536 pub fn start_and_recover<F, Fut>(self, f: F) -> (Fut::Output, Checkpoint)
539 where
540 F: FnOnce(Context) -> Fut,
541 Fut: Future,
542 {
543 let (context, executor, panicked) = match self.state {
545 State::Config(config) => Context::new(config),
546 State::Checkpoint(checkpoint) => Context::recover(checkpoint),
547 };
548
549 let storage = context.storage.clone();
551 let network_buffer_pool_cfg = context.network_buffer_pool.config().clone();
552 let storage_buffer_pool_cfg = context.storage_buffer_pool.config().clone();
553 let mut root = Box::pin(panicked.interrupt(f(context)));
554
555 Tasks::register_root(&executor.tasks);
557
558 let result = catch_unwind(AssertUnwindSafe(|| loop {
561 {
563 let current = executor.time.lock();
564 if let Some(deadline) = executor.deadline {
565 if *current >= deadline {
566 drop(current);
567 panic!("runtime timeout");
568 }
569 }
570 }
571
572 let mut queue = executor.tasks.drain();
574
575 if queue.len() > 1 {
577 let mut rng = executor.rng.lock();
578 queue.shuffle(&mut *rng);
579 }
580
581 trace!(
587 iter = executor.metrics.iterations.get(),
588 tasks = queue.len(),
589 "starting loop"
590 );
591 let mut output = None;
592 for id in queue {
593 let Some(task) = executor.tasks.get(id) else {
595 trace!(id, "skipping missing task");
596 continue;
597 };
598
599 executor.auditor.event(b"process_task", |hasher| {
601 hasher.update(task.id.to_be_bytes());
602 hasher.update(task.label.name().as_bytes());
603 });
604 executor.metrics.task_polls.get_or_create(&task.label).inc();
605 trace!(id, "processing task");
606
607 let waker = waker(Arc::new(TaskWaker {
609 id,
610 tasks: Arc::downgrade(&executor.tasks),
611 }));
612 let mut cx = task::Context::from_waker(&waker);
613
614 match &task.mode {
616 Mode::Root => {
617 if let Poll::Ready(result) = root.as_mut().poll(&mut cx) {
619 trace!(id, "root task is complete");
620 output = Some(result);
621 break;
622 }
623 }
624 Mode::Work(future) => {
625 let mut fut_opt = future.lock();
627 let Some(fut) = fut_opt.as_mut() else {
628 trace!(id, "skipping already complete task");
629
630 executor.tasks.remove(id);
632 continue;
633 };
634
635 if fut.as_mut().poll(&mut cx).is_ready() {
637 trace!(id, "task is complete");
638
639 executor.tasks.remove(id);
641 *fut_opt = None;
642 continue;
643 }
644 }
645 }
646
647 trace!(id, "task is still pending");
649 }
650
651 if let Some(output) = output {
653 break output;
654 }
655
656 let mut current = executor.advance_time();
658 current = executor.skip_idle_time(current);
659
660 executor.wake_ready_sleepers(current);
662 executor.assert_liveness();
663
664 executor.metrics.iterations.inc();
666 }));
667
668 executor.sleeping.lock().clear(); let tasks = executor.tasks.clear();
676 for task in tasks {
677 let Mode::Work(future) = &task.mode else {
678 continue;
679 };
680 *future.lock() = None;
681 }
682
683 drop(root);
687
688 assert!(
691 Arc::weak_count(&executor) == 0,
692 "executor still has weak references"
693 );
694
695 let output = match result {
697 Ok(output) => output,
698 Err(payload) => resume_unwind(payload),
699 };
700
701 let executor = Arc::into_inner(executor).expect("executor still has strong references");
703
704 let checkpoint = Checkpoint {
706 cycle: executor.cycle,
707 deadline: executor.deadline,
708 auditor: executor.auditor,
709 rng: executor.rng,
710 time: executor.time,
711 storage,
712 dns: executor.dns,
713 catch_panics: executor.panicker.catch(),
714 network_buffer_pool_cfg,
715 storage_buffer_pool_cfg,
716 };
717
718 (output, checkpoint)
719 }
720}
721
722impl Default for Runner {
723 fn default() -> Self {
724 Self::new(Config::default())
725 }
726}
727
728impl crate::Runner for Runner {
729 type Context = Context;
730
731 fn start<F, Fut>(self, f: F) -> Fut::Output
732 where
733 F: FnOnce(Self::Context) -> Fut,
734 Fut: Future,
735 {
736 let (output, _) = self.start_and_recover(f);
737 output
738 }
739}
740
741enum Mode {
743 Root,
744 Work(Mutex<Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>>),
745}
746
747struct Task {
749 id: u128,
750 label: Label,
751
752 mode: Mode,
753}
754
755struct TaskWaker {
757 id: u128,
758
759 tasks: Weak<Tasks>,
760}
761
762impl ArcWake for TaskWaker {
763 fn wake_by_ref(arc_self: &Arc<Self>) {
764 if let Some(tasks) = arc_self.tasks.upgrade() {
769 tasks.queue(arc_self.id);
770 }
771 }
772}
773
774struct Tasks {
776 counter: Mutex<u128>,
778 ready: Mutex<Vec<u128>>,
780 running: Mutex<BTreeMap<u128, Arc<Task>>>,
782}
783
784impl Tasks {
785 const fn new() -> Self {
787 Self {
788 counter: Mutex::new(0),
789 ready: Mutex::new(Vec::new()),
790 running: Mutex::new(BTreeMap::new()),
791 }
792 }
793
794 fn increment(&self) -> u128 {
796 let mut counter = self.counter.lock();
797 let old = *counter;
798 *counter = counter.checked_add(1).expect("task counter overflow");
799 old
800 }
801
802 fn register_root(arc_self: &Arc<Self>) {
806 let id = arc_self.increment();
807 let task = Arc::new(Task {
808 id,
809 label: Label::root(),
810 mode: Mode::Root,
811 });
812 arc_self.register(id, task);
813 }
814
815 fn register_work(
817 arc_self: &Arc<Self>,
818 label: Label,
819 future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
820 ) {
821 let id = arc_self.increment();
822 let task = Arc::new(Task {
823 id,
824 label,
825 mode: Mode::Work(Mutex::new(Some(future))),
826 });
827 arc_self.register(id, task);
828 }
829
830 fn register(&self, id: u128, task: Arc<Task>) {
832 self.running.lock().insert(id, task);
834
835 self.queue(id);
837 }
838
839 fn queue(&self, id: u128) {
841 let mut ready = self.ready.lock();
842 ready.push(id);
843 }
844
845 fn drain(&self) -> Vec<u128> {
847 let mut queue = self.ready.lock();
848 let len = queue.len();
849 replace(&mut *queue, Vec::with_capacity(len))
850 }
851
852 fn ready(&self) -> usize {
854 self.ready.lock().len()
855 }
856
857 fn get(&self, id: u128) -> Option<Arc<Task>> {
862 let running = self.running.lock();
863 running.get(&id).cloned()
864 }
865
866 fn remove(&self, id: u128) {
868 self.running.lock().remove(&id);
869 }
870
871 fn clear(&self) -> Vec<Arc<Task>> {
873 self.ready.lock().clear();
875
876 let running: BTreeMap<u128, Arc<Task>> = {
878 let mut running = self.running.lock();
879 take(&mut *running)
880 };
881 running.into_values().collect()
882 }
883}
884
885type Network = MeteredNetwork<AuditedNetwork<DeterministicNetwork>>;
886type Storage = MeteredStorage<AuditedStorage<FaultyStorage<MemStorage>>>;
887
888pub struct Context {
892 name: String,
893 attributes: Vec<(String, String)>,
894 scope: Option<Arc<ScopeGuard>>,
895 executor: Weak<Executor>,
896 network: Arc<Network>,
897 storage: Arc<Storage>,
898 network_buffer_pool: BufferPool,
899 storage_buffer_pool: BufferPool,
900 tree: Arc<Tree>,
901 execution: Execution,
902 traced: bool,
903}
904
905impl Clone for Context {
906 fn clone(&self) -> Self {
907 let (child, _) = Tree::child(&self.tree);
908 Self {
909 name: self.name.clone(),
910 attributes: self.attributes.clone(),
911 scope: self.scope.clone(),
912 executor: self.executor.clone(),
913 network: self.network.clone(),
914 storage: self.storage.clone(),
915 network_buffer_pool: self.network_buffer_pool.clone(),
916 storage_buffer_pool: self.storage_buffer_pool.clone(),
917
918 tree: child,
919 execution: Execution::default(),
920 traced: false,
921 }
922 }
923}
924
925impl Context {
926 fn new(cfg: Config) -> (Self, Arc<Executor>, Panicked) {
927 let mut registry = Registry::new();
929 let runtime_registry = registry.root_mut().sub_registry_with_prefix(METRICS_PREFIX);
930
931 let metrics = Arc::new(Metrics::init(runtime_registry));
933 let start_time = cfg.start_time;
934 let deadline = cfg
935 .timeout
936 .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
937 let auditor = Arc::new(Auditor::default());
938
939 let rng = Arc::new(Mutex::new(cfg.rng));
941
942 let network_buffer_pool = BufferPool::new(
944 cfg.network_buffer_pool_cfg.clone(),
945 runtime_registry.sub_registry_with_prefix("network_buffer_pool"),
946 );
947 let storage_buffer_pool = BufferPool::new(
948 cfg.storage_buffer_pool_cfg.clone(),
949 runtime_registry.sub_registry_with_prefix("storage_buffer_pool"),
950 );
951
952 let storage_fault_config = Arc::new(RwLock::new(cfg.storage_fault_cfg));
954 let storage = MeteredStorage::new(
955 AuditedStorage::new(
956 FaultyStorage::new(
957 MemStorage::new(storage_buffer_pool.clone()),
958 rng.clone(),
959 storage_fault_config,
960 ),
961 auditor.clone(),
962 ),
963 runtime_registry,
964 );
965
966 let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
968 let network = MeteredNetwork::new(network, runtime_registry);
969
970 let (panicker, panicked) = Panicker::new(cfg.catch_panics);
972
973 let executor = Arc::new(Executor {
974 registry: Mutex::new(registry),
975 registered_metrics: Mutex::new(HashSet::new()),
976 cycle: cfg.cycle,
977 deadline,
978 metrics,
979 auditor,
980 rng,
981 time: Mutex::new(start_time),
982 tasks: Arc::new(Tasks::new()),
983 sleeping: Mutex::new(BinaryHeap::new()),
984 shutdown: Mutex::new(Stopper::default()),
985 panicker,
986 dns: Mutex::new(HashMap::new()),
987 });
988
989 (
990 Self {
991 name: String::new(),
992 attributes: Vec::new(),
993 scope: None,
994 executor: Arc::downgrade(&executor),
995 network: Arc::new(network),
996 storage: Arc::new(storage),
997 network_buffer_pool,
998 storage_buffer_pool,
999 tree: Tree::root(),
1000 execution: Execution::default(),
1001 traced: false,
1002 },
1003 executor,
1004 panicked,
1005 )
1006 }
1007
1008 fn recover(checkpoint: Checkpoint) -> (Self, Arc<Executor>, Panicked) {
1020 let mut registry = Registry::new();
1022 let runtime_registry = registry.root_mut().sub_registry_with_prefix(METRICS_PREFIX);
1023 let metrics = Arc::new(Metrics::init(runtime_registry));
1024
1025 let network =
1027 AuditedNetwork::new(DeterministicNetwork::default(), checkpoint.auditor.clone());
1028 let network = MeteredNetwork::new(network, runtime_registry);
1029
1030 let network_buffer_pool = BufferPool::new(
1032 checkpoint.network_buffer_pool_cfg.clone(),
1033 runtime_registry.sub_registry_with_prefix("network_buffer_pool"),
1034 );
1035 let storage_buffer_pool = BufferPool::new(
1036 checkpoint.storage_buffer_pool_cfg.clone(),
1037 runtime_registry.sub_registry_with_prefix("storage_buffer_pool"),
1038 );
1039
1040 let (panicker, panicked) = Panicker::new(checkpoint.catch_panics);
1042
1043 let executor = Arc::new(Executor {
1044 cycle: checkpoint.cycle,
1046 deadline: checkpoint.deadline,
1047 auditor: checkpoint.auditor,
1048 rng: checkpoint.rng,
1049 time: checkpoint.time,
1050 dns: checkpoint.dns,
1051
1052 registry: Mutex::new(registry),
1054 registered_metrics: Mutex::new(HashSet::new()),
1055 metrics,
1056 tasks: Arc::new(Tasks::new()),
1057 sleeping: Mutex::new(BinaryHeap::new()),
1058 shutdown: Mutex::new(Stopper::default()),
1059 panicker,
1060 });
1061 (
1062 Self {
1063 name: String::new(),
1064 attributes: Vec::new(),
1065 scope: None,
1066 executor: Arc::downgrade(&executor),
1067 network: Arc::new(network),
1068 storage: checkpoint.storage,
1069 network_buffer_pool,
1070 storage_buffer_pool,
1071 tree: Tree::root(),
1072 execution: Execution::default(),
1073 traced: false,
1074 },
1075 executor,
1076 panicked,
1077 )
1078 }
1079
1080 fn executor(&self) -> Arc<Executor> {
1082 self.executor.upgrade().expect("executor already dropped")
1083 }
1084
1085 fn metrics(&self) -> Arc<Metrics> {
1087 self.executor().metrics.clone()
1088 }
1089
1090 pub fn auditor(&self) -> Arc<Auditor> {
1092 self.executor().auditor.clone()
1093 }
1094
1095 pub fn storage_audit(&self) -> Digest {
1097 self.storage.inner().inner().inner().audit()
1098 }
1099
1100 pub fn storage_fault_config(&self) -> Arc<RwLock<FaultConfig>> {
1106 self.storage.inner().inner().config()
1107 }
1108
1109 pub fn resolver_register(&self, host: impl Into<String>, addrs: Option<Vec<IpAddr>>) {
1114 let executor = self.executor();
1116 let host = host.into();
1117 executor.auditor.event(b"resolver_register", |hasher| {
1118 hasher.update(host.as_bytes());
1119 hasher.update(addrs.encode());
1120 });
1121
1122 let mut dns = executor.dns.lock();
1124 match addrs {
1125 Some(addrs) => {
1126 dns.insert(host, addrs);
1127 }
1128 None => {
1129 dns.remove(&host);
1130 }
1131 }
1132 }
1133}
1134
1135impl crate::Spawner for Context {
1136 fn dedicated(mut self) -> Self {
1137 self.execution = Execution::Dedicated;
1138 self
1139 }
1140
1141 fn shared(mut self, blocking: bool) -> Self {
1142 self.execution = Execution::Shared(blocking);
1143 self
1144 }
1145
1146 fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
1147 where
1148 F: FnOnce(Self) -> Fut + Send + 'static,
1149 Fut: Future<Output = T> + Send + 'static,
1150 T: Send + 'static,
1151 {
1152 let (label, metric) = spawn_metrics!(self);
1154
1155 let parent = Arc::clone(&self.tree);
1157 let traced = self.traced;
1158 self.execution = Execution::default();
1159 self.traced = false;
1160 let (child, aborted) = Tree::child(&parent);
1161 if aborted {
1162 return Handle::closed(metric);
1163 }
1164 self.tree = child;
1165
1166 let executor = self.executor();
1168 let future = if traced {
1169 let span = info_span!(parent: None, "task", name = %label.name());
1170 for (key, value) in &self.attributes {
1171 span.set_attribute(key.clone(), value.clone());
1172 }
1173 Either::Left(f(self).instrument(span))
1174 } else {
1175 Either::Right(f(self))
1176 };
1177 let (f, handle) = Handle::init(
1178 future,
1179 metric,
1180 executor.panicker.clone(),
1181 Arc::clone(&parent),
1182 );
1183 Tasks::register_work(&executor.tasks, label, Box::pin(f));
1184
1185 if let Some(aborter) = handle.aborter() {
1187 parent.register(aborter);
1188 }
1189
1190 handle
1191 }
1192
1193 async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
1194 let executor = self.executor();
1195 executor.auditor.event(b"stop", |hasher| {
1196 hasher.update(value.to_be_bytes());
1197 });
1198 let stop_resolved = {
1199 let mut shutdown = executor.shutdown.lock();
1200 shutdown.stop(value)
1201 };
1202
1203 let timeout_future = timeout.map_or_else(
1205 || futures::future::Either::Right(futures::future::pending()),
1206 |duration| futures::future::Either::Left(self.sleep(duration)),
1207 );
1208 select! {
1209 result = stop_resolved => {
1210 result.map_err(|_| Error::Closed)?;
1211 Ok(())
1212 },
1213 _ = timeout_future => Err(Error::Timeout),
1214 }
1215 }
1216
1217 fn stopped(&self) -> Signal {
1218 let executor = self.executor();
1219 executor.auditor.event(b"stopped", |_| {});
1220 let stopped = executor.shutdown.lock().stopped();
1221 stopped
1222 }
1223}
1224
1225impl crate::ThreadPooler for Context {
1226 fn create_thread_pool(
1227 &self,
1228 concurrency: NonZeroUsize,
1229 ) -> Result<ThreadPool, ThreadPoolBuildError> {
1230 let mut builder = ThreadPoolBuilder::new().num_threads(concurrency.get());
1231
1232 if rayon::current_thread_index().is_none() {
1233 builder = builder.use_current_thread()
1234 }
1235
1236 builder
1237 .spawn_handler(move |thread| {
1238 self.with_label("rayon_thread")
1239 .dedicated()
1240 .spawn(move |_| async move { thread.run() });
1241 Ok(())
1242 })
1243 .build()
1244 .map(Arc::new)
1245 }
1246}
1247
1248impl crate::Metrics for Context {
1249 fn label(&self) -> String {
1250 self.name.clone()
1251 }
1252
1253 fn with_label(&self, label: &str) -> Self {
1254 validate_label(label);
1256
1257 let name = {
1259 let prefix = self.name.clone();
1260 if prefix.is_empty() {
1261 label.to_string()
1262 } else {
1263 format!("{prefix}_{label}")
1264 }
1265 };
1266 assert!(
1267 !name.starts_with(METRICS_PREFIX),
1268 "using runtime label is not allowed"
1269 );
1270 Self {
1271 name,
1272 ..self.clone()
1273 }
1274 }
1275
1276 fn with_attribute(&self, key: &str, value: impl std::fmt::Display) -> Self {
1277 validate_label(key);
1279
1280 let mut attributes = self.attributes.clone();
1282 assert!(
1283 add_attribute(&mut attributes, key, value),
1284 "duplicate attribute key: {key}"
1285 );
1286 Self {
1287 attributes,
1288 ..self.clone()
1289 }
1290 }
1291
1292 fn with_scope(&self) -> Self {
1293 let executor = self.executor();
1294 executor.auditor.event(b"with_scope", |_| {});
1295
1296 if self.scope.is_some() {
1298 return self.clone();
1299 }
1300
1301 let weak = self.executor.clone();
1303 let scope_id = executor.registry.lock().create_scope();
1304 let guard = Arc::new(ScopeGuard::new(scope_id, move |id| {
1305 if let Some(exec) = weak.upgrade() {
1306 exec.registry.lock().remove_scope(id);
1307 }
1308 }));
1309 Self {
1310 scope: Some(guard),
1311 ..self.clone()
1312 }
1313 }
1314
1315 fn with_span(&self) -> Self {
1316 Self {
1317 traced: true,
1318 ..self.clone()
1319 }
1320 }
1321
1322 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
1323 let name = name.into();
1325 let help = help.into();
1326
1327 let executor = self.executor();
1329 executor.auditor.event(b"register", |hasher| {
1330 hasher.update(name.as_bytes());
1331 hasher.update(help.as_bytes());
1332 for (k, v) in &self.attributes {
1333 hasher.update(k.as_bytes());
1334 hasher.update(v.as_bytes());
1335 }
1336 });
1337 let prefixed_name = {
1338 let prefix = &self.name;
1339 if prefix.is_empty() {
1340 name
1341 } else {
1342 format!("{}_{}", *prefix, name)
1343 }
1344 };
1345
1346 let metric_key = (prefixed_name.clone(), self.attributes.clone());
1348 let is_new = executor.registered_metrics.lock().insert(metric_key);
1349 assert!(
1350 is_new,
1351 "duplicate metric: {} with attributes {:?}",
1352 prefixed_name, self.attributes
1353 );
1354
1355 let mut registry = executor.registry.lock();
1357 let scoped = registry.get_scope(self.scope.as_ref().map(|s| s.scope_id()));
1358 let sub_registry = self
1359 .attributes
1360 .iter()
1361 .fold(scoped, |reg, (k, v): &(String, String)| {
1362 reg.sub_registry_with_label((Cow::Owned(k.clone()), Cow::Owned(v.clone())))
1363 });
1364 sub_registry.register(prefixed_name, help, metric);
1365 }
1366
1367 fn encode(&self) -> String {
1368 let executor = self.executor();
1369 executor.auditor.event(b"encode", |_| {});
1370 let encoded = executor.registry.lock().encode();
1371 encoded
1372 }
1373}
1374
1375struct Sleeper {
1376 executor: Weak<Executor>,
1377 time: SystemTime,
1378 registered: bool,
1379}
1380
1381impl Sleeper {
1382 fn executor(&self) -> Arc<Executor> {
1384 self.executor.upgrade().expect("executor already dropped")
1385 }
1386}
1387
1388struct Alarm {
1389 time: SystemTime,
1390 waker: Waker,
1391}
1392
1393impl PartialEq for Alarm {
1394 fn eq(&self, other: &Self) -> bool {
1395 self.time.eq(&other.time)
1396 }
1397}
1398
1399impl Eq for Alarm {}
1400
1401impl PartialOrd for Alarm {
1402 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1403 Some(self.cmp(other))
1404 }
1405}
1406
1407impl Ord for Alarm {
1408 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1409 other.time.cmp(&self.time)
1411 }
1412}
1413
1414impl Future for Sleeper {
1415 type Output = ();
1416
1417 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1418 let executor = self.executor();
1419 {
1420 let current_time = *executor.time.lock();
1421 if current_time >= self.time {
1422 return Poll::Ready(());
1423 }
1424 }
1425 if !self.registered {
1426 self.registered = true;
1427 executor.sleeping.lock().push(Alarm {
1428 time: self.time,
1429 waker: cx.waker().clone(),
1430 });
1431 }
1432 Poll::Pending
1433 }
1434}
1435
1436impl Clock for Context {
1437 fn current(&self) -> SystemTime {
1438 *self.executor().time.lock()
1439 }
1440
1441 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
1442 let deadline = self
1443 .current()
1444 .checked_add(duration)
1445 .expect("overflow when setting wake time");
1446 self.sleep_until(deadline)
1447 }
1448
1449 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
1450 Sleeper {
1451 executor: self.executor.clone(),
1452
1453 time: deadline,
1454 registered: false,
1455 }
1456 }
1457}
1458
1459#[cfg(feature = "external")]
1463#[pin_project]
1464struct Waiter<F: Future> {
1465 executor: Weak<Executor>,
1466 target: SystemTime,
1467 #[pin]
1468 future: F,
1469 ready: Option<F::Output>,
1470 started: bool,
1471 registered: bool,
1472}
1473
1474#[cfg(feature = "external")]
1475impl<F> Future for Waiter<F>
1476where
1477 F: Future + Send,
1478{
1479 type Output = F::Output;
1480
1481 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1482 let mut this = self.project();
1483
1484 if !*this.started {
1488 *this.started = true;
1489 let waker = noop_waker();
1490 let mut cx_noop = task::Context::from_waker(&waker);
1491 if let Poll::Ready(value) = this.future.as_mut().poll(&mut cx_noop) {
1492 *this.ready = Some(value);
1493 }
1494 }
1495
1496 let executor = this.executor.upgrade().expect("executor already dropped");
1498 let current_time = *executor.time.lock();
1499 if current_time < *this.target {
1500 if !*this.registered {
1503 *this.registered = true;
1504 executor.sleeping.lock().push(Alarm {
1505 time: *this.target,
1506 waker: cx.waker().clone(),
1507 });
1508 }
1509 return Poll::Pending;
1510 }
1511
1512 if let Some(value) = this.ready.take() {
1514 return Poll::Ready(value);
1515 }
1516
1517 let blocker = Blocker::new();
1520 loop {
1521 let waker = waker(blocker.clone());
1522 let mut cx_block = task::Context::from_waker(&waker);
1523 match this.future.as_mut().poll(&mut cx_block) {
1524 Poll::Ready(value) => {
1525 break Poll::Ready(value);
1526 }
1527 Poll::Pending => blocker.wait(),
1528 }
1529 }
1530 }
1531}
1532
1533#[cfg(feature = "external")]
1534impl Pacer for Context {
1535 fn pace<'a, F, T>(&'a self, latency: Duration, future: F) -> impl Future<Output = T> + Send + 'a
1536 where
1537 F: Future<Output = T> + Send + 'a,
1538 T: Send + 'a,
1539 {
1540 let target = self
1542 .executor()
1543 .time
1544 .lock()
1545 .checked_add(latency)
1546 .expect("overflow when setting wake time");
1547
1548 Waiter {
1549 executor: self.executor.clone(),
1550 target,
1551 future,
1552 ready: None,
1553 started: false,
1554 registered: false,
1555 }
1556 }
1557}
1558
1559impl GClock for Context {
1560 type Instant = SystemTime;
1561
1562 fn now(&self) -> Self::Instant {
1563 self.current()
1564 }
1565}
1566
1567impl ReasonablyRealtime for Context {}
1568
1569impl crate::Network for Context {
1570 type Listener = ListenerOf<Network>;
1571
1572 async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
1573 self.network.bind(socket).await
1574 }
1575
1576 async fn dial(
1577 &self,
1578 socket: SocketAddr,
1579 ) -> Result<(crate::SinkOf<Self>, crate::StreamOf<Self>), Error> {
1580 self.network.dial(socket).await
1581 }
1582}
1583
1584impl crate::Resolver for Context {
1585 async fn resolve(&self, host: &str) -> Result<Vec<IpAddr>, Error> {
1586 let executor = self.executor();
1588 let dns = executor.dns.lock();
1589 let result = dns.get(host).cloned();
1590 drop(dns);
1591
1592 executor.auditor.event(b"resolve", |hasher| {
1594 hasher.update(host.as_bytes());
1595 hasher.update(result.encode());
1596 });
1597 result.ok_or_else(|| Error::ResolveFailed(host.to_string()))
1598 }
1599}
1600
1601impl RngCore for Context {
1602 fn next_u32(&mut self) -> u32 {
1603 let executor = self.executor();
1604 executor.auditor.event(b"rand", |hasher| {
1605 hasher.update(b"next_u32");
1606 });
1607 let result = executor.rng.lock().next_u32();
1608 result
1609 }
1610
1611 fn next_u64(&mut self) -> u64 {
1612 let executor = self.executor();
1613 executor.auditor.event(b"rand", |hasher| {
1614 hasher.update(b"next_u64");
1615 });
1616 let result = executor.rng.lock().next_u64();
1617 result
1618 }
1619
1620 fn fill_bytes(&mut self, dest: &mut [u8]) {
1621 let executor = self.executor();
1622 executor.auditor.event(b"rand", |hasher| {
1623 hasher.update(b"fill_bytes");
1624 });
1625 executor.rng.lock().fill_bytes(dest);
1626 }
1627
1628 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1629 let executor = self.executor();
1630 executor.auditor.event(b"rand", |hasher| {
1631 hasher.update(b"try_fill_bytes");
1632 });
1633 let result = executor.rng.lock().try_fill_bytes(dest);
1634 result
1635 }
1636}
1637
1638impl CryptoRng for Context {}
1639
1640impl crate::Storage for Context {
1641 type Blob = <Storage as crate::Storage>::Blob;
1642
1643 async fn open_versioned(
1644 &self,
1645 partition: &str,
1646 name: &[u8],
1647 versions: std::ops::RangeInclusive<u16>,
1648 ) -> Result<(Self::Blob, u64, u16), Error> {
1649 self.storage.open_versioned(partition, name, versions).await
1650 }
1651
1652 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1653 self.storage.remove(partition, name).await
1654 }
1655
1656 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1657 self.storage.scan(partition).await
1658 }
1659}
1660
1661impl crate::BufferPooler for Context {
1662 fn network_buffer_pool(&self) -> &crate::BufferPool {
1663 &self.network_buffer_pool
1664 }
1665
1666 fn storage_buffer_pool(&self) -> &crate::BufferPool {
1667 &self.storage_buffer_pool
1668 }
1669}
1670
1671#[cfg(test)]
1672mod tests {
1673 use super::*;
1674 #[cfg(feature = "external")]
1675 use crate::FutureExt;
1676 #[cfg(feature = "external")]
1677 use crate::Spawner;
1678 use crate::{deterministic, reschedule, Blob, Metrics, Resolver, Runner as _, Storage};
1679 use commonware_macros::test_traced;
1680 #[cfg(feature = "external")]
1681 use commonware_utils::channel::mpsc;
1682 use commonware_utils::channel::oneshot;
1683 #[cfg(not(feature = "external"))]
1684 use futures::future::pending;
1685 #[cfg(not(feature = "external"))]
1686 use futures::stream::StreamExt as _;
1687 #[cfg(feature = "external")]
1688 use futures::StreamExt;
1689 use futures::{stream::FuturesUnordered, task::noop_waker};
1690
1691 async fn task(i: usize) -> usize {
1692 for _ in 0..5 {
1693 reschedule().await;
1694 }
1695 i
1696 }
1697
1698 fn run_tasks(tasks: usize, runner: deterministic::Runner) -> (String, Vec<usize>) {
1699 runner.start(|context| async move {
1700 let mut handles = FuturesUnordered::new();
1701 for i in 0..=tasks - 1 {
1702 handles.push(context.clone().spawn(move |_| task(i)));
1703 }
1704
1705 let mut outputs = Vec::new();
1706 while let Some(result) = handles.next().await {
1707 outputs.push(result.unwrap());
1708 }
1709 assert_eq!(outputs.len(), tasks);
1710 (context.auditor().state(), outputs)
1711 })
1712 }
1713
1714 fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1715 let executor = deterministic::Runner::seeded(seed);
1716 run_tasks(5, executor)
1717 }
1718
1719 #[test]
1720 fn test_same_seed_same_order() {
1721 let mut outputs = Vec::new();
1723 for seed in 0..1000 {
1724 let output = run_with_seed(seed);
1725 outputs.push(output);
1726 }
1727
1728 for seed in 0..1000 {
1730 let output = run_with_seed(seed);
1731 assert_eq!(output, outputs[seed as usize]);
1732 }
1733 }
1734
1735 #[test_traced("TRACE")]
1736 fn test_different_seeds_different_order() {
1737 let output1 = run_with_seed(12345);
1738 let output2 = run_with_seed(54321);
1739 assert_ne!(output1, output2);
1740 }
1741
1742 #[test]
1743 fn test_alarm_min_heap() {
1744 let now = SystemTime::now();
1746 let alarms = vec![
1747 Alarm {
1748 time: now + Duration::new(10, 0),
1749 waker: noop_waker(),
1750 },
1751 Alarm {
1752 time: now + Duration::new(5, 0),
1753 waker: noop_waker(),
1754 },
1755 Alarm {
1756 time: now + Duration::new(15, 0),
1757 waker: noop_waker(),
1758 },
1759 Alarm {
1760 time: now + Duration::new(5, 0),
1761 waker: noop_waker(),
1762 },
1763 ];
1764 let mut heap = BinaryHeap::new();
1765 for alarm in alarms {
1766 heap.push(alarm);
1767 }
1768
1769 let mut sorted_times = Vec::new();
1771 while let Some(alarm) = heap.pop() {
1772 sorted_times.push(alarm.time);
1773 }
1774 assert_eq!(
1775 sorted_times,
1776 vec![
1777 now + Duration::new(5, 0),
1778 now + Duration::new(5, 0),
1779 now + Duration::new(10, 0),
1780 now + Duration::new(15, 0),
1781 ]
1782 );
1783 }
1784
1785 #[test]
1786 #[should_panic(expected = "runtime timeout")]
1787 fn test_timeout() {
1788 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1789 executor.start(|context| async move {
1790 loop {
1791 context.sleep(Duration::from_secs(1)).await;
1792 }
1793 });
1794 }
1795
1796 #[test]
1797 #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1798 fn test_bad_timeout() {
1799 let cfg = Config {
1800 timeout: Some(Duration::default()),
1801 cycle: Duration::default(),
1802 ..Config::default()
1803 };
1804 deterministic::Runner::new(cfg);
1805 }
1806
1807 #[test]
1808 #[should_panic(
1809 expected = "cycle duration must be greater than or equal to system time precision"
1810 )]
1811 fn test_bad_cycle() {
1812 let cfg = Config {
1813 cycle: SYSTEM_TIME_PRECISION - Duration::from_nanos(1),
1814 ..Config::default()
1815 };
1816 deterministic::Runner::new(cfg);
1817 }
1818
1819 #[test]
1820 fn test_recover_synced_storage_persists() {
1821 let executor1 = deterministic::Runner::default();
1823 let partition = "test_partition";
1824 let name = b"test_blob";
1825 let data = b"Hello, world!";
1826
1827 let (state, checkpoint) = executor1.start_and_recover(|context| async move {
1829 let (blob, _) = context.open(partition, name).await.unwrap();
1830 blob.write_at(0, data).await.unwrap();
1831 blob.sync().await.unwrap();
1832 context.auditor().state()
1833 });
1834
1835 assert_eq!(state, checkpoint.auditor.state());
1837
1838 let executor = Runner::from(checkpoint);
1840 executor.start(|context| async move {
1841 let (blob, len) = context.open(partition, name).await.unwrap();
1842 assert_eq!(len, data.len() as u64);
1843 let read = blob.read_at(0, data.len()).await.unwrap();
1844 assert_eq!(read.coalesce(), data);
1845 });
1846 }
1847
1848 #[test]
1849 #[should_panic(expected = "goodbye")]
1850 fn test_recover_panic_handling() {
1851 let executor1 = deterministic::Runner::default();
1853 let (_, checkpoint) = executor1.start_and_recover(|_| async move {
1854 reschedule().await;
1855 });
1856
1857 let executor = Runner::from(checkpoint);
1859 executor.start(|_| async move {
1860 panic!("goodbye");
1861 });
1862 }
1863
1864 #[test]
1865 fn test_recover_unsynced_storage_does_not_persist() {
1866 let executor = deterministic::Runner::default();
1868 let partition = "test_partition";
1869 let name = b"test_blob";
1870 let data = b"Hello, world!";
1871
1872 let (_, checkpoint) = executor.start_and_recover(|context| async move {
1874 let context = context.clone();
1875 let (blob, _) = context.open(partition, name).await.unwrap();
1876 blob.write_at(0, data).await.unwrap();
1877 });
1878
1879 let executor = Runner::from(checkpoint);
1881
1882 executor.start(|context| async move {
1884 let (_, len) = context.open(partition, name).await.unwrap();
1885 assert_eq!(len, 0);
1886 });
1887 }
1888
1889 #[test]
1890 fn test_recover_dns_mappings_persist() {
1891 let executor = deterministic::Runner::default();
1893 let host = "example.com";
1894 let addrs = vec![
1895 IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 1)),
1896 IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 2)),
1897 ];
1898
1899 let (state, checkpoint) = executor.start_and_recover({
1901 let addrs = addrs.clone();
1902 |context| async move {
1903 context.resolver_register(host, Some(addrs));
1904 context.auditor().state()
1905 }
1906 });
1907
1908 assert_eq!(state, checkpoint.auditor.state());
1910
1911 let executor = Runner::from(checkpoint);
1913 executor.start(move |context| async move {
1914 let resolved = context.resolve(host).await.unwrap();
1915 assert_eq!(resolved, addrs);
1916 });
1917 }
1918
1919 #[test]
1920 fn test_recover_time_persists() {
1921 let executor = deterministic::Runner::default();
1923 let duration_to_sleep = Duration::from_secs(10);
1924
1925 let (time_before_recovery, checkpoint) = executor.start_and_recover(|context| async move {
1927 context.sleep(duration_to_sleep).await;
1928 context.current()
1929 });
1930
1931 assert_eq!(
1933 time_before_recovery.duration_since(UNIX_EPOCH).unwrap(),
1934 duration_to_sleep
1935 );
1936
1937 let executor2 = Runner::from(checkpoint);
1939 executor2.start(move |context| async move {
1940 assert_eq!(context.current(), time_before_recovery);
1941
1942 context.sleep(duration_to_sleep).await;
1944 assert_eq!(
1945 context.current().duration_since(UNIX_EPOCH).unwrap(),
1946 duration_to_sleep * 2
1947 );
1948 });
1949 }
1950
1951 #[test]
1952 #[should_panic(expected = "executor still has weak references")]
1953 fn test_context_return() {
1954 let executor = deterministic::Runner::default();
1956
1957 let context = executor.start(|context| async move {
1959 context
1961 });
1962
1963 drop(context);
1965 }
1966
1967 #[test]
1968 fn test_default_time_zero() {
1969 let executor = deterministic::Runner::default();
1971
1972 executor.start(|context| async move {
1973 assert_eq!(
1975 context.current().duration_since(UNIX_EPOCH).unwrap(),
1976 Duration::ZERO
1977 );
1978 });
1979 }
1980
1981 #[test]
1982 fn test_start_time() {
1983 let executor_default = deterministic::Runner::default();
1985 executor_default.start(|context| async move {
1986 assert_eq!(context.current(), UNIX_EPOCH);
1987 });
1988
1989 let start_time = UNIX_EPOCH + Duration::from_secs(100);
1991 let cfg = Config::default().with_start_time(start_time);
1992 let executor = deterministic::Runner::new(cfg);
1993
1994 executor.start(move |context| async move {
1995 assert_eq!(context.current(), start_time);
1997 });
1998 }
1999
2000 #[test]
2001 #[should_panic(expected = "start time must be greater than or equal to unix epoch")]
2002 fn test_bad_start_time() {
2003 let cfg = Config::default().with_start_time(UNIX_EPOCH - Duration::from_secs(1));
2004 deterministic::Runner::new(cfg);
2005 }
2006
2007 #[cfg(not(feature = "external"))]
2008 #[test]
2009 #[should_panic(expected = "runtime stalled")]
2010 fn test_stall() {
2011 let executor = deterministic::Runner::default();
2013
2014 executor.start(|_| async move {
2016 pending::<()>().await;
2017 });
2018 }
2019
2020 #[cfg(not(feature = "external"))]
2021 #[test]
2022 #[should_panic(expected = "runtime stalled")]
2023 fn test_external_simulated() {
2024 let executor = deterministic::Runner::default();
2026
2027 let (tx, rx) = oneshot::channel();
2029 std::thread::spawn(move || {
2030 std::thread::sleep(Duration::from_secs(1));
2031 tx.send(()).unwrap();
2032 });
2033
2034 executor.start(|_| async move {
2036 rx.await.unwrap();
2037 });
2038 }
2039
2040 #[cfg(feature = "external")]
2041 #[test]
2042 fn test_external_realtime() {
2043 let executor = deterministic::Runner::default();
2045
2046 let (tx, rx) = oneshot::channel();
2048 std::thread::spawn(move || {
2049 std::thread::sleep(Duration::from_secs(1));
2050 tx.send(()).unwrap();
2051 });
2052
2053 executor.start(|_| async move {
2055 rx.await.unwrap();
2056 });
2057 }
2058
2059 #[cfg(feature = "external")]
2060 #[test]
2061 fn test_external_realtime_variable() {
2062 let executor = deterministic::Runner::default();
2064
2065 executor.start(|context| async move {
2067 let start_real = SystemTime::now();
2069 let start_sim = context.current();
2070 let (first_tx, first_rx) = oneshot::channel();
2071 let (second_tx, second_rx) = oneshot::channel();
2072 let (results_tx, mut results_rx) = mpsc::channel(2);
2073
2074 let first_wait = Duration::from_secs(1);
2076 std::thread::spawn(move || {
2077 std::thread::sleep(first_wait);
2078 first_tx.send(()).unwrap();
2079 });
2080
2081 std::thread::spawn(move || {
2083 std::thread::sleep(Duration::ZERO);
2084 second_tx.send(()).unwrap();
2085 });
2086
2087 let first = context.clone().spawn({
2089 let results_tx = results_tx.clone();
2090 move |context| async move {
2091 first_rx.pace(&context, Duration::ZERO).await.unwrap();
2092 let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
2093 assert!(elapsed_real > first_wait);
2094 let elapsed_sim = context.current().duration_since(start_sim).unwrap();
2095 assert!(elapsed_sim < first_wait);
2096 results_tx.send(1).await.unwrap();
2097 }
2098 });
2099
2100 let second = context.clone().spawn(move |context| async move {
2102 second_rx.pace(&context, first_wait).await.unwrap();
2103 let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
2104 assert!(elapsed_real >= first_wait);
2105 let elapsed_sim = context.current().duration_since(start_sim).unwrap();
2106 assert!(elapsed_sim >= first_wait);
2107 results_tx.send(2).await.unwrap();
2108 });
2109
2110 second.await.unwrap();
2112 first.await.unwrap();
2113
2114 let mut results = Vec::new();
2116 for _ in 0..2 {
2117 results.push(results_rx.recv().await.unwrap());
2118 }
2119 assert_eq!(results, vec![1, 2]);
2120 });
2121 }
2122
2123 #[cfg(not(feature = "external"))]
2124 #[test]
2125 fn test_simulated_skip() {
2126 let executor = deterministic::Runner::default();
2128
2129 executor.start(|context| async move {
2131 context.sleep(Duration::from_secs(1)).await;
2132
2133 let metrics = context.encode();
2135 let iterations = metrics
2136 .lines()
2137 .find_map(|line| {
2138 line.strip_prefix("runtime_iterations_total ")
2139 .and_then(|value| value.trim().parse::<u64>().ok())
2140 })
2141 .expect("missing runtime_iterations_total metric");
2142 assert!(iterations < 10);
2143 });
2144 }
2145
2146 #[cfg(feature = "external")]
2147 #[test]
2148 fn test_realtime_no_skip() {
2149 let executor = deterministic::Runner::default();
2151
2152 executor.start(|context| async move {
2154 context.sleep(Duration::from_secs(1)).await;
2155
2156 let metrics = context.encode();
2158 let iterations = metrics
2159 .lines()
2160 .find_map(|line| {
2161 line.strip_prefix("runtime_iterations_total ")
2162 .and_then(|value| value.trim().parse::<u64>().ok())
2163 })
2164 .expect("missing runtime_iterations_total metric");
2165 assert!(iterations > 500);
2166 });
2167 }
2168
2169 #[test]
2170 #[should_panic(expected = "label must start with [a-zA-Z]")]
2171 fn test_metrics_label_empty() {
2172 let executor = deterministic::Runner::default();
2173 executor.start(|context| async move {
2174 context.with_label("");
2175 });
2176 }
2177
2178 #[test]
2179 #[should_panic(expected = "label must start with [a-zA-Z]")]
2180 fn test_metrics_label_invalid_first_char() {
2181 let executor = deterministic::Runner::default();
2182 executor.start(|context| async move {
2183 context.with_label("1invalid");
2184 });
2185 }
2186
2187 #[test]
2188 #[should_panic(expected = "label must only contain [a-zA-Z0-9_]")]
2189 fn test_metrics_label_invalid_char() {
2190 let executor = deterministic::Runner::default();
2191 executor.start(|context| async move {
2192 context.with_label("invalid-label");
2193 });
2194 }
2195
2196 #[test]
2197 #[should_panic(expected = "using runtime label is not allowed")]
2198 fn test_metrics_label_reserved_prefix() {
2199 let executor = deterministic::Runner::default();
2200 executor.start(|context| async move {
2201 context.with_label(METRICS_PREFIX);
2202 });
2203 }
2204
2205 #[test]
2206 #[should_panic(expected = "duplicate attribute key: epoch")]
2207 fn test_metrics_duplicate_attribute_panics() {
2208 let executor = deterministic::Runner::default();
2209 executor.start(|context| async move {
2210 let _ = context
2211 .with_label("test")
2212 .with_attribute("epoch", "old")
2213 .with_attribute("epoch", "new");
2214 });
2215 }
2216
2217 #[test]
2218 fn test_storage_fault_injection_and_recovery() {
2219 let cfg = deterministic::Config::default().with_storage_fault_config(FaultConfig {
2221 sync_rate: Some(1.0),
2222 ..Default::default()
2223 });
2224
2225 let (result, checkpoint) =
2226 deterministic::Runner::new(cfg).start_and_recover(|ctx| async move {
2227 let (blob, _) = ctx.open("test_fault", b"blob").await.unwrap();
2228 blob.write_at(0, b"data".to_vec()).await.unwrap();
2229 blob.sync().await });
2231
2232 assert!(result.is_err());
2234
2235 deterministic::Runner::from(checkpoint).start(|ctx| async move {
2237 *ctx.storage_fault_config().write() = FaultConfig::default();
2239
2240 let (blob, len) = ctx.open("test_fault", b"blob").await.unwrap();
2242 assert_eq!(len, 0, "unsynced data should be lost after recovery");
2243
2244 blob.write_at(0, b"recovered".to_vec()).await.unwrap();
2246 blob.sync()
2247 .await
2248 .expect("sync should succeed with faults disabled");
2249
2250 let read_buf = blob.read_at(0, 9).await.unwrap();
2252 assert_eq!(read_buf.coalesce(), b"recovered");
2253 });
2254 }
2255
2256 #[test]
2257 fn test_storage_fault_dynamic_config() {
2258 let executor = deterministic::Runner::default();
2259 executor.start(|ctx| async move {
2260 let (blob, _) = ctx.open("test_dynamic", b"blob").await.unwrap();
2261
2262 blob.write_at(0, b"initial".to_vec()).await.unwrap();
2264 blob.sync().await.expect("initial sync should succeed");
2265
2266 let storage_fault_cfg = ctx.storage_fault_config();
2268 storage_fault_cfg.write().sync_rate = Some(1.0);
2269
2270 blob.write_at(0, b"updated".to_vec()).await.unwrap();
2272 let result = blob.sync().await;
2273 assert!(result.is_err(), "sync should fail with faults enabled");
2274
2275 storage_fault_cfg.write().sync_rate = Some(0.0);
2277
2278 blob.sync()
2280 .await
2281 .expect("sync should succeed with faults disabled");
2282 });
2283 }
2284
2285 #[test]
2286 fn test_storage_fault_determinism() {
2287 fn run_with_seed(seed: u64) -> Vec<bool> {
2289 let cfg = deterministic::Config::default()
2290 .with_seed(seed)
2291 .with_storage_fault_config(FaultConfig {
2292 open_rate: Some(0.5),
2293 ..Default::default()
2294 });
2295
2296 let runner = deterministic::Runner::new(cfg);
2297 runner.start(|ctx| async move {
2298 let mut results = Vec::new();
2299 for i in 0..20 {
2300 let name = format!("blob{i}");
2301 let result = ctx.open("test_determinism", name.as_bytes()).await;
2302 results.push(result.is_ok());
2303 }
2304 results
2305 })
2306 }
2307
2308 let results1 = run_with_seed(12345);
2309 let results2 = run_with_seed(12345);
2310 assert_eq!(
2311 results1, results2,
2312 "same seed should produce same failure pattern"
2313 );
2314
2315 let results3 = run_with_seed(99999);
2316 assert_ne!(
2317 results1, results3,
2318 "different seeds should produce different patterns"
2319 );
2320 }
2321
2322 #[test]
2323 fn test_storage_fault_determinism_multi_task() {
2324 fn run_with_seed(seed: u64) -> Vec<u32> {
2327 let cfg = deterministic::Config::default()
2328 .with_seed(seed)
2329 .with_storage_fault_config(FaultConfig {
2330 open_rate: Some(0.5),
2331 write_rate: Some(0.3),
2332 sync_rate: Some(0.2),
2333 ..Default::default()
2334 });
2335
2336 let runner = deterministic::Runner::new(cfg);
2337 runner.start(|ctx| async move {
2338 let mut handles = Vec::new();
2340 for i in 0..5 {
2341 let ctx = ctx.clone();
2342 handles.push(ctx.spawn(move |ctx| async move {
2343 let mut successes = 0u32;
2344 for j in 0..4 {
2345 let name = format!("task{i}_blob{j}");
2346 if let Ok((blob, _)) = ctx.open("partition", name.as_bytes()).await {
2347 successes += 1;
2348 if blob.write_at(0, b"data".to_vec()).await.is_ok() {
2349 successes += 1;
2350 }
2351 if blob.sync().await.is_ok() {
2352 successes += 1;
2353 }
2354 }
2355 }
2356 successes
2357 }));
2358 }
2359
2360 let mut results = Vec::new();
2362 for handle in handles {
2363 results.push(handle.await.unwrap());
2364 }
2365 results
2366 })
2367 }
2368
2369 let results1 = run_with_seed(42);
2370 let results2 = run_with_seed(42);
2371 assert_eq!(
2372 results1, results2,
2373 "same seed should produce same multi-task pattern"
2374 );
2375
2376 let results3 = run_with_seed(99999);
2377 assert_ne!(
2378 results1, results3,
2379 "different seeds should produce different patterns"
2380 );
2381 }
2382}