1pub use crate::storage::faulty::Config as FaultConfig;
46use crate::{
47 child_label,
48 network::{
49 audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
50 metered::Network as MeteredNetwork,
51 },
52 prefixed_name,
53 storage::{
54 audited::Storage as AuditedStorage, faulty::Storage as FaultyStorage,
55 memory::Storage as MemStorage, metered::Storage as MeteredStorage,
56 },
57 telemetry::metrics::{
58 add_attribute, raw, task::Label, validate_label, Counter, CounterFamily, GaugeFamily,
59 Metric, Register, Registered, Registry,
60 },
61 utils::{
62 signal::{Signal, Stopper},
63 supervision::Tree,
64 Panicker,
65 },
66 BufferPool, BufferPoolConfig, Clock, Error, Execution, Handle, ListenerOf, Name, Panicked,
67 Spawner as _, Supervisor as _, METRICS_PREFIX,
68};
69#[cfg(feature = "external")]
70use crate::{Blocker, Pacer};
71use commonware_codec::Encode;
72use commonware_formatting::hex;
73use commonware_macros::select;
74use commonware_parallel::ThreadPool;
75use commonware_utils::{
76 sync::{Mutex, RwLock},
77 time::SYSTEM_TIME_PRECISION,
78 SystemTimeExt,
79};
80#[cfg(feature = "external")]
81use futures::task::noop_waker;
82use futures::{
83 future::Either,
84 task::{waker, ArcWake},
85 Future,
86};
87use governor::clock::{Clock as GClock, ReasonablyRealtime};
88#[cfg(feature = "external")]
89use pin_project::pin_project;
90use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
91use rand_core::CryptoRngCore;
92use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
93use sha2::{Digest as _, Sha256};
94use std::{
95 collections::{BTreeMap, BinaryHeap, HashMap},
96 mem::{replace, take},
97 net::{IpAddr, SocketAddr},
98 num::NonZeroUsize,
99 panic::{catch_unwind, resume_unwind, AssertUnwindSafe},
100 pin::Pin,
101 sync::{Arc, Weak},
102 task::{self, Poll, Waker},
103 time::{Duration, SystemTime, UNIX_EPOCH},
104};
105use tracing::{info_span, trace, Instrument};
106use tracing_opentelemetry::OpenTelemetrySpanExt;
107
108#[derive(Debug)]
109struct Metrics {
110 iterations: Counter,
111 tasks_spawned: CounterFamily<Label>,
112 tasks_running: GaugeFamily<Label>,
113 task_polls: CounterFamily<Label>,
114}
115
116impl Metrics {
117 pub fn init(registry: &mut impl Register) -> Self {
118 Self {
119 iterations: registry.register(
120 "iterations",
121 "Total number of iterations",
122 raw::Counter::default(),
123 ),
124 tasks_spawned: registry.register(
125 "tasks_spawned",
126 "Total number of tasks spawned",
127 raw::Family::default(),
128 ),
129 tasks_running: registry.register(
130 "tasks_running",
131 "Number of tasks currently running",
132 raw::Family::default(),
133 ),
134 task_polls: registry.register(
135 "task_polls",
136 "Total number of task polls",
137 raw::Family::default(),
138 ),
139 }
140 }
141}
142
143type Digest = [u8; 32];
145
146pub struct Auditor {
148 digest: Mutex<Digest>,
149}
150
151impl Default for Auditor {
152 fn default() -> Self {
153 Self {
154 digest: Digest::default().into(),
155 }
156 }
157}
158
159impl Auditor {
160 pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
164 where
165 F: FnOnce(&mut Sha256),
166 {
167 let mut digest = self.digest.lock();
168
169 let mut hasher = Sha256::new();
170 hasher.update(digest.as_ref());
171 hasher.update(label);
172 payload(&mut hasher);
173
174 *digest = hasher.finalize().into();
175 }
176
177 pub fn state(&self) -> String {
182 let hash = self.digest.lock();
183 hex(hash.as_ref())
184 }
185}
186
187pub type BoxDynRng = Box<dyn CryptoRngCore + Send + 'static>;
189
190pub struct Config {
192 rng: BoxDynRng,
194
195 cycle: Duration,
198
199 start_time: SystemTime,
201
202 timeout: Option<Duration>,
204
205 catch_panics: bool,
207
208 storage_fault_cfg: FaultConfig,
211
212 network_buffer_pool_cfg: BufferPoolConfig,
214
215 storage_buffer_pool_cfg: BufferPoolConfig,
217}
218
219impl Config {
220 pub fn new() -> Self {
222 cfg_if::cfg_if! {
223 if #[cfg(miri)] {
224 let network_buffer_pool_cfg = BufferPoolConfig::for_network()
226 .with_max_per_class(commonware_utils::NZU32!(32))
227 .with_thread_cache_disabled();
228 let storage_buffer_pool_cfg = BufferPoolConfig::for_storage()
229 .with_max_per_class(commonware_utils::NZU32!(32))
230 .with_thread_cache_disabled();
231 } else {
232 let network_buffer_pool_cfg =
233 BufferPoolConfig::for_network().with_thread_cache_disabled();
234 let storage_buffer_pool_cfg =
235 BufferPoolConfig::for_storage().with_thread_cache_disabled();
236 }
237 }
238
239 Self {
240 rng: Box::new(StdRng::seed_from_u64(42)),
241 cycle: Duration::from_millis(1),
242 start_time: UNIX_EPOCH,
243 timeout: None,
244 catch_panics: false,
245 storage_fault_cfg: FaultConfig::default(),
246 network_buffer_pool_cfg,
247 storage_buffer_pool_cfg,
248 }
249 }
250
251 pub fn with_seed(self, seed: u64) -> Self {
254 self.with_rng(Box::new(StdRng::seed_from_u64(seed)))
255 }
256
257 pub fn with_rng(mut self, rng: BoxDynRng) -> Self {
263 self.rng = rng;
264 self
265 }
266
267 pub const fn with_cycle(mut self, cycle: Duration) -> Self {
269 self.cycle = cycle;
270 self
271 }
272 pub const fn with_start_time(mut self, start_time: SystemTime) -> Self {
274 self.start_time = start_time;
275 self
276 }
277 pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
279 self.timeout = timeout;
280 self
281 }
282 pub const fn with_catch_panics(mut self, catch_panics: bool) -> Self {
284 self.catch_panics = catch_panics;
285 self
286 }
287 pub const fn with_network_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
289 self.network_buffer_pool_cfg = cfg;
290 self
291 }
292 pub const fn with_storage_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
294 self.storage_buffer_pool_cfg = cfg;
295 self
296 }
297
298 pub const fn with_storage_fault_config(mut self, faults: FaultConfig) -> Self {
304 self.storage_fault_cfg = faults;
305 self
306 }
307
308 pub const fn cycle(&self) -> Duration {
311 self.cycle
312 }
313 pub const fn start_time(&self) -> SystemTime {
315 self.start_time
316 }
317 pub const fn timeout(&self) -> Option<Duration> {
319 self.timeout
320 }
321 pub const fn catch_panics(&self) -> bool {
323 self.catch_panics
324 }
325 pub const fn network_buffer_pool_config(&self) -> &BufferPoolConfig {
327 &self.network_buffer_pool_cfg
328 }
329 pub const fn storage_buffer_pool_config(&self) -> &BufferPoolConfig {
331 &self.storage_buffer_pool_cfg
332 }
333
334 pub fn assert(&self) {
336 assert!(
337 self.cycle != Duration::default() || self.timeout.is_none(),
338 "cycle duration must be non-zero when timeout is set",
339 );
340 assert!(
341 self.cycle >= SYSTEM_TIME_PRECISION,
342 "cycle duration must be greater than or equal to system time precision"
343 );
344 assert!(
345 self.start_time >= UNIX_EPOCH,
346 "start time must be greater than or equal to unix epoch"
347 );
348 }
349}
350
351impl Default for Config {
352 fn default() -> Self {
353 Self::new()
354 }
355}
356
357pub struct Executor {
359 registry: Registry,
360 cycle: Duration,
361 deadline: Option<SystemTime>,
362 metrics: Arc<Metrics>,
363 auditor: Arc<Auditor>,
364 rng: Arc<Mutex<BoxDynRng>>,
365 time: Mutex<SystemTime>,
366 tasks: Arc<Tasks>,
367 sleeping: Mutex<BinaryHeap<Alarm>>,
368 shutdown: Mutex<Stopper>,
369 panicker: Panicker,
370 dns: Mutex<HashMap<String, Vec<IpAddr>>>,
371}
372
373impl Executor {
374 fn advance_time(&self) -> SystemTime {
379 #[cfg(feature = "external")]
380 std::thread::sleep(self.cycle);
381
382 let mut time = self.time.lock();
383 *time = time
384 .checked_add(self.cycle)
385 .expect("executor time overflowed");
386 let now = *time;
387 trace!(now = now.epoch_millis(), "time advanced");
388 now
389 }
390
391 fn skip_idle_time(&self, current: SystemTime) -> SystemTime {
396 if cfg!(feature = "external") || self.tasks.ready() != 0 {
397 return current;
398 }
399
400 let mut skip_until = None;
401 {
402 let sleeping = self.sleeping.lock();
403 if let Some(next) = sleeping.peek() {
404 if next.time > current {
405 skip_until = Some(next.time);
406 }
407 }
408 }
409
410 skip_until.map_or(current, |deadline| {
411 let mut time = self.time.lock();
412 *time = deadline;
413 let now = *time;
414 trace!(now = now.epoch_millis(), "time skipped");
415 now
416 })
417 }
418
419 fn wake_ready_sleepers(&self, current: SystemTime) {
421 let mut sleeping = self.sleeping.lock();
422 while let Some(next) = sleeping.peek() {
423 if next.time <= current {
424 let sleeper = sleeping.pop().unwrap();
425 sleeper.waker.wake();
426 } else {
427 break;
428 }
429 }
430 }
431
432 fn assert_liveness(&self) {
436 if cfg!(feature = "external") || self.tasks.ready() != 0 {
437 return;
438 }
439
440 panic!("runtime stalled");
441 }
442}
443
444pub struct Checkpoint {
448 cycle: Duration,
449 deadline: Option<SystemTime>,
450 auditor: Arc<Auditor>,
451 rng: Arc<Mutex<BoxDynRng>>,
452 time: Mutex<SystemTime>,
453 storage: Arc<Storage>,
454 dns: Mutex<HashMap<String, Vec<IpAddr>>>,
455 catch_panics: bool,
456 network_buffer_pool_cfg: BufferPoolConfig,
457 storage_buffer_pool_cfg: BufferPoolConfig,
458}
459
460impl Checkpoint {
461 pub fn auditor(&self) -> Arc<Auditor> {
463 self.auditor.clone()
464 }
465}
466
467#[allow(clippy::large_enum_variant)]
468enum State {
469 Config(Config),
470 Checkpoint(Checkpoint),
471}
472
473pub struct Runner {
475 state: State,
476}
477
478impl From<Config> for Runner {
479 fn from(cfg: Config) -> Self {
480 Self::new(cfg)
481 }
482}
483
484impl From<Checkpoint> for Runner {
485 fn from(checkpoint: Checkpoint) -> Self {
486 Self {
487 state: State::Checkpoint(checkpoint),
488 }
489 }
490}
491
492impl Runner {
493 pub fn new(cfg: Config) -> Self {
495 cfg.assert();
497 Self {
498 state: State::Config(cfg),
499 }
500 }
501
502 pub fn seeded(seed: u64) -> Self {
505 Self::new(Config::default().with_seed(seed))
506 }
507
508 pub fn timed(timeout: Duration) -> Self {
511 let cfg = Config {
512 timeout: Some(timeout),
513 ..Config::default()
514 };
515 Self::new(cfg)
516 }
517
518 pub fn start_and_recover<F, Fut>(self, f: F) -> (Fut::Output, Checkpoint)
521 where
522 F: FnOnce(Context) -> Fut,
523 Fut: Future,
524 {
525 let (context, executor, panicked) = match self.state {
527 State::Config(config) => Context::new(config),
528 State::Checkpoint(checkpoint) => Context::recover(checkpoint),
529 };
530
531 let storage = context.storage.clone();
533 let network_buffer_pool_cfg = context.network_buffer_pool.config().clone();
534 let storage_buffer_pool_cfg = context.storage_buffer_pool.config().clone();
535 let mut root = Box::pin(panicked.interrupt(f(context)));
536
537 Tasks::register_root(&executor.tasks);
539
540 let result = catch_unwind(AssertUnwindSafe(|| loop {
543 {
545 let current = executor.time.lock();
546 if let Some(deadline) = executor.deadline {
547 if *current >= deadline {
548 drop(current);
549 panic!("runtime timeout");
550 }
551 }
552 }
553
554 let mut queue = executor.tasks.drain();
556
557 if queue.len() > 1 {
559 let mut rng = executor.rng.lock();
560 queue.shuffle(&mut *rng);
561 }
562
563 trace!(
569 iter = executor.metrics.iterations.get(),
570 tasks = queue.len(),
571 "starting loop"
572 );
573 let mut output = None;
574 for id in queue {
575 let Some(task) = executor.tasks.get(id) else {
577 trace!(id, "skipping missing task");
578 continue;
579 };
580
581 executor.auditor.event(b"process_task", |hasher| {
583 hasher.update(task.id.to_be_bytes());
584 hasher.update(task.label.name().as_bytes());
585 });
586 executor.metrics.task_polls.get_or_create(&task.label).inc();
587 trace!(id, "processing task");
588
589 let waker = waker(Arc::new(TaskWaker {
591 id,
592 tasks: Arc::downgrade(&executor.tasks),
593 }));
594 let mut cx = task::Context::from_waker(&waker);
595
596 match &task.mode {
598 Mode::Root => {
599 if let Poll::Ready(result) = root.as_mut().poll(&mut cx) {
601 trace!(id, "root task is complete");
602 output = Some(result);
603 break;
604 }
605 }
606 Mode::Work(future) => {
607 let mut fut_opt = future.lock();
609 let Some(fut) = fut_opt.as_mut() else {
610 trace!(id, "skipping already complete task");
611
612 executor.tasks.remove(id);
614 continue;
615 };
616
617 if fut.as_mut().poll(&mut cx).is_ready() {
619 trace!(id, "task is complete");
620
621 executor.tasks.remove(id);
623 *fut_opt = None;
624 continue;
625 }
626 }
627 }
628
629 trace!(id, "task is still pending");
631 }
632
633 if let Some(output) = output {
635 break output;
636 }
637
638 let mut current = executor.advance_time();
640 current = executor.skip_idle_time(current);
641
642 executor.wake_ready_sleepers(current);
644 executor.assert_liveness();
645
646 executor.metrics.iterations.inc();
648 }));
649
650 executor.sleeping.lock().clear(); let tasks = executor.tasks.clear();
658 for task in tasks {
659 let Mode::Work(future) = &task.mode else {
660 continue;
661 };
662 *future.lock() = None;
663 }
664
665 drop(root);
669
670 assert!(
673 Arc::weak_count(&executor) == 0,
674 "executor still has weak references"
675 );
676
677 let output = match result {
679 Ok(output) => output,
680 Err(payload) => resume_unwind(payload),
681 };
682
683 let executor = Arc::into_inner(executor).expect("executor still has strong references");
685
686 let checkpoint = Checkpoint {
688 cycle: executor.cycle,
689 deadline: executor.deadline,
690 auditor: executor.auditor,
691 rng: executor.rng,
692 time: executor.time,
693 storage,
694 dns: executor.dns,
695 catch_panics: executor.panicker.catch(),
696 network_buffer_pool_cfg,
697 storage_buffer_pool_cfg,
698 };
699
700 (output, checkpoint)
701 }
702}
703
704impl Default for Runner {
705 fn default() -> Self {
706 Self::new(Config::default())
707 }
708}
709
710impl crate::Runner for Runner {
711 type Context = Context;
712
713 fn start<F, Fut>(self, f: F) -> Fut::Output
714 where
715 F: FnOnce(Self::Context) -> Fut,
716 Fut: Future,
717 {
718 let (output, _) = self.start_and_recover(f);
719 output
720 }
721}
722
723enum Mode {
725 Root,
726 Work(Mutex<Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>>),
727}
728
729struct Task {
731 id: u128,
732 label: Label,
733
734 mode: Mode,
735}
736
737struct TaskWaker {
739 id: u128,
740
741 tasks: Weak<Tasks>,
742}
743
744impl ArcWake for TaskWaker {
745 fn wake_by_ref(arc_self: &Arc<Self>) {
746 if let Some(tasks) = arc_self.tasks.upgrade() {
751 tasks.queue(arc_self.id);
752 }
753 }
754}
755
756struct Tasks {
758 counter: Mutex<u128>,
760 ready: Mutex<Vec<u128>>,
762 running: Mutex<BTreeMap<u128, Arc<Task>>>,
764}
765
766impl Tasks {
767 const fn new() -> Self {
769 Self {
770 counter: Mutex::new(0),
771 ready: Mutex::new(Vec::new()),
772 running: Mutex::new(BTreeMap::new()),
773 }
774 }
775
776 fn increment(&self) -> u128 {
778 let mut counter = self.counter.lock();
779 let old = *counter;
780 *counter = counter.checked_add(1).expect("task counter overflow");
781 old
782 }
783
784 fn register_root(arc_self: &Arc<Self>) {
788 let id = arc_self.increment();
789 let task = Arc::new(Task {
790 id,
791 label: Label::root(),
792 mode: Mode::Root,
793 });
794 arc_self.register(id, task);
795 }
796
797 fn register_work(
799 arc_self: &Arc<Self>,
800 label: Label,
801 future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
802 ) {
803 let id = arc_self.increment();
804 let task = Arc::new(Task {
805 id,
806 label,
807 mode: Mode::Work(Mutex::new(Some(future))),
808 });
809 arc_self.register(id, task);
810 }
811
812 fn register(&self, id: u128, task: Arc<Task>) {
814 self.running.lock().insert(id, task);
816
817 self.queue(id);
819 }
820
821 fn queue(&self, id: u128) {
823 let mut ready = self.ready.lock();
824 ready.push(id);
825 }
826
827 fn drain(&self) -> Vec<u128> {
829 let mut queue = self.ready.lock();
830 let len = queue.len();
831 replace(&mut *queue, Vec::with_capacity(len))
832 }
833
834 fn ready(&self) -> usize {
836 self.ready.lock().len()
837 }
838
839 fn get(&self, id: u128) -> Option<Arc<Task>> {
844 let running = self.running.lock();
845 running.get(&id).cloned()
846 }
847
848 fn remove(&self, id: u128) {
850 self.running.lock().remove(&id);
851 }
852
853 fn clear(&self) -> Vec<Arc<Task>> {
855 self.ready.lock().clear();
857
858 let running: BTreeMap<u128, Arc<Task>> = {
860 let mut running = self.running.lock();
861 take(&mut *running)
862 };
863 running.into_values().collect()
864 }
865}
866
867type Network = MeteredNetwork<AuditedNetwork<DeterministicNetwork>>;
868type Storage = MeteredStorage<AuditedStorage<FaultyStorage<MemStorage>>>;
869
870pub struct Context {
874 name: String,
875 attributes: Vec<(String, String)>,
876 executor: Weak<Executor>,
877 network: Arc<Network>,
878 storage: Arc<Storage>,
879 network_buffer_pool: BufferPool,
880 storage_buffer_pool: BufferPool,
881 tree: Arc<Tree>,
882 execution: Execution,
883 traced: bool,
884}
885
886impl Context {
887 fn new(cfg: Config) -> (Self, Arc<Executor>, Panicked) {
888 let mut registry = Registry::new();
890 let mut runtime_registry = registry.sub_registry(METRICS_PREFIX);
891
892 let metrics = Arc::new(Metrics::init(&mut runtime_registry));
894 let start_time = cfg.start_time;
895 let deadline = cfg
896 .timeout
897 .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
898 let auditor = Arc::new(Auditor::default());
899
900 let rng = Arc::new(Mutex::new(cfg.rng));
902
903 let network_buffer_pool = BufferPool::new(
905 cfg.network_buffer_pool_cfg.clone(),
906 &mut runtime_registry.sub_registry("network_buffer_pool"),
907 );
908 let storage_buffer_pool = BufferPool::new(
909 cfg.storage_buffer_pool_cfg.clone(),
910 &mut runtime_registry.sub_registry("storage_buffer_pool"),
911 );
912
913 let storage_fault_config = Arc::new(RwLock::new(cfg.storage_fault_cfg));
915 let storage = MeteredStorage::new(
916 AuditedStorage::new(
917 FaultyStorage::new(
918 MemStorage::new(storage_buffer_pool.clone()),
919 rng.clone(),
920 storage_fault_config,
921 ),
922 auditor.clone(),
923 ),
924 &mut runtime_registry,
925 );
926
927 let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
929 let network = MeteredNetwork::new(network, &mut runtime_registry);
930
931 let (panicker, panicked) = Panicker::new(cfg.catch_panics);
933
934 let executor = Arc::new(Executor {
935 registry,
936 cycle: cfg.cycle,
937 deadline,
938 metrics,
939 auditor,
940 rng,
941 time: Mutex::new(start_time),
942 tasks: Arc::new(Tasks::new()),
943 sleeping: Mutex::new(BinaryHeap::new()),
944 shutdown: Mutex::new(Stopper::default()),
945 panicker,
946 dns: Mutex::new(HashMap::new()),
947 });
948
949 (
950 Self {
951 name: String::new(),
952 attributes: Vec::new(),
953 executor: Arc::downgrade(&executor),
954 network: Arc::new(network),
955 storage: Arc::new(storage),
956 network_buffer_pool,
957 storage_buffer_pool,
958 tree: Tree::root(),
959 execution: Execution::default(),
960 traced: false,
961 },
962 executor,
963 panicked,
964 )
965 }
966
967 fn recover(checkpoint: Checkpoint) -> (Self, Arc<Executor>, Panicked) {
979 let mut registry = Registry::new();
981 let mut runtime_registry = registry.sub_registry(METRICS_PREFIX);
982 let metrics = Arc::new(Metrics::init(&mut runtime_registry));
983
984 let network =
986 AuditedNetwork::new(DeterministicNetwork::default(), checkpoint.auditor.clone());
987 let network = MeteredNetwork::new(network, &mut runtime_registry);
988
989 let network_buffer_pool = BufferPool::new(
991 checkpoint.network_buffer_pool_cfg.clone(),
992 &mut runtime_registry.sub_registry("network_buffer_pool"),
993 );
994 let storage_buffer_pool = BufferPool::new(
995 checkpoint.storage_buffer_pool_cfg.clone(),
996 &mut runtime_registry.sub_registry("storage_buffer_pool"),
997 );
998
999 let (panicker, panicked) = Panicker::new(checkpoint.catch_panics);
1001
1002 let executor = Arc::new(Executor {
1003 cycle: checkpoint.cycle,
1005 deadline: checkpoint.deadline,
1006 auditor: checkpoint.auditor,
1007 rng: checkpoint.rng,
1008 time: checkpoint.time,
1009 dns: checkpoint.dns,
1010
1011 registry,
1013 metrics,
1014 tasks: Arc::new(Tasks::new()),
1015 sleeping: Mutex::new(BinaryHeap::new()),
1016 shutdown: Mutex::new(Stopper::default()),
1017 panicker,
1018 });
1019 (
1020 Self {
1021 name: String::new(),
1022 attributes: Vec::new(),
1023 executor: Arc::downgrade(&executor),
1024 network: Arc::new(network),
1025 storage: checkpoint.storage,
1026 network_buffer_pool,
1027 storage_buffer_pool,
1028 tree: Tree::root(),
1029 execution: Execution::default(),
1030 traced: false,
1031 },
1032 executor,
1033 panicked,
1034 )
1035 }
1036
1037 fn executor(&self) -> Arc<Executor> {
1039 self.executor.upgrade().expect("executor already dropped")
1040 }
1041
1042 fn metrics(&self) -> Arc<Metrics> {
1044 self.executor().metrics.clone()
1045 }
1046
1047 pub fn auditor(&self) -> Arc<Auditor> {
1049 self.executor().auditor.clone()
1050 }
1051
1052 pub fn storage_audit(&self) -> Digest {
1054 self.storage.inner().inner().inner().audit()
1055 }
1056
1057 pub fn storage_fault_config(&self) -> Arc<RwLock<FaultConfig>> {
1063 self.storage.inner().inner().config()
1064 }
1065
1066 pub fn resolver_register(&self, host: impl Into<String>, addrs: Option<Vec<IpAddr>>) {
1071 let executor = self.executor();
1073 let host = host.into();
1074 executor.auditor.event(b"resolver_register", |hasher| {
1075 hasher.update(host.as_bytes());
1076 hasher.update(addrs.encode());
1077 });
1078
1079 let mut dns = executor.dns.lock();
1081 match addrs {
1082 Some(addrs) => {
1083 dns.insert(host, addrs);
1084 }
1085 None => {
1086 dns.remove(&host);
1087 }
1088 }
1089 }
1090}
1091
1092impl crate::Spawner for Context {
1093 fn dedicated(mut self) -> Self {
1094 self.execution = Execution::Dedicated;
1095 self
1096 }
1097
1098 fn shared(mut self, blocking: bool) -> Self {
1099 self.execution = Execution::Shared(blocking);
1100 self
1101 }
1102
1103 fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
1104 where
1105 F: FnOnce(Self) -> Fut + Send + 'static,
1106 Fut: Future<Output = T> + Send + 'static,
1107 T: Send + 'static,
1108 {
1109 let (label, metric) = spawn_metrics!(self);
1111
1112 let parent = Arc::clone(&self.tree);
1114 let traced = self.traced;
1115 self.execution = Execution::default();
1116 self.traced = false;
1117 let (child, aborted) = Tree::child(&parent);
1118 if aborted {
1119 return Handle::closed(metric);
1120 }
1121 self.tree = child;
1122
1123 let executor = self.executor();
1125 let future = if traced {
1126 let span = info_span!(parent: None, "task", name = %label.name());
1127 for (key, value) in &self.attributes {
1128 span.set_attribute(key.clone(), value.clone());
1129 }
1130 Either::Left(f(self).instrument(span))
1131 } else {
1132 Either::Right(f(self))
1133 };
1134 let (f, handle) = Handle::init(
1135 future,
1136 metric,
1137 executor.panicker.clone(),
1138 Arc::clone(&parent),
1139 );
1140 Tasks::register_work(&executor.tasks, label, Box::pin(f));
1141
1142 if let Some(aborter) = handle.aborter() {
1144 parent.register(aborter);
1145 }
1146
1147 handle
1148 }
1149
1150 async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
1151 let executor = self.executor();
1152 executor.auditor.event(b"stop", |hasher| {
1153 hasher.update(value.to_be_bytes());
1154 });
1155 let stop_resolved = {
1156 let mut shutdown = executor.shutdown.lock();
1157 shutdown.stop(value)
1158 };
1159
1160 let timeout_future = timeout.map_or_else(
1162 || futures::future::Either::Right(futures::future::pending()),
1163 |duration| futures::future::Either::Left(self.sleep(duration)),
1164 );
1165 select! {
1166 result = stop_resolved => {
1167 result.map_err(|_| Error::Closed)?;
1168 Ok(())
1169 },
1170 _ = timeout_future => Err(Error::Timeout),
1171 }
1172 }
1173
1174 fn stopped(&self) -> Signal {
1175 let executor = self.executor();
1176 executor.auditor.event(b"stopped", |_| {});
1177 let stopped = executor.shutdown.lock().stopped();
1178 stopped
1179 }
1180}
1181
1182impl crate::ThreadPooler for Context {
1183 fn create_thread_pool(
1184 &self,
1185 concurrency: NonZeroUsize,
1186 ) -> Result<ThreadPool, ThreadPoolBuildError> {
1187 let mut builder = ThreadPoolBuilder::new().num_threads(concurrency.get());
1188
1189 if rayon::current_thread_index().is_none() {
1190 builder = builder.use_current_thread()
1191 }
1192
1193 builder
1194 .spawn_handler(move |thread| {
1195 self.child("rayon_thread")
1196 .dedicated()
1197 .spawn(move |_| async move { thread.run() });
1198 Ok(())
1199 })
1200 .build()
1201 .map(Arc::new)
1202 }
1203}
1204
1205impl crate::Supervisor for Context {
1206 fn child(&self, label: &'static str) -> Self {
1207 let (tree, _) = Tree::child(&self.tree);
1208 Self {
1209 name: child_label(&self.name, label),
1210 attributes: self.attributes.clone(),
1211 executor: self.executor.clone(),
1212 network: self.network.clone(),
1213 storage: self.storage.clone(),
1214 network_buffer_pool: self.network_buffer_pool.clone(),
1215 storage_buffer_pool: self.storage_buffer_pool.clone(),
1216 tree,
1217 execution: Execution::default(),
1218 traced: false,
1219 }
1220 }
1221
1222 fn with_attribute(mut self, key: &'static str, value: impl std::fmt::Display) -> Self {
1223 validate_label(key);
1225
1226 add_attribute(&mut self.attributes, key, value);
1228 self
1229 }
1230
1231 fn name(&self) -> Name {
1232 Name {
1233 label: self.name.clone(),
1234 attributes: self.attributes.clone(),
1235 }
1236 }
1237}
1238
1239impl crate::Tracing for Context {
1240 fn with_span(mut self) -> Self {
1241 self.traced = true;
1242 self
1243 }
1244}
1245
1246impl crate::Metrics for Context {
1247 fn register<N: Into<String>, H: Into<String>, M: Metric>(
1248 &self,
1249 name: N,
1250 help: H,
1251 metric: M,
1252 ) -> Registered<M> {
1253 let name = name.into();
1254 let help = help.into();
1255 let executor = self.executor();
1256 executor.auditor.event(b"register", |hasher| {
1257 hasher.update(name.as_bytes());
1258 hasher.update(help.as_bytes());
1259 for (k, v) in &self.attributes {
1260 hasher.update(k.as_bytes());
1261 hasher.update(v.as_bytes());
1262 }
1263 });
1264 let metric = Arc::new(metric);
1265 executor.registry.register(
1266 prefixed_name(&self.name, &name),
1267 help,
1268 self.attributes.clone(),
1269 metric,
1270 )
1271 }
1272
1273 fn encode(&self) -> String {
1274 let executor = self.executor();
1275 executor.auditor.event(b"encode", |_| {});
1276 executor.registry.encode()
1277 }
1278}
1279
1280struct Sleeper {
1281 executor: Weak<Executor>,
1282 time: SystemTime,
1283 registered: bool,
1284}
1285
1286impl Sleeper {
1287 fn executor(&self) -> Arc<Executor> {
1289 self.executor.upgrade().expect("executor already dropped")
1290 }
1291}
1292
1293struct Alarm {
1294 time: SystemTime,
1295 waker: Waker,
1296}
1297
1298impl PartialEq for Alarm {
1299 fn eq(&self, other: &Self) -> bool {
1300 self.time.eq(&other.time)
1301 }
1302}
1303
1304impl Eq for Alarm {}
1305
1306impl PartialOrd for Alarm {
1307 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1308 Some(self.cmp(other))
1309 }
1310}
1311
1312impl Ord for Alarm {
1313 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1314 other.time.cmp(&self.time)
1316 }
1317}
1318
1319impl Future for Sleeper {
1320 type Output = ();
1321
1322 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1323 let executor = self.executor();
1324 {
1325 let current_time = *executor.time.lock();
1326 if current_time >= self.time {
1327 return Poll::Ready(());
1328 }
1329 }
1330 if !self.registered {
1331 self.registered = true;
1332 executor.sleeping.lock().push(Alarm {
1333 time: self.time,
1334 waker: cx.waker().clone(),
1335 });
1336 }
1337 Poll::Pending
1338 }
1339}
1340
1341impl Clock for Context {
1342 fn current(&self) -> SystemTime {
1343 *self.executor().time.lock()
1344 }
1345
1346 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
1347 let deadline = self
1348 .current()
1349 .checked_add(duration)
1350 .expect("overflow when setting wake time");
1351 self.sleep_until(deadline)
1352 }
1353
1354 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
1355 Sleeper {
1356 executor: self.executor.clone(),
1357
1358 time: deadline,
1359 registered: false,
1360 }
1361 }
1362}
1363
1364#[cfg(feature = "external")]
1368#[pin_project]
1369struct Waiter<F: Future> {
1370 executor: Weak<Executor>,
1371 target: SystemTime,
1372 #[pin]
1373 future: F,
1374 ready: Option<F::Output>,
1375 started: bool,
1376 registered: bool,
1377}
1378
1379#[cfg(feature = "external")]
1380impl<F> Future for Waiter<F>
1381where
1382 F: Future + Send,
1383{
1384 type Output = F::Output;
1385
1386 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1387 let mut this = self.project();
1388
1389 if !*this.started {
1393 *this.started = true;
1394 let waker = noop_waker();
1395 let mut cx_noop = task::Context::from_waker(&waker);
1396 if let Poll::Ready(value) = this.future.as_mut().poll(&mut cx_noop) {
1397 *this.ready = Some(value);
1398 }
1399 }
1400
1401 let executor = this.executor.upgrade().expect("executor already dropped");
1403 let current_time = *executor.time.lock();
1404 if current_time < *this.target {
1405 if !*this.registered {
1408 *this.registered = true;
1409 executor.sleeping.lock().push(Alarm {
1410 time: *this.target,
1411 waker: cx.waker().clone(),
1412 });
1413 }
1414 return Poll::Pending;
1415 }
1416
1417 if let Some(value) = this.ready.take() {
1419 return Poll::Ready(value);
1420 }
1421
1422 let blocker = Blocker::new();
1425 loop {
1426 let waker = waker(blocker.clone());
1427 let mut cx_block = task::Context::from_waker(&waker);
1428 match this.future.as_mut().poll(&mut cx_block) {
1429 Poll::Ready(value) => {
1430 break Poll::Ready(value);
1431 }
1432 Poll::Pending => blocker.wait(),
1433 }
1434 }
1435 }
1436}
1437
1438#[cfg(feature = "external")]
1439impl Pacer for Context {
1440 fn pace<'a, F, T>(&'a self, latency: Duration, future: F) -> impl Future<Output = T> + Send + 'a
1441 where
1442 F: Future<Output = T> + Send + 'a,
1443 T: Send + 'a,
1444 {
1445 let target = self
1447 .executor()
1448 .time
1449 .lock()
1450 .checked_add(latency)
1451 .expect("overflow when setting wake time");
1452
1453 Waiter {
1454 executor: self.executor.clone(),
1455 target,
1456 future,
1457 ready: None,
1458 started: false,
1459 registered: false,
1460 }
1461 }
1462}
1463
1464impl GClock for Context {
1465 type Instant = SystemTime;
1466
1467 fn now(&self) -> Self::Instant {
1468 self.current()
1469 }
1470}
1471
1472impl ReasonablyRealtime for Context {}
1473
1474impl crate::Network for Context {
1475 type Listener = ListenerOf<Network>;
1476
1477 async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
1478 self.network.bind(socket).await
1479 }
1480
1481 async fn dial(
1482 &self,
1483 socket: SocketAddr,
1484 ) -> Result<(crate::SinkOf<Self>, crate::StreamOf<Self>), Error> {
1485 self.network.dial(socket).await
1486 }
1487}
1488
1489impl crate::Resolver for Context {
1490 async fn resolve(&self, host: &str) -> Result<Vec<IpAddr>, Error> {
1491 let executor = self.executor();
1493 let dns = executor.dns.lock();
1494 let result = dns.get(host).cloned();
1495 drop(dns);
1496
1497 executor.auditor.event(b"resolve", |hasher| {
1499 hasher.update(host.as_bytes());
1500 hasher.update(result.encode());
1501 });
1502 result.ok_or_else(|| Error::ResolveFailed(host.to_string()))
1503 }
1504}
1505
1506impl RngCore for Context {
1507 fn next_u32(&mut self) -> u32 {
1508 let executor = self.executor();
1509 executor.auditor.event(b"rand", |hasher| {
1510 hasher.update(b"next_u32");
1511 });
1512 let result = executor.rng.lock().next_u32();
1513 result
1514 }
1515
1516 fn next_u64(&mut self) -> u64 {
1517 let executor = self.executor();
1518 executor.auditor.event(b"rand", |hasher| {
1519 hasher.update(b"next_u64");
1520 });
1521 let result = executor.rng.lock().next_u64();
1522 result
1523 }
1524
1525 fn fill_bytes(&mut self, dest: &mut [u8]) {
1526 let executor = self.executor();
1527 executor.auditor.event(b"rand", |hasher| {
1528 hasher.update(b"fill_bytes");
1529 });
1530 executor.rng.lock().fill_bytes(dest);
1531 }
1532
1533 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1534 let executor = self.executor();
1535 executor.auditor.event(b"rand", |hasher| {
1536 hasher.update(b"try_fill_bytes");
1537 });
1538 let result = executor.rng.lock().try_fill_bytes(dest);
1539 result
1540 }
1541}
1542
1543impl CryptoRng for Context {}
1544
1545impl crate::Storage for Context {
1546 type Blob = <Storage as crate::Storage>::Blob;
1547
1548 async fn open_versioned(
1549 &self,
1550 partition: &str,
1551 name: &[u8],
1552 versions: std::ops::RangeInclusive<u16>,
1553 ) -> Result<(Self::Blob, u64, u16), Error> {
1554 self.storage.open_versioned(partition, name, versions).await
1555 }
1556
1557 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1558 self.storage.remove(partition, name).await
1559 }
1560
1561 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1562 self.storage.scan(partition).await
1563 }
1564}
1565
1566impl crate::BufferPooler for Context {
1567 fn network_buffer_pool(&self) -> &crate::BufferPool {
1568 &self.network_buffer_pool
1569 }
1570
1571 fn storage_buffer_pool(&self) -> &crate::BufferPool {
1572 &self.storage_buffer_pool
1573 }
1574}
1575
1576#[cfg(test)]
1577mod tests {
1578 use super::*;
1579 #[cfg(feature = "external")]
1580 use crate::FutureExt;
1581 use crate::{deterministic, reschedule, Blob, Metrics as _, Resolver, Runner as _, Storage};
1582 use commonware_macros::test_traced;
1583 #[cfg(feature = "external")]
1584 use commonware_utils::channel::mpsc;
1585 use commonware_utils::channel::oneshot;
1586 #[cfg(not(feature = "external"))]
1587 use futures::future::pending;
1588 #[cfg(not(feature = "external"))]
1589 use futures::stream::StreamExt as _;
1590 #[cfg(feature = "external")]
1591 use futures::StreamExt;
1592 use futures::{stream::FuturesUnordered, task::noop_waker};
1593
1594 async fn task(i: usize) -> usize {
1595 for _ in 0..5 {
1596 reschedule().await;
1597 }
1598 i
1599 }
1600
1601 fn run_tasks(tasks: usize, runner: deterministic::Runner) -> (String, Vec<usize>) {
1602 runner.start(|context| async move {
1603 let mut handles = FuturesUnordered::new();
1604 for i in 0..=tasks - 1 {
1605 handles.push(context.child("task").spawn(move |_| task(i)));
1606 }
1607
1608 let mut outputs = Vec::new();
1609 while let Some(result) = handles.next().await {
1610 outputs.push(result.unwrap());
1611 }
1612 assert_eq!(outputs.len(), tasks);
1613 (context.auditor().state(), outputs)
1614 })
1615 }
1616
1617 fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1618 let executor = deterministic::Runner::seeded(seed);
1619 run_tasks(5, executor)
1620 }
1621
1622 #[test]
1623 fn test_same_seed_same_order() {
1624 let mut outputs = Vec::new();
1626 for seed in 0..1000 {
1627 let output = run_with_seed(seed);
1628 outputs.push(output);
1629 }
1630
1631 for seed in 0..1000 {
1633 let output = run_with_seed(seed);
1634 assert_eq!(output, outputs[seed as usize]);
1635 }
1636 }
1637
1638 #[test_traced("TRACE")]
1639 fn test_different_seeds_different_order() {
1640 let output1 = run_with_seed(12345);
1641 let output2 = run_with_seed(54321);
1642 assert_ne!(output1, output2);
1643 }
1644
1645 #[test]
1646 fn test_alarm_min_heap() {
1647 let now = SystemTime::now();
1649 let alarms = vec![
1650 Alarm {
1651 time: now + Duration::new(10, 0),
1652 waker: noop_waker(),
1653 },
1654 Alarm {
1655 time: now + Duration::new(5, 0),
1656 waker: noop_waker(),
1657 },
1658 Alarm {
1659 time: now + Duration::new(15, 0),
1660 waker: noop_waker(),
1661 },
1662 Alarm {
1663 time: now + Duration::new(5, 0),
1664 waker: noop_waker(),
1665 },
1666 ];
1667 let mut heap = BinaryHeap::new();
1668 for alarm in alarms {
1669 heap.push(alarm);
1670 }
1671
1672 let mut sorted_times = Vec::new();
1674 while let Some(alarm) = heap.pop() {
1675 sorted_times.push(alarm.time);
1676 }
1677 assert_eq!(
1678 sorted_times,
1679 vec![
1680 now + Duration::new(5, 0),
1681 now + Duration::new(5, 0),
1682 now + Duration::new(10, 0),
1683 now + Duration::new(15, 0),
1684 ]
1685 );
1686 }
1687
1688 #[test]
1689 #[should_panic(expected = "runtime timeout")]
1690 fn test_timeout() {
1691 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1692 executor.start(|context| async move {
1693 loop {
1694 context.sleep(Duration::from_secs(1)).await;
1695 }
1696 });
1697 }
1698
1699 #[test]
1700 #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1701 fn test_bad_timeout() {
1702 let cfg = Config {
1703 timeout: Some(Duration::default()),
1704 cycle: Duration::default(),
1705 ..Config::default()
1706 };
1707 deterministic::Runner::new(cfg);
1708 }
1709
1710 #[test]
1711 #[should_panic(
1712 expected = "cycle duration must be greater than or equal to system time precision"
1713 )]
1714 fn test_bad_cycle() {
1715 let cfg = Config {
1716 cycle: SYSTEM_TIME_PRECISION - Duration::from_nanos(1),
1717 ..Config::default()
1718 };
1719 deterministic::Runner::new(cfg);
1720 }
1721
1722 #[test]
1723 fn test_recover_synced_storage_persists() {
1724 let executor1 = deterministic::Runner::default();
1726 let partition = "test_partition";
1727 let name = b"test_blob";
1728 let data = b"Hello, world!";
1729
1730 let (state, checkpoint) = executor1.start_and_recover(|context| async move {
1732 let (blob, _) = context.open(partition, name).await.unwrap();
1733 blob.write_at(0, data).await.unwrap();
1734 blob.sync().await.unwrap();
1735 context.auditor().state()
1736 });
1737
1738 assert_eq!(state, checkpoint.auditor.state());
1740
1741 let executor = Runner::from(checkpoint);
1743 executor.start(|context| async move {
1744 let (blob, len) = context.open(partition, name).await.unwrap();
1745 assert_eq!(len, data.len() as u64);
1746 let read = blob.read_at(0, data.len()).await.unwrap();
1747 assert_eq!(read.coalesce(), data);
1748 });
1749 }
1750
1751 #[test]
1752 #[should_panic(expected = "goodbye")]
1753 fn test_recover_panic_handling() {
1754 let executor1 = deterministic::Runner::default();
1756 let (_, checkpoint) = executor1.start_and_recover(|_| async move {
1757 reschedule().await;
1758 });
1759
1760 let executor = Runner::from(checkpoint);
1762 executor.start(|_| async move {
1763 panic!("goodbye");
1764 });
1765 }
1766
1767 #[test]
1768 fn test_recover_unsynced_storage_does_not_persist() {
1769 let executor = deterministic::Runner::default();
1771 let partition = "test_partition";
1772 let name = b"test_blob";
1773 let data = b"Hello, world!";
1774
1775 let (_, checkpoint) = executor.start_and_recover(|context| async move {
1777 let (blob, _) = context.open(partition, name).await.unwrap();
1778 blob.write_at(0, data).await.unwrap();
1779 });
1780
1781 let executor = Runner::from(checkpoint);
1783
1784 executor.start(|context| async move {
1786 let (_, len) = context.open(partition, name).await.unwrap();
1787 assert_eq!(len, 0);
1788 });
1789 }
1790
1791 #[test]
1792 fn test_recover_dns_mappings_persist() {
1793 let executor = deterministic::Runner::default();
1795 let host = "example.com";
1796 let addrs = vec![
1797 IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 1)),
1798 IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 2)),
1799 ];
1800
1801 let (state, checkpoint) = executor.start_and_recover({
1803 let addrs = addrs.clone();
1804 |context| async move {
1805 context.resolver_register(host, Some(addrs));
1806 context.auditor().state()
1807 }
1808 });
1809
1810 assert_eq!(state, checkpoint.auditor.state());
1812
1813 let executor = Runner::from(checkpoint);
1815 executor.start(move |context| async move {
1816 let resolved = context.resolve(host).await.unwrap();
1817 assert_eq!(resolved, addrs);
1818 });
1819 }
1820
1821 #[test]
1822 fn test_recover_time_persists() {
1823 let executor = deterministic::Runner::default();
1825 let duration_to_sleep = Duration::from_secs(10);
1826
1827 let (time_before_recovery, checkpoint) = executor.start_and_recover(|context| async move {
1829 context.sleep(duration_to_sleep).await;
1830 context.current()
1831 });
1832
1833 assert_eq!(
1835 time_before_recovery.duration_since(UNIX_EPOCH).unwrap(),
1836 duration_to_sleep
1837 );
1838
1839 let executor2 = Runner::from(checkpoint);
1841 executor2.start(move |context| async move {
1842 assert_eq!(context.current(), time_before_recovery);
1843
1844 context.sleep(duration_to_sleep).await;
1846 assert_eq!(
1847 context.current().duration_since(UNIX_EPOCH).unwrap(),
1848 duration_to_sleep * 2
1849 );
1850 });
1851 }
1852
1853 #[test]
1854 #[should_panic(expected = "executor still has weak references")]
1855 fn test_context_return() {
1856 let executor = deterministic::Runner::default();
1858
1859 let context = executor.start(|context| async move {
1861 context
1863 });
1864
1865 drop(context);
1867 }
1868
1869 #[test]
1870 fn test_default_time_zero() {
1871 let executor = deterministic::Runner::default();
1873
1874 executor.start(|context| async move {
1875 assert_eq!(
1877 context.current().duration_since(UNIX_EPOCH).unwrap(),
1878 Duration::ZERO
1879 );
1880 });
1881 }
1882
1883 #[test]
1884 fn test_start_time() {
1885 let executor_default = deterministic::Runner::default();
1887 executor_default.start(|context| async move {
1888 assert_eq!(context.current(), UNIX_EPOCH);
1889 });
1890
1891 let start_time = UNIX_EPOCH + Duration::from_secs(100);
1893 let cfg = Config::default().with_start_time(start_time);
1894 let executor = deterministic::Runner::new(cfg);
1895
1896 executor.start(move |context| async move {
1897 assert_eq!(context.current(), start_time);
1899 });
1900 }
1901
1902 #[test]
1903 #[should_panic(expected = "start time must be greater than or equal to unix epoch")]
1904 fn test_bad_start_time() {
1905 let cfg = Config::default().with_start_time(UNIX_EPOCH - Duration::from_secs(1));
1906 deterministic::Runner::new(cfg);
1907 }
1908
1909 #[cfg(not(feature = "external"))]
1910 #[test]
1911 #[should_panic(expected = "runtime stalled")]
1912 fn test_stall() {
1913 let executor = deterministic::Runner::default();
1915
1916 executor.start(|_| async move {
1918 pending::<()>().await;
1919 });
1920 }
1921
1922 #[cfg(not(feature = "external"))]
1923 #[test]
1924 #[should_panic(expected = "runtime stalled")]
1925 fn test_external_simulated() {
1926 let executor = deterministic::Runner::default();
1928
1929 let (tx, rx) = oneshot::channel();
1931 std::thread::spawn(move || {
1932 std::thread::sleep(Duration::from_secs(1));
1933 tx.send(()).unwrap();
1934 });
1935
1936 executor.start(|_| async move {
1938 rx.await.unwrap();
1939 });
1940 }
1941
1942 #[cfg(feature = "external")]
1943 #[test]
1944 fn test_external_realtime() {
1945 let executor = deterministic::Runner::default();
1947
1948 let (tx, rx) = oneshot::channel();
1950 std::thread::spawn(move || {
1951 std::thread::sleep(Duration::from_secs(1));
1952 tx.send(()).unwrap();
1953 });
1954
1955 executor.start(|_| async move {
1957 rx.await.unwrap();
1958 });
1959 }
1960
1961 #[cfg(feature = "external")]
1962 #[test]
1963 fn test_external_realtime_variable() {
1964 let executor = deterministic::Runner::default();
1966
1967 executor.start(|context| async move {
1969 let start_real = SystemTime::now();
1971 let start_sim = context.current();
1972 let (first_tx, first_rx) = oneshot::channel();
1973 let (second_tx, second_rx) = oneshot::channel();
1974 let (results_tx, mut results_rx) = mpsc::channel(2);
1975
1976 let first_wait = Duration::from_secs(1);
1978 std::thread::spawn(move || {
1979 std::thread::sleep(first_wait);
1980 first_tx.send(()).unwrap();
1981 });
1982
1983 std::thread::spawn(move || {
1985 std::thread::sleep(Duration::ZERO);
1986 second_tx.send(()).unwrap();
1987 });
1988
1989 let first = context.child("sample_before_send").spawn({
1991 let results_tx = results_tx.clone();
1992 move |context| async move {
1993 first_rx.pace(&context, Duration::ZERO).await.unwrap();
1994 let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
1995 assert!(elapsed_real > first_wait);
1996 let elapsed_sim = context.current().duration_since(start_sim).unwrap();
1997 assert!(elapsed_sim < first_wait);
1998 results_tx.send(1).await.unwrap();
1999 }
2000 });
2001
2002 let second = context
2004 .child("sample_after_send")
2005 .spawn(move |context| async move {
2006 second_rx.pace(&context, first_wait).await.unwrap();
2007 let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
2008 assert!(elapsed_real >= first_wait);
2009 let elapsed_sim = context.current().duration_since(start_sim).unwrap();
2010 assert!(elapsed_sim >= first_wait);
2011 results_tx.send(2).await.unwrap();
2012 });
2013
2014 second.await.unwrap();
2016 first.await.unwrap();
2017
2018 let mut results = Vec::new();
2020 for _ in 0..2 {
2021 results.push(results_rx.recv().await.unwrap());
2022 }
2023 assert_eq!(results, vec![1, 2]);
2024 });
2025 }
2026
2027 #[cfg(not(feature = "external"))]
2028 #[test]
2029 fn test_simulated_skip() {
2030 let executor = deterministic::Runner::default();
2032
2033 executor.start(|context| async move {
2035 context.sleep(Duration::from_secs(1)).await;
2036
2037 let metrics = context.encode();
2039 let iterations = metrics
2040 .lines()
2041 .find_map(|line| {
2042 line.strip_prefix("runtime_iterations_total ")
2043 .and_then(|value| value.trim().parse::<u64>().ok())
2044 })
2045 .expect("missing runtime_iterations_total metric");
2046 assert!(iterations < 10);
2047 });
2048 }
2049
2050 #[cfg(feature = "external")]
2051 #[test]
2052 fn test_realtime_no_skip() {
2053 let executor = deterministic::Runner::default();
2055
2056 executor.start(|context| async move {
2058 context.sleep(Duration::from_secs(1)).await;
2059
2060 let metrics = context.encode();
2062 let iterations = metrics
2063 .lines()
2064 .find_map(|line| {
2065 line.strip_prefix("runtime_iterations_total ")
2066 .and_then(|value| value.trim().parse::<u64>().ok())
2067 })
2068 .expect("missing runtime_iterations_total metric");
2069 assert!(iterations > 500);
2070 });
2071 }
2072
2073 #[test]
2074 #[should_panic(expected = "label must start with [a-zA-Z]")]
2075 fn test_metrics_label_empty() {
2076 let executor = deterministic::Runner::default();
2077 executor.start(|context| async move {
2078 let _ = context.child("");
2079 });
2080 }
2081
2082 #[test]
2083 #[should_panic(expected = "label must start with [a-zA-Z]")]
2084 fn test_metrics_label_invalid_first_char() {
2085 let executor = deterministic::Runner::default();
2086 executor.start(|context| async move {
2087 let _ = context.child("1invalid");
2088 });
2089 }
2090
2091 #[test]
2092 #[should_panic(expected = "label must only contain [a-zA-Z0-9_]")]
2093 fn test_metrics_label_invalid_char() {
2094 let executor = deterministic::Runner::default();
2095 executor.start(|context| async move {
2096 let _ = context.child("invalid-label");
2097 });
2098 }
2099
2100 #[test]
2101 #[should_panic(expected = "using runtime label is not allowed")]
2102 fn test_metrics_label_reserved_prefix() {
2103 let executor = deterministic::Runner::default();
2104 executor.start(|context| async move {
2105 let _ = context.child(METRICS_PREFIX);
2106 });
2107 }
2108
2109 #[test]
2110 fn test_metrics_duplicate_attribute_overwrites() {
2111 let executor = deterministic::Runner::default();
2112 executor.start(|context| async move {
2113 let context = context
2114 .child("test")
2115 .with_attribute("epoch", "old")
2116 .with_attribute("epoch", "new");
2117 assert_eq!(
2118 context.name().attributes,
2119 vec![("epoch".to_string(), "new".to_string())]
2120 );
2121 });
2122 }
2123
2124 #[test]
2125 fn test_storage_fault_injection_and_recovery() {
2126 let cfg = deterministic::Config::default().with_storage_fault_config(FaultConfig {
2128 sync_rate: Some(1.0),
2129 ..Default::default()
2130 });
2131
2132 let (result, checkpoint) =
2133 deterministic::Runner::new(cfg).start_and_recover(|ctx| async move {
2134 let (blob, _) = ctx.open("test_fault", b"blob").await.unwrap();
2135 blob.write_at(0, b"data".to_vec()).await.unwrap();
2136 blob.sync().await });
2138
2139 assert!(result.is_err());
2141
2142 deterministic::Runner::from(checkpoint).start(|ctx| async move {
2144 *ctx.storage_fault_config().write() = FaultConfig::default();
2146
2147 let (blob, len) = ctx.open("test_fault", b"blob").await.unwrap();
2149 assert_eq!(len, 0, "unsynced data should be lost after recovery");
2150
2151 blob.write_at(0, b"recovered".to_vec()).await.unwrap();
2153 blob.sync()
2154 .await
2155 .expect("sync should succeed with faults disabled");
2156
2157 let read_buf = blob.read_at(0, 9).await.unwrap();
2159 assert_eq!(read_buf.coalesce(), b"recovered");
2160 });
2161 }
2162
2163 #[test]
2164 fn test_storage_fault_dynamic_config() {
2165 let executor = deterministic::Runner::default();
2166 executor.start(|ctx| async move {
2167 let (blob, _) = ctx.open("test_dynamic", b"blob").await.unwrap();
2168
2169 blob.write_at(0, b"initial".to_vec()).await.unwrap();
2171 blob.sync().await.expect("initial sync should succeed");
2172
2173 let storage_fault_cfg = ctx.storage_fault_config();
2175 storage_fault_cfg.write().sync_rate = Some(1.0);
2176
2177 blob.write_at(0, b"updated".to_vec()).await.unwrap();
2179 let result = blob.sync().await;
2180 assert!(result.is_err(), "sync should fail with faults enabled");
2181
2182 storage_fault_cfg.write().sync_rate = Some(0.0);
2184
2185 blob.sync()
2187 .await
2188 .expect("sync should succeed with faults disabled");
2189 });
2190 }
2191
2192 #[test]
2193 fn test_storage_fault_determinism() {
2194 fn run_with_seed(seed: u64) -> Vec<bool> {
2196 let cfg = deterministic::Config::default()
2197 .with_seed(seed)
2198 .with_storage_fault_config(FaultConfig {
2199 open_rate: Some(0.5),
2200 ..Default::default()
2201 });
2202
2203 let runner = deterministic::Runner::new(cfg);
2204 runner.start(|ctx| async move {
2205 let mut results = Vec::new();
2206 for i in 0..20 {
2207 let name = format!("blob{i}");
2208 let result = ctx.open("test_determinism", name.as_bytes()).await;
2209 results.push(result.is_ok());
2210 }
2211 results
2212 })
2213 }
2214
2215 let results1 = run_with_seed(12345);
2216 let results2 = run_with_seed(12345);
2217 assert_eq!(
2218 results1, results2,
2219 "same seed should produce same failure pattern"
2220 );
2221
2222 let results3 = run_with_seed(99999);
2223 assert_ne!(
2224 results1, results3,
2225 "different seeds should produce different patterns"
2226 );
2227 }
2228
2229 #[test]
2230 fn test_storage_fault_determinism_multi_task() {
2231 fn run_with_seed(seed: u64) -> Vec<u32> {
2234 let cfg = deterministic::Config::default()
2235 .with_seed(seed)
2236 .with_storage_fault_config(FaultConfig {
2237 open_rate: Some(0.5),
2238 write_rate: Some(0.3),
2239 sync_rate: Some(0.2),
2240 ..Default::default()
2241 });
2242
2243 let runner = deterministic::Runner::new(cfg);
2244 runner.start(|ctx| async move {
2245 let mut handles = Vec::new();
2247 for i in 0..5 {
2248 let ctx = ctx.child("task");
2249 handles.push(ctx.spawn(move |ctx| async move {
2250 let mut successes = 0u32;
2251 for j in 0..4 {
2252 let name = format!("task{i}_blob{j}");
2253 if let Ok((blob, _)) = ctx.open("partition", name.as_bytes()).await {
2254 successes += 1;
2255 if blob.write_at(0, b"data".to_vec()).await.is_ok() {
2256 successes += 1;
2257 }
2258 if blob.sync().await.is_ok() {
2259 successes += 1;
2260 }
2261 }
2262 }
2263 successes
2264 }));
2265 }
2266
2267 let mut results = Vec::new();
2269 for handle in handles {
2270 results.push(handle.await.unwrap());
2271 }
2272 results
2273 })
2274 }
2275
2276 let results1 = run_with_seed(42);
2277 let results2 = run_with_seed(42);
2278 assert_eq!(
2279 results1, results2,
2280 "same seed should produce same multi-task pattern"
2281 );
2282
2283 let results3 = run_with_seed(99999);
2284 assert_ne!(
2285 results1, results3,
2286 "different seeds should produce different patterns"
2287 );
2288 }
2289}