1use crate::{
40 network::{
41 audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
42 metered::Network as MeteredNetwork,
43 },
44 storage::{
45 audited::Storage as AuditedStorage, memory::Storage as MemStorage,
46 metered::Storage as MeteredStorage,
47 },
48 telemetry::metrics::task::Label,
49 utils::{
50 signal::{Signal, Stopper},
51 supervision::Tree,
52 Panicker,
53 },
54 validate_label, Clock, Error, Execution, Handle, ListenerOf, Panicked, METRICS_PREFIX,
55};
56#[cfg(feature = "external")]
57use crate::{Blocker, Pacer};
58use commonware_codec::Encode;
59use commonware_macros::select;
60use commonware_utils::{hex, time::SYSTEM_TIME_PRECISION, SystemTimeExt};
61#[cfg(feature = "external")]
62use futures::task::noop_waker;
63use futures::{
64 future::BoxFuture,
65 task::{waker, ArcWake},
66 Future, FutureExt,
67};
68use governor::clock::{Clock as GClock, ReasonablyRealtime};
69#[cfg(feature = "external")]
70use pin_project::pin_project;
71use prometheus_client::{
72 encoding::text::encode,
73 metrics::{counter::Counter, family::Family, gauge::Gauge},
74 registry::{Metric, Registry},
75};
76use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
77use sha2::{Digest as _, Sha256};
78use std::{
79 collections::{BTreeMap, BinaryHeap, HashMap},
80 mem::{replace, take},
81 net::{IpAddr, SocketAddr},
82 panic::{catch_unwind, resume_unwind, AssertUnwindSafe},
83 pin::Pin,
84 sync::{Arc, Mutex, Weak},
85 task::{self, Poll, Waker},
86 time::{Duration, SystemTime, UNIX_EPOCH},
87};
88use tracing::{info_span, trace, Instrument};
89
90#[derive(Debug)]
91struct Metrics {
92 iterations: Counter,
93 tasks_spawned: Family<Label, Counter>,
94 tasks_running: Family<Label, Gauge>,
95 task_polls: Family<Label, Counter>,
96
97 network_bandwidth: Counter,
98}
99
100impl Metrics {
101 pub fn init(registry: &mut Registry) -> Self {
102 let metrics = Self {
103 iterations: Counter::default(),
104 task_polls: Family::default(),
105 tasks_spawned: Family::default(),
106 tasks_running: Family::default(),
107 network_bandwidth: Counter::default(),
108 };
109 registry.register(
110 "iterations",
111 "Total number of iterations",
112 metrics.iterations.clone(),
113 );
114 registry.register(
115 "tasks_spawned",
116 "Total number of tasks spawned",
117 metrics.tasks_spawned.clone(),
118 );
119 registry.register(
120 "tasks_running",
121 "Number of tasks currently running",
122 metrics.tasks_running.clone(),
123 );
124 registry.register(
125 "task_polls",
126 "Total number of task polls",
127 metrics.task_polls.clone(),
128 );
129 registry.register(
130 "bandwidth",
131 "Total amount of data sent over network",
132 metrics.network_bandwidth.clone(),
133 );
134 metrics
135 }
136}
137
138type Digest = [u8; 32];
140
141pub struct Auditor {
143 digest: Mutex<Digest>,
144}
145
146impl Default for Auditor {
147 fn default() -> Self {
148 Self {
149 digest: Digest::default().into(),
150 }
151 }
152}
153
154impl Auditor {
155 pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
159 where
160 F: FnOnce(&mut Sha256),
161 {
162 let mut digest = self.digest.lock().unwrap();
163
164 let mut hasher = Sha256::new();
165 hasher.update(digest.as_ref());
166 hasher.update(label);
167 payload(&mut hasher);
168
169 *digest = hasher.finalize().into();
170 }
171
172 pub fn state(&self) -> String {
177 let hash = self.digest.lock().unwrap();
178 hex(hash.as_ref())
179 }
180}
181
182#[derive(Clone)]
184pub struct Config {
185 seed: u64,
187
188 cycle: Duration,
191
192 timeout: Option<Duration>,
194
195 catch_panics: bool,
197}
198
199impl Config {
200 pub const fn new() -> Self {
202 Self {
203 seed: 42,
204 cycle: Duration::from_millis(1),
205 timeout: None,
206 catch_panics: false,
207 }
208 }
209
210 pub const fn with_seed(mut self, seed: u64) -> Self {
213 self.seed = seed;
214 self
215 }
216 pub const fn with_cycle(mut self, cycle: Duration) -> Self {
218 self.cycle = cycle;
219 self
220 }
221 pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
223 self.timeout = timeout;
224 self
225 }
226 pub const fn with_catch_panics(mut self, catch_panics: bool) -> Self {
228 self.catch_panics = catch_panics;
229 self
230 }
231
232 pub const fn seed(&self) -> u64 {
235 self.seed
236 }
237 pub const fn cycle(&self) -> Duration {
239 self.cycle
240 }
241 pub const fn timeout(&self) -> Option<Duration> {
243 self.timeout
244 }
245 pub const fn catch_panics(&self) -> bool {
247 self.catch_panics
248 }
249
250 pub fn assert(&self) {
252 assert!(
253 self.cycle != Duration::default() || self.timeout.is_none(),
254 "cycle duration must be non-zero when timeout is set",
255 );
256 assert!(
257 self.cycle >= SYSTEM_TIME_PRECISION,
258 "cycle duration must be greater than or equal to system time precision"
259 );
260 }
261}
262
263impl Default for Config {
264 fn default() -> Self {
265 Self::new()
266 }
267}
268
269pub struct Executor {
271 registry: Mutex<Registry>,
272 cycle: Duration,
273 deadline: Option<SystemTime>,
274 metrics: Arc<Metrics>,
275 auditor: Arc<Auditor>,
276 rng: Mutex<StdRng>,
277 time: Mutex<SystemTime>,
278 tasks: Arc<Tasks>,
279 sleeping: Mutex<BinaryHeap<Alarm>>,
280 shutdown: Mutex<Stopper>,
281 panicker: Panicker,
282 dns: Mutex<HashMap<String, Vec<IpAddr>>>,
283}
284
285impl Executor {
286 fn advance_time(&self) -> SystemTime {
291 #[cfg(feature = "external")]
292 std::thread::sleep(self.cycle);
293
294 let mut time = self.time.lock().unwrap();
295 *time = time
296 .checked_add(self.cycle)
297 .expect("executor time overflowed");
298 let now = *time;
299 trace!(now = now.epoch_millis(), "time advanced");
300 now
301 }
302
303 fn skip_idle_time(&self, current: SystemTime) -> SystemTime {
308 if cfg!(feature = "external") || self.tasks.ready() != 0 {
309 return current;
310 }
311
312 let mut skip_until = None;
313 {
314 let sleeping = self.sleeping.lock().unwrap();
315 if let Some(next) = sleeping.peek() {
316 if next.time > current {
317 skip_until = Some(next.time);
318 }
319 }
320 }
321
322 skip_until.map_or(current, |deadline| {
323 let mut time = self.time.lock().unwrap();
324 *time = deadline;
325 let now = *time;
326 trace!(now = now.epoch_millis(), "time skipped");
327 now
328 })
329 }
330
331 fn wake_ready_sleepers(&self, current: SystemTime) {
333 let mut sleeping = self.sleeping.lock().unwrap();
334 while let Some(next) = sleeping.peek() {
335 if next.time <= current {
336 let sleeper = sleeping.pop().unwrap();
337 sleeper.waker.wake();
338 } else {
339 break;
340 }
341 }
342 }
343
344 fn assert_liveness(&self) {
348 if cfg!(feature = "external") || self.tasks.ready() != 0 {
349 return;
350 }
351
352 panic!("runtime stalled");
353 }
354}
355
356pub struct Checkpoint {
360 cycle: Duration,
361 deadline: Option<SystemTime>,
362 auditor: Arc<Auditor>,
363 rng: Mutex<StdRng>,
364 time: Mutex<SystemTime>,
365 storage: Arc<Storage>,
366 dns: Mutex<HashMap<String, Vec<IpAddr>>>,
367 catch_panics: bool,
368}
369
370impl Checkpoint {
371 pub fn auditor(&self) -> Arc<Auditor> {
373 self.auditor.clone()
374 }
375}
376
377#[allow(clippy::large_enum_variant)]
378enum State {
379 Config(Config),
380 Checkpoint(Checkpoint),
381}
382
383pub struct Runner {
385 state: State,
386}
387
388impl From<Config> for Runner {
389 fn from(cfg: Config) -> Self {
390 Self::new(cfg)
391 }
392}
393
394impl From<Checkpoint> for Runner {
395 fn from(checkpoint: Checkpoint) -> Self {
396 Self {
397 state: State::Checkpoint(checkpoint),
398 }
399 }
400}
401
402impl Runner {
403 pub fn new(cfg: Config) -> Self {
405 cfg.assert();
407 Self {
408 state: State::Config(cfg),
409 }
410 }
411
412 pub fn seeded(seed: u64) -> Self {
415 let cfg = Config {
416 seed,
417 ..Config::default()
418 };
419 Self::new(cfg)
420 }
421
422 pub fn timed(timeout: Duration) -> Self {
425 let cfg = Config {
426 timeout: Some(timeout),
427 ..Config::default()
428 };
429 Self::new(cfg)
430 }
431
432 pub fn start_and_recover<F, Fut>(self, f: F) -> (Fut::Output, Checkpoint)
435 where
436 F: FnOnce(Context) -> Fut,
437 Fut: Future,
438 {
439 let (context, executor, panicked) = match self.state {
441 State::Config(config) => Context::new(config),
442 State::Checkpoint(checkpoint) => Context::recover(checkpoint),
443 };
444
445 let storage = context.storage.clone();
447 let mut root = Box::pin(panicked.interrupt(f(context)));
448
449 Tasks::register_root(&executor.tasks);
451
452 let result = catch_unwind(AssertUnwindSafe(|| loop {
455 {
457 let current = executor.time.lock().unwrap();
458 if let Some(deadline) = executor.deadline {
459 if *current >= deadline {
460 drop(current);
462 panic!("runtime timeout");
463 }
464 }
465 }
466
467 let mut queue = executor.tasks.drain();
469
470 if queue.len() > 1 {
472 let mut rng = executor.rng.lock().unwrap();
473 queue.shuffle(&mut *rng);
474 }
475
476 trace!(
482 iter = executor.metrics.iterations.get(),
483 tasks = queue.len(),
484 "starting loop"
485 );
486 let mut output = None;
487 for id in queue {
488 let Some(task) = executor.tasks.get(id) else {
490 trace!(id, "skipping missing task");
491 continue;
492 };
493
494 executor.auditor.event(b"process_task", |hasher| {
496 hasher.update(task.id.to_be_bytes());
497 hasher.update(task.label.name().as_bytes());
498 });
499 executor.metrics.task_polls.get_or_create(&task.label).inc();
500 trace!(id, "processing task");
501
502 let waker = waker(Arc::new(TaskWaker {
504 id,
505 tasks: Arc::downgrade(&executor.tasks),
506 }));
507 let mut cx = task::Context::from_waker(&waker);
508
509 match &task.mode {
511 Mode::Root => {
512 if let Poll::Ready(result) = root.as_mut().poll(&mut cx) {
514 trace!(id, "root task is complete");
515 output = Some(result);
516 break;
517 }
518 }
519 Mode::Work(future) => {
520 let mut fut_opt = future.lock().unwrap();
522 let Some(fut) = fut_opt.as_mut() else {
523 trace!(id, "skipping already complete task");
524
525 executor.tasks.remove(id);
527 continue;
528 };
529
530 if fut.as_mut().poll(&mut cx).is_ready() {
532 trace!(id, "task is complete");
533
534 executor.tasks.remove(id);
536 *fut_opt = None;
537 continue;
538 }
539 }
540 }
541
542 trace!(id, "task is still pending");
544 }
545
546 if let Some(output) = output {
548 break output;
549 }
550
551 let mut current = executor.advance_time();
553 current = executor.skip_idle_time(current);
554
555 executor.wake_ready_sleepers(current);
557 executor.assert_liveness();
558
559 executor.metrics.iterations.inc();
561 }));
562
563 executor.sleeping.lock().unwrap().clear(); let tasks = executor.tasks.clear();
571 for task in tasks {
572 let Mode::Work(future) = &task.mode else {
573 continue;
574 };
575 *future.lock().unwrap() = None;
576 }
577
578 drop(root);
582
583 assert!(
586 Arc::weak_count(&executor) == 0,
587 "executor still has weak references"
588 );
589
590 let output = match result {
592 Ok(output) => output,
593 Err(payload) => resume_unwind(payload),
594 };
595
596 let executor = Arc::into_inner(executor).expect("executor still has strong references");
598
599 let checkpoint = Checkpoint {
601 cycle: executor.cycle,
602 deadline: executor.deadline,
603 auditor: executor.auditor,
604 rng: executor.rng,
605 time: executor.time,
606 storage,
607 dns: executor.dns,
608 catch_panics: executor.panicker.catch(),
609 };
610
611 (output, checkpoint)
612 }
613}
614
615impl Default for Runner {
616 fn default() -> Self {
617 Self::new(Config::default())
618 }
619}
620
621impl crate::Runner for Runner {
622 type Context = Context;
623
624 fn start<F, Fut>(self, f: F) -> Fut::Output
625 where
626 F: FnOnce(Self::Context) -> Fut,
627 Fut: Future,
628 {
629 let (output, _) = self.start_and_recover(f);
630 output
631 }
632}
633
634enum Mode {
636 Root,
637 Work(Mutex<Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>>),
638}
639
640struct Task {
642 id: u128,
643 label: Label,
644
645 mode: Mode,
646}
647
648struct TaskWaker {
650 id: u128,
651
652 tasks: Weak<Tasks>,
653}
654
655impl ArcWake for TaskWaker {
656 fn wake_by_ref(arc_self: &Arc<Self>) {
657 if let Some(tasks) = arc_self.tasks.upgrade() {
662 tasks.queue(arc_self.id);
663 }
664 }
665}
666
667struct Tasks {
669 counter: Mutex<u128>,
671 ready: Mutex<Vec<u128>>,
673 running: Mutex<BTreeMap<u128, Arc<Task>>>,
675}
676
677impl Tasks {
678 const fn new() -> Self {
680 Self {
681 counter: Mutex::new(0),
682 ready: Mutex::new(Vec::new()),
683 running: Mutex::new(BTreeMap::new()),
684 }
685 }
686
687 fn increment(&self) -> u128 {
689 let mut counter = self.counter.lock().unwrap();
690 let old = *counter;
691 *counter = counter.checked_add(1).expect("task counter overflow");
692 old
693 }
694
695 fn register_root(arc_self: &Arc<Self>) {
699 let id = arc_self.increment();
700 let task = Arc::new(Task {
701 id,
702 label: Label::root(),
703 mode: Mode::Root,
704 });
705 arc_self.register(id, task);
706 }
707
708 fn register_work(
710 arc_self: &Arc<Self>,
711 label: Label,
712 future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
713 ) {
714 let id = arc_self.increment();
715 let task = Arc::new(Task {
716 id,
717 label,
718 mode: Mode::Work(Mutex::new(Some(future))),
719 });
720 arc_self.register(id, task);
721 }
722
723 fn register(&self, id: u128, task: Arc<Task>) {
725 self.running.lock().unwrap().insert(id, task);
727
728 self.queue(id);
730 }
731
732 fn queue(&self, id: u128) {
734 let mut ready = self.ready.lock().unwrap();
735 ready.push(id);
736 }
737
738 fn drain(&self) -> Vec<u128> {
740 let mut queue = self.ready.lock().unwrap();
741 let len = queue.len();
742 replace(&mut *queue, Vec::with_capacity(len))
743 }
744
745 fn ready(&self) -> usize {
747 self.ready.lock().unwrap().len()
748 }
749
750 fn get(&self, id: u128) -> Option<Arc<Task>> {
755 let running = self.running.lock().unwrap();
756 running.get(&id).cloned()
757 }
758
759 fn remove(&self, id: u128) {
761 self.running.lock().unwrap().remove(&id);
762 }
763
764 fn clear(&self) -> Vec<Arc<Task>> {
766 self.ready.lock().unwrap().clear();
768
769 let running: BTreeMap<u128, Arc<Task>> = {
771 let mut running = self.running.lock().unwrap();
772 take(&mut *running)
773 };
774 running.into_values().collect()
775 }
776}
777
778type Network = MeteredNetwork<AuditedNetwork<DeterministicNetwork>>;
779type Storage = MeteredStorage<AuditedStorage<MemStorage>>;
780
781pub struct Context {
785 name: String,
786 executor: Weak<Executor>,
787 network: Arc<Network>,
788 storage: Arc<Storage>,
789 tree: Arc<Tree>,
790 execution: Execution,
791 instrumented: bool,
792}
793
794impl Clone for Context {
795 fn clone(&self) -> Self {
796 let (child, _) = Tree::child(&self.tree);
797 Self {
798 name: self.name.clone(),
799 executor: self.executor.clone(),
800 network: self.network.clone(),
801 storage: self.storage.clone(),
802
803 tree: child,
804 execution: Execution::default(),
805 instrumented: false,
806 }
807 }
808}
809
810impl Context {
811 fn new(cfg: Config) -> (Self, Arc<Executor>, Panicked) {
812 let mut registry = Registry::default();
814 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
815
816 let metrics = Arc::new(Metrics::init(runtime_registry));
818 let start_time = UNIX_EPOCH;
819 let deadline = cfg
820 .timeout
821 .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
822 let auditor = Arc::new(Auditor::default());
823 let storage = MeteredStorage::new(
824 AuditedStorage::new(MemStorage::default(), auditor.clone()),
825 runtime_registry,
826 );
827 let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
828 let network = MeteredNetwork::new(network, runtime_registry);
829
830 let (panicker, panicked) = Panicker::new(cfg.catch_panics);
832
833 let executor = Arc::new(Executor {
834 registry: Mutex::new(registry),
835 cycle: cfg.cycle,
836 deadline,
837 metrics,
838 auditor,
839 rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
840 time: Mutex::new(start_time),
841 tasks: Arc::new(Tasks::new()),
842 sleeping: Mutex::new(BinaryHeap::new()),
843 shutdown: Mutex::new(Stopper::default()),
844 panicker,
845 dns: Mutex::new(HashMap::new()),
846 });
847
848 (
849 Self {
850 name: String::new(),
851 executor: Arc::downgrade(&executor),
852 network: Arc::new(network),
853 storage: Arc::new(storage),
854 tree: Tree::root(),
855 execution: Execution::default(),
856 instrumented: false,
857 },
858 executor,
859 panicked,
860 )
861 }
862
863 fn recover(checkpoint: Checkpoint) -> (Self, Arc<Executor>, Panicked) {
875 let mut registry = Registry::default();
877 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
878 let metrics = Arc::new(Metrics::init(runtime_registry));
879
880 let network =
882 AuditedNetwork::new(DeterministicNetwork::default(), checkpoint.auditor.clone());
883 let network = MeteredNetwork::new(network, runtime_registry);
884
885 let (panicker, panicked) = Panicker::new(checkpoint.catch_panics);
887
888 let executor = Arc::new(Executor {
889 cycle: checkpoint.cycle,
891 deadline: checkpoint.deadline,
892 auditor: checkpoint.auditor,
893 rng: checkpoint.rng,
894 time: checkpoint.time,
895 dns: checkpoint.dns,
896
897 registry: Mutex::new(registry),
899 metrics,
900 tasks: Arc::new(Tasks::new()),
901 sleeping: Mutex::new(BinaryHeap::new()),
902 shutdown: Mutex::new(Stopper::default()),
903 panicker,
904 });
905 (
906 Self {
907 name: String::new(),
908 executor: Arc::downgrade(&executor),
909 network: Arc::new(network),
910 storage: checkpoint.storage,
911 tree: Tree::root(),
912 execution: Execution::default(),
913 instrumented: false,
914 },
915 executor,
916 panicked,
917 )
918 }
919
920 fn executor(&self) -> Arc<Executor> {
922 self.executor.upgrade().expect("executor already dropped")
923 }
924
925 fn metrics(&self) -> Arc<Metrics> {
927 self.executor().metrics.clone()
928 }
929
930 pub fn auditor(&self) -> Arc<Auditor> {
932 self.executor().auditor.clone()
933 }
934
935 pub fn resolver_register(&self, host: impl Into<String>, addrs: Option<Vec<IpAddr>>) {
940 let executor = self.executor();
942 let host = host.into();
943 executor.auditor.event(b"resolver_register", |hasher| {
944 hasher.update(host.as_bytes());
945 hasher.update(addrs.encode());
946 });
947
948 let mut dns = executor.dns.lock().unwrap();
950 match addrs {
951 Some(addrs) => {
952 dns.insert(host, addrs);
953 }
954 None => {
955 dns.remove(&host);
956 }
957 }
958 }
959}
960
961impl crate::Spawner for Context {
962 fn dedicated(mut self) -> Self {
963 self.execution = Execution::Dedicated;
964 self
965 }
966
967 fn shared(mut self, blocking: bool) -> Self {
968 self.execution = Execution::Shared(blocking);
969 self
970 }
971
972 fn instrumented(mut self) -> Self {
973 self.instrumented = true;
974 self
975 }
976
977 fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
978 where
979 F: FnOnce(Self) -> Fut + Send + 'static,
980 Fut: Future<Output = T> + Send + 'static,
981 T: Send + 'static,
982 {
983 let (label, metric) = spawn_metrics!(self);
985
986 let parent = Arc::clone(&self.tree);
988 let is_instrumented = self.instrumented;
989 self.execution = Execution::default();
990 self.instrumented = false;
991 let (child, aborted) = Tree::child(&parent);
992 if aborted {
993 return Handle::closed(metric);
994 }
995 self.tree = child;
996
997 let executor = self.executor();
999 let future: BoxFuture<'_, T> = if is_instrumented {
1000 f(self)
1001 .instrument(info_span!(parent: None, "task", name = %label.name()))
1002 .boxed()
1003 } else {
1004 f(self).boxed()
1005 };
1006 let (f, handle) = Handle::init(
1007 future,
1008 metric,
1009 executor.panicker.clone(),
1010 Arc::clone(&parent),
1011 );
1012 Tasks::register_work(&executor.tasks, label, Box::pin(f));
1013
1014 if let Some(aborter) = handle.aborter() {
1016 parent.register(aborter);
1017 }
1018
1019 handle
1020 }
1021
1022 async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
1023 let executor = self.executor();
1024 executor.auditor.event(b"stop", |hasher| {
1025 hasher.update(value.to_be_bytes());
1026 });
1027 let stop_resolved = {
1028 let mut shutdown = executor.shutdown.lock().unwrap();
1029 shutdown.stop(value)
1030 };
1031
1032 let timeout_future = timeout.map_or_else(
1034 || futures::future::Either::Right(futures::future::pending()),
1035 |duration| futures::future::Either::Left(self.sleep(duration)),
1036 );
1037 select! {
1038 result = stop_resolved => {
1039 result.map_err(|_| Error::Closed)?;
1040 Ok(())
1041 },
1042 _ = timeout_future => {
1043 Err(Error::Timeout)
1044 }
1045 }
1046 }
1047
1048 fn stopped(&self) -> Signal {
1049 let executor = self.executor();
1050 executor.auditor.event(b"stopped", |_| {});
1051 let stopped = executor.shutdown.lock().unwrap().stopped();
1052 stopped
1053 }
1054}
1055
1056impl crate::Metrics for Context {
1057 fn with_label(&self, label: &str) -> Self {
1058 validate_label(label);
1060
1061 let name = {
1063 let prefix = self.name.clone();
1064 if prefix.is_empty() {
1065 label.to_string()
1066 } else {
1067 format!("{prefix}_{label}")
1068 }
1069 };
1070 assert!(
1071 !name.starts_with(METRICS_PREFIX),
1072 "using runtime label is not allowed"
1073 );
1074 Self {
1075 name,
1076 ..self.clone()
1077 }
1078 }
1079
1080 fn label(&self) -> String {
1081 self.name.clone()
1082 }
1083
1084 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
1085 let name = name.into();
1087 let help = help.into();
1088
1089 let executor = self.executor();
1091 executor.auditor.event(b"register", |hasher| {
1092 hasher.update(name.as_bytes());
1093 hasher.update(help.as_bytes());
1094 });
1095 let prefixed_name = {
1096 let prefix = &self.name;
1097 if prefix.is_empty() {
1098 name
1099 } else {
1100 format!("{}_{}", *prefix, name)
1101 }
1102 };
1103 executor
1104 .registry
1105 .lock()
1106 .unwrap()
1107 .register(prefixed_name, help, metric);
1108 }
1109
1110 fn encode(&self) -> String {
1111 let executor = self.executor();
1112 executor.auditor.event(b"encode", |_| {});
1113 let mut buffer = String::new();
1114 encode(&mut buffer, &executor.registry.lock().unwrap()).expect("encoding failed");
1115 buffer
1116 }
1117}
1118
1119struct Sleeper {
1120 executor: Weak<Executor>,
1121 time: SystemTime,
1122 registered: bool,
1123}
1124
1125impl Sleeper {
1126 fn executor(&self) -> Arc<Executor> {
1128 self.executor.upgrade().expect("executor already dropped")
1129 }
1130}
1131
1132struct Alarm {
1133 time: SystemTime,
1134 waker: Waker,
1135}
1136
1137impl PartialEq for Alarm {
1138 fn eq(&self, other: &Self) -> bool {
1139 self.time.eq(&other.time)
1140 }
1141}
1142
1143impl Eq for Alarm {}
1144
1145impl PartialOrd for Alarm {
1146 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1147 Some(self.cmp(other))
1148 }
1149}
1150
1151impl Ord for Alarm {
1152 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1153 other.time.cmp(&self.time)
1155 }
1156}
1157
1158impl Future for Sleeper {
1159 type Output = ();
1160
1161 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1162 let executor = self.executor();
1163 {
1164 let current_time = *executor.time.lock().unwrap();
1165 if current_time >= self.time {
1166 return Poll::Ready(());
1167 }
1168 }
1169 if !self.registered {
1170 self.registered = true;
1171 executor.sleeping.lock().unwrap().push(Alarm {
1172 time: self.time,
1173 waker: cx.waker().clone(),
1174 });
1175 }
1176 Poll::Pending
1177 }
1178}
1179
1180impl Clock for Context {
1181 fn current(&self) -> SystemTime {
1182 *self.executor().time.lock().unwrap()
1183 }
1184
1185 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
1186 let deadline = self
1187 .current()
1188 .checked_add(duration)
1189 .expect("overflow when setting wake time");
1190 self.sleep_until(deadline)
1191 }
1192
1193 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
1194 Sleeper {
1195 executor: self.executor.clone(),
1196
1197 time: deadline,
1198 registered: false,
1199 }
1200 }
1201}
1202
1203#[cfg(feature = "external")]
1207#[pin_project]
1208struct Waiter<F: Future> {
1209 executor: Weak<Executor>,
1210 target: SystemTime,
1211 #[pin]
1212 future: F,
1213 ready: Option<F::Output>,
1214 started: bool,
1215 registered: bool,
1216}
1217
1218#[cfg(feature = "external")]
1219impl<F> Future for Waiter<F>
1220where
1221 F: Future + Send,
1222{
1223 type Output = F::Output;
1224
1225 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1226 let mut this = self.project();
1227
1228 if !*this.started {
1232 *this.started = true;
1233 let waker = noop_waker();
1234 let mut cx_noop = task::Context::from_waker(&waker);
1235 if let Poll::Ready(value) = this.future.as_mut().poll(&mut cx_noop) {
1236 *this.ready = Some(value);
1237 }
1238 }
1239
1240 let executor = this.executor.upgrade().expect("executor already dropped");
1242 let current_time = *executor.time.lock().unwrap();
1243 if current_time < *this.target {
1244 if !*this.registered {
1247 *this.registered = true;
1248 executor.sleeping.lock().unwrap().push(Alarm {
1249 time: *this.target,
1250 waker: cx.waker().clone(),
1251 });
1252 }
1253 return Poll::Pending;
1254 }
1255
1256 if let Some(value) = this.ready.take() {
1258 return Poll::Ready(value);
1259 }
1260
1261 let blocker = Blocker::new();
1264 loop {
1265 let waker = waker(blocker.clone());
1266 let mut cx_block = task::Context::from_waker(&waker);
1267 match this.future.as_mut().poll(&mut cx_block) {
1268 Poll::Ready(value) => {
1269 break Poll::Ready(value);
1270 }
1271 Poll::Pending => blocker.wait(),
1272 }
1273 }
1274 }
1275}
1276
1277#[cfg(feature = "external")]
1278impl Pacer for Context {
1279 fn pace<'a, F, T>(&'a self, latency: Duration, future: F) -> impl Future<Output = T> + Send + 'a
1280 where
1281 F: Future<Output = T> + Send + 'a,
1282 T: Send + 'a,
1283 {
1284 let target = self
1286 .executor()
1287 .time
1288 .lock()
1289 .unwrap()
1290 .checked_add(latency)
1291 .expect("overflow when setting wake time");
1292
1293 Waiter {
1294 executor: self.executor.clone(),
1295 target,
1296 future,
1297 ready: None,
1298 started: false,
1299 registered: false,
1300 }
1301 }
1302}
1303
1304impl GClock for Context {
1305 type Instant = SystemTime;
1306
1307 fn now(&self) -> Self::Instant {
1308 self.current()
1309 }
1310}
1311
1312impl ReasonablyRealtime for Context {}
1313
1314impl crate::Network for Context {
1315 type Listener = ListenerOf<Network>;
1316
1317 async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
1318 self.network.bind(socket).await
1319 }
1320
1321 async fn dial(
1322 &self,
1323 socket: SocketAddr,
1324 ) -> Result<(crate::SinkOf<Self>, crate::StreamOf<Self>), Error> {
1325 self.network.dial(socket).await
1326 }
1327}
1328
1329impl crate::Resolver for Context {
1330 async fn resolve(&self, host: &str) -> Result<Vec<IpAddr>, Error> {
1331 let executor = self.executor();
1333 let dns = executor.dns.lock().unwrap();
1334 let result = dns.get(host).cloned();
1335 drop(dns);
1336
1337 executor.auditor.event(b"resolve", |hasher| {
1339 hasher.update(host.as_bytes());
1340 hasher.update(result.encode());
1341 });
1342 result.ok_or_else(|| Error::ResolveFailed(host.to_string()))
1343 }
1344}
1345
1346impl RngCore for Context {
1347 fn next_u32(&mut self) -> u32 {
1348 let executor = self.executor();
1349 executor.auditor.event(b"rand", |hasher| {
1350 hasher.update(b"next_u32");
1351 });
1352 let result = executor.rng.lock().unwrap().next_u32();
1353 result
1354 }
1355
1356 fn next_u64(&mut self) -> u64 {
1357 let executor = self.executor();
1358 executor.auditor.event(b"rand", |hasher| {
1359 hasher.update(b"next_u64");
1360 });
1361 let result = executor.rng.lock().unwrap().next_u64();
1362 result
1363 }
1364
1365 fn fill_bytes(&mut self, dest: &mut [u8]) {
1366 let executor = self.executor();
1367 executor.auditor.event(b"rand", |hasher| {
1368 hasher.update(b"fill_bytes");
1369 });
1370 executor.rng.lock().unwrap().fill_bytes(dest);
1371 }
1372
1373 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1374 let executor = self.executor();
1375 executor.auditor.event(b"rand", |hasher| {
1376 hasher.update(b"try_fill_bytes");
1377 });
1378 let result = executor.rng.lock().unwrap().try_fill_bytes(dest);
1379 result
1380 }
1381}
1382
1383impl CryptoRng for Context {}
1384
1385impl crate::Storage for Context {
1386 type Blob = <Storage as crate::Storage>::Blob;
1387
1388 async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> {
1389 self.storage.open(partition, name).await
1390 }
1391
1392 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1393 self.storage.remove(partition, name).await
1394 }
1395
1396 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1397 self.storage.scan(partition).await
1398 }
1399}
1400
1401#[cfg(test)]
1402mod tests {
1403 use super::*;
1404 #[cfg(feature = "external")]
1405 use crate::FutureExt;
1406 #[cfg(feature = "external")]
1407 use crate::Spawner;
1408 use crate::{
1409 deterministic, reschedule, utils::run_tasks, Blob, Metrics, Resolver, Runner as _, Storage,
1410 };
1411 use commonware_macros::test_traced;
1412 #[cfg(not(feature = "external"))]
1413 use futures::future::pending;
1414 #[cfg(feature = "external")]
1415 use futures::{channel::mpsc, SinkExt, StreamExt};
1416 use futures::{channel::oneshot, task::noop_waker};
1417
1418 fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1419 let executor = deterministic::Runner::seeded(seed);
1420 run_tasks(5, executor)
1421 }
1422
1423 #[test]
1424 fn test_same_seed_same_order() {
1425 let mut outputs = Vec::new();
1427 for seed in 0..1000 {
1428 let output = run_with_seed(seed);
1429 outputs.push(output);
1430 }
1431
1432 for seed in 0..1000 {
1434 let output = run_with_seed(seed);
1435 assert_eq!(output, outputs[seed as usize]);
1436 }
1437 }
1438
1439 #[test_traced("TRACE")]
1440 fn test_different_seeds_different_order() {
1441 let output1 = run_with_seed(12345);
1442 let output2 = run_with_seed(54321);
1443 assert_ne!(output1, output2);
1444 }
1445
1446 #[test]
1447 fn test_alarm_min_heap() {
1448 let now = SystemTime::now();
1450 let alarms = vec![
1451 Alarm {
1452 time: now + Duration::new(10, 0),
1453 waker: noop_waker(),
1454 },
1455 Alarm {
1456 time: now + Duration::new(5, 0),
1457 waker: noop_waker(),
1458 },
1459 Alarm {
1460 time: now + Duration::new(15, 0),
1461 waker: noop_waker(),
1462 },
1463 Alarm {
1464 time: now + Duration::new(5, 0),
1465 waker: noop_waker(),
1466 },
1467 ];
1468 let mut heap = BinaryHeap::new();
1469 for alarm in alarms {
1470 heap.push(alarm);
1471 }
1472
1473 let mut sorted_times = Vec::new();
1475 while let Some(alarm) = heap.pop() {
1476 sorted_times.push(alarm.time);
1477 }
1478 assert_eq!(
1479 sorted_times,
1480 vec![
1481 now + Duration::new(5, 0),
1482 now + Duration::new(5, 0),
1483 now + Duration::new(10, 0),
1484 now + Duration::new(15, 0),
1485 ]
1486 );
1487 }
1488
1489 #[test]
1490 #[should_panic(expected = "runtime timeout")]
1491 fn test_timeout() {
1492 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1493 executor.start(|context| async move {
1494 loop {
1495 context.sleep(Duration::from_secs(1)).await;
1496 }
1497 });
1498 }
1499
1500 #[test]
1501 #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1502 fn test_bad_timeout() {
1503 let cfg = Config {
1504 timeout: Some(Duration::default()),
1505 cycle: Duration::default(),
1506 ..Config::default()
1507 };
1508 deterministic::Runner::new(cfg);
1509 }
1510
1511 #[test]
1512 #[should_panic(
1513 expected = "cycle duration must be greater than or equal to system time precision"
1514 )]
1515 fn test_bad_cycle() {
1516 let cfg = Config {
1517 cycle: SYSTEM_TIME_PRECISION - Duration::from_nanos(1),
1518 ..Config::default()
1519 };
1520 deterministic::Runner::new(cfg);
1521 }
1522
1523 #[test]
1524 fn test_recover_synced_storage_persists() {
1525 let executor1 = deterministic::Runner::default();
1527 let partition = "test_partition";
1528 let name = b"test_blob";
1529 let data = b"Hello, world!";
1530
1531 let (state, checkpoint) = executor1.start_and_recover(|context| async move {
1533 let (blob, _) = context.open(partition, name).await.unwrap();
1534 blob.write_at(Vec::from(data), 0).await.unwrap();
1535 blob.sync().await.unwrap();
1536 context.auditor().state()
1537 });
1538
1539 assert_eq!(state, checkpoint.auditor.state());
1541
1542 let executor = Runner::from(checkpoint);
1544 executor.start(|context| async move {
1545 let (blob, len) = context.open(partition, name).await.unwrap();
1546 assert_eq!(len, data.len() as u64);
1547 let read = blob.read_at(vec![0; data.len()], 0).await.unwrap();
1548 assert_eq!(read.as_ref(), data);
1549 });
1550 }
1551
1552 #[test]
1553 #[should_panic(expected = "goodbye")]
1554 fn test_recover_panic_handling() {
1555 let executor1 = deterministic::Runner::default();
1557 let (_, checkpoint) = executor1.start_and_recover(|_| async move {
1558 reschedule().await;
1559 });
1560
1561 let executor = Runner::from(checkpoint);
1563 executor.start(|_| async move {
1564 panic!("goodbye");
1565 });
1566 }
1567
1568 #[test]
1569 fn test_recover_unsynced_storage_does_not_persist() {
1570 let executor = deterministic::Runner::default();
1572 let partition = "test_partition";
1573 let name = b"test_blob";
1574 let data = Vec::from("Hello, world!");
1575
1576 let (_, checkpoint) = executor.start_and_recover(|context| async move {
1578 let context = context.clone();
1579 let (blob, _) = context.open(partition, name).await.unwrap();
1580 blob.write_at(data, 0).await.unwrap();
1581 });
1582
1583 let executor = Runner::from(checkpoint);
1585
1586 executor.start(|context| async move {
1588 let (_, len) = context.open(partition, name).await.unwrap();
1589 assert_eq!(len, 0);
1590 });
1591 }
1592
1593 #[test]
1594 fn test_recover_dns_mappings_persist() {
1595 let executor = deterministic::Runner::default();
1597 let host = "example.com";
1598 let addrs = vec![
1599 IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 1)),
1600 IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 2)),
1601 ];
1602
1603 let (state, checkpoint) = executor.start_and_recover({
1605 let addrs = addrs.clone();
1606 |context| async move {
1607 context.resolver_register(host, Some(addrs));
1608 context.auditor().state()
1609 }
1610 });
1611
1612 assert_eq!(state, checkpoint.auditor.state());
1614
1615 let executor = Runner::from(checkpoint);
1617 executor.start(move |context| async move {
1618 let resolved = context.resolve(host).await.unwrap();
1619 assert_eq!(resolved, addrs);
1620 });
1621 }
1622
1623 #[test]
1624 #[should_panic(expected = "executor still has weak references")]
1625 fn test_context_return() {
1626 let executor = deterministic::Runner::default();
1628
1629 let context = executor.start(|context| async move {
1631 context
1633 });
1634
1635 drop(context);
1637 }
1638
1639 #[test]
1640 fn test_default_time_zero() {
1641 let executor = deterministic::Runner::default();
1643
1644 executor.start(|context| async move {
1645 assert_eq!(
1647 context.current().duration_since(UNIX_EPOCH).unwrap(),
1648 Duration::ZERO
1649 );
1650 });
1651 }
1652
1653 #[cfg(not(feature = "external"))]
1654 #[test]
1655 #[should_panic(expected = "runtime stalled")]
1656 fn test_stall() {
1657 let executor = deterministic::Runner::default();
1659
1660 executor.start(|_| async move {
1662 pending::<()>().await;
1663 });
1664 }
1665
1666 #[cfg(not(feature = "external"))]
1667 #[test]
1668 #[should_panic(expected = "runtime stalled")]
1669 fn test_external_simulated() {
1670 let executor = deterministic::Runner::default();
1672
1673 let (tx, rx) = oneshot::channel();
1675 std::thread::spawn(move || {
1676 std::thread::sleep(Duration::from_secs(1));
1677 tx.send(()).unwrap();
1678 });
1679
1680 executor.start(|_| async move {
1682 rx.await.unwrap();
1683 });
1684 }
1685
1686 #[cfg(feature = "external")]
1687 #[test]
1688 fn test_external_realtime() {
1689 let executor = deterministic::Runner::default();
1691
1692 let (tx, rx) = oneshot::channel();
1694 std::thread::spawn(move || {
1695 std::thread::sleep(Duration::from_secs(1));
1696 tx.send(()).unwrap();
1697 });
1698
1699 executor.start(|_| async move {
1701 rx.await.unwrap();
1702 });
1703 }
1704
1705 #[cfg(feature = "external")]
1706 #[test]
1707 fn test_external_realtime_variable() {
1708 let executor = deterministic::Runner::default();
1710
1711 executor.start(|context| async move {
1713 let start_real = SystemTime::now();
1715 let start_sim = context.current();
1716 let (first_tx, first_rx) = oneshot::channel();
1717 let (second_tx, second_rx) = oneshot::channel();
1718 let (mut results_tx, mut results_rx) = mpsc::channel(2);
1719
1720 let first_wait = Duration::from_secs(1);
1722 std::thread::spawn(move || {
1723 std::thread::sleep(first_wait);
1724 first_tx.send(()).unwrap();
1725 });
1726
1727 std::thread::spawn(move || {
1729 std::thread::sleep(Duration::ZERO);
1730 second_tx.send(()).unwrap();
1731 });
1732
1733 let first = context.clone().spawn({
1735 let mut results_tx = results_tx.clone();
1736 move |context| async move {
1737 first_rx.pace(&context, Duration::ZERO).await.unwrap();
1738 let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
1739 assert!(elapsed_real > first_wait);
1740 let elapsed_sim = context.current().duration_since(start_sim).unwrap();
1741 assert!(elapsed_sim < first_wait);
1742 results_tx.send(1).await.unwrap();
1743 }
1744 });
1745
1746 let second = context.clone().spawn(move |context| async move {
1748 second_rx.pace(&context, first_wait).await.unwrap();
1749 let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
1750 assert!(elapsed_real >= first_wait);
1751 let elapsed_sim = context.current().duration_since(start_sim).unwrap();
1752 assert!(elapsed_sim >= first_wait);
1753 results_tx.send(2).await.unwrap();
1754 });
1755
1756 second.await.unwrap();
1758 first.await.unwrap();
1759
1760 let mut results = Vec::new();
1762 for _ in 0..2 {
1763 results.push(results_rx.next().await.unwrap());
1764 }
1765 assert_eq!(results, vec![1, 2]);
1766 });
1767 }
1768
1769 #[cfg(not(feature = "external"))]
1770 #[test]
1771 fn test_simulated_skip() {
1772 let executor = deterministic::Runner::default();
1774
1775 executor.start(|context| async move {
1777 context.sleep(Duration::from_secs(1)).await;
1778
1779 let metrics = context.encode();
1781 let iterations = metrics
1782 .lines()
1783 .find_map(|line| {
1784 line.strip_prefix("runtime_iterations_total ")
1785 .and_then(|value| value.trim().parse::<u64>().ok())
1786 })
1787 .expect("missing runtime_iterations_total metric");
1788 assert!(iterations < 10);
1789 });
1790 }
1791
1792 #[cfg(feature = "external")]
1793 #[test]
1794 fn test_realtime_no_skip() {
1795 let executor = deterministic::Runner::default();
1797
1798 executor.start(|context| async move {
1800 context.sleep(Duration::from_secs(1)).await;
1801
1802 let metrics = context.encode();
1804 let iterations = metrics
1805 .lines()
1806 .find_map(|line| {
1807 line.strip_prefix("runtime_iterations_total ")
1808 .and_then(|value| value.trim().parse::<u64>().ok())
1809 })
1810 .expect("missing runtime_iterations_total metric");
1811 assert!(iterations > 500);
1812 });
1813 }
1814}