1use crate::{
40 network::{
41 audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
42 metered::Network as MeteredNetwork,
43 },
44 storage::{
45 audited::Storage as AuditedStorage, memory::Storage as MemStorage,
46 metered::Storage as MeteredStorage,
47 },
48 telemetry::metrics::task::Label,
49 utils::{
50 signal::{Signal, Stopper},
51 supervision::Tree,
52 Panicker,
53 },
54 Clock, Error, Execution, Handle, ListenerOf, Panicked, METRICS_PREFIX,
55};
56#[cfg(feature = "external")]
57use crate::{Blocker, Pacer};
58use commonware_macros::select;
59use commonware_utils::{hex, time::SYSTEM_TIME_PRECISION, SystemTimeExt};
60#[cfg(feature = "external")]
61use futures::task::noop_waker;
62use futures::{
63 future::BoxFuture,
64 task::{waker, ArcWake},
65 Future, FutureExt,
66};
67use governor::clock::{Clock as GClock, ReasonablyRealtime};
68#[cfg(feature = "external")]
69use pin_project::pin_project;
70use prometheus_client::{
71 encoding::text::encode,
72 metrics::{counter::Counter, family::Family, gauge::Gauge},
73 registry::{Metric, Registry},
74};
75use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
76use sha2::{Digest as _, Sha256};
77use std::{
78 collections::{BTreeMap, BinaryHeap},
79 mem::{replace, take},
80 net::SocketAddr,
81 pin::Pin,
82 sync::{Arc, Mutex, Weak},
83 task::{self, Poll, Waker},
84 time::{Duration, SystemTime, UNIX_EPOCH},
85};
86use tracing::{info_span, trace, Instrument};
87
88#[derive(Debug)]
89struct Metrics {
90 iterations: Counter,
91 tasks_spawned: Family<Label, Counter>,
92 tasks_running: Family<Label, Gauge>,
93 task_polls: Family<Label, Counter>,
94
95 network_bandwidth: Counter,
96}
97
98impl Metrics {
99 pub fn init(registry: &mut Registry) -> Self {
100 let metrics = Self {
101 iterations: Counter::default(),
102 task_polls: Family::default(),
103 tasks_spawned: Family::default(),
104 tasks_running: Family::default(),
105 network_bandwidth: Counter::default(),
106 };
107 registry.register(
108 "iterations",
109 "Total number of iterations",
110 metrics.iterations.clone(),
111 );
112 registry.register(
113 "tasks_spawned",
114 "Total number of tasks spawned",
115 metrics.tasks_spawned.clone(),
116 );
117 registry.register(
118 "tasks_running",
119 "Number of tasks currently running",
120 metrics.tasks_running.clone(),
121 );
122 registry.register(
123 "task_polls",
124 "Total number of task polls",
125 metrics.task_polls.clone(),
126 );
127 registry.register(
128 "bandwidth",
129 "Total amount of data sent over network",
130 metrics.network_bandwidth.clone(),
131 );
132 metrics
133 }
134}
135
136type Digest = [u8; 32];
138
139pub struct Auditor {
141 digest: Mutex<Digest>,
142}
143
144impl Default for Auditor {
145 fn default() -> Self {
146 Self {
147 digest: Digest::default().into(),
148 }
149 }
150}
151
152impl Auditor {
153 pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
157 where
158 F: FnOnce(&mut Sha256),
159 {
160 let mut digest = self.digest.lock().unwrap();
161
162 let mut hasher = Sha256::new();
163 hasher.update(digest.as_ref());
164 hasher.update(label);
165 payload(&mut hasher);
166
167 *digest = hasher.finalize().into();
168 }
169
170 pub fn state(&self) -> String {
175 let hash = self.digest.lock().unwrap();
176 hex(hash.as_ref())
177 }
178}
179
180#[derive(Clone)]
182pub struct Config {
183 seed: u64,
185
186 cycle: Duration,
189
190 timeout: Option<Duration>,
192
193 catch_panics: bool,
195}
196
197impl Config {
198 pub fn new() -> Self {
200 Self {
201 seed: 42,
202 cycle: Duration::from_millis(1),
203 timeout: None,
204 catch_panics: false,
205 }
206 }
207
208 pub fn with_seed(mut self, seed: u64) -> Self {
211 self.seed = seed;
212 self
213 }
214 pub fn with_cycle(mut self, cycle: Duration) -> Self {
216 self.cycle = cycle;
217 self
218 }
219 pub fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
221 self.timeout = timeout;
222 self
223 }
224 pub fn with_catch_panics(mut self, catch_panics: bool) -> Self {
226 self.catch_panics = catch_panics;
227 self
228 }
229
230 pub fn seed(&self) -> u64 {
233 self.seed
234 }
235 pub fn cycle(&self) -> Duration {
237 self.cycle
238 }
239 pub fn timeout(&self) -> Option<Duration> {
241 self.timeout
242 }
243 pub fn catch_panics(&self) -> bool {
245 self.catch_panics
246 }
247
248 pub fn assert(&self) {
250 assert!(
251 self.cycle != Duration::default() || self.timeout.is_none(),
252 "cycle duration must be non-zero when timeout is set",
253 );
254 assert!(
255 self.cycle >= SYSTEM_TIME_PRECISION,
256 "cycle duration must be greater than or equal to system time precision"
257 );
258 }
259}
260
261impl Default for Config {
262 fn default() -> Self {
263 Self::new()
264 }
265}
266
267pub struct Executor {
269 registry: Mutex<Registry>,
270 cycle: Duration,
271 deadline: Option<SystemTime>,
272 metrics: Arc<Metrics>,
273 auditor: Arc<Auditor>,
274 rng: Mutex<StdRng>,
275 time: Mutex<SystemTime>,
276 tasks: Arc<Tasks>,
277 sleeping: Mutex<BinaryHeap<Alarm>>,
278 shutdown: Mutex<Stopper>,
279 panicker: Panicker,
280}
281
282impl Executor {
283 fn advance_time(&self) -> SystemTime {
288 #[cfg(feature = "external")]
289 std::thread::sleep(self.cycle);
290
291 let mut time = self.time.lock().unwrap();
292 *time = time
293 .checked_add(self.cycle)
294 .expect("executor time overflowed");
295 let now = *time;
296 trace!(now = now.epoch_millis(), "time advanced");
297 now
298 }
299
300 fn skip_idle_time(&self, current: SystemTime) -> SystemTime {
305 if cfg!(feature = "external") || self.tasks.ready() != 0 {
306 return current;
307 }
308
309 let mut skip_until = None;
310 {
311 let sleeping = self.sleeping.lock().unwrap();
312 if let Some(next) = sleeping.peek() {
313 if next.time > current {
314 skip_until = Some(next.time);
315 }
316 }
317 }
318
319 if let Some(deadline) = skip_until {
320 let mut time = self.time.lock().unwrap();
321 *time = deadline;
322 let now = *time;
323 trace!(now = now.epoch_millis(), "time skipped");
324 now
325 } else {
326 current
327 }
328 }
329
330 fn wake_ready_sleepers(&self, current: SystemTime) {
332 let mut sleeping = self.sleeping.lock().unwrap();
333 while let Some(next) = sleeping.peek() {
334 if next.time <= current {
335 let sleeper = sleeping.pop().unwrap();
336 sleeper.waker.wake();
337 } else {
338 break;
339 }
340 }
341 }
342
343 fn assert_liveness(&self) {
347 if cfg!(feature = "external") || self.tasks.ready() != 0 {
348 return;
349 }
350
351 panic!("runtime stalled");
352 }
353}
354
355pub struct Checkpoint {
359 cycle: Duration,
360 deadline: Option<SystemTime>,
361 auditor: Arc<Auditor>,
362 rng: Mutex<StdRng>,
363 time: Mutex<SystemTime>,
364 storage: Arc<Storage>,
365 catch_panics: bool,
366}
367
368impl Checkpoint {
369 pub fn auditor(&self) -> Arc<Auditor> {
371 self.auditor.clone()
372 }
373}
374
375#[allow(clippy::large_enum_variant)]
376enum State {
377 Config(Config),
378 Checkpoint(Checkpoint),
379}
380
381pub struct Runner {
383 state: State,
384}
385
386impl From<Config> for Runner {
387 fn from(cfg: Config) -> Self {
388 Self::new(cfg)
389 }
390}
391
392impl From<Checkpoint> for Runner {
393 fn from(checkpoint: Checkpoint) -> Self {
394 Self {
395 state: State::Checkpoint(checkpoint),
396 }
397 }
398}
399
400impl Runner {
401 pub fn new(cfg: Config) -> Self {
403 cfg.assert();
405 Runner {
406 state: State::Config(cfg),
407 }
408 }
409
410 pub fn seeded(seed: u64) -> Self {
413 let cfg = Config {
414 seed,
415 ..Config::default()
416 };
417 Self::new(cfg)
418 }
419
420 pub fn timed(timeout: Duration) -> Self {
423 let cfg = Config {
424 timeout: Some(timeout),
425 ..Config::default()
426 };
427 Self::new(cfg)
428 }
429
430 pub fn start_and_recover<F, Fut>(self, f: F) -> (Fut::Output, Checkpoint)
433 where
434 F: FnOnce(Context) -> Fut,
435 Fut: Future,
436 {
437 let (context, executor, panicked) = match self.state {
439 State::Config(config) => Context::new(config),
440 State::Checkpoint(checkpoint) => Context::recover(checkpoint),
441 };
442
443 let storage = context.storage.clone();
445 let mut root = Box::pin(panicked.interrupt(f(context)));
446
447 Tasks::register_root(&executor.tasks);
449
450 let output = loop {
452 {
454 let current = executor.time.lock().unwrap();
455 if let Some(deadline) = executor.deadline {
456 if *current >= deadline {
457 panic!("runtime timeout");
458 }
459 }
460 }
461
462 let mut queue = executor.tasks.drain();
464
465 if queue.len() > 1 {
467 let mut rng = executor.rng.lock().unwrap();
468 queue.shuffle(&mut *rng);
469 }
470
471 trace!(
477 iter = executor.metrics.iterations.get(),
478 tasks = queue.len(),
479 "starting loop"
480 );
481 let mut output = None;
482 for id in queue {
483 let Some(task) = executor.tasks.get(id) else {
485 trace!(id, "skipping missing task");
486 continue;
487 };
488
489 executor.auditor.event(b"process_task", |hasher| {
491 hasher.update(task.id.to_be_bytes());
492 hasher.update(task.label.name().as_bytes());
493 });
494 executor.metrics.task_polls.get_or_create(&task.label).inc();
495 trace!(id, "processing task");
496
497 let waker = waker(Arc::new(TaskWaker {
499 id,
500 tasks: Arc::downgrade(&executor.tasks),
501 }));
502 let mut cx = task::Context::from_waker(&waker);
503
504 match &task.mode {
506 Mode::Root => {
507 if let Poll::Ready(result) = root.as_mut().poll(&mut cx) {
509 trace!(id, "root task is complete");
510 output = Some(result);
511 break;
512 }
513 }
514 Mode::Work(future) => {
515 let mut fut_opt = future.lock().unwrap();
517 let Some(fut) = fut_opt.as_mut() else {
518 trace!(id, "skipping already complete task");
519
520 executor.tasks.remove(id);
522 continue;
523 };
524
525 if fut.as_mut().poll(&mut cx).is_ready() {
527 trace!(id, "task is complete");
528
529 executor.tasks.remove(id);
531 *fut_opt = None;
532 continue;
533 }
534 }
535 }
536
537 trace!(id, "task is still pending");
539 }
540
541 if let Some(output) = output {
543 break output;
544 }
545
546 let mut current = executor.advance_time();
548 current = executor.skip_idle_time(current);
549
550 executor.wake_ready_sleepers(current);
552 executor.assert_liveness();
553
554 executor.metrics.iterations.inc();
556 };
557
558 executor.sleeping.lock().unwrap().clear(); let tasks = executor.tasks.clear();
566 for task in tasks {
567 let Mode::Work(future) = &task.mode else {
568 continue;
569 };
570 *future.lock().unwrap() = None;
571 }
572
573 assert!(
576 Arc::weak_count(&executor) == 0,
577 "executor still has weak references"
578 );
579
580 let executor = Arc::into_inner(executor).expect("executor still has strong references");
582
583 let checkpoint = Checkpoint {
585 cycle: executor.cycle,
586 deadline: executor.deadline,
587 auditor: executor.auditor,
588 rng: executor.rng,
589 time: executor.time,
590 storage,
591 catch_panics: executor.panicker.catch(),
592 };
593
594 (output, checkpoint)
595 }
596}
597
598impl Default for Runner {
599 fn default() -> Self {
600 Self::new(Config::default())
601 }
602}
603
604impl crate::Runner for Runner {
605 type Context = Context;
606
607 fn start<F, Fut>(self, f: F) -> Fut::Output
608 where
609 F: FnOnce(Self::Context) -> Fut,
610 Fut: Future,
611 {
612 let (output, _) = self.start_and_recover(f);
613 output
614 }
615}
616
617enum Mode {
619 Root,
620 Work(Mutex<Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>>),
621}
622
623struct Task {
625 id: u128,
626 label: Label,
627
628 mode: Mode,
629}
630
631struct TaskWaker {
633 id: u128,
634
635 tasks: Weak<Tasks>,
636}
637
638impl ArcWake for TaskWaker {
639 fn wake_by_ref(arc_self: &Arc<Self>) {
640 if let Some(tasks) = arc_self.tasks.upgrade() {
645 tasks.queue(arc_self.id);
646 }
647 }
648}
649
650struct Tasks {
652 counter: Mutex<u128>,
654 ready: Mutex<Vec<u128>>,
656 running: Mutex<BTreeMap<u128, Arc<Task>>>,
658}
659
660impl Tasks {
661 fn new() -> Self {
663 Self {
664 counter: Mutex::new(0),
665 ready: Mutex::new(Vec::new()),
666 running: Mutex::new(BTreeMap::new()),
667 }
668 }
669
670 fn increment(&self) -> u128 {
672 let mut counter = self.counter.lock().unwrap();
673 let old = *counter;
674 *counter = counter.checked_add(1).expect("task counter overflow");
675 old
676 }
677
678 fn register_root(arc_self: &Arc<Self>) {
682 let id = arc_self.increment();
683 let task = Arc::new(Task {
684 id,
685 label: Label::root(),
686 mode: Mode::Root,
687 });
688 arc_self.register(id, task);
689 }
690
691 fn register_work(
693 arc_self: &Arc<Self>,
694 label: Label,
695 future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
696 ) {
697 let id = arc_self.increment();
698 let task = Arc::new(Task {
699 id,
700 label,
701 mode: Mode::Work(Mutex::new(Some(future))),
702 });
703 arc_self.register(id, task);
704 }
705
706 fn register(&self, id: u128, task: Arc<Task>) {
708 self.running.lock().unwrap().insert(id, task);
710
711 self.queue(id);
713 }
714
715 fn queue(&self, id: u128) {
717 let mut ready = self.ready.lock().unwrap();
718 ready.push(id);
719 }
720
721 fn drain(&self) -> Vec<u128> {
723 let mut queue = self.ready.lock().unwrap();
724 let len = queue.len();
725 replace(&mut *queue, Vec::with_capacity(len))
726 }
727
728 fn ready(&self) -> usize {
730 self.ready.lock().unwrap().len()
731 }
732
733 fn get(&self, id: u128) -> Option<Arc<Task>> {
738 let running = self.running.lock().unwrap();
739 running.get(&id).cloned()
740 }
741
742 fn remove(&self, id: u128) {
744 self.running.lock().unwrap().remove(&id);
745 }
746
747 fn clear(&self) -> Vec<Arc<Task>> {
749 self.ready.lock().unwrap().clear();
751
752 let running: BTreeMap<u128, Arc<Task>> = {
754 let mut running = self.running.lock().unwrap();
755 take(&mut *running)
756 };
757 running.into_values().collect()
758 }
759}
760
761type Network = MeteredNetwork<AuditedNetwork<DeterministicNetwork>>;
762type Storage = MeteredStorage<AuditedStorage<MemStorage>>;
763
764pub struct Context {
768 name: String,
769 executor: Weak<Executor>,
770 network: Arc<Network>,
771 storage: Arc<Storage>,
772 tree: Arc<Tree>,
773 execution: Execution,
774 instrumented: bool,
775}
776
777impl Clone for Context {
778 fn clone(&self) -> Self {
779 let (child, _) = Tree::child(&self.tree);
780 Self {
781 name: self.name.clone(),
782 executor: self.executor.clone(),
783 network: self.network.clone(),
784 storage: self.storage.clone(),
785
786 tree: child,
787 execution: Execution::default(),
788 instrumented: false,
789 }
790 }
791}
792
793impl Context {
794 fn new(cfg: Config) -> (Self, Arc<Executor>, Panicked) {
795 let mut registry = Registry::default();
797 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
798
799 let metrics = Arc::new(Metrics::init(runtime_registry));
801 let start_time = UNIX_EPOCH;
802 let deadline = cfg
803 .timeout
804 .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
805 let auditor = Arc::new(Auditor::default());
806 let storage = MeteredStorage::new(
807 AuditedStorage::new(MemStorage::default(), auditor.clone()),
808 runtime_registry,
809 );
810 let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
811 let network = MeteredNetwork::new(network, runtime_registry);
812
813 let (panicker, panicked) = Panicker::new(cfg.catch_panics);
815
816 let executor = Arc::new(Executor {
817 registry: Mutex::new(registry),
818 cycle: cfg.cycle,
819 deadline,
820 metrics: metrics.clone(),
821 auditor: auditor.clone(),
822 rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
823 time: Mutex::new(start_time),
824 tasks: Arc::new(Tasks::new()),
825 sleeping: Mutex::new(BinaryHeap::new()),
826 shutdown: Mutex::new(Stopper::default()),
827 panicker,
828 });
829
830 (
831 Self {
832 name: String::new(),
833 executor: Arc::downgrade(&executor),
834 network: Arc::new(network),
835 storage: Arc::new(storage),
836 tree: Tree::root(),
837 execution: Execution::default(),
838 instrumented: false,
839 },
840 executor,
841 panicked,
842 )
843 }
844
845 fn recover(checkpoint: Checkpoint) -> (Self, Arc<Executor>, Panicked) {
857 let mut registry = Registry::default();
859 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
860 let metrics = Arc::new(Metrics::init(runtime_registry));
861
862 let network =
864 AuditedNetwork::new(DeterministicNetwork::default(), checkpoint.auditor.clone());
865 let network = MeteredNetwork::new(network, runtime_registry);
866
867 let (panicker, panicked) = Panicker::new(checkpoint.catch_panics);
869
870 let executor = Arc::new(Executor {
871 cycle: checkpoint.cycle,
873 deadline: checkpoint.deadline,
874 auditor: checkpoint.auditor,
875 rng: checkpoint.rng,
876 time: checkpoint.time,
877
878 registry: Mutex::new(registry),
880 metrics: metrics.clone(),
881 tasks: Arc::new(Tasks::new()),
882 sleeping: Mutex::new(BinaryHeap::new()),
883 shutdown: Mutex::new(Stopper::default()),
884 panicker,
885 });
886 (
887 Self {
888 name: String::new(),
889 executor: Arc::downgrade(&executor),
890 network: Arc::new(network),
891 storage: checkpoint.storage,
892 tree: Tree::root(),
893 execution: Execution::default(),
894 instrumented: false,
895 },
896 executor,
897 panicked,
898 )
899 }
900
901 fn executor(&self) -> Arc<Executor> {
903 self.executor.upgrade().expect("executor already dropped")
904 }
905
906 fn metrics(&self) -> Arc<Metrics> {
908 self.executor().metrics.clone()
909 }
910
911 pub fn auditor(&self) -> Arc<Auditor> {
913 self.executor().auditor.clone()
914 }
915}
916
917impl crate::Spawner for Context {
918 fn dedicated(mut self) -> Self {
919 self.execution = Execution::Dedicated;
920 self
921 }
922
923 fn shared(mut self, blocking: bool) -> Self {
924 self.execution = Execution::Shared(blocking);
925 self
926 }
927
928 fn instrumented(mut self) -> Self {
929 self.instrumented = true;
930 self
931 }
932
933 fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
934 where
935 F: FnOnce(Self) -> Fut + Send + 'static,
936 Fut: Future<Output = T> + Send + 'static,
937 T: Send + 'static,
938 {
939 let (label, metric) = spawn_metrics!(self);
941
942 let parent = Arc::clone(&self.tree);
944 let is_instrumented = self.instrumented;
945 self.execution = Execution::default();
946 self.instrumented = false;
947 let (child, aborted) = Tree::child(&parent);
948 if aborted {
949 return Handle::closed(metric);
950 }
951 self.tree = child;
952
953 let executor = self.executor();
955 let future: BoxFuture<T> = if is_instrumented {
956 f(self)
957 .instrument(info_span!(parent: None, "task", name = %label.name()))
958 .boxed()
959 } else {
960 f(self).boxed()
961 };
962 let (f, handle) = Handle::init(
963 future,
964 metric,
965 executor.panicker.clone(),
966 Arc::clone(&parent),
967 );
968 Tasks::register_work(&executor.tasks, label, Box::pin(f));
969
970 if let Some(aborter) = handle.aborter() {
972 parent.register(aborter);
973 }
974
975 handle
976 }
977
978 async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
979 let executor = self.executor();
980 executor.auditor.event(b"stop", |hasher| {
981 hasher.update(value.to_be_bytes());
982 });
983 let stop_resolved = {
984 let mut shutdown = executor.shutdown.lock().unwrap();
985 shutdown.stop(value)
986 };
987
988 let timeout_future = match timeout {
990 Some(duration) => futures::future::Either::Left(self.sleep(duration)),
991 None => futures::future::Either::Right(futures::future::pending()),
992 };
993 select! {
994 result = stop_resolved => {
995 result.map_err(|_| Error::Closed)?;
996 Ok(())
997 },
998 _ = timeout_future => {
999 Err(Error::Timeout)
1000 }
1001 }
1002 }
1003
1004 fn stopped(&self) -> Signal {
1005 let executor = self.executor();
1006 executor.auditor.event(b"stopped", |_| {});
1007 let stopped = executor.shutdown.lock().unwrap().stopped();
1008 stopped
1009 }
1010}
1011
1012impl crate::Metrics for Context {
1013 fn with_label(&self, label: &str) -> Self {
1014 let name = {
1015 let prefix = self.name.clone();
1016 if prefix.is_empty() {
1017 label.to_string()
1018 } else {
1019 format!("{prefix}_{label}")
1020 }
1021 };
1022 assert!(
1023 !name.starts_with(METRICS_PREFIX),
1024 "using runtime label is not allowed"
1025 );
1026 Self {
1027 name,
1028 ..self.clone()
1029 }
1030 }
1031
1032 fn label(&self) -> String {
1033 self.name.clone()
1034 }
1035
1036 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
1037 let name = name.into();
1039 let help = help.into();
1040
1041 let executor = self.executor();
1043 executor.auditor.event(b"register", |hasher| {
1044 hasher.update(name.as_bytes());
1045 hasher.update(help.as_bytes());
1046 });
1047 let prefixed_name = {
1048 let prefix = &self.name;
1049 if prefix.is_empty() {
1050 name
1051 } else {
1052 format!("{}_{}", *prefix, name)
1053 }
1054 };
1055 executor
1056 .registry
1057 .lock()
1058 .unwrap()
1059 .register(prefixed_name, help, metric);
1060 }
1061
1062 fn encode(&self) -> String {
1063 let executor = self.executor();
1064 executor.auditor.event(b"encode", |_| {});
1065 let mut buffer = String::new();
1066 encode(&mut buffer, &executor.registry.lock().unwrap()).expect("encoding failed");
1067 buffer
1068 }
1069}
1070
1071struct Sleeper {
1072 executor: Weak<Executor>,
1073 time: SystemTime,
1074 registered: bool,
1075}
1076
1077impl Sleeper {
1078 fn executor(&self) -> Arc<Executor> {
1080 self.executor.upgrade().expect("executor already dropped")
1081 }
1082}
1083
1084struct Alarm {
1085 time: SystemTime,
1086 waker: Waker,
1087}
1088
1089impl PartialEq for Alarm {
1090 fn eq(&self, other: &Self) -> bool {
1091 self.time.eq(&other.time)
1092 }
1093}
1094
1095impl Eq for Alarm {}
1096
1097impl PartialOrd for Alarm {
1098 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1099 Some(self.cmp(other))
1100 }
1101}
1102
1103impl Ord for Alarm {
1104 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1105 other.time.cmp(&self.time)
1107 }
1108}
1109
1110impl Future for Sleeper {
1111 type Output = ();
1112
1113 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1114 let executor = self.executor();
1115 {
1116 let current_time = *executor.time.lock().unwrap();
1117 if current_time >= self.time {
1118 return Poll::Ready(());
1119 }
1120 }
1121 if !self.registered {
1122 self.registered = true;
1123 executor.sleeping.lock().unwrap().push(Alarm {
1124 time: self.time,
1125 waker: cx.waker().clone(),
1126 });
1127 }
1128 Poll::Pending
1129 }
1130}
1131
1132impl Clock for Context {
1133 fn current(&self) -> SystemTime {
1134 *self.executor().time.lock().unwrap()
1135 }
1136
1137 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
1138 let deadline = self
1139 .current()
1140 .checked_add(duration)
1141 .expect("overflow when setting wake time");
1142 self.sleep_until(deadline)
1143 }
1144
1145 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
1146 Sleeper {
1147 executor: self.executor.clone(),
1148
1149 time: deadline,
1150 registered: false,
1151 }
1152 }
1153}
1154
1155#[cfg(feature = "external")]
1159#[pin_project]
1160struct Waiter<F: Future> {
1161 executor: Weak<Executor>,
1162 target: SystemTime,
1163 #[pin]
1164 future: F,
1165 ready: Option<F::Output>,
1166 started: bool,
1167 registered: bool,
1168}
1169
1170#[cfg(feature = "external")]
1171impl<F> Future for Waiter<F>
1172where
1173 F: Future + Send,
1174{
1175 type Output = F::Output;
1176
1177 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1178 let mut this = self.project();
1179
1180 if !*this.started {
1184 *this.started = true;
1185 let waker = noop_waker();
1186 let mut cx_noop = task::Context::from_waker(&waker);
1187 if let Poll::Ready(value) = this.future.as_mut().poll(&mut cx_noop) {
1188 *this.ready = Some(value);
1189 }
1190 }
1191
1192 let executor = this.executor.upgrade().expect("executor already dropped");
1194 let current_time = *executor.time.lock().unwrap();
1195 if current_time < *this.target {
1196 if !*this.registered {
1199 *this.registered = true;
1200 executor.sleeping.lock().unwrap().push(Alarm {
1201 time: *this.target,
1202 waker: cx.waker().clone(),
1203 });
1204 }
1205 return Poll::Pending;
1206 }
1207
1208 if let Some(value) = this.ready.take() {
1210 return Poll::Ready(value);
1211 }
1212
1213 let blocker = Blocker::new();
1216 loop {
1217 let waker = waker(blocker.clone());
1218 let mut cx_block = task::Context::from_waker(&waker);
1219 match this.future.as_mut().poll(&mut cx_block) {
1220 Poll::Ready(value) => {
1221 break Poll::Ready(value);
1222 }
1223 Poll::Pending => blocker.wait(),
1224 }
1225 }
1226 }
1227}
1228
1229#[cfg(feature = "external")]
1230impl Pacer for Context {
1231 fn pace<'a, F, T>(&'a self, latency: Duration, future: F) -> impl Future<Output = T> + Send + 'a
1232 where
1233 F: Future<Output = T> + Send + 'a,
1234 T: Send + 'a,
1235 {
1236 let target = self
1238 .executor()
1239 .time
1240 .lock()
1241 .unwrap()
1242 .checked_add(latency)
1243 .expect("overflow when setting wake time");
1244
1245 Waiter {
1246 executor: self.executor.clone(),
1247 target,
1248 future,
1249 ready: None,
1250 started: false,
1251 registered: false,
1252 }
1253 }
1254}
1255
1256impl GClock for Context {
1257 type Instant = SystemTime;
1258
1259 fn now(&self) -> Self::Instant {
1260 self.current()
1261 }
1262}
1263
1264impl ReasonablyRealtime for Context {}
1265
1266impl crate::Network for Context {
1267 type Listener = ListenerOf<Network>;
1268
1269 async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
1270 self.network.bind(socket).await
1271 }
1272
1273 async fn dial(
1274 &self,
1275 socket: SocketAddr,
1276 ) -> Result<(crate::SinkOf<Self>, crate::StreamOf<Self>), Error> {
1277 self.network.dial(socket).await
1278 }
1279}
1280
1281impl RngCore for Context {
1282 fn next_u32(&mut self) -> u32 {
1283 let executor = self.executor();
1284 executor.auditor.event(b"rand", |hasher| {
1285 hasher.update(b"next_u32");
1286 });
1287 let result = executor.rng.lock().unwrap().next_u32();
1288 result
1289 }
1290
1291 fn next_u64(&mut self) -> u64 {
1292 let executor = self.executor();
1293 executor.auditor.event(b"rand", |hasher| {
1294 hasher.update(b"next_u64");
1295 });
1296 let result = executor.rng.lock().unwrap().next_u64();
1297 result
1298 }
1299
1300 fn fill_bytes(&mut self, dest: &mut [u8]) {
1301 let executor = self.executor();
1302 executor.auditor.event(b"rand", |hasher| {
1303 hasher.update(b"fill_bytes");
1304 });
1305 executor.rng.lock().unwrap().fill_bytes(dest);
1306 }
1307
1308 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1309 let executor = self.executor();
1310 executor.auditor.event(b"rand", |hasher| {
1311 hasher.update(b"try_fill_bytes");
1312 });
1313 let result = executor.rng.lock().unwrap().try_fill_bytes(dest);
1314 result
1315 }
1316}
1317
1318impl CryptoRng for Context {}
1319
1320impl crate::Storage for Context {
1321 type Blob = <Storage as crate::Storage>::Blob;
1322
1323 async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> {
1324 self.storage.open(partition, name).await
1325 }
1326
1327 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1328 self.storage.remove(partition, name).await
1329 }
1330
1331 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1332 self.storage.scan(partition).await
1333 }
1334}
1335
1336#[cfg(test)]
1337mod tests {
1338 use super::*;
1339 #[cfg(feature = "external")]
1340 use crate::FutureExt;
1341 #[cfg(feature = "external")]
1342 use crate::Spawner;
1343 use crate::{deterministic, reschedule, utils::run_tasks, Blob, Metrics, Runner as _, Storage};
1344 use commonware_macros::test_traced;
1345 #[cfg(not(feature = "external"))]
1346 use futures::future::pending;
1347 #[cfg(feature = "external")]
1348 use futures::{channel::mpsc, SinkExt, StreamExt};
1349 use futures::{channel::oneshot, task::noop_waker};
1350
1351 fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1352 let executor = deterministic::Runner::seeded(seed);
1353 run_tasks(5, executor)
1354 }
1355
1356 #[test]
1357 fn test_same_seed_same_order() {
1358 let mut outputs = Vec::new();
1360 for seed in 0..1000 {
1361 let output = run_with_seed(seed);
1362 outputs.push(output);
1363 }
1364
1365 for seed in 0..1000 {
1367 let output = run_with_seed(seed);
1368 assert_eq!(output, outputs[seed as usize]);
1369 }
1370 }
1371
1372 #[test_traced("TRACE")]
1373 fn test_different_seeds_different_order() {
1374 let output1 = run_with_seed(12345);
1375 let output2 = run_with_seed(54321);
1376 assert_ne!(output1, output2);
1377 }
1378
1379 #[test]
1380 fn test_alarm_min_heap() {
1381 let now = SystemTime::now();
1383 let alarms = vec![
1384 Alarm {
1385 time: now + Duration::new(10, 0),
1386 waker: noop_waker(),
1387 },
1388 Alarm {
1389 time: now + Duration::new(5, 0),
1390 waker: noop_waker(),
1391 },
1392 Alarm {
1393 time: now + Duration::new(15, 0),
1394 waker: noop_waker(),
1395 },
1396 Alarm {
1397 time: now + Duration::new(5, 0),
1398 waker: noop_waker(),
1399 },
1400 ];
1401 let mut heap = BinaryHeap::new();
1402 for alarm in alarms {
1403 heap.push(alarm);
1404 }
1405
1406 let mut sorted_times = Vec::new();
1408 while let Some(alarm) = heap.pop() {
1409 sorted_times.push(alarm.time);
1410 }
1411 assert_eq!(
1412 sorted_times,
1413 vec![
1414 now + Duration::new(5, 0),
1415 now + Duration::new(5, 0),
1416 now + Duration::new(10, 0),
1417 now + Duration::new(15, 0),
1418 ]
1419 );
1420 }
1421
1422 #[test]
1423 #[should_panic(expected = "runtime timeout")]
1424 fn test_timeout() {
1425 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1426 executor.start(|context| async move {
1427 loop {
1428 context.sleep(Duration::from_secs(1)).await;
1429 }
1430 });
1431 }
1432
1433 #[test]
1434 #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1435 fn test_bad_timeout() {
1436 let cfg = Config {
1437 timeout: Some(Duration::default()),
1438 cycle: Duration::default(),
1439 ..Config::default()
1440 };
1441 deterministic::Runner::new(cfg);
1442 }
1443
1444 #[test]
1445 #[should_panic(
1446 expected = "cycle duration must be greater than or equal to system time precision"
1447 )]
1448 fn test_bad_cycle() {
1449 let cfg = Config {
1450 cycle: SYSTEM_TIME_PRECISION - Duration::from_nanos(1),
1451 ..Config::default()
1452 };
1453 deterministic::Runner::new(cfg);
1454 }
1455
1456 #[test]
1457 fn test_recover_synced_storage_persists() {
1458 let executor1 = deterministic::Runner::default();
1460 let partition = "test_partition";
1461 let name = b"test_blob";
1462 let data = b"Hello, world!";
1463
1464 let (state, checkpoint) = executor1.start_and_recover(|context| async move {
1466 let (blob, _) = context.open(partition, name).await.unwrap();
1467 blob.write_at(Vec::from(data), 0).await.unwrap();
1468 blob.sync().await.unwrap();
1469 context.auditor().state()
1470 });
1471
1472 assert_eq!(state, checkpoint.auditor.state());
1474
1475 let executor = Runner::from(checkpoint);
1477 executor.start(|context| async move {
1478 let (blob, len) = context.open(partition, name).await.unwrap();
1479 assert_eq!(len, data.len() as u64);
1480 let read = blob.read_at(vec![0; data.len()], 0).await.unwrap();
1481 assert_eq!(read.as_ref(), data);
1482 });
1483 }
1484
1485 #[test]
1486 #[should_panic(expected = "goodbye")]
1487 fn test_recover_panic_handling() {
1488 let executor1 = deterministic::Runner::default();
1490 let (_, checkpoint) = executor1.start_and_recover(|_| async move {
1491 reschedule().await;
1492 });
1493
1494 let executor = Runner::from(checkpoint);
1496 executor.start(|_| async move {
1497 panic!("goodbye");
1498 });
1499 }
1500
1501 #[test]
1502 fn test_recover_unsynced_storage_does_not_persist() {
1503 let executor = deterministic::Runner::default();
1505 let partition = "test_partition";
1506 let name = b"test_blob";
1507 let data = Vec::from("Hello, world!");
1508
1509 let (_, checkpoint) = executor.start_and_recover(|context| async move {
1511 let context = context.clone();
1512 let (blob, _) = context.open(partition, name).await.unwrap();
1513 blob.write_at(data, 0).await.unwrap();
1514 });
1515
1516 let executor = Runner::from(checkpoint);
1518
1519 executor.start(|context| async move {
1521 let (_, len) = context.open(partition, name).await.unwrap();
1522 assert_eq!(len, 0);
1523 });
1524 }
1525
1526 #[test]
1527 #[should_panic(expected = "executor still has weak references")]
1528 fn test_context_return() {
1529 let executor = deterministic::Runner::default();
1531
1532 let context = executor.start(|context| async move {
1534 context
1536 });
1537
1538 drop(context);
1540 }
1541
1542 #[test]
1543 fn test_default_time_zero() {
1544 let executor = deterministic::Runner::default();
1546
1547 executor.start(|context| async move {
1548 assert_eq!(
1550 context.current().duration_since(UNIX_EPOCH).unwrap(),
1551 Duration::ZERO
1552 );
1553 });
1554 }
1555
1556 #[cfg(not(feature = "external"))]
1557 #[test]
1558 #[should_panic(expected = "runtime stalled")]
1559 fn test_stall() {
1560 let executor = deterministic::Runner::default();
1562
1563 executor.start(|_| async move {
1565 pending::<()>().await;
1566 });
1567 }
1568
1569 #[cfg(not(feature = "external"))]
1570 #[test]
1571 #[should_panic(expected = "runtime stalled")]
1572 fn test_external_simulated() {
1573 let executor = deterministic::Runner::default();
1575
1576 let (tx, rx) = oneshot::channel();
1578 std::thread::spawn(move || {
1579 std::thread::sleep(Duration::from_secs(1));
1580 tx.send(()).unwrap();
1581 });
1582
1583 executor.start(|_| async move {
1585 rx.await.unwrap();
1586 });
1587 }
1588
1589 #[cfg(feature = "external")]
1590 #[test]
1591 fn test_external_realtime() {
1592 let executor = deterministic::Runner::default();
1594
1595 let (tx, rx) = oneshot::channel();
1597 std::thread::spawn(move || {
1598 std::thread::sleep(Duration::from_secs(1));
1599 tx.send(()).unwrap();
1600 });
1601
1602 executor.start(|_| async move {
1604 rx.await.unwrap();
1605 });
1606 }
1607
1608 #[cfg(feature = "external")]
1609 #[test]
1610 fn test_external_realtime_variable() {
1611 let executor = deterministic::Runner::default();
1613
1614 executor.start(|context| async move {
1616 let start_real = SystemTime::now();
1618 let start_sim = context.current();
1619 let (first_tx, first_rx) = oneshot::channel();
1620 let (second_tx, second_rx) = oneshot::channel();
1621 let (mut results_tx, mut results_rx) = mpsc::channel(2);
1622
1623 let first_wait = Duration::from_secs(1);
1625 std::thread::spawn(move || {
1626 std::thread::sleep(first_wait);
1627 first_tx.send(()).unwrap();
1628 });
1629
1630 std::thread::spawn(move || {
1632 std::thread::sleep(Duration::ZERO);
1633 second_tx.send(()).unwrap();
1634 });
1635
1636 let first = context.clone().spawn({
1638 let mut results_tx = results_tx.clone();
1639 move |context| async move {
1640 first_rx.pace(&context, Duration::ZERO).await.unwrap();
1641 let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
1642 assert!(elapsed_real > first_wait);
1643 let elapsed_sim = context.current().duration_since(start_sim).unwrap();
1644 assert!(elapsed_sim < first_wait);
1645 results_tx.send(1).await.unwrap();
1646 }
1647 });
1648
1649 let second = context.clone().spawn(move |context| async move {
1651 second_rx.pace(&context, first_wait).await.unwrap();
1652 let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
1653 assert!(elapsed_real >= first_wait);
1654 let elapsed_sim = context.current().duration_since(start_sim).unwrap();
1655 assert!(elapsed_sim >= first_wait);
1656 results_tx.send(2).await.unwrap();
1657 });
1658
1659 second.await.unwrap();
1661 first.await.unwrap();
1662
1663 let mut results = Vec::new();
1665 for _ in 0..2 {
1666 results.push(results_rx.next().await.unwrap());
1667 }
1668 assert_eq!(results, vec![1, 2]);
1669 });
1670 }
1671
1672 #[cfg(not(feature = "external"))]
1673 #[test]
1674 fn test_simulated_skip() {
1675 let executor = deterministic::Runner::default();
1677
1678 executor.start(|context| async move {
1680 context.sleep(Duration::from_secs(1)).await;
1681
1682 let metrics = context.encode();
1684 let iterations = metrics
1685 .lines()
1686 .find_map(|line| {
1687 line.strip_prefix("runtime_iterations_total ")
1688 .and_then(|value| value.trim().parse::<u64>().ok())
1689 })
1690 .expect("missing runtime_iterations_total metric");
1691 assert!(iterations < 10);
1692 });
1693 }
1694
1695 #[cfg(feature = "external")]
1696 #[test]
1697 fn test_realtime_no_skip() {
1698 let executor = deterministic::Runner::default();
1700
1701 executor.start(|context| async move {
1703 context.sleep(Duration::from_secs(1)).await;
1704
1705 let metrics = context.encode();
1707 let iterations = metrics
1708 .lines()
1709 .find_map(|line| {
1710 line.strip_prefix("runtime_iterations_total ")
1711 .and_then(|value| value.trim().parse::<u64>().ok())
1712 })
1713 .expect("missing runtime_iterations_total metric");
1714 assert!(iterations > 500);
1715 });
1716 }
1717}