1use crate::{
40 network::{
41 audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
42 metered::Network as MeteredNetwork,
43 },
44 storage::{
45 audited::Storage as AuditedStorage, memory::Storage as MemStorage,
46 metered::Storage as MeteredStorage,
47 },
48 telemetry::metrics::task::Label,
49 utils::{
50 signal::{Signal, Stopper},
51 supervision::Tree,
52 Panicker,
53 },
54 validate_label, Clock, Error, Execution, Handle, ListenerOf, Metrics as _, Panicked,
55 Spawner as _, METRICS_PREFIX,
56};
57#[cfg(feature = "external")]
58use crate::{Blocker, Pacer};
59use commonware_codec::Encode;
60use commonware_macros::select;
61use commonware_parallel::ThreadPool;
62use commonware_utils::{hex, time::SYSTEM_TIME_PRECISION, SystemTimeExt};
63#[cfg(feature = "external")]
64use futures::task::noop_waker;
65use futures::{
66 future::BoxFuture,
67 task::{waker, ArcWake},
68 Future, FutureExt,
69};
70use governor::clock::{Clock as GClock, ReasonablyRealtime};
71#[cfg(feature = "external")]
72use pin_project::pin_project;
73use prometheus_client::{
74 encoding::text::encode,
75 metrics::{counter::Counter, family::Family, gauge::Gauge},
76 registry::{Metric, Registry},
77};
78use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
79use rand_core::CryptoRngCore;
80use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
81use sha2::{Digest as _, Sha256};
82use std::{
83 collections::{BTreeMap, BinaryHeap, HashMap},
84 mem::{replace, take},
85 net::{IpAddr, SocketAddr},
86 num::NonZeroUsize,
87 panic::{catch_unwind, resume_unwind, AssertUnwindSafe},
88 pin::Pin,
89 sync::{Arc, Mutex, Weak},
90 task::{self, Poll, Waker},
91 time::{Duration, SystemTime, UNIX_EPOCH},
92};
93use tracing::{info_span, trace, Instrument};
94
95#[derive(Debug)]
96struct Metrics {
97 iterations: Counter,
98 tasks_spawned: Family<Label, Counter>,
99 tasks_running: Family<Label, Gauge>,
100 task_polls: Family<Label, Counter>,
101
102 network_bandwidth: Counter,
103}
104
105impl Metrics {
106 pub fn init(registry: &mut Registry) -> Self {
107 let metrics = Self {
108 iterations: Counter::default(),
109 task_polls: Family::default(),
110 tasks_spawned: Family::default(),
111 tasks_running: Family::default(),
112 network_bandwidth: Counter::default(),
113 };
114 registry.register(
115 "iterations",
116 "Total number of iterations",
117 metrics.iterations.clone(),
118 );
119 registry.register(
120 "tasks_spawned",
121 "Total number of tasks spawned",
122 metrics.tasks_spawned.clone(),
123 );
124 registry.register(
125 "tasks_running",
126 "Number of tasks currently running",
127 metrics.tasks_running.clone(),
128 );
129 registry.register(
130 "task_polls",
131 "Total number of task polls",
132 metrics.task_polls.clone(),
133 );
134 registry.register(
135 "bandwidth",
136 "Total amount of data sent over network",
137 metrics.network_bandwidth.clone(),
138 );
139 metrics
140 }
141}
142
143type Digest = [u8; 32];
145
146pub struct Auditor {
148 digest: Mutex<Digest>,
149}
150
151impl Default for Auditor {
152 fn default() -> Self {
153 Self {
154 digest: Digest::default().into(),
155 }
156 }
157}
158
159impl Auditor {
160 pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
164 where
165 F: FnOnce(&mut Sha256),
166 {
167 let mut digest = self.digest.lock().unwrap();
168
169 let mut hasher = Sha256::new();
170 hasher.update(digest.as_ref());
171 hasher.update(label);
172 payload(&mut hasher);
173
174 *digest = hasher.finalize().into();
175 }
176
177 pub fn state(&self) -> String {
182 let hash = self.digest.lock().unwrap();
183 hex(hash.as_ref())
184 }
185}
186
187pub type BoxDynRng = Box<dyn CryptoRngCore + Send + 'static>;
189
190pub struct Config {
192 rng: BoxDynRng,
194
195 cycle: Duration,
198
199 timeout: Option<Duration>,
201
202 catch_panics: bool,
204}
205
206impl Config {
207 pub fn new() -> Self {
209 Self {
210 rng: Box::new(StdRng::seed_from_u64(42)),
211 cycle: Duration::from_millis(1),
212 timeout: None,
213 catch_panics: false,
214 }
215 }
216
217 pub fn with_seed(self, seed: u64) -> Self {
220 self.with_rng(Box::new(StdRng::seed_from_u64(seed)))
221 }
222
223 pub fn with_rng(mut self, rng: BoxDynRng) -> Self {
229 self.rng = rng;
230 self
231 }
232
233 pub const fn with_cycle(mut self, cycle: Duration) -> Self {
235 self.cycle = cycle;
236 self
237 }
238 pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
240 self.timeout = timeout;
241 self
242 }
243 pub const fn with_catch_panics(mut self, catch_panics: bool) -> Self {
245 self.catch_panics = catch_panics;
246 self
247 }
248
249 pub const fn cycle(&self) -> Duration {
252 self.cycle
253 }
254 pub const fn timeout(&self) -> Option<Duration> {
256 self.timeout
257 }
258 pub const fn catch_panics(&self) -> bool {
260 self.catch_panics
261 }
262
263 pub fn assert(&self) {
265 assert!(
266 self.cycle != Duration::default() || self.timeout.is_none(),
267 "cycle duration must be non-zero when timeout is set",
268 );
269 assert!(
270 self.cycle >= SYSTEM_TIME_PRECISION,
271 "cycle duration must be greater than or equal to system time precision"
272 );
273 }
274}
275
276impl Default for Config {
277 fn default() -> Self {
278 Self::new()
279 }
280}
281
282pub struct Executor {
284 registry: Mutex<Registry>,
285 cycle: Duration,
286 deadline: Option<SystemTime>,
287 metrics: Arc<Metrics>,
288 auditor: Arc<Auditor>,
289 rng: Mutex<BoxDynRng>,
290 time: Mutex<SystemTime>,
291 tasks: Arc<Tasks>,
292 sleeping: Mutex<BinaryHeap<Alarm>>,
293 shutdown: Mutex<Stopper>,
294 panicker: Panicker,
295 dns: Mutex<HashMap<String, Vec<IpAddr>>>,
296}
297
298impl Executor {
299 fn advance_time(&self) -> SystemTime {
304 #[cfg(feature = "external")]
305 std::thread::sleep(self.cycle);
306
307 let mut time = self.time.lock().unwrap();
308 *time = time
309 .checked_add(self.cycle)
310 .expect("executor time overflowed");
311 let now = *time;
312 trace!(now = now.epoch_millis(), "time advanced");
313 now
314 }
315
316 fn skip_idle_time(&self, current: SystemTime) -> SystemTime {
321 if cfg!(feature = "external") || self.tasks.ready() != 0 {
322 return current;
323 }
324
325 let mut skip_until = None;
326 {
327 let sleeping = self.sleeping.lock().unwrap();
328 if let Some(next) = sleeping.peek() {
329 if next.time > current {
330 skip_until = Some(next.time);
331 }
332 }
333 }
334
335 skip_until.map_or(current, |deadline| {
336 let mut time = self.time.lock().unwrap();
337 *time = deadline;
338 let now = *time;
339 trace!(now = now.epoch_millis(), "time skipped");
340 now
341 })
342 }
343
344 fn wake_ready_sleepers(&self, current: SystemTime) {
346 let mut sleeping = self.sleeping.lock().unwrap();
347 while let Some(next) = sleeping.peek() {
348 if next.time <= current {
349 let sleeper = sleeping.pop().unwrap();
350 sleeper.waker.wake();
351 } else {
352 break;
353 }
354 }
355 }
356
357 fn assert_liveness(&self) {
361 if cfg!(feature = "external") || self.tasks.ready() != 0 {
362 return;
363 }
364
365 panic!("runtime stalled");
366 }
367}
368
369pub struct Checkpoint {
373 cycle: Duration,
374 deadline: Option<SystemTime>,
375 auditor: Arc<Auditor>,
376 rng: Mutex<BoxDynRng>,
377 time: Mutex<SystemTime>,
378 storage: Arc<Storage>,
379 dns: Mutex<HashMap<String, Vec<IpAddr>>>,
380 catch_panics: bool,
381}
382
383impl Checkpoint {
384 pub fn auditor(&self) -> Arc<Auditor> {
386 self.auditor.clone()
387 }
388}
389
390#[allow(clippy::large_enum_variant)]
391enum State {
392 Config(Config),
393 Checkpoint(Checkpoint),
394}
395
396pub struct Runner {
398 state: State,
399}
400
401impl From<Config> for Runner {
402 fn from(cfg: Config) -> Self {
403 Self::new(cfg)
404 }
405}
406
407impl From<Checkpoint> for Runner {
408 fn from(checkpoint: Checkpoint) -> Self {
409 Self {
410 state: State::Checkpoint(checkpoint),
411 }
412 }
413}
414
415impl Runner {
416 pub fn new(cfg: Config) -> Self {
418 cfg.assert();
420 Self {
421 state: State::Config(cfg),
422 }
423 }
424
425 pub fn seeded(seed: u64) -> Self {
428 Self::new(Config::default().with_seed(seed))
429 }
430
431 pub fn timed(timeout: Duration) -> Self {
434 let cfg = Config {
435 timeout: Some(timeout),
436 ..Config::default()
437 };
438 Self::new(cfg)
439 }
440
441 pub fn start_and_recover<F, Fut>(self, f: F) -> (Fut::Output, Checkpoint)
444 where
445 F: FnOnce(Context) -> Fut,
446 Fut: Future,
447 {
448 let (context, executor, panicked) = match self.state {
450 State::Config(config) => Context::new(config),
451 State::Checkpoint(checkpoint) => Context::recover(checkpoint),
452 };
453
454 let storage = context.storage.clone();
456 let mut root = Box::pin(panicked.interrupt(f(context)));
457
458 Tasks::register_root(&executor.tasks);
460
461 let result = catch_unwind(AssertUnwindSafe(|| loop {
464 {
466 let current = executor.time.lock().unwrap();
467 if let Some(deadline) = executor.deadline {
468 if *current >= deadline {
469 drop(current);
471 panic!("runtime timeout");
472 }
473 }
474 }
475
476 let mut queue = executor.tasks.drain();
478
479 if queue.len() > 1 {
481 let mut rng = executor.rng.lock().unwrap();
482 queue.shuffle(&mut *rng);
483 }
484
485 trace!(
491 iter = executor.metrics.iterations.get(),
492 tasks = queue.len(),
493 "starting loop"
494 );
495 let mut output = None;
496 for id in queue {
497 let Some(task) = executor.tasks.get(id) else {
499 trace!(id, "skipping missing task");
500 continue;
501 };
502
503 executor.auditor.event(b"process_task", |hasher| {
505 hasher.update(task.id.to_be_bytes());
506 hasher.update(task.label.name().as_bytes());
507 });
508 executor.metrics.task_polls.get_or_create(&task.label).inc();
509 trace!(id, "processing task");
510
511 let waker = waker(Arc::new(TaskWaker {
513 id,
514 tasks: Arc::downgrade(&executor.tasks),
515 }));
516 let mut cx = task::Context::from_waker(&waker);
517
518 match &task.mode {
520 Mode::Root => {
521 if let Poll::Ready(result) = root.as_mut().poll(&mut cx) {
523 trace!(id, "root task is complete");
524 output = Some(result);
525 break;
526 }
527 }
528 Mode::Work(future) => {
529 let mut fut_opt = future.lock().unwrap();
531 let Some(fut) = fut_opt.as_mut() else {
532 trace!(id, "skipping already complete task");
533
534 executor.tasks.remove(id);
536 continue;
537 };
538
539 if fut.as_mut().poll(&mut cx).is_ready() {
541 trace!(id, "task is complete");
542
543 executor.tasks.remove(id);
545 *fut_opt = None;
546 continue;
547 }
548 }
549 }
550
551 trace!(id, "task is still pending");
553 }
554
555 if let Some(output) = output {
557 break output;
558 }
559
560 let mut current = executor.advance_time();
562 current = executor.skip_idle_time(current);
563
564 executor.wake_ready_sleepers(current);
566 executor.assert_liveness();
567
568 executor.metrics.iterations.inc();
570 }));
571
572 executor.sleeping.lock().unwrap().clear(); let tasks = executor.tasks.clear();
580 for task in tasks {
581 let Mode::Work(future) = &task.mode else {
582 continue;
583 };
584 *future.lock().unwrap() = None;
585 }
586
587 drop(root);
591
592 assert!(
595 Arc::weak_count(&executor) == 0,
596 "executor still has weak references"
597 );
598
599 let output = match result {
601 Ok(output) => output,
602 Err(payload) => resume_unwind(payload),
603 };
604
605 let executor = Arc::into_inner(executor).expect("executor still has strong references");
607
608 let checkpoint = Checkpoint {
610 cycle: executor.cycle,
611 deadline: executor.deadline,
612 auditor: executor.auditor,
613 rng: executor.rng,
614 time: executor.time,
615 storage,
616 dns: executor.dns,
617 catch_panics: executor.panicker.catch(),
618 };
619
620 (output, checkpoint)
621 }
622}
623
624impl Default for Runner {
625 fn default() -> Self {
626 Self::new(Config::default())
627 }
628}
629
630impl crate::Runner for Runner {
631 type Context = Context;
632
633 fn start<F, Fut>(self, f: F) -> Fut::Output
634 where
635 F: FnOnce(Self::Context) -> Fut,
636 Fut: Future,
637 {
638 let (output, _) = self.start_and_recover(f);
639 output
640 }
641}
642
643enum Mode {
645 Root,
646 Work(Mutex<Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>>),
647}
648
649struct Task {
651 id: u128,
652 label: Label,
653
654 mode: Mode,
655}
656
657struct TaskWaker {
659 id: u128,
660
661 tasks: Weak<Tasks>,
662}
663
664impl ArcWake for TaskWaker {
665 fn wake_by_ref(arc_self: &Arc<Self>) {
666 if let Some(tasks) = arc_self.tasks.upgrade() {
671 tasks.queue(arc_self.id);
672 }
673 }
674}
675
676struct Tasks {
678 counter: Mutex<u128>,
680 ready: Mutex<Vec<u128>>,
682 running: Mutex<BTreeMap<u128, Arc<Task>>>,
684}
685
686impl Tasks {
687 const fn new() -> Self {
689 Self {
690 counter: Mutex::new(0),
691 ready: Mutex::new(Vec::new()),
692 running: Mutex::new(BTreeMap::new()),
693 }
694 }
695
696 fn increment(&self) -> u128 {
698 let mut counter = self.counter.lock().unwrap();
699 let old = *counter;
700 *counter = counter.checked_add(1).expect("task counter overflow");
701 old
702 }
703
704 fn register_root(arc_self: &Arc<Self>) {
708 let id = arc_self.increment();
709 let task = Arc::new(Task {
710 id,
711 label: Label::root(),
712 mode: Mode::Root,
713 });
714 arc_self.register(id, task);
715 }
716
717 fn register_work(
719 arc_self: &Arc<Self>,
720 label: Label,
721 future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
722 ) {
723 let id = arc_self.increment();
724 let task = Arc::new(Task {
725 id,
726 label,
727 mode: Mode::Work(Mutex::new(Some(future))),
728 });
729 arc_self.register(id, task);
730 }
731
732 fn register(&self, id: u128, task: Arc<Task>) {
734 self.running.lock().unwrap().insert(id, task);
736
737 self.queue(id);
739 }
740
741 fn queue(&self, id: u128) {
743 let mut ready = self.ready.lock().unwrap();
744 ready.push(id);
745 }
746
747 fn drain(&self) -> Vec<u128> {
749 let mut queue = self.ready.lock().unwrap();
750 let len = queue.len();
751 replace(&mut *queue, Vec::with_capacity(len))
752 }
753
754 fn ready(&self) -> usize {
756 self.ready.lock().unwrap().len()
757 }
758
759 fn get(&self, id: u128) -> Option<Arc<Task>> {
764 let running = self.running.lock().unwrap();
765 running.get(&id).cloned()
766 }
767
768 fn remove(&self, id: u128) {
770 self.running.lock().unwrap().remove(&id);
771 }
772
773 fn clear(&self) -> Vec<Arc<Task>> {
775 self.ready.lock().unwrap().clear();
777
778 let running: BTreeMap<u128, Arc<Task>> = {
780 let mut running = self.running.lock().unwrap();
781 take(&mut *running)
782 };
783 running.into_values().collect()
784 }
785}
786
787type Network = MeteredNetwork<AuditedNetwork<DeterministicNetwork>>;
788type Storage = MeteredStorage<AuditedStorage<MemStorage>>;
789
790pub struct Context {
794 name: String,
795 executor: Weak<Executor>,
796 network: Arc<Network>,
797 storage: Arc<Storage>,
798 tree: Arc<Tree>,
799 execution: Execution,
800 instrumented: bool,
801}
802
803impl Clone for Context {
804 fn clone(&self) -> Self {
805 let (child, _) = Tree::child(&self.tree);
806 Self {
807 name: self.name.clone(),
808 executor: self.executor.clone(),
809 network: self.network.clone(),
810 storage: self.storage.clone(),
811
812 tree: child,
813 execution: Execution::default(),
814 instrumented: false,
815 }
816 }
817}
818
819impl Context {
820 fn new(cfg: Config) -> (Self, Arc<Executor>, Panicked) {
821 let mut registry = Registry::default();
823 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
824
825 let metrics = Arc::new(Metrics::init(runtime_registry));
827 let start_time = UNIX_EPOCH;
828 let deadline = cfg
829 .timeout
830 .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
831 let auditor = Arc::new(Auditor::default());
832 let storage = MeteredStorage::new(
833 AuditedStorage::new(MemStorage::default(), auditor.clone()),
834 runtime_registry,
835 );
836 let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
837 let network = MeteredNetwork::new(network, runtime_registry);
838
839 let (panicker, panicked) = Panicker::new(cfg.catch_panics);
841
842 let executor = Arc::new(Executor {
843 registry: Mutex::new(registry),
844 cycle: cfg.cycle,
845 deadline,
846 metrics,
847 auditor,
848 rng: Mutex::new(cfg.rng),
849 time: Mutex::new(start_time),
850 tasks: Arc::new(Tasks::new()),
851 sleeping: Mutex::new(BinaryHeap::new()),
852 shutdown: Mutex::new(Stopper::default()),
853 panicker,
854 dns: Mutex::new(HashMap::new()),
855 });
856
857 (
858 Self {
859 name: String::new(),
860 executor: Arc::downgrade(&executor),
861 network: Arc::new(network),
862 storage: Arc::new(storage),
863 tree: Tree::root(),
864 execution: Execution::default(),
865 instrumented: false,
866 },
867 executor,
868 panicked,
869 )
870 }
871
872 fn recover(checkpoint: Checkpoint) -> (Self, Arc<Executor>, Panicked) {
884 let mut registry = Registry::default();
886 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
887 let metrics = Arc::new(Metrics::init(runtime_registry));
888
889 let network =
891 AuditedNetwork::new(DeterministicNetwork::default(), checkpoint.auditor.clone());
892 let network = MeteredNetwork::new(network, runtime_registry);
893
894 let (panicker, panicked) = Panicker::new(checkpoint.catch_panics);
896
897 let executor = Arc::new(Executor {
898 cycle: checkpoint.cycle,
900 deadline: checkpoint.deadline,
901 auditor: checkpoint.auditor,
902 rng: checkpoint.rng,
903 time: checkpoint.time,
904 dns: checkpoint.dns,
905
906 registry: Mutex::new(registry),
908 metrics,
909 tasks: Arc::new(Tasks::new()),
910 sleeping: Mutex::new(BinaryHeap::new()),
911 shutdown: Mutex::new(Stopper::default()),
912 panicker,
913 });
914 (
915 Self {
916 name: String::new(),
917 executor: Arc::downgrade(&executor),
918 network: Arc::new(network),
919 storage: checkpoint.storage,
920 tree: Tree::root(),
921 execution: Execution::default(),
922 instrumented: false,
923 },
924 executor,
925 panicked,
926 )
927 }
928
929 fn executor(&self) -> Arc<Executor> {
931 self.executor.upgrade().expect("executor already dropped")
932 }
933
934 fn metrics(&self) -> Arc<Metrics> {
936 self.executor().metrics.clone()
937 }
938
939 pub fn auditor(&self) -> Arc<Auditor> {
941 self.executor().auditor.clone()
942 }
943
944 pub fn storage_audit(&self) -> Digest {
946 self.storage.inner().inner().audit()
947 }
948
949 pub fn resolver_register(&self, host: impl Into<String>, addrs: Option<Vec<IpAddr>>) {
954 let executor = self.executor();
956 let host = host.into();
957 executor.auditor.event(b"resolver_register", |hasher| {
958 hasher.update(host.as_bytes());
959 hasher.update(addrs.encode());
960 });
961
962 let mut dns = executor.dns.lock().unwrap();
964 match addrs {
965 Some(addrs) => {
966 dns.insert(host, addrs);
967 }
968 None => {
969 dns.remove(&host);
970 }
971 }
972 }
973}
974
975impl crate::Spawner for Context {
976 fn dedicated(mut self) -> Self {
977 self.execution = Execution::Dedicated;
978 self
979 }
980
981 fn shared(mut self, blocking: bool) -> Self {
982 self.execution = Execution::Shared(blocking);
983 self
984 }
985
986 fn instrumented(mut self) -> Self {
987 self.instrumented = true;
988 self
989 }
990
991 fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
992 where
993 F: FnOnce(Self) -> Fut + Send + 'static,
994 Fut: Future<Output = T> + Send + 'static,
995 T: Send + 'static,
996 {
997 let (label, metric) = spawn_metrics!(self);
999
1000 let parent = Arc::clone(&self.tree);
1002 let is_instrumented = self.instrumented;
1003 self.execution = Execution::default();
1004 self.instrumented = false;
1005 let (child, aborted) = Tree::child(&parent);
1006 if aborted {
1007 return Handle::closed(metric);
1008 }
1009 self.tree = child;
1010
1011 let executor = self.executor();
1013 let future: BoxFuture<'_, T> = if is_instrumented {
1014 f(self)
1015 .instrument(info_span!(parent: None, "task", name = %label.name()))
1016 .boxed()
1017 } else {
1018 f(self).boxed()
1019 };
1020 let (f, handle) = Handle::init(
1021 future,
1022 metric,
1023 executor.panicker.clone(),
1024 Arc::clone(&parent),
1025 );
1026 Tasks::register_work(&executor.tasks, label, Box::pin(f));
1027
1028 if let Some(aborter) = handle.aborter() {
1030 parent.register(aborter);
1031 }
1032
1033 handle
1034 }
1035
1036 async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
1037 let executor = self.executor();
1038 executor.auditor.event(b"stop", |hasher| {
1039 hasher.update(value.to_be_bytes());
1040 });
1041 let stop_resolved = {
1042 let mut shutdown = executor.shutdown.lock().unwrap();
1043 shutdown.stop(value)
1044 };
1045
1046 let timeout_future = timeout.map_or_else(
1048 || futures::future::Either::Right(futures::future::pending()),
1049 |duration| futures::future::Either::Left(self.sleep(duration)),
1050 );
1051 select! {
1052 result = stop_resolved => {
1053 result.map_err(|_| Error::Closed)?;
1054 Ok(())
1055 },
1056 _ = timeout_future => {
1057 Err(Error::Timeout)
1058 }
1059 }
1060 }
1061
1062 fn stopped(&self) -> Signal {
1063 let executor = self.executor();
1064 executor.auditor.event(b"stopped", |_| {});
1065 let stopped = executor.shutdown.lock().unwrap().stopped();
1066 stopped
1067 }
1068}
1069
1070impl crate::RayonPoolSpawner for Context {
1071 fn create_pool(&self, concurrency: NonZeroUsize) -> Result<ThreadPool, ThreadPoolBuildError> {
1072 let mut builder = ThreadPoolBuilder::new().num_threads(concurrency.get());
1073
1074 if rayon::current_thread_index().is_none() {
1075 builder = builder.use_current_thread()
1076 }
1077
1078 builder
1079 .spawn_handler(move |thread| {
1080 self.with_label("rayon_thread")
1081 .dedicated()
1082 .spawn(move |_| async move { thread.run() });
1083 Ok(())
1084 })
1085 .build()
1086 .map(Arc::new)
1087 }
1088}
1089
1090impl crate::Metrics for Context {
1091 fn with_label(&self, label: &str) -> Self {
1092 validate_label(label);
1094
1095 let name = {
1097 let prefix = self.name.clone();
1098 if prefix.is_empty() {
1099 label.to_string()
1100 } else {
1101 format!("{prefix}_{label}")
1102 }
1103 };
1104 assert!(
1105 !name.starts_with(METRICS_PREFIX),
1106 "using runtime label is not allowed"
1107 );
1108 Self {
1109 name,
1110 ..self.clone()
1111 }
1112 }
1113
1114 fn label(&self) -> String {
1115 self.name.clone()
1116 }
1117
1118 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
1119 let name = name.into();
1121 let help = help.into();
1122
1123 let executor = self.executor();
1125 executor.auditor.event(b"register", |hasher| {
1126 hasher.update(name.as_bytes());
1127 hasher.update(help.as_bytes());
1128 });
1129 let prefixed_name = {
1130 let prefix = &self.name;
1131 if prefix.is_empty() {
1132 name
1133 } else {
1134 format!("{}_{}", *prefix, name)
1135 }
1136 };
1137 executor
1138 .registry
1139 .lock()
1140 .unwrap()
1141 .register(prefixed_name, help, metric);
1142 }
1143
1144 fn encode(&self) -> String {
1145 let executor = self.executor();
1146 executor.auditor.event(b"encode", |_| {});
1147 let mut buffer = String::new();
1148 encode(&mut buffer, &executor.registry.lock().unwrap()).expect("encoding failed");
1149 buffer
1150 }
1151}
1152
1153struct Sleeper {
1154 executor: Weak<Executor>,
1155 time: SystemTime,
1156 registered: bool,
1157}
1158
1159impl Sleeper {
1160 fn executor(&self) -> Arc<Executor> {
1162 self.executor.upgrade().expect("executor already dropped")
1163 }
1164}
1165
1166struct Alarm {
1167 time: SystemTime,
1168 waker: Waker,
1169}
1170
1171impl PartialEq for Alarm {
1172 fn eq(&self, other: &Self) -> bool {
1173 self.time.eq(&other.time)
1174 }
1175}
1176
1177impl Eq for Alarm {}
1178
1179impl PartialOrd for Alarm {
1180 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1181 Some(self.cmp(other))
1182 }
1183}
1184
1185impl Ord for Alarm {
1186 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1187 other.time.cmp(&self.time)
1189 }
1190}
1191
1192impl Future for Sleeper {
1193 type Output = ();
1194
1195 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1196 let executor = self.executor();
1197 {
1198 let current_time = *executor.time.lock().unwrap();
1199 if current_time >= self.time {
1200 return Poll::Ready(());
1201 }
1202 }
1203 if !self.registered {
1204 self.registered = true;
1205 executor.sleeping.lock().unwrap().push(Alarm {
1206 time: self.time,
1207 waker: cx.waker().clone(),
1208 });
1209 }
1210 Poll::Pending
1211 }
1212}
1213
1214impl Clock for Context {
1215 fn current(&self) -> SystemTime {
1216 *self.executor().time.lock().unwrap()
1217 }
1218
1219 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
1220 let deadline = self
1221 .current()
1222 .checked_add(duration)
1223 .expect("overflow when setting wake time");
1224 self.sleep_until(deadline)
1225 }
1226
1227 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
1228 Sleeper {
1229 executor: self.executor.clone(),
1230
1231 time: deadline,
1232 registered: false,
1233 }
1234 }
1235}
1236
1237#[cfg(feature = "external")]
1241#[pin_project]
1242struct Waiter<F: Future> {
1243 executor: Weak<Executor>,
1244 target: SystemTime,
1245 #[pin]
1246 future: F,
1247 ready: Option<F::Output>,
1248 started: bool,
1249 registered: bool,
1250}
1251
1252#[cfg(feature = "external")]
1253impl<F> Future for Waiter<F>
1254where
1255 F: Future + Send,
1256{
1257 type Output = F::Output;
1258
1259 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1260 let mut this = self.project();
1261
1262 if !*this.started {
1266 *this.started = true;
1267 let waker = noop_waker();
1268 let mut cx_noop = task::Context::from_waker(&waker);
1269 if let Poll::Ready(value) = this.future.as_mut().poll(&mut cx_noop) {
1270 *this.ready = Some(value);
1271 }
1272 }
1273
1274 let executor = this.executor.upgrade().expect("executor already dropped");
1276 let current_time = *executor.time.lock().unwrap();
1277 if current_time < *this.target {
1278 if !*this.registered {
1281 *this.registered = true;
1282 executor.sleeping.lock().unwrap().push(Alarm {
1283 time: *this.target,
1284 waker: cx.waker().clone(),
1285 });
1286 }
1287 return Poll::Pending;
1288 }
1289
1290 if let Some(value) = this.ready.take() {
1292 return Poll::Ready(value);
1293 }
1294
1295 let blocker = Blocker::new();
1298 loop {
1299 let waker = waker(blocker.clone());
1300 let mut cx_block = task::Context::from_waker(&waker);
1301 match this.future.as_mut().poll(&mut cx_block) {
1302 Poll::Ready(value) => {
1303 break Poll::Ready(value);
1304 }
1305 Poll::Pending => blocker.wait(),
1306 }
1307 }
1308 }
1309}
1310
1311#[cfg(feature = "external")]
1312impl Pacer for Context {
1313 fn pace<'a, F, T>(&'a self, latency: Duration, future: F) -> impl Future<Output = T> + Send + 'a
1314 where
1315 F: Future<Output = T> + Send + 'a,
1316 T: Send + 'a,
1317 {
1318 let target = self
1320 .executor()
1321 .time
1322 .lock()
1323 .unwrap()
1324 .checked_add(latency)
1325 .expect("overflow when setting wake time");
1326
1327 Waiter {
1328 executor: self.executor.clone(),
1329 target,
1330 future,
1331 ready: None,
1332 started: false,
1333 registered: false,
1334 }
1335 }
1336}
1337
1338impl GClock for Context {
1339 type Instant = SystemTime;
1340
1341 fn now(&self) -> Self::Instant {
1342 self.current()
1343 }
1344}
1345
1346impl ReasonablyRealtime for Context {}
1347
1348impl crate::Network for Context {
1349 type Listener = ListenerOf<Network>;
1350
1351 async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
1352 self.network.bind(socket).await
1353 }
1354
1355 async fn dial(
1356 &self,
1357 socket: SocketAddr,
1358 ) -> Result<(crate::SinkOf<Self>, crate::StreamOf<Self>), Error> {
1359 self.network.dial(socket).await
1360 }
1361}
1362
1363impl crate::Resolver for Context {
1364 async fn resolve(&self, host: &str) -> Result<Vec<IpAddr>, Error> {
1365 let executor = self.executor();
1367 let dns = executor.dns.lock().unwrap();
1368 let result = dns.get(host).cloned();
1369 drop(dns);
1370
1371 executor.auditor.event(b"resolve", |hasher| {
1373 hasher.update(host.as_bytes());
1374 hasher.update(result.encode());
1375 });
1376 result.ok_or_else(|| Error::ResolveFailed(host.to_string()))
1377 }
1378}
1379
1380impl RngCore for Context {
1381 fn next_u32(&mut self) -> u32 {
1382 let executor = self.executor();
1383 executor.auditor.event(b"rand", |hasher| {
1384 hasher.update(b"next_u32");
1385 });
1386 let result = executor.rng.lock().unwrap().next_u32();
1387 result
1388 }
1389
1390 fn next_u64(&mut self) -> u64 {
1391 let executor = self.executor();
1392 executor.auditor.event(b"rand", |hasher| {
1393 hasher.update(b"next_u64");
1394 });
1395 let result = executor.rng.lock().unwrap().next_u64();
1396 result
1397 }
1398
1399 fn fill_bytes(&mut self, dest: &mut [u8]) {
1400 let executor = self.executor();
1401 executor.auditor.event(b"rand", |hasher| {
1402 hasher.update(b"fill_bytes");
1403 });
1404 executor.rng.lock().unwrap().fill_bytes(dest);
1405 }
1406
1407 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1408 let executor = self.executor();
1409 executor.auditor.event(b"rand", |hasher| {
1410 hasher.update(b"try_fill_bytes");
1411 });
1412 let result = executor.rng.lock().unwrap().try_fill_bytes(dest);
1413 result
1414 }
1415}
1416
1417impl CryptoRng for Context {}
1418
1419impl crate::Storage for Context {
1420 type Blob = <Storage as crate::Storage>::Blob;
1421
1422 async fn open_versioned(
1423 &self,
1424 partition: &str,
1425 name: &[u8],
1426 versions: std::ops::RangeInclusive<u16>,
1427 ) -> Result<(Self::Blob, u64, u16), Error> {
1428 self.storage.open_versioned(partition, name, versions).await
1429 }
1430
1431 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1432 self.storage.remove(partition, name).await
1433 }
1434
1435 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1436 self.storage.scan(partition).await
1437 }
1438}
1439
1440#[cfg(test)]
1441mod tests {
1442 use super::*;
1443 #[cfg(feature = "external")]
1444 use crate::FutureExt;
1445 #[cfg(feature = "external")]
1446 use crate::Spawner;
1447 use crate::{
1448 deterministic, reschedule, utils::run_tasks, Blob, Metrics, Resolver, Runner as _, Storage,
1449 };
1450 use commonware_macros::test_traced;
1451 #[cfg(not(feature = "external"))]
1452 use futures::future::pending;
1453 #[cfg(feature = "external")]
1454 use futures::{channel::mpsc, SinkExt, StreamExt};
1455 use futures::{channel::oneshot, task::noop_waker};
1456
1457 fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1458 let executor = deterministic::Runner::seeded(seed);
1459 run_tasks(5, executor)
1460 }
1461
1462 #[test]
1463 fn test_same_seed_same_order() {
1464 let mut outputs = Vec::new();
1466 for seed in 0..1000 {
1467 let output = run_with_seed(seed);
1468 outputs.push(output);
1469 }
1470
1471 for seed in 0..1000 {
1473 let output = run_with_seed(seed);
1474 assert_eq!(output, outputs[seed as usize]);
1475 }
1476 }
1477
1478 #[test_traced("TRACE")]
1479 fn test_different_seeds_different_order() {
1480 let output1 = run_with_seed(12345);
1481 let output2 = run_with_seed(54321);
1482 assert_ne!(output1, output2);
1483 }
1484
1485 #[test]
1486 fn test_alarm_min_heap() {
1487 let now = SystemTime::now();
1489 let alarms = vec![
1490 Alarm {
1491 time: now + Duration::new(10, 0),
1492 waker: noop_waker(),
1493 },
1494 Alarm {
1495 time: now + Duration::new(5, 0),
1496 waker: noop_waker(),
1497 },
1498 Alarm {
1499 time: now + Duration::new(15, 0),
1500 waker: noop_waker(),
1501 },
1502 Alarm {
1503 time: now + Duration::new(5, 0),
1504 waker: noop_waker(),
1505 },
1506 ];
1507 let mut heap = BinaryHeap::new();
1508 for alarm in alarms {
1509 heap.push(alarm);
1510 }
1511
1512 let mut sorted_times = Vec::new();
1514 while let Some(alarm) = heap.pop() {
1515 sorted_times.push(alarm.time);
1516 }
1517 assert_eq!(
1518 sorted_times,
1519 vec![
1520 now + Duration::new(5, 0),
1521 now + Duration::new(5, 0),
1522 now + Duration::new(10, 0),
1523 now + Duration::new(15, 0),
1524 ]
1525 );
1526 }
1527
1528 #[test]
1529 #[should_panic(expected = "runtime timeout")]
1530 fn test_timeout() {
1531 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1532 executor.start(|context| async move {
1533 loop {
1534 context.sleep(Duration::from_secs(1)).await;
1535 }
1536 });
1537 }
1538
1539 #[test]
1540 #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1541 fn test_bad_timeout() {
1542 let cfg = Config {
1543 timeout: Some(Duration::default()),
1544 cycle: Duration::default(),
1545 ..Config::default()
1546 };
1547 deterministic::Runner::new(cfg);
1548 }
1549
1550 #[test]
1551 #[should_panic(
1552 expected = "cycle duration must be greater than or equal to system time precision"
1553 )]
1554 fn test_bad_cycle() {
1555 let cfg = Config {
1556 cycle: SYSTEM_TIME_PRECISION - Duration::from_nanos(1),
1557 ..Config::default()
1558 };
1559 deterministic::Runner::new(cfg);
1560 }
1561
1562 #[test]
1563 fn test_recover_synced_storage_persists() {
1564 let executor1 = deterministic::Runner::default();
1566 let partition = "test_partition";
1567 let name = b"test_blob";
1568 let data = b"Hello, world!";
1569
1570 let (state, checkpoint) = executor1.start_and_recover(|context| async move {
1572 let (blob, _) = context.open(partition, name).await.unwrap();
1573 blob.write_at(Vec::from(data), 0).await.unwrap();
1574 blob.sync().await.unwrap();
1575 context.auditor().state()
1576 });
1577
1578 assert_eq!(state, checkpoint.auditor.state());
1580
1581 let executor = Runner::from(checkpoint);
1583 executor.start(|context| async move {
1584 let (blob, len) = context.open(partition, name).await.unwrap();
1585 assert_eq!(len, data.len() as u64);
1586 let read = blob.read_at(vec![0; data.len()], 0).await.unwrap();
1587 assert_eq!(read.as_ref(), data);
1588 });
1589 }
1590
1591 #[test]
1592 #[should_panic(expected = "goodbye")]
1593 fn test_recover_panic_handling() {
1594 let executor1 = deterministic::Runner::default();
1596 let (_, checkpoint) = executor1.start_and_recover(|_| async move {
1597 reschedule().await;
1598 });
1599
1600 let executor = Runner::from(checkpoint);
1602 executor.start(|_| async move {
1603 panic!("goodbye");
1604 });
1605 }
1606
1607 #[test]
1608 fn test_recover_unsynced_storage_does_not_persist() {
1609 let executor = deterministic::Runner::default();
1611 let partition = "test_partition";
1612 let name = b"test_blob";
1613 let data = Vec::from("Hello, world!");
1614
1615 let (_, checkpoint) = executor.start_and_recover(|context| async move {
1617 let context = context.clone();
1618 let (blob, _) = context.open(partition, name).await.unwrap();
1619 blob.write_at(data, 0).await.unwrap();
1620 });
1621
1622 let executor = Runner::from(checkpoint);
1624
1625 executor.start(|context| async move {
1627 let (_, len) = context.open(partition, name).await.unwrap();
1628 assert_eq!(len, 0);
1629 });
1630 }
1631
1632 #[test]
1633 fn test_recover_dns_mappings_persist() {
1634 let executor = deterministic::Runner::default();
1636 let host = "example.com";
1637 let addrs = vec![
1638 IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 1)),
1639 IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 2)),
1640 ];
1641
1642 let (state, checkpoint) = executor.start_and_recover({
1644 let addrs = addrs.clone();
1645 |context| async move {
1646 context.resolver_register(host, Some(addrs));
1647 context.auditor().state()
1648 }
1649 });
1650
1651 assert_eq!(state, checkpoint.auditor.state());
1653
1654 let executor = Runner::from(checkpoint);
1656 executor.start(move |context| async move {
1657 let resolved = context.resolve(host).await.unwrap();
1658 assert_eq!(resolved, addrs);
1659 });
1660 }
1661
1662 #[test]
1663 #[should_panic(expected = "executor still has weak references")]
1664 fn test_context_return() {
1665 let executor = deterministic::Runner::default();
1667
1668 let context = executor.start(|context| async move {
1670 context
1672 });
1673
1674 drop(context);
1676 }
1677
1678 #[test]
1679 fn test_default_time_zero() {
1680 let executor = deterministic::Runner::default();
1682
1683 executor.start(|context| async move {
1684 assert_eq!(
1686 context.current().duration_since(UNIX_EPOCH).unwrap(),
1687 Duration::ZERO
1688 );
1689 });
1690 }
1691
1692 #[cfg(not(feature = "external"))]
1693 #[test]
1694 #[should_panic(expected = "runtime stalled")]
1695 fn test_stall() {
1696 let executor = deterministic::Runner::default();
1698
1699 executor.start(|_| async move {
1701 pending::<()>().await;
1702 });
1703 }
1704
1705 #[cfg(not(feature = "external"))]
1706 #[test]
1707 #[should_panic(expected = "runtime stalled")]
1708 fn test_external_simulated() {
1709 let executor = deterministic::Runner::default();
1711
1712 let (tx, rx) = oneshot::channel();
1714 std::thread::spawn(move || {
1715 std::thread::sleep(Duration::from_secs(1));
1716 tx.send(()).unwrap();
1717 });
1718
1719 executor.start(|_| async move {
1721 rx.await.unwrap();
1722 });
1723 }
1724
1725 #[cfg(feature = "external")]
1726 #[test]
1727 fn test_external_realtime() {
1728 let executor = deterministic::Runner::default();
1730
1731 let (tx, rx) = oneshot::channel();
1733 std::thread::spawn(move || {
1734 std::thread::sleep(Duration::from_secs(1));
1735 tx.send(()).unwrap();
1736 });
1737
1738 executor.start(|_| async move {
1740 rx.await.unwrap();
1741 });
1742 }
1743
1744 #[cfg(feature = "external")]
1745 #[test]
1746 fn test_external_realtime_variable() {
1747 let executor = deterministic::Runner::default();
1749
1750 executor.start(|context| async move {
1752 let start_real = SystemTime::now();
1754 let start_sim = context.current();
1755 let (first_tx, first_rx) = oneshot::channel();
1756 let (second_tx, second_rx) = oneshot::channel();
1757 let (mut results_tx, mut results_rx) = mpsc::channel(2);
1758
1759 let first_wait = Duration::from_secs(1);
1761 std::thread::spawn(move || {
1762 std::thread::sleep(first_wait);
1763 first_tx.send(()).unwrap();
1764 });
1765
1766 std::thread::spawn(move || {
1768 std::thread::sleep(Duration::ZERO);
1769 second_tx.send(()).unwrap();
1770 });
1771
1772 let first = context.clone().spawn({
1774 let mut results_tx = results_tx.clone();
1775 move |context| async move {
1776 first_rx.pace(&context, Duration::ZERO).await.unwrap();
1777 let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
1778 assert!(elapsed_real > first_wait);
1779 let elapsed_sim = context.current().duration_since(start_sim).unwrap();
1780 assert!(elapsed_sim < first_wait);
1781 results_tx.send(1).await.unwrap();
1782 }
1783 });
1784
1785 let second = context.clone().spawn(move |context| async move {
1787 second_rx.pace(&context, first_wait).await.unwrap();
1788 let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
1789 assert!(elapsed_real >= first_wait);
1790 let elapsed_sim = context.current().duration_since(start_sim).unwrap();
1791 assert!(elapsed_sim >= first_wait);
1792 results_tx.send(2).await.unwrap();
1793 });
1794
1795 second.await.unwrap();
1797 first.await.unwrap();
1798
1799 let mut results = Vec::new();
1801 for _ in 0..2 {
1802 results.push(results_rx.next().await.unwrap());
1803 }
1804 assert_eq!(results, vec![1, 2]);
1805 });
1806 }
1807
1808 #[cfg(not(feature = "external"))]
1809 #[test]
1810 fn test_simulated_skip() {
1811 let executor = deterministic::Runner::default();
1813
1814 executor.start(|context| async move {
1816 context.sleep(Duration::from_secs(1)).await;
1817
1818 let metrics = context.encode();
1820 let iterations = metrics
1821 .lines()
1822 .find_map(|line| {
1823 line.strip_prefix("runtime_iterations_total ")
1824 .and_then(|value| value.trim().parse::<u64>().ok())
1825 })
1826 .expect("missing runtime_iterations_total metric");
1827 assert!(iterations < 10);
1828 });
1829 }
1830
1831 #[cfg(feature = "external")]
1832 #[test]
1833 fn test_realtime_no_skip() {
1834 let executor = deterministic::Runner::default();
1836
1837 executor.start(|context| async move {
1839 context.sleep(Duration::from_secs(1)).await;
1840
1841 let metrics = context.encode();
1843 let iterations = metrics
1844 .lines()
1845 .find_map(|line| {
1846 line.strip_prefix("runtime_iterations_total ")
1847 .and_then(|value| value.trim().parse::<u64>().ok())
1848 })
1849 .expect("missing runtime_iterations_total metric");
1850 assert!(iterations > 500);
1851 });
1852 }
1853}