1pub use crate::storage::faulty::Config as FaultConfig;
46use crate::{
47 network::{
48 audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
49 metered::Network as MeteredNetwork,
50 },
51 storage::{
52 audited::Storage as AuditedStorage, faulty::Storage as FaultyStorage,
53 memory::Storage as MemStorage, metered::Storage as MeteredStorage,
54 },
55 telemetry::metrics::task::Label,
56 utils::{
57 add_attribute,
58 signal::{Signal, Stopper},
59 supervision::Tree,
60 MetricEncoder, Panicker,
61 },
62 validate_label, BufferPool, BufferPoolConfig, Clock, Error, Execution, Handle, ListenerOf,
63 Metrics as _, Panicked, Spawner as _, METRICS_PREFIX,
64};
65#[cfg(feature = "external")]
66use crate::{Blocker, Pacer};
67use commonware_codec::Encode;
68use commonware_macros::select;
69use commonware_parallel::ThreadPool;
70#[cfg(miri)]
71use commonware_utils::NZUsize;
72use commonware_utils::{hex, time::SYSTEM_TIME_PRECISION, SystemTimeExt};
73#[cfg(feature = "external")]
74use futures::task::noop_waker;
75use futures::{
76 future::BoxFuture,
77 task::{waker, ArcWake},
78 Future, FutureExt,
79};
80use governor::clock::{Clock as GClock, ReasonablyRealtime};
81#[cfg(feature = "external")]
82use pin_project::pin_project;
83use prometheus_client::{
84 encoding::text::encode,
85 metrics::{counter::Counter, family::Family, gauge::Gauge},
86 registry::{Metric, Registry},
87};
88use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
89use rand_core::CryptoRngCore;
90use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
91use sha2::{Digest as _, Sha256};
92use std::{
93 borrow::Cow,
94 collections::{BTreeMap, BinaryHeap, HashMap, HashSet},
95 mem::{replace, take},
96 net::{IpAddr, SocketAddr},
97 num::NonZeroUsize,
98 panic::{catch_unwind, resume_unwind, AssertUnwindSafe},
99 pin::Pin,
100 sync::{Arc, Mutex, RwLock, Weak},
101 task::{self, Poll, Waker},
102 time::{Duration, SystemTime, UNIX_EPOCH},
103};
104use tracing::{info_span, trace, Instrument};
105use tracing_opentelemetry::OpenTelemetrySpanExt;
106
107#[derive(Debug)]
108struct Metrics {
109 iterations: Counter,
110 tasks_spawned: Family<Label, Counter>,
111 tasks_running: Family<Label, Gauge>,
112 task_polls: Family<Label, Counter>,
113
114 network_bandwidth: Counter,
115}
116
117impl Metrics {
118 pub fn init(registry: &mut Registry) -> Self {
119 let metrics = Self {
120 iterations: Counter::default(),
121 task_polls: Family::default(),
122 tasks_spawned: Family::default(),
123 tasks_running: Family::default(),
124 network_bandwidth: Counter::default(),
125 };
126 registry.register(
127 "iterations",
128 "Total number of iterations",
129 metrics.iterations.clone(),
130 );
131 registry.register(
132 "tasks_spawned",
133 "Total number of tasks spawned",
134 metrics.tasks_spawned.clone(),
135 );
136 registry.register(
137 "tasks_running",
138 "Number of tasks currently running",
139 metrics.tasks_running.clone(),
140 );
141 registry.register(
142 "task_polls",
143 "Total number of task polls",
144 metrics.task_polls.clone(),
145 );
146 registry.register(
147 "bandwidth",
148 "Total amount of data sent over network",
149 metrics.network_bandwidth.clone(),
150 );
151 metrics
152 }
153}
154
155type Digest = [u8; 32];
157
158pub struct Auditor {
160 digest: Mutex<Digest>,
161}
162
163impl Default for Auditor {
164 fn default() -> Self {
165 Self {
166 digest: Digest::default().into(),
167 }
168 }
169}
170
171impl Auditor {
172 pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
176 where
177 F: FnOnce(&mut Sha256),
178 {
179 let mut digest = self.digest.lock().unwrap();
180
181 let mut hasher = Sha256::new();
182 hasher.update(digest.as_ref());
183 hasher.update(label);
184 payload(&mut hasher);
185
186 *digest = hasher.finalize().into();
187 }
188
189 pub fn state(&self) -> String {
194 let hash = self.digest.lock().unwrap();
195 hex(hash.as_ref())
196 }
197}
198
199pub type BoxDynRng = Box<dyn CryptoRngCore + Send + 'static>;
201
202pub struct Config {
204 rng: BoxDynRng,
206
207 cycle: Duration,
210
211 timeout: Option<Duration>,
213
214 catch_panics: bool,
216
217 storage_faults: FaultConfig,
220}
221
222impl Config {
223 pub fn new() -> Self {
225 Self {
226 rng: Box::new(StdRng::seed_from_u64(42)),
227 cycle: Duration::from_millis(1),
228 timeout: None,
229 catch_panics: false,
230 storage_faults: FaultConfig::default(),
231 }
232 }
233
234 pub fn with_seed(self, seed: u64) -> Self {
237 self.with_rng(Box::new(StdRng::seed_from_u64(seed)))
238 }
239
240 pub fn with_rng(mut self, rng: BoxDynRng) -> Self {
246 self.rng = rng;
247 self
248 }
249
250 pub const fn with_cycle(mut self, cycle: Duration) -> Self {
252 self.cycle = cycle;
253 self
254 }
255 pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
257 self.timeout = timeout;
258 self
259 }
260 pub const fn with_catch_panics(mut self, catch_panics: bool) -> Self {
262 self.catch_panics = catch_panics;
263 self
264 }
265
266 pub const fn with_storage_faults(mut self, faults: FaultConfig) -> Self {
272 self.storage_faults = faults;
273 self
274 }
275
276 pub const fn cycle(&self) -> Duration {
279 self.cycle
280 }
281 pub const fn timeout(&self) -> Option<Duration> {
283 self.timeout
284 }
285 pub const fn catch_panics(&self) -> bool {
287 self.catch_panics
288 }
289
290 pub fn assert(&self) {
292 assert!(
293 self.cycle != Duration::default() || self.timeout.is_none(),
294 "cycle duration must be non-zero when timeout is set",
295 );
296 assert!(
297 self.cycle >= SYSTEM_TIME_PRECISION,
298 "cycle duration must be greater than or equal to system time precision"
299 );
300 }
301}
302
303impl Default for Config {
304 fn default() -> Self {
305 Self::new()
306 }
307}
308
309type MetricKey = (String, Vec<(String, String)>);
311
312pub struct Executor {
314 registry: Mutex<Registry>,
315 registered_metrics: Mutex<HashSet<MetricKey>>,
316 cycle: Duration,
317 deadline: Option<SystemTime>,
318 metrics: Arc<Metrics>,
319 auditor: Arc<Auditor>,
320 rng: Arc<Mutex<BoxDynRng>>,
321 time: Mutex<SystemTime>,
322 tasks: Arc<Tasks>,
323 sleeping: Mutex<BinaryHeap<Alarm>>,
324 shutdown: Mutex<Stopper>,
325 panicker: Panicker,
326 dns: Mutex<HashMap<String, Vec<IpAddr>>>,
327}
328
329impl Executor {
330 fn advance_time(&self) -> SystemTime {
335 #[cfg(feature = "external")]
336 std::thread::sleep(self.cycle);
337
338 let mut time = self.time.lock().unwrap();
339 *time = time
340 .checked_add(self.cycle)
341 .expect("executor time overflowed");
342 let now = *time;
343 trace!(now = now.epoch_millis(), "time advanced");
344 now
345 }
346
347 fn skip_idle_time(&self, current: SystemTime) -> SystemTime {
352 if cfg!(feature = "external") || self.tasks.ready() != 0 {
353 return current;
354 }
355
356 let mut skip_until = None;
357 {
358 let sleeping = self.sleeping.lock().unwrap();
359 if let Some(next) = sleeping.peek() {
360 if next.time > current {
361 skip_until = Some(next.time);
362 }
363 }
364 }
365
366 skip_until.map_or(current, |deadline| {
367 let mut time = self.time.lock().unwrap();
368 *time = deadline;
369 let now = *time;
370 trace!(now = now.epoch_millis(), "time skipped");
371 now
372 })
373 }
374
375 fn wake_ready_sleepers(&self, current: SystemTime) {
377 let mut sleeping = self.sleeping.lock().unwrap();
378 while let Some(next) = sleeping.peek() {
379 if next.time <= current {
380 let sleeper = sleeping.pop().unwrap();
381 sleeper.waker.wake();
382 } else {
383 break;
384 }
385 }
386 }
387
388 fn assert_liveness(&self) {
392 if cfg!(feature = "external") || self.tasks.ready() != 0 {
393 return;
394 }
395
396 panic!("runtime stalled");
397 }
398}
399
400pub struct Checkpoint {
404 cycle: Duration,
405 deadline: Option<SystemTime>,
406 auditor: Arc<Auditor>,
407 rng: Arc<Mutex<BoxDynRng>>,
408 time: Mutex<SystemTime>,
409 storage: Arc<Storage>,
410 dns: Mutex<HashMap<String, Vec<IpAddr>>>,
411 catch_panics: bool,
412}
413
414impl Checkpoint {
415 pub fn auditor(&self) -> Arc<Auditor> {
417 self.auditor.clone()
418 }
419}
420
421#[allow(clippy::large_enum_variant)]
422enum State {
423 Config(Config),
424 Checkpoint(Checkpoint),
425}
426
427pub struct Runner {
429 state: State,
430}
431
432impl From<Config> for Runner {
433 fn from(cfg: Config) -> Self {
434 Self::new(cfg)
435 }
436}
437
438impl From<Checkpoint> for Runner {
439 fn from(checkpoint: Checkpoint) -> Self {
440 Self {
441 state: State::Checkpoint(checkpoint),
442 }
443 }
444}
445
446impl Runner {
447 pub fn new(cfg: Config) -> Self {
449 cfg.assert();
451 Self {
452 state: State::Config(cfg),
453 }
454 }
455
456 pub fn seeded(seed: u64) -> Self {
459 Self::new(Config::default().with_seed(seed))
460 }
461
462 pub fn timed(timeout: Duration) -> Self {
465 let cfg = Config {
466 timeout: Some(timeout),
467 ..Config::default()
468 };
469 Self::new(cfg)
470 }
471
472 pub fn start_and_recover<F, Fut>(self, f: F) -> (Fut::Output, Checkpoint)
475 where
476 F: FnOnce(Context) -> Fut,
477 Fut: Future,
478 {
479 let (context, executor, panicked) = match self.state {
481 State::Config(config) => Context::new(config),
482 State::Checkpoint(checkpoint) => Context::recover(checkpoint),
483 };
484
485 let storage = context.storage.clone();
487 let mut root = Box::pin(panicked.interrupt(f(context)));
488
489 Tasks::register_root(&executor.tasks);
491
492 let result = catch_unwind(AssertUnwindSafe(|| loop {
495 {
497 let current = executor.time.lock().unwrap();
498 if let Some(deadline) = executor.deadline {
499 if *current >= deadline {
500 drop(current);
502 panic!("runtime timeout");
503 }
504 }
505 }
506
507 let mut queue = executor.tasks.drain();
509
510 if queue.len() > 1 {
512 let mut rng = executor.rng.lock().unwrap();
513 queue.shuffle(&mut *rng);
514 }
515
516 trace!(
522 iter = executor.metrics.iterations.get(),
523 tasks = queue.len(),
524 "starting loop"
525 );
526 let mut output = None;
527 for id in queue {
528 let Some(task) = executor.tasks.get(id) else {
530 trace!(id, "skipping missing task");
531 continue;
532 };
533
534 executor.auditor.event(b"process_task", |hasher| {
536 hasher.update(task.id.to_be_bytes());
537 hasher.update(task.label.name().as_bytes());
538 });
539 executor.metrics.task_polls.get_or_create(&task.label).inc();
540 trace!(id, "processing task");
541
542 let waker = waker(Arc::new(TaskWaker {
544 id,
545 tasks: Arc::downgrade(&executor.tasks),
546 }));
547 let mut cx = task::Context::from_waker(&waker);
548
549 match &task.mode {
551 Mode::Root => {
552 if let Poll::Ready(result) = root.as_mut().poll(&mut cx) {
554 trace!(id, "root task is complete");
555 output = Some(result);
556 break;
557 }
558 }
559 Mode::Work(future) => {
560 let mut fut_opt = future.lock().unwrap();
562 let Some(fut) = fut_opt.as_mut() else {
563 trace!(id, "skipping already complete task");
564
565 executor.tasks.remove(id);
567 continue;
568 };
569
570 if fut.as_mut().poll(&mut cx).is_ready() {
572 trace!(id, "task is complete");
573
574 executor.tasks.remove(id);
576 *fut_opt = None;
577 continue;
578 }
579 }
580 }
581
582 trace!(id, "task is still pending");
584 }
585
586 if let Some(output) = output {
588 break output;
589 }
590
591 let mut current = executor.advance_time();
593 current = executor.skip_idle_time(current);
594
595 executor.wake_ready_sleepers(current);
597 executor.assert_liveness();
598
599 executor.metrics.iterations.inc();
601 }));
602
603 executor.sleeping.lock().unwrap().clear(); let tasks = executor.tasks.clear();
611 for task in tasks {
612 let Mode::Work(future) = &task.mode else {
613 continue;
614 };
615 *future.lock().unwrap() = None;
616 }
617
618 drop(root);
622
623 assert!(
626 Arc::weak_count(&executor) == 0,
627 "executor still has weak references"
628 );
629
630 let output = match result {
632 Ok(output) => output,
633 Err(payload) => resume_unwind(payload),
634 };
635
636 let executor = Arc::into_inner(executor).expect("executor still has strong references");
638
639 let checkpoint = Checkpoint {
641 cycle: executor.cycle,
642 deadline: executor.deadline,
643 auditor: executor.auditor,
644 rng: executor.rng,
645 time: executor.time,
646 storage,
647 dns: executor.dns,
648 catch_panics: executor.panicker.catch(),
649 };
650
651 (output, checkpoint)
652 }
653}
654
655impl Default for Runner {
656 fn default() -> Self {
657 Self::new(Config::default())
658 }
659}
660
661impl crate::Runner for Runner {
662 type Context = Context;
663
664 fn start<F, Fut>(self, f: F) -> Fut::Output
665 where
666 F: FnOnce(Self::Context) -> Fut,
667 Fut: Future,
668 {
669 let (output, _) = self.start_and_recover(f);
670 output
671 }
672}
673
674enum Mode {
676 Root,
677 Work(Mutex<Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>>),
678}
679
680struct Task {
682 id: u128,
683 label: Label,
684
685 mode: Mode,
686}
687
688struct TaskWaker {
690 id: u128,
691
692 tasks: Weak<Tasks>,
693}
694
695impl ArcWake for TaskWaker {
696 fn wake_by_ref(arc_self: &Arc<Self>) {
697 if let Some(tasks) = arc_self.tasks.upgrade() {
702 tasks.queue(arc_self.id);
703 }
704 }
705}
706
707struct Tasks {
709 counter: Mutex<u128>,
711 ready: Mutex<Vec<u128>>,
713 running: Mutex<BTreeMap<u128, Arc<Task>>>,
715}
716
717impl Tasks {
718 const fn new() -> Self {
720 Self {
721 counter: Mutex::new(0),
722 ready: Mutex::new(Vec::new()),
723 running: Mutex::new(BTreeMap::new()),
724 }
725 }
726
727 fn increment(&self) -> u128 {
729 let mut counter = self.counter.lock().unwrap();
730 let old = *counter;
731 *counter = counter.checked_add(1).expect("task counter overflow");
732 old
733 }
734
735 fn register_root(arc_self: &Arc<Self>) {
739 let id = arc_self.increment();
740 let task = Arc::new(Task {
741 id,
742 label: Label::root(),
743 mode: Mode::Root,
744 });
745 arc_self.register(id, task);
746 }
747
748 fn register_work(
750 arc_self: &Arc<Self>,
751 label: Label,
752 future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
753 ) {
754 let id = arc_self.increment();
755 let task = Arc::new(Task {
756 id,
757 label,
758 mode: Mode::Work(Mutex::new(Some(future))),
759 });
760 arc_self.register(id, task);
761 }
762
763 fn register(&self, id: u128, task: Arc<Task>) {
765 self.running.lock().unwrap().insert(id, task);
767
768 self.queue(id);
770 }
771
772 fn queue(&self, id: u128) {
774 let mut ready = self.ready.lock().unwrap();
775 ready.push(id);
776 }
777
778 fn drain(&self) -> Vec<u128> {
780 let mut queue = self.ready.lock().unwrap();
781 let len = queue.len();
782 replace(&mut *queue, Vec::with_capacity(len))
783 }
784
785 fn ready(&self) -> usize {
787 self.ready.lock().unwrap().len()
788 }
789
790 fn get(&self, id: u128) -> Option<Arc<Task>> {
795 let running = self.running.lock().unwrap();
796 running.get(&id).cloned()
797 }
798
799 fn remove(&self, id: u128) {
801 self.running.lock().unwrap().remove(&id);
802 }
803
804 fn clear(&self) -> Vec<Arc<Task>> {
806 self.ready.lock().unwrap().clear();
808
809 let running: BTreeMap<u128, Arc<Task>> = {
811 let mut running = self.running.lock().unwrap();
812 take(&mut *running)
813 };
814 running.into_values().collect()
815 }
816}
817
818type Network = MeteredNetwork<AuditedNetwork<DeterministicNetwork>>;
819type Storage = MeteredStorage<AuditedStorage<FaultyStorage<MemStorage>>>;
820
821pub struct Context {
825 name: String,
826 attributes: Vec<(String, String)>,
827 executor: Weak<Executor>,
828 network: Arc<Network>,
829 storage: Arc<Storage>,
830 network_buffer_pool: BufferPool,
831 storage_buffer_pool: BufferPool,
832 tree: Arc<Tree>,
833 execution: Execution,
834 instrumented: bool,
835}
836
837impl Clone for Context {
838 fn clone(&self) -> Self {
839 let (child, _) = Tree::child(&self.tree);
840 Self {
841 name: self.name.clone(),
842 attributes: self.attributes.clone(),
843 executor: self.executor.clone(),
844 network: self.network.clone(),
845 storage: self.storage.clone(),
846 network_buffer_pool: self.network_buffer_pool.clone(),
847 storage_buffer_pool: self.storage_buffer_pool.clone(),
848
849 tree: child,
850 execution: Execution::default(),
851 instrumented: false,
852 }
853 }
854}
855
856impl Context {
857 fn new(cfg: Config) -> (Self, Arc<Executor>, Panicked) {
858 let mut registry = Registry::default();
860 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
861
862 let metrics = Arc::new(Metrics::init(runtime_registry));
864 let start_time = UNIX_EPOCH;
865 let deadline = cfg
866 .timeout
867 .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
868 let auditor = Arc::new(Auditor::default());
869
870 let rng = Arc::new(Mutex::new(cfg.rng));
872
873 let storage_fault_config = Arc::new(RwLock::new(cfg.storage_faults));
875 let storage = MeteredStorage::new(
876 AuditedStorage::new(
877 FaultyStorage::new(MemStorage::default(), rng.clone(), storage_fault_config),
878 auditor.clone(),
879 ),
880 runtime_registry,
881 );
882
883 let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
885 let network = MeteredNetwork::new(network, runtime_registry);
886
887 cfg_if::cfg_if! {
889 if #[cfg(miri)] {
890 let network_config = BufferPoolConfig {
892 max_per_class: NZUsize!(32),
893 ..BufferPoolConfig::for_network()
894 };
895 let storage_config = BufferPoolConfig {
896 max_per_class: NZUsize!(32),
897 ..BufferPoolConfig::for_storage()
898 };
899 } else {
900 let network_config = BufferPoolConfig::for_network();
901 let storage_config = BufferPoolConfig::for_storage();
902 }
903 }
904 let network_buffer_pool = BufferPool::new(
905 network_config,
906 runtime_registry.sub_registry_with_prefix("network_buffer_pool"),
907 );
908 let storage_buffer_pool = BufferPool::new(
909 storage_config,
910 runtime_registry.sub_registry_with_prefix("storage_buffer_pool"),
911 );
912
913 let (panicker, panicked) = Panicker::new(cfg.catch_panics);
915
916 let executor = Arc::new(Executor {
917 registry: Mutex::new(registry),
918 registered_metrics: Mutex::new(HashSet::new()),
919 cycle: cfg.cycle,
920 deadline,
921 metrics,
922 auditor,
923 rng,
924 time: Mutex::new(start_time),
925 tasks: Arc::new(Tasks::new()),
926 sleeping: Mutex::new(BinaryHeap::new()),
927 shutdown: Mutex::new(Stopper::default()),
928 panicker,
929 dns: Mutex::new(HashMap::new()),
930 });
931
932 (
933 Self {
934 name: String::new(),
935 attributes: Vec::new(),
936 executor: Arc::downgrade(&executor),
937 network: Arc::new(network),
938 storage: Arc::new(storage),
939 network_buffer_pool,
940 storage_buffer_pool,
941 tree: Tree::root(),
942 execution: Execution::default(),
943 instrumented: false,
944 },
945 executor,
946 panicked,
947 )
948 }
949
950 fn recover(checkpoint: Checkpoint) -> (Self, Arc<Executor>, Panicked) {
962 let mut registry = Registry::default();
964 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
965 let metrics = Arc::new(Metrics::init(runtime_registry));
966
967 let network =
969 AuditedNetwork::new(DeterministicNetwork::default(), checkpoint.auditor.clone());
970 let network = MeteredNetwork::new(network, runtime_registry);
971
972 cfg_if::cfg_if! {
974 if #[cfg(miri)] {
975 let network_config = BufferPoolConfig {
977 max_per_class: NZUsize!(32),
978 ..BufferPoolConfig::for_network()
979 };
980 let storage_config = BufferPoolConfig {
981 max_per_class: NZUsize!(32),
982 ..BufferPoolConfig::for_storage()
983 };
984 } else {
985 let network_config = BufferPoolConfig::for_network();
986 let storage_config = BufferPoolConfig::for_storage();
987 }
988 }
989 let network_buffer_pool = BufferPool::new(
990 network_config,
991 runtime_registry.sub_registry_with_prefix("network_buffer_pool"),
992 );
993 let storage_buffer_pool = BufferPool::new(
994 storage_config,
995 runtime_registry.sub_registry_with_prefix("storage_buffer_pool"),
996 );
997
998 let (panicker, panicked) = Panicker::new(checkpoint.catch_panics);
1000
1001 let executor = Arc::new(Executor {
1002 cycle: checkpoint.cycle,
1004 deadline: checkpoint.deadline,
1005 auditor: checkpoint.auditor,
1006 rng: checkpoint.rng,
1007 time: checkpoint.time,
1008 dns: checkpoint.dns,
1009
1010 registry: Mutex::new(registry),
1012 registered_metrics: Mutex::new(HashSet::new()),
1013 metrics,
1014 tasks: Arc::new(Tasks::new()),
1015 sleeping: Mutex::new(BinaryHeap::new()),
1016 shutdown: Mutex::new(Stopper::default()),
1017 panicker,
1018 });
1019 (
1020 Self {
1021 name: String::new(),
1022 attributes: Vec::new(),
1023 executor: Arc::downgrade(&executor),
1024 network: Arc::new(network),
1025 storage: checkpoint.storage,
1026 network_buffer_pool,
1027 storage_buffer_pool,
1028 tree: Tree::root(),
1029 execution: Execution::default(),
1030 instrumented: false,
1031 },
1032 executor,
1033 panicked,
1034 )
1035 }
1036
1037 fn executor(&self) -> Arc<Executor> {
1039 self.executor.upgrade().expect("executor already dropped")
1040 }
1041
1042 fn metrics(&self) -> Arc<Metrics> {
1044 self.executor().metrics.clone()
1045 }
1046
1047 pub fn auditor(&self) -> Arc<Auditor> {
1049 self.executor().auditor.clone()
1050 }
1051
1052 pub fn storage_audit(&self) -> Digest {
1054 self.storage.inner().inner().inner().audit()
1055 }
1056
1057 pub fn storage_faults(&self) -> Arc<RwLock<FaultConfig>> {
1063 self.storage.inner().inner().config()
1064 }
1065
1066 pub fn resolver_register(&self, host: impl Into<String>, addrs: Option<Vec<IpAddr>>) {
1071 let executor = self.executor();
1073 let host = host.into();
1074 executor.auditor.event(b"resolver_register", |hasher| {
1075 hasher.update(host.as_bytes());
1076 hasher.update(addrs.encode());
1077 });
1078
1079 let mut dns = executor.dns.lock().unwrap();
1081 match addrs {
1082 Some(addrs) => {
1083 dns.insert(host, addrs);
1084 }
1085 None => {
1086 dns.remove(&host);
1087 }
1088 }
1089 }
1090}
1091
1092impl crate::Spawner for Context {
1093 fn dedicated(mut self) -> Self {
1094 self.execution = Execution::Dedicated;
1095 self
1096 }
1097
1098 fn shared(mut self, blocking: bool) -> Self {
1099 self.execution = Execution::Shared(blocking);
1100 self
1101 }
1102
1103 fn instrumented(mut self) -> Self {
1104 self.instrumented = true;
1105 self
1106 }
1107
1108 fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
1109 where
1110 F: FnOnce(Self) -> Fut + Send + 'static,
1111 Fut: Future<Output = T> + Send + 'static,
1112 T: Send + 'static,
1113 {
1114 let (label, metric) = spawn_metrics!(self);
1116
1117 let parent = Arc::clone(&self.tree);
1119 let is_instrumented = self.instrumented;
1120 self.execution = Execution::default();
1121 self.instrumented = false;
1122 let (child, aborted) = Tree::child(&parent);
1123 if aborted {
1124 return Handle::closed(metric);
1125 }
1126 self.tree = child;
1127
1128 let executor = self.executor();
1130 let future: BoxFuture<'_, T> = if is_instrumented {
1131 let span = info_span!(parent: None, "task", name = %label.name());
1132 for (key, value) in &self.attributes {
1133 span.set_attribute(key.clone(), value.clone());
1134 }
1135 f(self).instrument(span).boxed()
1136 } else {
1137 f(self).boxed()
1138 };
1139 let (f, handle) = Handle::init(
1140 future,
1141 metric,
1142 executor.panicker.clone(),
1143 Arc::clone(&parent),
1144 );
1145 Tasks::register_work(&executor.tasks, label, Box::pin(f));
1146
1147 if let Some(aborter) = handle.aborter() {
1149 parent.register(aborter);
1150 }
1151
1152 handle
1153 }
1154
1155 async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
1156 let executor = self.executor();
1157 executor.auditor.event(b"stop", |hasher| {
1158 hasher.update(value.to_be_bytes());
1159 });
1160 let stop_resolved = {
1161 let mut shutdown = executor.shutdown.lock().unwrap();
1162 shutdown.stop(value)
1163 };
1164
1165 let timeout_future = timeout.map_or_else(
1167 || futures::future::Either::Right(futures::future::pending()),
1168 |duration| futures::future::Either::Left(self.sleep(duration)),
1169 );
1170 select! {
1171 result = stop_resolved => {
1172 result.map_err(|_| Error::Closed)?;
1173 Ok(())
1174 },
1175 _ = timeout_future => Err(Error::Timeout),
1176 }
1177 }
1178
1179 fn stopped(&self) -> Signal {
1180 let executor = self.executor();
1181 executor.auditor.event(b"stopped", |_| {});
1182 let stopped = executor.shutdown.lock().unwrap().stopped();
1183 stopped
1184 }
1185}
1186
1187impl crate::ThreadPooler for Context {
1188 fn create_thread_pool(
1189 &self,
1190 concurrency: NonZeroUsize,
1191 ) -> Result<ThreadPool, ThreadPoolBuildError> {
1192 let mut builder = ThreadPoolBuilder::new().num_threads(concurrency.get());
1193
1194 if rayon::current_thread_index().is_none() {
1195 builder = builder.use_current_thread()
1196 }
1197
1198 builder
1199 .spawn_handler(move |thread| {
1200 self.with_label("rayon_thread")
1201 .dedicated()
1202 .spawn(move |_| async move { thread.run() });
1203 Ok(())
1204 })
1205 .build()
1206 .map(Arc::new)
1207 }
1208}
1209
1210impl crate::Metrics for Context {
1211 fn with_label(&self, label: &str) -> Self {
1212 validate_label(label);
1214
1215 let name = {
1217 let prefix = self.name.clone();
1218 if prefix.is_empty() {
1219 label.to_string()
1220 } else {
1221 format!("{prefix}_{label}")
1222 }
1223 };
1224 assert!(
1225 !name.starts_with(METRICS_PREFIX),
1226 "using runtime label is not allowed"
1227 );
1228 Self {
1229 name,
1230 ..self.clone()
1231 }
1232 }
1233
1234 fn with_attribute(&self, key: &str, value: impl std::fmt::Display) -> Self {
1235 validate_label(key);
1237
1238 let mut attributes = self.attributes.clone();
1240 assert!(
1241 add_attribute(&mut attributes, key, value),
1242 "duplicate attribute key: {key}"
1243 );
1244 Self {
1245 attributes,
1246 ..self.clone()
1247 }
1248 }
1249
1250 fn label(&self) -> String {
1251 self.name.clone()
1252 }
1253
1254 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
1255 let name = name.into();
1257 let help = help.into();
1258
1259 let executor = self.executor();
1261 executor.auditor.event(b"register", |hasher| {
1262 hasher.update(name.as_bytes());
1263 hasher.update(help.as_bytes());
1264 for (k, v) in &self.attributes {
1265 hasher.update(k.as_bytes());
1266 hasher.update(v.as_bytes());
1267 }
1268 });
1269 let prefixed_name = {
1270 let prefix = &self.name;
1271 if prefix.is_empty() {
1272 name
1273 } else {
1274 format!("{}_{}", *prefix, name)
1275 }
1276 };
1277
1278 let metric_key = (prefixed_name.clone(), self.attributes.clone());
1280 let is_new = executor
1281 .registered_metrics
1282 .lock()
1283 .unwrap()
1284 .insert(metric_key);
1285 assert!(
1286 is_new,
1287 "duplicate metric: {} with attributes {:?}",
1288 prefixed_name, self.attributes
1289 );
1290
1291 let mut registry = executor.registry.lock().unwrap();
1293 let sub_registry = self.attributes.iter().fold(&mut *registry, |reg, (k, v)| {
1294 reg.sub_registry_with_label((Cow::Owned(k.clone()), Cow::Owned(v.clone())))
1295 });
1296 sub_registry.register(prefixed_name, help, metric);
1297 }
1298
1299 fn encode(&self) -> String {
1300 let executor = self.executor();
1301 executor.auditor.event(b"encode", |_| {});
1302 let mut encoder = MetricEncoder::new();
1303 encode(&mut encoder, &executor.registry.lock().unwrap()).expect("encoding failed");
1304 encoder.into_string()
1305 }
1306}
1307
1308struct Sleeper {
1309 executor: Weak<Executor>,
1310 time: SystemTime,
1311 registered: bool,
1312}
1313
1314impl Sleeper {
1315 fn executor(&self) -> Arc<Executor> {
1317 self.executor.upgrade().expect("executor already dropped")
1318 }
1319}
1320
1321struct Alarm {
1322 time: SystemTime,
1323 waker: Waker,
1324}
1325
1326impl PartialEq for Alarm {
1327 fn eq(&self, other: &Self) -> bool {
1328 self.time.eq(&other.time)
1329 }
1330}
1331
1332impl Eq for Alarm {}
1333
1334impl PartialOrd for Alarm {
1335 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1336 Some(self.cmp(other))
1337 }
1338}
1339
1340impl Ord for Alarm {
1341 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1342 other.time.cmp(&self.time)
1344 }
1345}
1346
1347impl Future for Sleeper {
1348 type Output = ();
1349
1350 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1351 let executor = self.executor();
1352 {
1353 let current_time = *executor.time.lock().unwrap();
1354 if current_time >= self.time {
1355 return Poll::Ready(());
1356 }
1357 }
1358 if !self.registered {
1359 self.registered = true;
1360 executor.sleeping.lock().unwrap().push(Alarm {
1361 time: self.time,
1362 waker: cx.waker().clone(),
1363 });
1364 }
1365 Poll::Pending
1366 }
1367}
1368
1369impl Clock for Context {
1370 fn current(&self) -> SystemTime {
1371 *self.executor().time.lock().unwrap()
1372 }
1373
1374 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
1375 let deadline = self
1376 .current()
1377 .checked_add(duration)
1378 .expect("overflow when setting wake time");
1379 self.sleep_until(deadline)
1380 }
1381
1382 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
1383 Sleeper {
1384 executor: self.executor.clone(),
1385
1386 time: deadline,
1387 registered: false,
1388 }
1389 }
1390}
1391
1392#[cfg(feature = "external")]
1396#[pin_project]
1397struct Waiter<F: Future> {
1398 executor: Weak<Executor>,
1399 target: SystemTime,
1400 #[pin]
1401 future: F,
1402 ready: Option<F::Output>,
1403 started: bool,
1404 registered: bool,
1405}
1406
1407#[cfg(feature = "external")]
1408impl<F> Future for Waiter<F>
1409where
1410 F: Future + Send,
1411{
1412 type Output = F::Output;
1413
1414 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1415 let mut this = self.project();
1416
1417 if !*this.started {
1421 *this.started = true;
1422 let waker = noop_waker();
1423 let mut cx_noop = task::Context::from_waker(&waker);
1424 if let Poll::Ready(value) = this.future.as_mut().poll(&mut cx_noop) {
1425 *this.ready = Some(value);
1426 }
1427 }
1428
1429 let executor = this.executor.upgrade().expect("executor already dropped");
1431 let current_time = *executor.time.lock().unwrap();
1432 if current_time < *this.target {
1433 if !*this.registered {
1436 *this.registered = true;
1437 executor.sleeping.lock().unwrap().push(Alarm {
1438 time: *this.target,
1439 waker: cx.waker().clone(),
1440 });
1441 }
1442 return Poll::Pending;
1443 }
1444
1445 if let Some(value) = this.ready.take() {
1447 return Poll::Ready(value);
1448 }
1449
1450 let blocker = Blocker::new();
1453 loop {
1454 let waker = waker(blocker.clone());
1455 let mut cx_block = task::Context::from_waker(&waker);
1456 match this.future.as_mut().poll(&mut cx_block) {
1457 Poll::Ready(value) => {
1458 break Poll::Ready(value);
1459 }
1460 Poll::Pending => blocker.wait(),
1461 }
1462 }
1463 }
1464}
1465
1466#[cfg(feature = "external")]
1467impl Pacer for Context {
1468 fn pace<'a, F, T>(&'a self, latency: Duration, future: F) -> impl Future<Output = T> + Send + 'a
1469 where
1470 F: Future<Output = T> + Send + 'a,
1471 T: Send + 'a,
1472 {
1473 let target = self
1475 .executor()
1476 .time
1477 .lock()
1478 .unwrap()
1479 .checked_add(latency)
1480 .expect("overflow when setting wake time");
1481
1482 Waiter {
1483 executor: self.executor.clone(),
1484 target,
1485 future,
1486 ready: None,
1487 started: false,
1488 registered: false,
1489 }
1490 }
1491}
1492
1493impl GClock for Context {
1494 type Instant = SystemTime;
1495
1496 fn now(&self) -> Self::Instant {
1497 self.current()
1498 }
1499}
1500
1501impl ReasonablyRealtime for Context {}
1502
1503impl crate::Network for Context {
1504 type Listener = ListenerOf<Network>;
1505
1506 async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
1507 self.network.bind(socket).await
1508 }
1509
1510 async fn dial(
1511 &self,
1512 socket: SocketAddr,
1513 ) -> Result<(crate::SinkOf<Self>, crate::StreamOf<Self>), Error> {
1514 self.network.dial(socket).await
1515 }
1516}
1517
1518impl crate::Resolver for Context {
1519 async fn resolve(&self, host: &str) -> Result<Vec<IpAddr>, Error> {
1520 let executor = self.executor();
1522 let dns = executor.dns.lock().unwrap();
1523 let result = dns.get(host).cloned();
1524 drop(dns);
1525
1526 executor.auditor.event(b"resolve", |hasher| {
1528 hasher.update(host.as_bytes());
1529 hasher.update(result.encode());
1530 });
1531 result.ok_or_else(|| Error::ResolveFailed(host.to_string()))
1532 }
1533}
1534
1535impl RngCore for Context {
1536 fn next_u32(&mut self) -> u32 {
1537 let executor = self.executor();
1538 executor.auditor.event(b"rand", |hasher| {
1539 hasher.update(b"next_u32");
1540 });
1541 let result = executor.rng.lock().unwrap().next_u32();
1542 result
1543 }
1544
1545 fn next_u64(&mut self) -> u64 {
1546 let executor = self.executor();
1547 executor.auditor.event(b"rand", |hasher| {
1548 hasher.update(b"next_u64");
1549 });
1550 let result = executor.rng.lock().unwrap().next_u64();
1551 result
1552 }
1553
1554 fn fill_bytes(&mut self, dest: &mut [u8]) {
1555 let executor = self.executor();
1556 executor.auditor.event(b"rand", |hasher| {
1557 hasher.update(b"fill_bytes");
1558 });
1559 executor.rng.lock().unwrap().fill_bytes(dest);
1560 }
1561
1562 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1563 let executor = self.executor();
1564 executor.auditor.event(b"rand", |hasher| {
1565 hasher.update(b"try_fill_bytes");
1566 });
1567 let result = executor.rng.lock().unwrap().try_fill_bytes(dest);
1568 result
1569 }
1570}
1571
1572impl CryptoRng for Context {}
1573
1574impl crate::Storage for Context {
1575 type Blob = <Storage as crate::Storage>::Blob;
1576
1577 async fn open_versioned(
1578 &self,
1579 partition: &str,
1580 name: &[u8],
1581 versions: std::ops::RangeInclusive<u16>,
1582 ) -> Result<(Self::Blob, u64, u16), Error> {
1583 self.storage.open_versioned(partition, name, versions).await
1584 }
1585
1586 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1587 self.storage.remove(partition, name).await
1588 }
1589
1590 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1591 self.storage.scan(partition).await
1592 }
1593}
1594
1595impl crate::BufferPooler for Context {
1596 fn network_buffer_pool(&self) -> &crate::BufferPool {
1597 &self.network_buffer_pool
1598 }
1599
1600 fn storage_buffer_pool(&self) -> &crate::BufferPool {
1601 &self.storage_buffer_pool
1602 }
1603}
1604
1605#[cfg(test)]
1606mod tests {
1607 use super::*;
1608 #[cfg(feature = "external")]
1609 use crate::FutureExt;
1610 #[cfg(feature = "external")]
1611 use crate::Spawner;
1612 use crate::{
1613 deterministic, reschedule, Blob, IoBufMut, Metrics, Resolver, Runner as _, Storage,
1614 };
1615 use commonware_macros::test_traced;
1616 #[cfg(feature = "external")]
1617 use commonware_utils::channel::mpsc;
1618 use commonware_utils::channel::oneshot;
1619 #[cfg(not(feature = "external"))]
1620 use futures::future::pending;
1621 #[cfg(not(feature = "external"))]
1622 use futures::stream::StreamExt as _;
1623 #[cfg(feature = "external")]
1624 use futures::StreamExt;
1625 use futures::{stream::FuturesUnordered, task::noop_waker};
1626
1627 async fn task(i: usize) -> usize {
1628 for _ in 0..5 {
1629 reschedule().await;
1630 }
1631 i
1632 }
1633
1634 fn run_tasks(tasks: usize, runner: deterministic::Runner) -> (String, Vec<usize>) {
1635 runner.start(|context| async move {
1636 let mut handles = FuturesUnordered::new();
1637 for i in 0..=tasks - 1 {
1638 handles.push(context.clone().spawn(move |_| task(i)));
1639 }
1640
1641 let mut outputs = Vec::new();
1642 while let Some(result) = handles.next().await {
1643 outputs.push(result.unwrap());
1644 }
1645 assert_eq!(outputs.len(), tasks);
1646 (context.auditor().state(), outputs)
1647 })
1648 }
1649
1650 fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1651 let executor = deterministic::Runner::seeded(seed);
1652 run_tasks(5, executor)
1653 }
1654
1655 #[test]
1656 fn test_same_seed_same_order() {
1657 let mut outputs = Vec::new();
1659 for seed in 0..1000 {
1660 let output = run_with_seed(seed);
1661 outputs.push(output);
1662 }
1663
1664 for seed in 0..1000 {
1666 let output = run_with_seed(seed);
1667 assert_eq!(output, outputs[seed as usize]);
1668 }
1669 }
1670
1671 #[test_traced("TRACE")]
1672 fn test_different_seeds_different_order() {
1673 let output1 = run_with_seed(12345);
1674 let output2 = run_with_seed(54321);
1675 assert_ne!(output1, output2);
1676 }
1677
1678 #[test]
1679 fn test_alarm_min_heap() {
1680 let now = SystemTime::now();
1682 let alarms = vec![
1683 Alarm {
1684 time: now + Duration::new(10, 0),
1685 waker: noop_waker(),
1686 },
1687 Alarm {
1688 time: now + Duration::new(5, 0),
1689 waker: noop_waker(),
1690 },
1691 Alarm {
1692 time: now + Duration::new(15, 0),
1693 waker: noop_waker(),
1694 },
1695 Alarm {
1696 time: now + Duration::new(5, 0),
1697 waker: noop_waker(),
1698 },
1699 ];
1700 let mut heap = BinaryHeap::new();
1701 for alarm in alarms {
1702 heap.push(alarm);
1703 }
1704
1705 let mut sorted_times = Vec::new();
1707 while let Some(alarm) = heap.pop() {
1708 sorted_times.push(alarm.time);
1709 }
1710 assert_eq!(
1711 sorted_times,
1712 vec![
1713 now + Duration::new(5, 0),
1714 now + Duration::new(5, 0),
1715 now + Duration::new(10, 0),
1716 now + Duration::new(15, 0),
1717 ]
1718 );
1719 }
1720
1721 #[test]
1722 #[should_panic(expected = "runtime timeout")]
1723 fn test_timeout() {
1724 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1725 executor.start(|context| async move {
1726 loop {
1727 context.sleep(Duration::from_secs(1)).await;
1728 }
1729 });
1730 }
1731
1732 #[test]
1733 #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1734 fn test_bad_timeout() {
1735 let cfg = Config {
1736 timeout: Some(Duration::default()),
1737 cycle: Duration::default(),
1738 ..Config::default()
1739 };
1740 deterministic::Runner::new(cfg);
1741 }
1742
1743 #[test]
1744 #[should_panic(
1745 expected = "cycle duration must be greater than or equal to system time precision"
1746 )]
1747 fn test_bad_cycle() {
1748 let cfg = Config {
1749 cycle: SYSTEM_TIME_PRECISION - Duration::from_nanos(1),
1750 ..Config::default()
1751 };
1752 deterministic::Runner::new(cfg);
1753 }
1754
1755 #[test]
1756 fn test_recover_synced_storage_persists() {
1757 let executor1 = deterministic::Runner::default();
1759 let partition = "test_partition";
1760 let name = b"test_blob";
1761 let data = b"Hello, world!";
1762
1763 let (state, checkpoint) = executor1.start_and_recover(|context| async move {
1765 let (blob, _) = context.open(partition, name).await.unwrap();
1766 blob.write_at(0, data).await.unwrap();
1767 blob.sync().await.unwrap();
1768 context.auditor().state()
1769 });
1770
1771 assert_eq!(state, checkpoint.auditor.state());
1773
1774 let executor = Runner::from(checkpoint);
1776 executor.start(|context| async move {
1777 let (blob, len) = context.open(partition, name).await.unwrap();
1778 assert_eq!(len, data.len() as u64);
1779 let read = blob.read_at(0, IoBufMut::zeroed(data.len())).await.unwrap();
1780 assert_eq!(read.coalesce(), data);
1781 });
1782 }
1783
1784 #[test]
1785 #[should_panic(expected = "goodbye")]
1786 fn test_recover_panic_handling() {
1787 let executor1 = deterministic::Runner::default();
1789 let (_, checkpoint) = executor1.start_and_recover(|_| async move {
1790 reschedule().await;
1791 });
1792
1793 let executor = Runner::from(checkpoint);
1795 executor.start(|_| async move {
1796 panic!("goodbye");
1797 });
1798 }
1799
1800 #[test]
1801 fn test_recover_unsynced_storage_does_not_persist() {
1802 let executor = deterministic::Runner::default();
1804 let partition = "test_partition";
1805 let name = b"test_blob";
1806 let data = b"Hello, world!";
1807
1808 let (_, checkpoint) = executor.start_and_recover(|context| async move {
1810 let context = context.clone();
1811 let (blob, _) = context.open(partition, name).await.unwrap();
1812 blob.write_at(0, data).await.unwrap();
1813 });
1814
1815 let executor = Runner::from(checkpoint);
1817
1818 executor.start(|context| async move {
1820 let (_, len) = context.open(partition, name).await.unwrap();
1821 assert_eq!(len, 0);
1822 });
1823 }
1824
1825 #[test]
1826 fn test_recover_dns_mappings_persist() {
1827 let executor = deterministic::Runner::default();
1829 let host = "example.com";
1830 let addrs = vec![
1831 IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 1)),
1832 IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 2)),
1833 ];
1834
1835 let (state, checkpoint) = executor.start_and_recover({
1837 let addrs = addrs.clone();
1838 |context| async move {
1839 context.resolver_register(host, Some(addrs));
1840 context.auditor().state()
1841 }
1842 });
1843
1844 assert_eq!(state, checkpoint.auditor.state());
1846
1847 let executor = Runner::from(checkpoint);
1849 executor.start(move |context| async move {
1850 let resolved = context.resolve(host).await.unwrap();
1851 assert_eq!(resolved, addrs);
1852 });
1853 }
1854
1855 #[test]
1856 #[should_panic(expected = "executor still has weak references")]
1857 fn test_context_return() {
1858 let executor = deterministic::Runner::default();
1860
1861 let context = executor.start(|context| async move {
1863 context
1865 });
1866
1867 drop(context);
1869 }
1870
1871 #[test]
1872 fn test_default_time_zero() {
1873 let executor = deterministic::Runner::default();
1875
1876 executor.start(|context| async move {
1877 assert_eq!(
1879 context.current().duration_since(UNIX_EPOCH).unwrap(),
1880 Duration::ZERO
1881 );
1882 });
1883 }
1884
1885 #[cfg(not(feature = "external"))]
1886 #[test]
1887 #[should_panic(expected = "runtime stalled")]
1888 fn test_stall() {
1889 let executor = deterministic::Runner::default();
1891
1892 executor.start(|_| async move {
1894 pending::<()>().await;
1895 });
1896 }
1897
1898 #[cfg(not(feature = "external"))]
1899 #[test]
1900 #[should_panic(expected = "runtime stalled")]
1901 fn test_external_simulated() {
1902 let executor = deterministic::Runner::default();
1904
1905 let (tx, rx) = oneshot::channel();
1907 std::thread::spawn(move || {
1908 std::thread::sleep(Duration::from_secs(1));
1909 tx.send(()).unwrap();
1910 });
1911
1912 executor.start(|_| async move {
1914 rx.await.unwrap();
1915 });
1916 }
1917
1918 #[cfg(feature = "external")]
1919 #[test]
1920 fn test_external_realtime() {
1921 let executor = deterministic::Runner::default();
1923
1924 let (tx, rx) = oneshot::channel();
1926 std::thread::spawn(move || {
1927 std::thread::sleep(Duration::from_secs(1));
1928 tx.send(()).unwrap();
1929 });
1930
1931 executor.start(|_| async move {
1933 rx.await.unwrap();
1934 });
1935 }
1936
1937 #[cfg(feature = "external")]
1938 #[test]
1939 fn test_external_realtime_variable() {
1940 let executor = deterministic::Runner::default();
1942
1943 executor.start(|context| async move {
1945 let start_real = SystemTime::now();
1947 let start_sim = context.current();
1948 let (first_tx, first_rx) = oneshot::channel();
1949 let (second_tx, second_rx) = oneshot::channel();
1950 let (results_tx, mut results_rx) = mpsc::channel(2);
1951
1952 let first_wait = Duration::from_secs(1);
1954 std::thread::spawn(move || {
1955 std::thread::sleep(first_wait);
1956 first_tx.send(()).unwrap();
1957 });
1958
1959 std::thread::spawn(move || {
1961 std::thread::sleep(Duration::ZERO);
1962 second_tx.send(()).unwrap();
1963 });
1964
1965 let first = context.clone().spawn({
1967 let results_tx = results_tx.clone();
1968 move |context| async move {
1969 first_rx.pace(&context, Duration::ZERO).await.unwrap();
1970 let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
1971 assert!(elapsed_real > first_wait);
1972 let elapsed_sim = context.current().duration_since(start_sim).unwrap();
1973 assert!(elapsed_sim < first_wait);
1974 results_tx.send(1).await.unwrap();
1975 }
1976 });
1977
1978 let second = context.clone().spawn(move |context| async move {
1980 second_rx.pace(&context, first_wait).await.unwrap();
1981 let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
1982 assert!(elapsed_real >= first_wait);
1983 let elapsed_sim = context.current().duration_since(start_sim).unwrap();
1984 assert!(elapsed_sim >= first_wait);
1985 results_tx.send(2).await.unwrap();
1986 });
1987
1988 second.await.unwrap();
1990 first.await.unwrap();
1991
1992 let mut results = Vec::new();
1994 for _ in 0..2 {
1995 results.push(results_rx.recv().await.unwrap());
1996 }
1997 assert_eq!(results, vec![1, 2]);
1998 });
1999 }
2000
2001 #[cfg(not(feature = "external"))]
2002 #[test]
2003 fn test_simulated_skip() {
2004 let executor = deterministic::Runner::default();
2006
2007 executor.start(|context| async move {
2009 context.sleep(Duration::from_secs(1)).await;
2010
2011 let metrics = context.encode();
2013 let iterations = metrics
2014 .lines()
2015 .find_map(|line| {
2016 line.strip_prefix("runtime_iterations_total ")
2017 .and_then(|value| value.trim().parse::<u64>().ok())
2018 })
2019 .expect("missing runtime_iterations_total metric");
2020 assert!(iterations < 10);
2021 });
2022 }
2023
2024 #[cfg(feature = "external")]
2025 #[test]
2026 fn test_realtime_no_skip() {
2027 let executor = deterministic::Runner::default();
2029
2030 executor.start(|context| async move {
2032 context.sleep(Duration::from_secs(1)).await;
2033
2034 let metrics = context.encode();
2036 let iterations = metrics
2037 .lines()
2038 .find_map(|line| {
2039 line.strip_prefix("runtime_iterations_total ")
2040 .and_then(|value| value.trim().parse::<u64>().ok())
2041 })
2042 .expect("missing runtime_iterations_total metric");
2043 assert!(iterations > 500);
2044 });
2045 }
2046
2047 #[test]
2048 #[should_panic(expected = "label must start with [a-zA-Z]")]
2049 fn test_metrics_label_empty() {
2050 let executor = deterministic::Runner::default();
2051 executor.start(|context| async move {
2052 context.with_label("");
2053 });
2054 }
2055
2056 #[test]
2057 #[should_panic(expected = "label must start with [a-zA-Z]")]
2058 fn test_metrics_label_invalid_first_char() {
2059 let executor = deterministic::Runner::default();
2060 executor.start(|context| async move {
2061 context.with_label("1invalid");
2062 });
2063 }
2064
2065 #[test]
2066 #[should_panic(expected = "label must only contain [a-zA-Z0-9_]")]
2067 fn test_metrics_label_invalid_char() {
2068 let executor = deterministic::Runner::default();
2069 executor.start(|context| async move {
2070 context.with_label("invalid-label");
2071 });
2072 }
2073
2074 #[test]
2075 #[should_panic(expected = "using runtime label is not allowed")]
2076 fn test_metrics_label_reserved_prefix() {
2077 let executor = deterministic::Runner::default();
2078 executor.start(|context| async move {
2079 context.with_label(METRICS_PREFIX);
2080 });
2081 }
2082
2083 #[test]
2084 #[should_panic(expected = "duplicate attribute key: epoch")]
2085 fn test_metrics_duplicate_attribute_panics() {
2086 let executor = deterministic::Runner::default();
2087 executor.start(|context| async move {
2088 let _ = context
2089 .with_label("test")
2090 .with_attribute("epoch", "old")
2091 .with_attribute("epoch", "new");
2092 });
2093 }
2094
2095 #[test]
2096 fn test_storage_fault_injection_and_recovery() {
2097 let cfg = deterministic::Config::default().with_storage_faults(FaultConfig {
2099 sync_rate: Some(1.0),
2100 ..Default::default()
2101 });
2102
2103 let (result, checkpoint) =
2104 deterministic::Runner::new(cfg).start_and_recover(|ctx| async move {
2105 let (blob, _) = ctx.open("test_fault", b"blob").await.unwrap();
2106 blob.write_at(0, b"data".to_vec()).await.unwrap();
2107 blob.sync().await });
2109
2110 assert!(result.is_err());
2112
2113 deterministic::Runner::from(checkpoint).start(|ctx| async move {
2115 *ctx.storage_faults().write().unwrap() = FaultConfig::default();
2117
2118 let (blob, len) = ctx.open("test_fault", b"blob").await.unwrap();
2120 assert_eq!(len, 0, "unsynced data should be lost after recovery");
2121
2122 blob.write_at(0, b"recovered".to_vec()).await.unwrap();
2124 blob.sync()
2125 .await
2126 .expect("sync should succeed with faults disabled");
2127
2128 let read_buf = blob.read_at(0, vec![0u8; 9]).await.unwrap();
2130 assert_eq!(read_buf.coalesce(), b"recovered");
2131 });
2132 }
2133
2134 #[test]
2135 fn test_storage_fault_dynamic_config() {
2136 let executor = deterministic::Runner::default();
2137 executor.start(|ctx| async move {
2138 let (blob, _) = ctx.open("test_dynamic", b"blob").await.unwrap();
2139
2140 blob.write_at(0, b"initial".to_vec()).await.unwrap();
2142 blob.sync().await.expect("initial sync should succeed");
2143
2144 let faults = ctx.storage_faults();
2146 faults.write().unwrap().sync_rate = Some(1.0);
2147
2148 blob.write_at(0, b"updated".to_vec()).await.unwrap();
2150 let result = blob.sync().await;
2151 assert!(result.is_err(), "sync should fail with faults enabled");
2152
2153 faults.write().unwrap().sync_rate = Some(0.0);
2155
2156 blob.sync()
2158 .await
2159 .expect("sync should succeed with faults disabled");
2160 });
2161 }
2162
2163 #[test]
2164 fn test_storage_fault_determinism() {
2165 fn run_with_seed(seed: u64) -> Vec<bool> {
2167 let cfg = deterministic::Config::default()
2168 .with_seed(seed)
2169 .with_storage_faults(FaultConfig {
2170 open_rate: Some(0.5),
2171 ..Default::default()
2172 });
2173
2174 let runner = deterministic::Runner::new(cfg);
2175 runner.start(|ctx| async move {
2176 let mut results = Vec::new();
2177 for i in 0..20 {
2178 let name = format!("blob{i}");
2179 let result = ctx.open("test_determinism", name.as_bytes()).await;
2180 results.push(result.is_ok());
2181 }
2182 results
2183 })
2184 }
2185
2186 let results1 = run_with_seed(12345);
2187 let results2 = run_with_seed(12345);
2188 assert_eq!(
2189 results1, results2,
2190 "same seed should produce same failure pattern"
2191 );
2192
2193 let results3 = run_with_seed(99999);
2194 assert_ne!(
2195 results1, results3,
2196 "different seeds should produce different patterns"
2197 );
2198 }
2199
2200 #[test]
2201 fn test_storage_fault_determinism_multi_task() {
2202 fn run_with_seed(seed: u64) -> Vec<u32> {
2205 let cfg = deterministic::Config::default()
2206 .with_seed(seed)
2207 .with_storage_faults(FaultConfig {
2208 open_rate: Some(0.5),
2209 write_rate: Some(0.3),
2210 sync_rate: Some(0.2),
2211 ..Default::default()
2212 });
2213
2214 let runner = deterministic::Runner::new(cfg);
2215 runner.start(|ctx| async move {
2216 let mut handles = Vec::new();
2218 for i in 0..5 {
2219 let ctx = ctx.clone();
2220 handles.push(ctx.spawn(move |ctx| async move {
2221 let mut successes = 0u32;
2222 for j in 0..4 {
2223 let name = format!("task{i}_blob{j}");
2224 if let Ok((blob, _)) = ctx.open("partition", name.as_bytes()).await {
2225 successes += 1;
2226 if blob.write_at(0, b"data".to_vec()).await.is_ok() {
2227 successes += 1;
2228 }
2229 if blob.sync().await.is_ok() {
2230 successes += 1;
2231 }
2232 }
2233 }
2234 successes
2235 }));
2236 }
2237
2238 let mut results = Vec::new();
2240 for handle in handles {
2241 results.push(handle.await.unwrap());
2242 }
2243 results
2244 })
2245 }
2246
2247 let results1 = run_with_seed(42);
2248 let results2 = run_with_seed(42);
2249 assert_eq!(
2250 results1, results2,
2251 "same seed should produce same multi-task pattern"
2252 );
2253
2254 let results3 = run_with_seed(99999);
2255 assert_ne!(
2256 results1, results3,
2257 "different seeds should produce different patterns"
2258 );
2259 }
2260}