1pub use crate::storage::faulty::Config as FaultConfig;
46use crate::{
47 network::{
48 audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
49 metered::Network as MeteredNetwork,
50 },
51 storage::{
52 audited::Storage as AuditedStorage, faulty::Storage as FaultyStorage,
53 memory::Storage as MemStorage, metered::Storage as MeteredStorage,
54 },
55 telemetry::metrics::task::Label,
56 utils::{
57 add_attribute,
58 signal::{Signal, Stopper},
59 supervision::Tree,
60 Panicker, Registry, ScopeGuard,
61 },
62 validate_label, BufferPool, BufferPoolConfig, Clock, Error, Execution, Handle, ListenerOf,
63 Metrics as _, Panicked, Spawner as _, METRICS_PREFIX,
64};
65#[cfg(feature = "external")]
66use crate::{Blocker, Pacer};
67use commonware_codec::Encode;
68use commonware_macros::select;
69use commonware_parallel::ThreadPool;
70use commonware_utils::{
71 hex,
72 sync::{Mutex, RwLock},
73 time::SYSTEM_TIME_PRECISION,
74 SystemTimeExt,
75};
76#[cfg(feature = "external")]
77use futures::task::noop_waker;
78use futures::{
79 future::BoxFuture,
80 task::{waker, ArcWake},
81 Future, FutureExt,
82};
83use governor::clock::{Clock as GClock, ReasonablyRealtime};
84#[cfg(feature = "external")]
85use pin_project::pin_project;
86use prometheus_client::{
87 metrics::{counter::Counter, family::Family, gauge::Gauge},
88 registry::{Metric, Registry as PrometheusRegistry},
89};
90use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
91use rand_core::CryptoRngCore;
92use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
93use sha2::{Digest as _, Sha256};
94use std::{
95 borrow::Cow,
96 collections::{BTreeMap, BinaryHeap, HashMap, HashSet},
97 mem::{replace, take},
98 net::{IpAddr, SocketAddr},
99 num::NonZeroUsize,
100 panic::{catch_unwind, resume_unwind, AssertUnwindSafe},
101 pin::Pin,
102 sync::{Arc, Weak},
103 task::{self, Poll, Waker},
104 time::{Duration, SystemTime, UNIX_EPOCH},
105};
106use tracing::{info_span, trace, Instrument};
107use tracing_opentelemetry::OpenTelemetrySpanExt;
108
109#[derive(Debug)]
110struct Metrics {
111 iterations: Counter,
112 tasks_spawned: Family<Label, Counter>,
113 tasks_running: Family<Label, Gauge>,
114 task_polls: Family<Label, Counter>,
115
116 network_bandwidth: Counter,
117}
118
119impl Metrics {
120 pub fn init(registry: &mut PrometheusRegistry) -> Self {
121 let metrics = Self {
122 iterations: Counter::default(),
123 task_polls: Family::default(),
124 tasks_spawned: Family::default(),
125 tasks_running: Family::default(),
126 network_bandwidth: Counter::default(),
127 };
128 registry.register(
129 "iterations",
130 "Total number of iterations",
131 metrics.iterations.clone(),
132 );
133 registry.register(
134 "tasks_spawned",
135 "Total number of tasks spawned",
136 metrics.tasks_spawned.clone(),
137 );
138 registry.register(
139 "tasks_running",
140 "Number of tasks currently running",
141 metrics.tasks_running.clone(),
142 );
143 registry.register(
144 "task_polls",
145 "Total number of task polls",
146 metrics.task_polls.clone(),
147 );
148 registry.register(
149 "bandwidth",
150 "Total amount of data sent over network",
151 metrics.network_bandwidth.clone(),
152 );
153 metrics
154 }
155}
156
157type Digest = [u8; 32];
159
160pub struct Auditor {
162 digest: Mutex<Digest>,
163}
164
165impl Default for Auditor {
166 fn default() -> Self {
167 Self {
168 digest: Digest::default().into(),
169 }
170 }
171}
172
173impl Auditor {
174 pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
178 where
179 F: FnOnce(&mut Sha256),
180 {
181 let mut digest = self.digest.lock();
182
183 let mut hasher = Sha256::new();
184 hasher.update(digest.as_ref());
185 hasher.update(label);
186 payload(&mut hasher);
187
188 *digest = hasher.finalize().into();
189 }
190
191 pub fn state(&self) -> String {
196 let hash = self.digest.lock();
197 hex(hash.as_ref())
198 }
199}
200
201pub type BoxDynRng = Box<dyn CryptoRngCore + Send + 'static>;
203
204pub struct Config {
206 rng: BoxDynRng,
208
209 cycle: Duration,
212
213 start_time: SystemTime,
215
216 timeout: Option<Duration>,
218
219 catch_panics: bool,
221
222 storage_fault_cfg: FaultConfig,
225
226 network_buffer_pool_cfg: BufferPoolConfig,
228
229 storage_buffer_pool_cfg: BufferPoolConfig,
231}
232
233impl Config {
234 pub fn new() -> Self {
236 cfg_if::cfg_if! {
237 if #[cfg(miri)] {
238 let network_buffer_pool_cfg = BufferPoolConfig::for_network()
240 .with_max_per_class(commonware_utils::NZUsize!(32));
241 let storage_buffer_pool_cfg = BufferPoolConfig::for_storage()
242 .with_max_per_class(commonware_utils::NZUsize!(32));
243 } else {
244 let network_buffer_pool_cfg = BufferPoolConfig::for_network();
245 let storage_buffer_pool_cfg = BufferPoolConfig::for_storage();
246 }
247 }
248
249 Self {
250 rng: Box::new(StdRng::seed_from_u64(42)),
251 cycle: Duration::from_millis(1),
252 start_time: UNIX_EPOCH,
253 timeout: None,
254 catch_panics: false,
255 storage_fault_cfg: FaultConfig::default(),
256 network_buffer_pool_cfg,
257 storage_buffer_pool_cfg,
258 }
259 }
260
261 pub fn with_seed(self, seed: u64) -> Self {
264 self.with_rng(Box::new(StdRng::seed_from_u64(seed)))
265 }
266
267 pub fn with_rng(mut self, rng: BoxDynRng) -> Self {
273 self.rng = rng;
274 self
275 }
276
277 pub const fn with_cycle(mut self, cycle: Duration) -> Self {
279 self.cycle = cycle;
280 self
281 }
282 pub const fn with_start_time(mut self, start_time: SystemTime) -> Self {
284 self.start_time = start_time;
285 self
286 }
287 pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
289 self.timeout = timeout;
290 self
291 }
292 pub const fn with_catch_panics(mut self, catch_panics: bool) -> Self {
294 self.catch_panics = catch_panics;
295 self
296 }
297 pub const fn with_network_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
299 self.network_buffer_pool_cfg = cfg;
300 self
301 }
302 pub const fn with_storage_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
304 self.storage_buffer_pool_cfg = cfg;
305 self
306 }
307
308 pub const fn with_storage_fault_config(mut self, faults: FaultConfig) -> Self {
314 self.storage_fault_cfg = faults;
315 self
316 }
317
318 pub const fn cycle(&self) -> Duration {
321 self.cycle
322 }
323 pub const fn start_time(&self) -> SystemTime {
325 self.start_time
326 }
327 pub const fn timeout(&self) -> Option<Duration> {
329 self.timeout
330 }
331 pub const fn catch_panics(&self) -> bool {
333 self.catch_panics
334 }
335 pub const fn network_buffer_pool_config(&self) -> &BufferPoolConfig {
337 &self.network_buffer_pool_cfg
338 }
339 pub const fn storage_buffer_pool_config(&self) -> &BufferPoolConfig {
341 &self.storage_buffer_pool_cfg
342 }
343
344 pub fn assert(&self) {
346 assert!(
347 self.cycle != Duration::default() || self.timeout.is_none(),
348 "cycle duration must be non-zero when timeout is set",
349 );
350 assert!(
351 self.cycle >= SYSTEM_TIME_PRECISION,
352 "cycle duration must be greater than or equal to system time precision"
353 );
354 }
355}
356
357impl Default for Config {
358 fn default() -> Self {
359 Self::new()
360 }
361}
362
363type MetricKey = (String, Vec<(String, String)>);
365
366pub struct Executor {
368 registry: Mutex<Registry>,
369 registered_metrics: Mutex<HashSet<MetricKey>>,
370 cycle: Duration,
371 deadline: Option<SystemTime>,
372 metrics: Arc<Metrics>,
373 auditor: Arc<Auditor>,
374 rng: Arc<Mutex<BoxDynRng>>,
375 time: Mutex<SystemTime>,
376 tasks: Arc<Tasks>,
377 sleeping: Mutex<BinaryHeap<Alarm>>,
378 shutdown: Mutex<Stopper>,
379 panicker: Panicker,
380 dns: Mutex<HashMap<String, Vec<IpAddr>>>,
381}
382
383impl Executor {
384 fn advance_time(&self) -> SystemTime {
389 #[cfg(feature = "external")]
390 std::thread::sleep(self.cycle);
391
392 let mut time = self.time.lock();
393 *time = time
394 .checked_add(self.cycle)
395 .expect("executor time overflowed");
396 let now = *time;
397 trace!(now = now.epoch_millis(), "time advanced");
398 now
399 }
400
401 fn skip_idle_time(&self, current: SystemTime) -> SystemTime {
406 if cfg!(feature = "external") || self.tasks.ready() != 0 {
407 return current;
408 }
409
410 let mut skip_until = None;
411 {
412 let sleeping = self.sleeping.lock();
413 if let Some(next) = sleeping.peek() {
414 if next.time > current {
415 skip_until = Some(next.time);
416 }
417 }
418 }
419
420 skip_until.map_or(current, |deadline| {
421 let mut time = self.time.lock();
422 *time = deadline;
423 let now = *time;
424 trace!(now = now.epoch_millis(), "time skipped");
425 now
426 })
427 }
428
429 fn wake_ready_sleepers(&self, current: SystemTime) {
431 let mut sleeping = self.sleeping.lock();
432 while let Some(next) = sleeping.peek() {
433 if next.time <= current {
434 let sleeper = sleeping.pop().unwrap();
435 sleeper.waker.wake();
436 } else {
437 break;
438 }
439 }
440 }
441
442 fn assert_liveness(&self) {
446 if cfg!(feature = "external") || self.tasks.ready() != 0 {
447 return;
448 }
449
450 panic!("runtime stalled");
451 }
452}
453
454pub struct Checkpoint {
458 cycle: Duration,
459 deadline: Option<SystemTime>,
460 auditor: Arc<Auditor>,
461 rng: Arc<Mutex<BoxDynRng>>,
462 time: Mutex<SystemTime>,
463 storage: Arc<Storage>,
464 dns: Mutex<HashMap<String, Vec<IpAddr>>>,
465 catch_panics: bool,
466 network_buffer_pool_cfg: BufferPoolConfig,
467 storage_buffer_pool_cfg: BufferPoolConfig,
468}
469
470impl Checkpoint {
471 pub fn auditor(&self) -> Arc<Auditor> {
473 self.auditor.clone()
474 }
475}
476
477#[allow(clippy::large_enum_variant)]
478enum State {
479 Config(Config),
480 Checkpoint(Checkpoint),
481}
482
483pub struct Runner {
485 state: State,
486}
487
488impl From<Config> for Runner {
489 fn from(cfg: Config) -> Self {
490 Self::new(cfg)
491 }
492}
493
494impl From<Checkpoint> for Runner {
495 fn from(checkpoint: Checkpoint) -> Self {
496 Self {
497 state: State::Checkpoint(checkpoint),
498 }
499 }
500}
501
502impl Runner {
503 pub fn new(cfg: Config) -> Self {
505 cfg.assert();
507 Self {
508 state: State::Config(cfg),
509 }
510 }
511
512 pub fn seeded(seed: u64) -> Self {
515 Self::new(Config::default().with_seed(seed))
516 }
517
518 pub fn timed(timeout: Duration) -> Self {
521 let cfg = Config {
522 timeout: Some(timeout),
523 ..Config::default()
524 };
525 Self::new(cfg)
526 }
527
528 pub fn start_and_recover<F, Fut>(self, f: F) -> (Fut::Output, Checkpoint)
531 where
532 F: FnOnce(Context) -> Fut,
533 Fut: Future,
534 {
535 let (context, executor, panicked) = match self.state {
537 State::Config(config) => Context::new(config),
538 State::Checkpoint(checkpoint) => Context::recover(checkpoint),
539 };
540
541 let storage = context.storage.clone();
543 let network_buffer_pool_cfg = context.network_buffer_pool.config().clone();
544 let storage_buffer_pool_cfg = context.storage_buffer_pool.config().clone();
545 let mut root = Box::pin(panicked.interrupt(f(context)));
546
547 Tasks::register_root(&executor.tasks);
549
550 let result = catch_unwind(AssertUnwindSafe(|| loop {
553 {
555 let current = executor.time.lock();
556 if let Some(deadline) = executor.deadline {
557 if *current >= deadline {
558 drop(current);
559 panic!("runtime timeout");
560 }
561 }
562 }
563
564 let mut queue = executor.tasks.drain();
566
567 if queue.len() > 1 {
569 let mut rng = executor.rng.lock();
570 queue.shuffle(&mut *rng);
571 }
572
573 trace!(
579 iter = executor.metrics.iterations.get(),
580 tasks = queue.len(),
581 "starting loop"
582 );
583 let mut output = None;
584 for id in queue {
585 let Some(task) = executor.tasks.get(id) else {
587 trace!(id, "skipping missing task");
588 continue;
589 };
590
591 executor.auditor.event(b"process_task", |hasher| {
593 hasher.update(task.id.to_be_bytes());
594 hasher.update(task.label.name().as_bytes());
595 });
596 executor.metrics.task_polls.get_or_create(&task.label).inc();
597 trace!(id, "processing task");
598
599 let waker = waker(Arc::new(TaskWaker {
601 id,
602 tasks: Arc::downgrade(&executor.tasks),
603 }));
604 let mut cx = task::Context::from_waker(&waker);
605
606 match &task.mode {
608 Mode::Root => {
609 if let Poll::Ready(result) = root.as_mut().poll(&mut cx) {
611 trace!(id, "root task is complete");
612 output = Some(result);
613 break;
614 }
615 }
616 Mode::Work(future) => {
617 let mut fut_opt = future.lock();
619 let Some(fut) = fut_opt.as_mut() else {
620 trace!(id, "skipping already complete task");
621
622 executor.tasks.remove(id);
624 continue;
625 };
626
627 if fut.as_mut().poll(&mut cx).is_ready() {
629 trace!(id, "task is complete");
630
631 executor.tasks.remove(id);
633 *fut_opt = None;
634 continue;
635 }
636 }
637 }
638
639 trace!(id, "task is still pending");
641 }
642
643 if let Some(output) = output {
645 break output;
646 }
647
648 let mut current = executor.advance_time();
650 current = executor.skip_idle_time(current);
651
652 executor.wake_ready_sleepers(current);
654 executor.assert_liveness();
655
656 executor.metrics.iterations.inc();
658 }));
659
660 executor.sleeping.lock().clear(); let tasks = executor.tasks.clear();
668 for task in tasks {
669 let Mode::Work(future) = &task.mode else {
670 continue;
671 };
672 *future.lock() = None;
673 }
674
675 drop(root);
679
680 assert!(
683 Arc::weak_count(&executor) == 0,
684 "executor still has weak references"
685 );
686
687 let output = match result {
689 Ok(output) => output,
690 Err(payload) => resume_unwind(payload),
691 };
692
693 let executor = Arc::into_inner(executor).expect("executor still has strong references");
695
696 let checkpoint = Checkpoint {
698 cycle: executor.cycle,
699 deadline: executor.deadline,
700 auditor: executor.auditor,
701 rng: executor.rng,
702 time: executor.time,
703 storage,
704 dns: executor.dns,
705 catch_panics: executor.panicker.catch(),
706 network_buffer_pool_cfg,
707 storage_buffer_pool_cfg,
708 };
709
710 (output, checkpoint)
711 }
712}
713
714impl Default for Runner {
715 fn default() -> Self {
716 Self::new(Config::default())
717 }
718}
719
720impl crate::Runner for Runner {
721 type Context = Context;
722
723 fn start<F, Fut>(self, f: F) -> Fut::Output
724 where
725 F: FnOnce(Self::Context) -> Fut,
726 Fut: Future,
727 {
728 let (output, _) = self.start_and_recover(f);
729 output
730 }
731}
732
733enum Mode {
735 Root,
736 Work(Mutex<Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>>),
737}
738
739struct Task {
741 id: u128,
742 label: Label,
743
744 mode: Mode,
745}
746
747struct TaskWaker {
749 id: u128,
750
751 tasks: Weak<Tasks>,
752}
753
754impl ArcWake for TaskWaker {
755 fn wake_by_ref(arc_self: &Arc<Self>) {
756 if let Some(tasks) = arc_self.tasks.upgrade() {
761 tasks.queue(arc_self.id);
762 }
763 }
764}
765
766struct Tasks {
768 counter: Mutex<u128>,
770 ready: Mutex<Vec<u128>>,
772 running: Mutex<BTreeMap<u128, Arc<Task>>>,
774}
775
776impl Tasks {
777 const fn new() -> Self {
779 Self {
780 counter: Mutex::new(0),
781 ready: Mutex::new(Vec::new()),
782 running: Mutex::new(BTreeMap::new()),
783 }
784 }
785
786 fn increment(&self) -> u128 {
788 let mut counter = self.counter.lock();
789 let old = *counter;
790 *counter = counter.checked_add(1).expect("task counter overflow");
791 old
792 }
793
794 fn register_root(arc_self: &Arc<Self>) {
798 let id = arc_self.increment();
799 let task = Arc::new(Task {
800 id,
801 label: Label::root(),
802 mode: Mode::Root,
803 });
804 arc_self.register(id, task);
805 }
806
807 fn register_work(
809 arc_self: &Arc<Self>,
810 label: Label,
811 future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
812 ) {
813 let id = arc_self.increment();
814 let task = Arc::new(Task {
815 id,
816 label,
817 mode: Mode::Work(Mutex::new(Some(future))),
818 });
819 arc_self.register(id, task);
820 }
821
822 fn register(&self, id: u128, task: Arc<Task>) {
824 self.running.lock().insert(id, task);
826
827 self.queue(id);
829 }
830
831 fn queue(&self, id: u128) {
833 let mut ready = self.ready.lock();
834 ready.push(id);
835 }
836
837 fn drain(&self) -> Vec<u128> {
839 let mut queue = self.ready.lock();
840 let len = queue.len();
841 replace(&mut *queue, Vec::with_capacity(len))
842 }
843
844 fn ready(&self) -> usize {
846 self.ready.lock().len()
847 }
848
849 fn get(&self, id: u128) -> Option<Arc<Task>> {
854 let running = self.running.lock();
855 running.get(&id).cloned()
856 }
857
858 fn remove(&self, id: u128) {
860 self.running.lock().remove(&id);
861 }
862
863 fn clear(&self) -> Vec<Arc<Task>> {
865 self.ready.lock().clear();
867
868 let running: BTreeMap<u128, Arc<Task>> = {
870 let mut running = self.running.lock();
871 take(&mut *running)
872 };
873 running.into_values().collect()
874 }
875}
876
877type Network = MeteredNetwork<AuditedNetwork<DeterministicNetwork>>;
878type Storage = MeteredStorage<AuditedStorage<FaultyStorage<MemStorage>>>;
879
880pub struct Context {
884 name: String,
885 attributes: Vec<(String, String)>,
886 scope: Option<Arc<ScopeGuard>>,
887 executor: Weak<Executor>,
888 network: Arc<Network>,
889 storage: Arc<Storage>,
890 network_buffer_pool: BufferPool,
891 storage_buffer_pool: BufferPool,
892 tree: Arc<Tree>,
893 execution: Execution,
894 instrumented: bool,
895}
896
897impl Clone for Context {
898 fn clone(&self) -> Self {
899 let (child, _) = Tree::child(&self.tree);
900 Self {
901 name: self.name.clone(),
902 attributes: self.attributes.clone(),
903 scope: self.scope.clone(),
904 executor: self.executor.clone(),
905 network: self.network.clone(),
906 storage: self.storage.clone(),
907 network_buffer_pool: self.network_buffer_pool.clone(),
908 storage_buffer_pool: self.storage_buffer_pool.clone(),
909
910 tree: child,
911 execution: Execution::default(),
912 instrumented: false,
913 }
914 }
915}
916
917impl Context {
918 fn new(cfg: Config) -> (Self, Arc<Executor>, Panicked) {
919 let mut registry = Registry::new();
921 let runtime_registry = registry.root_mut().sub_registry_with_prefix(METRICS_PREFIX);
922
923 let metrics = Arc::new(Metrics::init(runtime_registry));
925 let start_time = cfg.start_time;
926 let deadline = cfg
927 .timeout
928 .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
929 let auditor = Arc::new(Auditor::default());
930
931 let rng = Arc::new(Mutex::new(cfg.rng));
933
934 let network_buffer_pool = BufferPool::new(
936 cfg.network_buffer_pool_cfg.clone(),
937 runtime_registry.sub_registry_with_prefix("network_buffer_pool"),
938 );
939 let storage_buffer_pool = BufferPool::new(
940 cfg.storage_buffer_pool_cfg.clone(),
941 runtime_registry.sub_registry_with_prefix("storage_buffer_pool"),
942 );
943
944 let storage_fault_config = Arc::new(RwLock::new(cfg.storage_fault_cfg));
946 let storage = MeteredStorage::new(
947 AuditedStorage::new(
948 FaultyStorage::new(
949 MemStorage::new(storage_buffer_pool.clone()),
950 rng.clone(),
951 storage_fault_config,
952 ),
953 auditor.clone(),
954 ),
955 runtime_registry,
956 );
957
958 let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
960 let network = MeteredNetwork::new(network, runtime_registry);
961
962 let (panicker, panicked) = Panicker::new(cfg.catch_panics);
964
965 let executor = Arc::new(Executor {
966 registry: Mutex::new(registry),
967 registered_metrics: Mutex::new(HashSet::new()),
968 cycle: cfg.cycle,
969 deadline,
970 metrics,
971 auditor,
972 rng,
973 time: Mutex::new(start_time),
974 tasks: Arc::new(Tasks::new()),
975 sleeping: Mutex::new(BinaryHeap::new()),
976 shutdown: Mutex::new(Stopper::default()),
977 panicker,
978 dns: Mutex::new(HashMap::new()),
979 });
980
981 (
982 Self {
983 name: String::new(),
984 attributes: Vec::new(),
985 scope: None,
986 executor: Arc::downgrade(&executor),
987 network: Arc::new(network),
988 storage: Arc::new(storage),
989 network_buffer_pool,
990 storage_buffer_pool,
991 tree: Tree::root(),
992 execution: Execution::default(),
993 instrumented: false,
994 },
995 executor,
996 panicked,
997 )
998 }
999
1000 fn recover(checkpoint: Checkpoint) -> (Self, Arc<Executor>, Panicked) {
1012 let mut registry = Registry::new();
1014 let runtime_registry = registry.root_mut().sub_registry_with_prefix(METRICS_PREFIX);
1015 let metrics = Arc::new(Metrics::init(runtime_registry));
1016
1017 let network =
1019 AuditedNetwork::new(DeterministicNetwork::default(), checkpoint.auditor.clone());
1020 let network = MeteredNetwork::new(network, runtime_registry);
1021
1022 let network_buffer_pool = BufferPool::new(
1024 checkpoint.network_buffer_pool_cfg.clone(),
1025 runtime_registry.sub_registry_with_prefix("network_buffer_pool"),
1026 );
1027 let storage_buffer_pool = BufferPool::new(
1028 checkpoint.storage_buffer_pool_cfg.clone(),
1029 runtime_registry.sub_registry_with_prefix("storage_buffer_pool"),
1030 );
1031
1032 let (panicker, panicked) = Panicker::new(checkpoint.catch_panics);
1034
1035 let executor = Arc::new(Executor {
1036 cycle: checkpoint.cycle,
1038 deadline: checkpoint.deadline,
1039 auditor: checkpoint.auditor,
1040 rng: checkpoint.rng,
1041 time: checkpoint.time,
1042 dns: checkpoint.dns,
1043
1044 registry: Mutex::new(registry),
1046 registered_metrics: Mutex::new(HashSet::new()),
1047 metrics,
1048 tasks: Arc::new(Tasks::new()),
1049 sleeping: Mutex::new(BinaryHeap::new()),
1050 shutdown: Mutex::new(Stopper::default()),
1051 panicker,
1052 });
1053 (
1054 Self {
1055 name: String::new(),
1056 attributes: Vec::new(),
1057 scope: None,
1058 executor: Arc::downgrade(&executor),
1059 network: Arc::new(network),
1060 storage: checkpoint.storage,
1061 network_buffer_pool,
1062 storage_buffer_pool,
1063 tree: Tree::root(),
1064 execution: Execution::default(),
1065 instrumented: false,
1066 },
1067 executor,
1068 panicked,
1069 )
1070 }
1071
1072 fn executor(&self) -> Arc<Executor> {
1074 self.executor.upgrade().expect("executor already dropped")
1075 }
1076
1077 fn metrics(&self) -> Arc<Metrics> {
1079 self.executor().metrics.clone()
1080 }
1081
1082 pub fn auditor(&self) -> Arc<Auditor> {
1084 self.executor().auditor.clone()
1085 }
1086
1087 pub fn storage_audit(&self) -> Digest {
1089 self.storage.inner().inner().inner().audit()
1090 }
1091
1092 pub fn storage_fault_config(&self) -> Arc<RwLock<FaultConfig>> {
1098 self.storage.inner().inner().config()
1099 }
1100
1101 pub fn resolver_register(&self, host: impl Into<String>, addrs: Option<Vec<IpAddr>>) {
1106 let executor = self.executor();
1108 let host = host.into();
1109 executor.auditor.event(b"resolver_register", |hasher| {
1110 hasher.update(host.as_bytes());
1111 hasher.update(addrs.encode());
1112 });
1113
1114 let mut dns = executor.dns.lock();
1116 match addrs {
1117 Some(addrs) => {
1118 dns.insert(host, addrs);
1119 }
1120 None => {
1121 dns.remove(&host);
1122 }
1123 }
1124 }
1125}
1126
1127impl crate::Spawner for Context {
1128 fn dedicated(mut self) -> Self {
1129 self.execution = Execution::Dedicated;
1130 self
1131 }
1132
1133 fn shared(mut self, blocking: bool) -> Self {
1134 self.execution = Execution::Shared(blocking);
1135 self
1136 }
1137
1138 fn instrumented(mut self) -> Self {
1139 self.instrumented = true;
1140 self
1141 }
1142
1143 fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
1144 where
1145 F: FnOnce(Self) -> Fut + Send + 'static,
1146 Fut: Future<Output = T> + Send + 'static,
1147 T: Send + 'static,
1148 {
1149 let (label, metric) = spawn_metrics!(self);
1151
1152 let parent = Arc::clone(&self.tree);
1154 let is_instrumented = self.instrumented;
1155 self.execution = Execution::default();
1156 self.instrumented = false;
1157 let (child, aborted) = Tree::child(&parent);
1158 if aborted {
1159 return Handle::closed(metric);
1160 }
1161 self.tree = child;
1162
1163 let executor = self.executor();
1165 let future: BoxFuture<'_, T> = if is_instrumented {
1166 let span = info_span!(parent: None, "task", name = %label.name());
1167 for (key, value) in &self.attributes {
1168 span.set_attribute(key.clone(), value.clone());
1169 }
1170 f(self).instrument(span).boxed()
1171 } else {
1172 f(self).boxed()
1173 };
1174 let (f, handle) = Handle::init(
1175 future,
1176 metric,
1177 executor.panicker.clone(),
1178 Arc::clone(&parent),
1179 );
1180 Tasks::register_work(&executor.tasks, label, Box::pin(f));
1181
1182 if let Some(aborter) = handle.aborter() {
1184 parent.register(aborter);
1185 }
1186
1187 handle
1188 }
1189
1190 async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
1191 let executor = self.executor();
1192 executor.auditor.event(b"stop", |hasher| {
1193 hasher.update(value.to_be_bytes());
1194 });
1195 let stop_resolved = {
1196 let mut shutdown = executor.shutdown.lock();
1197 shutdown.stop(value)
1198 };
1199
1200 let timeout_future = timeout.map_or_else(
1202 || futures::future::Either::Right(futures::future::pending()),
1203 |duration| futures::future::Either::Left(self.sleep(duration)),
1204 );
1205 select! {
1206 result = stop_resolved => {
1207 result.map_err(|_| Error::Closed)?;
1208 Ok(())
1209 },
1210 _ = timeout_future => Err(Error::Timeout),
1211 }
1212 }
1213
1214 fn stopped(&self) -> Signal {
1215 let executor = self.executor();
1216 executor.auditor.event(b"stopped", |_| {});
1217 let stopped = executor.shutdown.lock().stopped();
1218 stopped
1219 }
1220}
1221
1222impl crate::ThreadPooler for Context {
1223 fn create_thread_pool(
1224 &self,
1225 concurrency: NonZeroUsize,
1226 ) -> Result<ThreadPool, ThreadPoolBuildError> {
1227 let mut builder = ThreadPoolBuilder::new().num_threads(concurrency.get());
1228
1229 if rayon::current_thread_index().is_none() {
1230 builder = builder.use_current_thread()
1231 }
1232
1233 builder
1234 .spawn_handler(move |thread| {
1235 self.with_label("rayon_thread")
1236 .dedicated()
1237 .spawn(move |_| async move { thread.run() });
1238 Ok(())
1239 })
1240 .build()
1241 .map(Arc::new)
1242 }
1243}
1244
1245impl crate::Metrics for Context {
1246 fn with_label(&self, label: &str) -> Self {
1247 validate_label(label);
1249
1250 let name = {
1252 let prefix = self.name.clone();
1253 if prefix.is_empty() {
1254 label.to_string()
1255 } else {
1256 format!("{prefix}_{label}")
1257 }
1258 };
1259 assert!(
1260 !name.starts_with(METRICS_PREFIX),
1261 "using runtime label is not allowed"
1262 );
1263 Self {
1264 name,
1265 ..self.clone()
1266 }
1267 }
1268
1269 fn with_attribute(&self, key: &str, value: impl std::fmt::Display) -> Self {
1270 validate_label(key);
1272
1273 let mut attributes = self.attributes.clone();
1275 assert!(
1276 add_attribute(&mut attributes, key, value),
1277 "duplicate attribute key: {key}"
1278 );
1279 Self {
1280 attributes,
1281 ..self.clone()
1282 }
1283 }
1284
1285 fn label(&self) -> String {
1286 self.name.clone()
1287 }
1288
1289 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
1290 let name = name.into();
1292 let help = help.into();
1293
1294 let executor = self.executor();
1296 executor.auditor.event(b"register", |hasher| {
1297 hasher.update(name.as_bytes());
1298 hasher.update(help.as_bytes());
1299 for (k, v) in &self.attributes {
1300 hasher.update(k.as_bytes());
1301 hasher.update(v.as_bytes());
1302 }
1303 });
1304 let prefixed_name = {
1305 let prefix = &self.name;
1306 if prefix.is_empty() {
1307 name
1308 } else {
1309 format!("{}_{}", *prefix, name)
1310 }
1311 };
1312
1313 let metric_key = (prefixed_name.clone(), self.attributes.clone());
1315 let is_new = executor.registered_metrics.lock().insert(metric_key);
1316 assert!(
1317 is_new,
1318 "duplicate metric: {} with attributes {:?}",
1319 prefixed_name, self.attributes
1320 );
1321
1322 let mut registry = executor.registry.lock();
1324 let scoped = registry.get_scope(self.scope.as_ref().map(|s| s.scope_id()));
1325 let sub_registry = self
1326 .attributes
1327 .iter()
1328 .fold(scoped, |reg, (k, v): &(String, String)| {
1329 reg.sub_registry_with_label((Cow::Owned(k.clone()), Cow::Owned(v.clone())))
1330 });
1331 sub_registry.register(prefixed_name, help, metric);
1332 }
1333
1334 fn encode(&self) -> String {
1335 let executor = self.executor();
1336 executor.auditor.event(b"encode", |_| {});
1337 let encoded = executor.registry.lock().encode();
1338 encoded
1339 }
1340
1341 fn with_scope(&self) -> Self {
1342 let executor = self.executor();
1343 executor.auditor.event(b"with_scope", |_| {});
1344
1345 if self.scope.is_some() {
1347 return self.clone();
1348 }
1349
1350 let weak = self.executor.clone();
1352 let scope_id = executor.registry.lock().create_scope();
1353 let guard = Arc::new(ScopeGuard::new(scope_id, move |id| {
1354 if let Some(exec) = weak.upgrade() {
1355 exec.registry.lock().remove_scope(id);
1356 }
1357 }));
1358 Self {
1359 scope: Some(guard),
1360 ..self.clone()
1361 }
1362 }
1363}
1364
1365struct Sleeper {
1366 executor: Weak<Executor>,
1367 time: SystemTime,
1368 registered: bool,
1369}
1370
1371impl Sleeper {
1372 fn executor(&self) -> Arc<Executor> {
1374 self.executor.upgrade().expect("executor already dropped")
1375 }
1376}
1377
1378struct Alarm {
1379 time: SystemTime,
1380 waker: Waker,
1381}
1382
1383impl PartialEq for Alarm {
1384 fn eq(&self, other: &Self) -> bool {
1385 self.time.eq(&other.time)
1386 }
1387}
1388
1389impl Eq for Alarm {}
1390
1391impl PartialOrd for Alarm {
1392 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1393 Some(self.cmp(other))
1394 }
1395}
1396
1397impl Ord for Alarm {
1398 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1399 other.time.cmp(&self.time)
1401 }
1402}
1403
1404impl Future for Sleeper {
1405 type Output = ();
1406
1407 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1408 let executor = self.executor();
1409 {
1410 let current_time = *executor.time.lock();
1411 if current_time >= self.time {
1412 return Poll::Ready(());
1413 }
1414 }
1415 if !self.registered {
1416 self.registered = true;
1417 executor.sleeping.lock().push(Alarm {
1418 time: self.time,
1419 waker: cx.waker().clone(),
1420 });
1421 }
1422 Poll::Pending
1423 }
1424}
1425
1426impl Clock for Context {
1427 fn current(&self) -> SystemTime {
1428 *self.executor().time.lock()
1429 }
1430
1431 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
1432 let deadline = self
1433 .current()
1434 .checked_add(duration)
1435 .expect("overflow when setting wake time");
1436 self.sleep_until(deadline)
1437 }
1438
1439 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
1440 Sleeper {
1441 executor: self.executor.clone(),
1442
1443 time: deadline,
1444 registered: false,
1445 }
1446 }
1447}
1448
1449#[cfg(feature = "external")]
1453#[pin_project]
1454struct Waiter<F: Future> {
1455 executor: Weak<Executor>,
1456 target: SystemTime,
1457 #[pin]
1458 future: F,
1459 ready: Option<F::Output>,
1460 started: bool,
1461 registered: bool,
1462}
1463
1464#[cfg(feature = "external")]
1465impl<F> Future for Waiter<F>
1466where
1467 F: Future + Send,
1468{
1469 type Output = F::Output;
1470
1471 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1472 let mut this = self.project();
1473
1474 if !*this.started {
1478 *this.started = true;
1479 let waker = noop_waker();
1480 let mut cx_noop = task::Context::from_waker(&waker);
1481 if let Poll::Ready(value) = this.future.as_mut().poll(&mut cx_noop) {
1482 *this.ready = Some(value);
1483 }
1484 }
1485
1486 let executor = this.executor.upgrade().expect("executor already dropped");
1488 let current_time = *executor.time.lock();
1489 if current_time < *this.target {
1490 if !*this.registered {
1493 *this.registered = true;
1494 executor.sleeping.lock().push(Alarm {
1495 time: *this.target,
1496 waker: cx.waker().clone(),
1497 });
1498 }
1499 return Poll::Pending;
1500 }
1501
1502 if let Some(value) = this.ready.take() {
1504 return Poll::Ready(value);
1505 }
1506
1507 let blocker = Blocker::new();
1510 loop {
1511 let waker = waker(blocker.clone());
1512 let mut cx_block = task::Context::from_waker(&waker);
1513 match this.future.as_mut().poll(&mut cx_block) {
1514 Poll::Ready(value) => {
1515 break Poll::Ready(value);
1516 }
1517 Poll::Pending => blocker.wait(),
1518 }
1519 }
1520 }
1521}
1522
1523#[cfg(feature = "external")]
1524impl Pacer for Context {
1525 fn pace<'a, F, T>(&'a self, latency: Duration, future: F) -> impl Future<Output = T> + Send + 'a
1526 where
1527 F: Future<Output = T> + Send + 'a,
1528 T: Send + 'a,
1529 {
1530 let target = self
1532 .executor()
1533 .time
1534 .lock()
1535 .checked_add(latency)
1536 .expect("overflow when setting wake time");
1537
1538 Waiter {
1539 executor: self.executor.clone(),
1540 target,
1541 future,
1542 ready: None,
1543 started: false,
1544 registered: false,
1545 }
1546 }
1547}
1548
1549impl GClock for Context {
1550 type Instant = SystemTime;
1551
1552 fn now(&self) -> Self::Instant {
1553 self.current()
1554 }
1555}
1556
1557impl ReasonablyRealtime for Context {}
1558
1559impl crate::Network for Context {
1560 type Listener = ListenerOf<Network>;
1561
1562 async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
1563 self.network.bind(socket).await
1564 }
1565
1566 async fn dial(
1567 &self,
1568 socket: SocketAddr,
1569 ) -> Result<(crate::SinkOf<Self>, crate::StreamOf<Self>), Error> {
1570 self.network.dial(socket).await
1571 }
1572}
1573
1574impl crate::Resolver for Context {
1575 async fn resolve(&self, host: &str) -> Result<Vec<IpAddr>, Error> {
1576 let executor = self.executor();
1578 let dns = executor.dns.lock();
1579 let result = dns.get(host).cloned();
1580 drop(dns);
1581
1582 executor.auditor.event(b"resolve", |hasher| {
1584 hasher.update(host.as_bytes());
1585 hasher.update(result.encode());
1586 });
1587 result.ok_or_else(|| Error::ResolveFailed(host.to_string()))
1588 }
1589}
1590
1591impl RngCore for Context {
1592 fn next_u32(&mut self) -> u32 {
1593 let executor = self.executor();
1594 executor.auditor.event(b"rand", |hasher| {
1595 hasher.update(b"next_u32");
1596 });
1597 let result = executor.rng.lock().next_u32();
1598 result
1599 }
1600
1601 fn next_u64(&mut self) -> u64 {
1602 let executor = self.executor();
1603 executor.auditor.event(b"rand", |hasher| {
1604 hasher.update(b"next_u64");
1605 });
1606 let result = executor.rng.lock().next_u64();
1607 result
1608 }
1609
1610 fn fill_bytes(&mut self, dest: &mut [u8]) {
1611 let executor = self.executor();
1612 executor.auditor.event(b"rand", |hasher| {
1613 hasher.update(b"fill_bytes");
1614 });
1615 executor.rng.lock().fill_bytes(dest);
1616 }
1617
1618 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1619 let executor = self.executor();
1620 executor.auditor.event(b"rand", |hasher| {
1621 hasher.update(b"try_fill_bytes");
1622 });
1623 let result = executor.rng.lock().try_fill_bytes(dest);
1624 result
1625 }
1626}
1627
1628impl CryptoRng for Context {}
1629
1630impl crate::Storage for Context {
1631 type Blob = <Storage as crate::Storage>::Blob;
1632
1633 async fn open_versioned(
1634 &self,
1635 partition: &str,
1636 name: &[u8],
1637 versions: std::ops::RangeInclusive<u16>,
1638 ) -> Result<(Self::Blob, u64, u16), Error> {
1639 self.storage.open_versioned(partition, name, versions).await
1640 }
1641
1642 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1643 self.storage.remove(partition, name).await
1644 }
1645
1646 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1647 self.storage.scan(partition).await
1648 }
1649}
1650
1651impl crate::BufferPooler for Context {
1652 fn network_buffer_pool(&self) -> &crate::BufferPool {
1653 &self.network_buffer_pool
1654 }
1655
1656 fn storage_buffer_pool(&self) -> &crate::BufferPool {
1657 &self.storage_buffer_pool
1658 }
1659}
1660
1661#[cfg(test)]
1662mod tests {
1663 use super::*;
1664 #[cfg(feature = "external")]
1665 use crate::FutureExt;
1666 #[cfg(feature = "external")]
1667 use crate::Spawner;
1668 use crate::{deterministic, reschedule, Blob, Metrics, Resolver, Runner as _, Storage};
1669 use commonware_macros::test_traced;
1670 #[cfg(feature = "external")]
1671 use commonware_utils::channel::mpsc;
1672 use commonware_utils::channel::oneshot;
1673 #[cfg(not(feature = "external"))]
1674 use futures::future::pending;
1675 #[cfg(not(feature = "external"))]
1676 use futures::stream::StreamExt as _;
1677 #[cfg(feature = "external")]
1678 use futures::StreamExt;
1679 use futures::{stream::FuturesUnordered, task::noop_waker};
1680
1681 async fn task(i: usize) -> usize {
1682 for _ in 0..5 {
1683 reschedule().await;
1684 }
1685 i
1686 }
1687
1688 fn run_tasks(tasks: usize, runner: deterministic::Runner) -> (String, Vec<usize>) {
1689 runner.start(|context| async move {
1690 let mut handles = FuturesUnordered::new();
1691 for i in 0..=tasks - 1 {
1692 handles.push(context.clone().spawn(move |_| task(i)));
1693 }
1694
1695 let mut outputs = Vec::new();
1696 while let Some(result) = handles.next().await {
1697 outputs.push(result.unwrap());
1698 }
1699 assert_eq!(outputs.len(), tasks);
1700 (context.auditor().state(), outputs)
1701 })
1702 }
1703
1704 fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1705 let executor = deterministic::Runner::seeded(seed);
1706 run_tasks(5, executor)
1707 }
1708
1709 #[test]
1710 fn test_same_seed_same_order() {
1711 let mut outputs = Vec::new();
1713 for seed in 0..1000 {
1714 let output = run_with_seed(seed);
1715 outputs.push(output);
1716 }
1717
1718 for seed in 0..1000 {
1720 let output = run_with_seed(seed);
1721 assert_eq!(output, outputs[seed as usize]);
1722 }
1723 }
1724
1725 #[test_traced("TRACE")]
1726 fn test_different_seeds_different_order() {
1727 let output1 = run_with_seed(12345);
1728 let output2 = run_with_seed(54321);
1729 assert_ne!(output1, output2);
1730 }
1731
1732 #[test]
1733 fn test_alarm_min_heap() {
1734 let now = SystemTime::now();
1736 let alarms = vec![
1737 Alarm {
1738 time: now + Duration::new(10, 0),
1739 waker: noop_waker(),
1740 },
1741 Alarm {
1742 time: now + Duration::new(5, 0),
1743 waker: noop_waker(),
1744 },
1745 Alarm {
1746 time: now + Duration::new(15, 0),
1747 waker: noop_waker(),
1748 },
1749 Alarm {
1750 time: now + Duration::new(5, 0),
1751 waker: noop_waker(),
1752 },
1753 ];
1754 let mut heap = BinaryHeap::new();
1755 for alarm in alarms {
1756 heap.push(alarm);
1757 }
1758
1759 let mut sorted_times = Vec::new();
1761 while let Some(alarm) = heap.pop() {
1762 sorted_times.push(alarm.time);
1763 }
1764 assert_eq!(
1765 sorted_times,
1766 vec![
1767 now + Duration::new(5, 0),
1768 now + Duration::new(5, 0),
1769 now + Duration::new(10, 0),
1770 now + Duration::new(15, 0),
1771 ]
1772 );
1773 }
1774
1775 #[test]
1776 #[should_panic(expected = "runtime timeout")]
1777 fn test_timeout() {
1778 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1779 executor.start(|context| async move {
1780 loop {
1781 context.sleep(Duration::from_secs(1)).await;
1782 }
1783 });
1784 }
1785
1786 #[test]
1787 #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1788 fn test_bad_timeout() {
1789 let cfg = Config {
1790 timeout: Some(Duration::default()),
1791 cycle: Duration::default(),
1792 ..Config::default()
1793 };
1794 deterministic::Runner::new(cfg);
1795 }
1796
1797 #[test]
1798 #[should_panic(
1799 expected = "cycle duration must be greater than or equal to system time precision"
1800 )]
1801 fn test_bad_cycle() {
1802 let cfg = Config {
1803 cycle: SYSTEM_TIME_PRECISION - Duration::from_nanos(1),
1804 ..Config::default()
1805 };
1806 deterministic::Runner::new(cfg);
1807 }
1808
1809 #[test]
1810 fn test_recover_synced_storage_persists() {
1811 let executor1 = deterministic::Runner::default();
1813 let partition = "test_partition";
1814 let name = b"test_blob";
1815 let data = b"Hello, world!";
1816
1817 let (state, checkpoint) = executor1.start_and_recover(|context| async move {
1819 let (blob, _) = context.open(partition, name).await.unwrap();
1820 blob.write_at(0, data).await.unwrap();
1821 blob.sync().await.unwrap();
1822 context.auditor().state()
1823 });
1824
1825 assert_eq!(state, checkpoint.auditor.state());
1827
1828 let executor = Runner::from(checkpoint);
1830 executor.start(|context| async move {
1831 let (blob, len) = context.open(partition, name).await.unwrap();
1832 assert_eq!(len, data.len() as u64);
1833 let read = blob.read_at(0, data.len()).await.unwrap();
1834 assert_eq!(read.coalesce(), data);
1835 });
1836 }
1837
1838 #[test]
1839 #[should_panic(expected = "goodbye")]
1840 fn test_recover_panic_handling() {
1841 let executor1 = deterministic::Runner::default();
1843 let (_, checkpoint) = executor1.start_and_recover(|_| async move {
1844 reschedule().await;
1845 });
1846
1847 let executor = Runner::from(checkpoint);
1849 executor.start(|_| async move {
1850 panic!("goodbye");
1851 });
1852 }
1853
1854 #[test]
1855 fn test_recover_unsynced_storage_does_not_persist() {
1856 let executor = deterministic::Runner::default();
1858 let partition = "test_partition";
1859 let name = b"test_blob";
1860 let data = b"Hello, world!";
1861
1862 let (_, checkpoint) = executor.start_and_recover(|context| async move {
1864 let context = context.clone();
1865 let (blob, _) = context.open(partition, name).await.unwrap();
1866 blob.write_at(0, data).await.unwrap();
1867 });
1868
1869 let executor = Runner::from(checkpoint);
1871
1872 executor.start(|context| async move {
1874 let (_, len) = context.open(partition, name).await.unwrap();
1875 assert_eq!(len, 0);
1876 });
1877 }
1878
1879 #[test]
1880 fn test_recover_dns_mappings_persist() {
1881 let executor = deterministic::Runner::default();
1883 let host = "example.com";
1884 let addrs = vec![
1885 IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 1)),
1886 IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 2)),
1887 ];
1888
1889 let (state, checkpoint) = executor.start_and_recover({
1891 let addrs = addrs.clone();
1892 |context| async move {
1893 context.resolver_register(host, Some(addrs));
1894 context.auditor().state()
1895 }
1896 });
1897
1898 assert_eq!(state, checkpoint.auditor.state());
1900
1901 let executor = Runner::from(checkpoint);
1903 executor.start(move |context| async move {
1904 let resolved = context.resolve(host).await.unwrap();
1905 assert_eq!(resolved, addrs);
1906 });
1907 }
1908
1909 #[test]
1910 fn test_recover_time_persists() {
1911 let executor = deterministic::Runner::default();
1913 let duration_to_sleep = Duration::from_secs(10);
1914
1915 let (time_before_recovery, checkpoint) = executor.start_and_recover(|context| async move {
1917 context.sleep(duration_to_sleep).await;
1918 context.current()
1919 });
1920
1921 assert_eq!(
1923 time_before_recovery.duration_since(UNIX_EPOCH).unwrap(),
1924 duration_to_sleep
1925 );
1926
1927 let executor2 = Runner::from(checkpoint);
1929 executor2.start(move |context| async move {
1930 assert_eq!(context.current(), time_before_recovery);
1931
1932 context.sleep(duration_to_sleep).await;
1934 assert_eq!(
1935 context.current().duration_since(UNIX_EPOCH).unwrap(),
1936 duration_to_sleep * 2
1937 );
1938 });
1939 }
1940
1941 #[test]
1942 #[should_panic(expected = "executor still has weak references")]
1943 fn test_context_return() {
1944 let executor = deterministic::Runner::default();
1946
1947 let context = executor.start(|context| async move {
1949 context
1951 });
1952
1953 drop(context);
1955 }
1956
1957 #[test]
1958 fn test_default_time_zero() {
1959 let executor = deterministic::Runner::default();
1961
1962 executor.start(|context| async move {
1963 assert_eq!(
1965 context.current().duration_since(UNIX_EPOCH).unwrap(),
1966 Duration::ZERO
1967 );
1968 });
1969 }
1970
1971 #[test]
1972 fn test_start_time() {
1973 let executor_default = deterministic::Runner::default();
1975 executor_default.start(|context| async move {
1976 assert_eq!(context.current(), UNIX_EPOCH);
1977 });
1978
1979 let start_time = UNIX_EPOCH + Duration::from_secs(100);
1981 let cfg = Config::default().with_start_time(start_time);
1982 let executor = deterministic::Runner::new(cfg);
1983
1984 executor.start(move |context| async move {
1985 assert_eq!(context.current(), start_time);
1987 });
1988 }
1989
1990 #[cfg(not(feature = "external"))]
1991 #[test]
1992 #[should_panic(expected = "runtime stalled")]
1993 fn test_stall() {
1994 let executor = deterministic::Runner::default();
1996
1997 executor.start(|_| async move {
1999 pending::<()>().await;
2000 });
2001 }
2002
2003 #[cfg(not(feature = "external"))]
2004 #[test]
2005 #[should_panic(expected = "runtime stalled")]
2006 fn test_external_simulated() {
2007 let executor = deterministic::Runner::default();
2009
2010 let (tx, rx) = oneshot::channel();
2012 std::thread::spawn(move || {
2013 std::thread::sleep(Duration::from_secs(1));
2014 tx.send(()).unwrap();
2015 });
2016
2017 executor.start(|_| async move {
2019 rx.await.unwrap();
2020 });
2021 }
2022
2023 #[cfg(feature = "external")]
2024 #[test]
2025 fn test_external_realtime() {
2026 let executor = deterministic::Runner::default();
2028
2029 let (tx, rx) = oneshot::channel();
2031 std::thread::spawn(move || {
2032 std::thread::sleep(Duration::from_secs(1));
2033 tx.send(()).unwrap();
2034 });
2035
2036 executor.start(|_| async move {
2038 rx.await.unwrap();
2039 });
2040 }
2041
2042 #[cfg(feature = "external")]
2043 #[test]
2044 fn test_external_realtime_variable() {
2045 let executor = deterministic::Runner::default();
2047
2048 executor.start(|context| async move {
2050 let start_real = SystemTime::now();
2052 let start_sim = context.current();
2053 let (first_tx, first_rx) = oneshot::channel();
2054 let (second_tx, second_rx) = oneshot::channel();
2055 let (results_tx, mut results_rx) = mpsc::channel(2);
2056
2057 let first_wait = Duration::from_secs(1);
2059 std::thread::spawn(move || {
2060 std::thread::sleep(first_wait);
2061 first_tx.send(()).unwrap();
2062 });
2063
2064 std::thread::spawn(move || {
2066 std::thread::sleep(Duration::ZERO);
2067 second_tx.send(()).unwrap();
2068 });
2069
2070 let first = context.clone().spawn({
2072 let results_tx = results_tx.clone();
2073 move |context| async move {
2074 first_rx.pace(&context, Duration::ZERO).await.unwrap();
2075 let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
2076 assert!(elapsed_real > first_wait);
2077 let elapsed_sim = context.current().duration_since(start_sim).unwrap();
2078 assert!(elapsed_sim < first_wait);
2079 results_tx.send(1).await.unwrap();
2080 }
2081 });
2082
2083 let second = context.clone().spawn(move |context| async move {
2085 second_rx.pace(&context, first_wait).await.unwrap();
2086 let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
2087 assert!(elapsed_real >= first_wait);
2088 let elapsed_sim = context.current().duration_since(start_sim).unwrap();
2089 assert!(elapsed_sim >= first_wait);
2090 results_tx.send(2).await.unwrap();
2091 });
2092
2093 second.await.unwrap();
2095 first.await.unwrap();
2096
2097 let mut results = Vec::new();
2099 for _ in 0..2 {
2100 results.push(results_rx.recv().await.unwrap());
2101 }
2102 assert_eq!(results, vec![1, 2]);
2103 });
2104 }
2105
2106 #[cfg(not(feature = "external"))]
2107 #[test]
2108 fn test_simulated_skip() {
2109 let executor = deterministic::Runner::default();
2111
2112 executor.start(|context| async move {
2114 context.sleep(Duration::from_secs(1)).await;
2115
2116 let metrics = context.encode();
2118 let iterations = metrics
2119 .lines()
2120 .find_map(|line| {
2121 line.strip_prefix("runtime_iterations_total ")
2122 .and_then(|value| value.trim().parse::<u64>().ok())
2123 })
2124 .expect("missing runtime_iterations_total metric");
2125 assert!(iterations < 10);
2126 });
2127 }
2128
2129 #[cfg(feature = "external")]
2130 #[test]
2131 fn test_realtime_no_skip() {
2132 let executor = deterministic::Runner::default();
2134
2135 executor.start(|context| async move {
2137 context.sleep(Duration::from_secs(1)).await;
2138
2139 let metrics = context.encode();
2141 let iterations = metrics
2142 .lines()
2143 .find_map(|line| {
2144 line.strip_prefix("runtime_iterations_total ")
2145 .and_then(|value| value.trim().parse::<u64>().ok())
2146 })
2147 .expect("missing runtime_iterations_total metric");
2148 assert!(iterations > 500);
2149 });
2150 }
2151
2152 #[test]
2153 #[should_panic(expected = "label must start with [a-zA-Z]")]
2154 fn test_metrics_label_empty() {
2155 let executor = deterministic::Runner::default();
2156 executor.start(|context| async move {
2157 context.with_label("");
2158 });
2159 }
2160
2161 #[test]
2162 #[should_panic(expected = "label must start with [a-zA-Z]")]
2163 fn test_metrics_label_invalid_first_char() {
2164 let executor = deterministic::Runner::default();
2165 executor.start(|context| async move {
2166 context.with_label("1invalid");
2167 });
2168 }
2169
2170 #[test]
2171 #[should_panic(expected = "label must only contain [a-zA-Z0-9_]")]
2172 fn test_metrics_label_invalid_char() {
2173 let executor = deterministic::Runner::default();
2174 executor.start(|context| async move {
2175 context.with_label("invalid-label");
2176 });
2177 }
2178
2179 #[test]
2180 #[should_panic(expected = "using runtime label is not allowed")]
2181 fn test_metrics_label_reserved_prefix() {
2182 let executor = deterministic::Runner::default();
2183 executor.start(|context| async move {
2184 context.with_label(METRICS_PREFIX);
2185 });
2186 }
2187
2188 #[test]
2189 #[should_panic(expected = "duplicate attribute key: epoch")]
2190 fn test_metrics_duplicate_attribute_panics() {
2191 let executor = deterministic::Runner::default();
2192 executor.start(|context| async move {
2193 let _ = context
2194 .with_label("test")
2195 .with_attribute("epoch", "old")
2196 .with_attribute("epoch", "new");
2197 });
2198 }
2199
2200 #[test]
2201 fn test_storage_fault_injection_and_recovery() {
2202 let cfg = deterministic::Config::default().with_storage_fault_config(FaultConfig {
2204 sync_rate: Some(1.0),
2205 ..Default::default()
2206 });
2207
2208 let (result, checkpoint) =
2209 deterministic::Runner::new(cfg).start_and_recover(|ctx| async move {
2210 let (blob, _) = ctx.open("test_fault", b"blob").await.unwrap();
2211 blob.write_at(0, b"data".to_vec()).await.unwrap();
2212 blob.sync().await });
2214
2215 assert!(result.is_err());
2217
2218 deterministic::Runner::from(checkpoint).start(|ctx| async move {
2220 *ctx.storage_fault_config().write() = FaultConfig::default();
2222
2223 let (blob, len) = ctx.open("test_fault", b"blob").await.unwrap();
2225 assert_eq!(len, 0, "unsynced data should be lost after recovery");
2226
2227 blob.write_at(0, b"recovered".to_vec()).await.unwrap();
2229 blob.sync()
2230 .await
2231 .expect("sync should succeed with faults disabled");
2232
2233 let read_buf = blob.read_at(0, 9).await.unwrap();
2235 assert_eq!(read_buf.coalesce(), b"recovered");
2236 });
2237 }
2238
2239 #[test]
2240 fn test_storage_fault_dynamic_config() {
2241 let executor = deterministic::Runner::default();
2242 executor.start(|ctx| async move {
2243 let (blob, _) = ctx.open("test_dynamic", b"blob").await.unwrap();
2244
2245 blob.write_at(0, b"initial".to_vec()).await.unwrap();
2247 blob.sync().await.expect("initial sync should succeed");
2248
2249 let storage_fault_cfg = ctx.storage_fault_config();
2251 storage_fault_cfg.write().sync_rate = Some(1.0);
2252
2253 blob.write_at(0, b"updated".to_vec()).await.unwrap();
2255 let result = blob.sync().await;
2256 assert!(result.is_err(), "sync should fail with faults enabled");
2257
2258 storage_fault_cfg.write().sync_rate = Some(0.0);
2260
2261 blob.sync()
2263 .await
2264 .expect("sync should succeed with faults disabled");
2265 });
2266 }
2267
2268 #[test]
2269 fn test_storage_fault_determinism() {
2270 fn run_with_seed(seed: u64) -> Vec<bool> {
2272 let cfg = deterministic::Config::default()
2273 .with_seed(seed)
2274 .with_storage_fault_config(FaultConfig {
2275 open_rate: Some(0.5),
2276 ..Default::default()
2277 });
2278
2279 let runner = deterministic::Runner::new(cfg);
2280 runner.start(|ctx| async move {
2281 let mut results = Vec::new();
2282 for i in 0..20 {
2283 let name = format!("blob{i}");
2284 let result = ctx.open("test_determinism", name.as_bytes()).await;
2285 results.push(result.is_ok());
2286 }
2287 results
2288 })
2289 }
2290
2291 let results1 = run_with_seed(12345);
2292 let results2 = run_with_seed(12345);
2293 assert_eq!(
2294 results1, results2,
2295 "same seed should produce same failure pattern"
2296 );
2297
2298 let results3 = run_with_seed(99999);
2299 assert_ne!(
2300 results1, results3,
2301 "different seeds should produce different patterns"
2302 );
2303 }
2304
2305 #[test]
2306 fn test_storage_fault_determinism_multi_task() {
2307 fn run_with_seed(seed: u64) -> Vec<u32> {
2310 let cfg = deterministic::Config::default()
2311 .with_seed(seed)
2312 .with_storage_fault_config(FaultConfig {
2313 open_rate: Some(0.5),
2314 write_rate: Some(0.3),
2315 sync_rate: Some(0.2),
2316 ..Default::default()
2317 });
2318
2319 let runner = deterministic::Runner::new(cfg);
2320 runner.start(|ctx| async move {
2321 let mut handles = Vec::new();
2323 for i in 0..5 {
2324 let ctx = ctx.clone();
2325 handles.push(ctx.spawn(move |ctx| async move {
2326 let mut successes = 0u32;
2327 for j in 0..4 {
2328 let name = format!("task{i}_blob{j}");
2329 if let Ok((blob, _)) = ctx.open("partition", name.as_bytes()).await {
2330 successes += 1;
2331 if blob.write_at(0, b"data".to_vec()).await.is_ok() {
2332 successes += 1;
2333 }
2334 if blob.sync().await.is_ok() {
2335 successes += 1;
2336 }
2337 }
2338 }
2339 successes
2340 }));
2341 }
2342
2343 let mut results = Vec::new();
2345 for handle in handles {
2346 results.push(handle.await.unwrap());
2347 }
2348 results
2349 })
2350 }
2351
2352 let results1 = run_with_seed(42);
2353 let results2 = run_with_seed(42);
2354 assert_eq!(
2355 results1, results2,
2356 "same seed should produce same multi-task pattern"
2357 );
2358
2359 let results3 = run_with_seed(99999);
2360 assert_ne!(
2361 results1, results3,
2362 "different seeds should produce different patterns"
2363 );
2364 }
2365}