1use crate::{
26 network::{
27 audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
28 metered::Network as MeteredNetwork,
29 },
30 storage::{
31 audited::Storage as AuditedStorage, memory::Storage as MemStorage,
32 metered::Storage as MeteredStorage,
33 },
34 telemetry::metrics::task::Label,
35 utils::signal::{Signal, Stopper},
36 Clock, Error, Handle, ListenerOf, METRICS_PREFIX,
37};
38use commonware_macros::select;
39use commonware_utils::{hex, SystemTimeExt};
40use futures::{
41 future::AbortHandle,
42 task::{waker_ref, ArcWake},
43 Future,
44};
45use governor::clock::{Clock as GClock, ReasonablyRealtime};
46use prometheus_client::{
47 encoding::text::encode,
48 metrics::{counter::Counter, family::Family, gauge::Gauge},
49 registry::{Metric, Registry},
50};
51use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
52use sha2::{Digest, Sha256};
53use std::{
54 collections::{BinaryHeap, HashMap},
55 mem::replace,
56 net::SocketAddr,
57 pin::Pin,
58 sync::{Arc, Mutex, Weak},
59 task::{self, Poll, Waker},
60 time::{Duration, SystemTime, UNIX_EPOCH},
61};
62use tracing::trace;
63
64pub type Partition = HashMap<Vec<u8>, Vec<u8>>;
66
67#[derive(Debug)]
68struct Metrics {
69 tasks_spawned: Family<Label, Counter>,
70 tasks_running: Family<Label, Gauge>,
71 task_polls: Family<Label, Counter>,
72
73 network_bandwidth: Counter,
74}
75
76impl Metrics {
77 pub fn init(registry: &mut Registry) -> Self {
78 let metrics = Self {
79 task_polls: Family::default(),
80 tasks_spawned: Family::default(),
81 tasks_running: Family::default(),
82 network_bandwidth: Counter::default(),
83 };
84 registry.register(
85 "tasks_spawned",
86 "Total number of tasks spawned",
87 metrics.tasks_spawned.clone(),
88 );
89 registry.register(
90 "tasks_running",
91 "Number of tasks currently running",
92 metrics.tasks_running.clone(),
93 );
94 registry.register(
95 "task_polls",
96 "Total number of task polls",
97 metrics.task_polls.clone(),
98 );
99 registry.register(
100 "bandwidth",
101 "Total amount of data sent over network",
102 metrics.network_bandwidth.clone(),
103 );
104 metrics
105 }
106}
107
108pub struct Auditor {
110 hash: Mutex<Vec<u8>>,
111}
112
113impl Default for Auditor {
114 fn default() -> Self {
115 Self {
116 hash: Vec::new().into(),
117 }
118 }
119}
120
121impl Auditor {
122 pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
126 where
127 F: FnOnce(&mut Sha256),
128 {
129 let mut hash = self.hash.lock().unwrap();
130
131 let mut hasher = Sha256::new();
132 hasher.update(&*hash);
133 hasher.update(label);
134 payload(&mut hasher);
135
136 *hash = hasher.finalize().to_vec();
137 }
138
139 pub fn state(&self) -> String {
144 let hash = self.hash.lock().unwrap().clone();
145 hex(&hash)
146 }
147}
148
149#[derive(Clone)]
151pub struct Config {
152 seed: u64,
154
155 cycle: Duration,
158
159 timeout: Option<Duration>,
161}
162
163impl Config {
164 pub fn new() -> Self {
166 Self {
167 seed: 42,
168 cycle: Duration::from_millis(1),
169 timeout: None,
170 }
171 }
172
173 pub fn with_seed(mut self, seed: u64) -> Self {
176 self.seed = seed;
177 self
178 }
179 pub fn with_cycle(mut self, cycle: Duration) -> Self {
181 self.cycle = cycle;
182 self
183 }
184 pub fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
186 self.timeout = timeout;
187 self
188 }
189
190 pub fn seed(&self) -> u64 {
193 self.seed
194 }
195 pub fn cycle(&self) -> Duration {
197 self.cycle
198 }
199 pub fn timeout(&self) -> Option<Duration> {
201 self.timeout
202 }
203
204 pub fn assert(&self) {
206 assert!(
207 self.cycle != Duration::default() || self.timeout.is_none(),
208 "cycle duration must be non-zero when timeout is set",
209 );
210 }
211}
212
213impl Default for Config {
214 fn default() -> Self {
215 Self::new()
216 }
217}
218
219pub struct Executor {
221 registry: Mutex<Registry>,
222 cycle: Duration,
223 deadline: Option<SystemTime>,
224 metrics: Arc<Metrics>,
225 auditor: Arc<Auditor>,
226 rng: Mutex<StdRng>,
227 time: Mutex<SystemTime>,
228 tasks: Arc<Tasks>,
229 sleeping: Mutex<BinaryHeap<Alarm>>,
230 partitions: Mutex<HashMap<String, Partition>>,
231 shutdown: Mutex<Stopper>,
232 finished: Mutex<bool>,
233 recovered: Mutex<bool>,
234}
235
236enum State {
237 Config(Config),
238 Context(Context),
239}
240
241pub struct Runner {
243 state: State,
244}
245
246impl From<Config> for Runner {
247 fn from(cfg: Config) -> Self {
248 Self::new(cfg)
249 }
250}
251
252impl From<Context> for Runner {
253 fn from(context: Context) -> Self {
254 Self {
255 state: State::Context(context),
256 }
257 }
258}
259
260impl Runner {
261 pub fn new(cfg: Config) -> Self {
263 cfg.assert();
265 Runner {
266 state: State::Config(cfg),
267 }
268 }
269
270 pub fn seeded(seed: u64) -> Self {
273 let cfg = Config {
274 seed,
275 ..Config::default()
276 };
277 Self::new(cfg)
278 }
279
280 pub fn timed(timeout: Duration) -> Self {
283 let cfg = Config {
284 timeout: Some(timeout),
285 ..Config::default()
286 };
287 Self::new(cfg)
288 }
289}
290
291impl Default for Runner {
292 fn default() -> Self {
293 Self::new(Config::default())
294 }
295}
296
297impl crate::Runner for Runner {
298 type Context = Context;
299
300 fn start<F, Fut>(self, f: F) -> Fut::Output
301 where
302 F: FnOnce(Self::Context) -> Fut,
303 Fut: Future,
304 {
305 let context = match self.state {
307 State::Config(config) => Context::new(config),
308 State::Context(context) => context,
309 };
310
311 let executor = context.executor.clone();
313 let mut root = Box::pin(f(context));
314
315 Tasks::register_root(&executor.tasks);
317
318 let mut iter = 0;
320 loop {
321 {
323 let current = executor.time.lock().unwrap();
324 if let Some(deadline) = executor.deadline {
325 if *current >= deadline {
326 panic!("runtime timeout");
327 }
328 }
329 }
330
331 let mut tasks = executor.tasks.drain();
333
334 {
336 let mut rng = executor.rng.lock().unwrap();
337 tasks.shuffle(&mut *rng);
338 }
339
340 trace!(iter, tasks = tasks.len(), "starting loop");
346 for task in tasks {
347 executor.auditor.event(b"process_task", |hasher| {
349 hasher.update(task.id.to_be_bytes());
350 hasher.update(task.label.name().as_bytes());
351 });
352 trace!(id = task.id, "processing task");
353
354 executor.metrics.task_polls.get_or_create(&task.label).inc();
356
357 let waker = waker_ref(&task);
359 let mut cx = task::Context::from_waker(&waker);
360 match &task.operation {
361 Operation::Root => {
362 if let Poll::Ready(output) = root.as_mut().poll(&mut cx) {
364 trace!(id = task.id, "task is complete");
365 *executor.finished.lock().unwrap() = true;
366 return output;
367 }
368 }
369 Operation::Work { future, completed } => {
370 if *completed.lock().unwrap() {
372 trace!(id = task.id, "dropping already complete task");
373 continue;
374 }
375
376 let mut fut = future.lock().unwrap();
378 if fut.as_mut().poll(&mut cx).is_ready() {
379 trace!(id = task.id, "task is complete");
380 *completed.lock().unwrap() = true;
381 continue;
382 }
383 }
384 }
385
386 trace!(id = task.id, "task is still pending");
388 }
389
390 let mut current;
395 {
396 let mut time = executor.time.lock().unwrap();
397 *time = time
398 .checked_add(executor.cycle)
399 .expect("executor time overflowed");
400 current = *time;
401 }
402 trace!(now = current.epoch_millis(), "time advanced");
403
404 if executor.tasks.len() == 0 {
406 let mut skip = None;
407 {
408 let sleeping = executor.sleeping.lock().unwrap();
409 if let Some(next) = sleeping.peek() {
410 if next.time > current {
411 skip = Some(next.time);
412 }
413 }
414 }
415 if let Some(skip_time) = skip {
416 {
417 let mut time = executor.time.lock().unwrap();
418 *time = skip_time;
419 current = *time;
420 }
421 trace!(now = current.epoch_millis(), "time skipped");
422 }
423 }
424
425 let mut to_wake = Vec::new();
427 let mut remaining;
428 {
429 let mut sleeping = executor.sleeping.lock().unwrap();
430 while let Some(next) = sleeping.peek() {
431 if next.time <= current {
432 let sleeper = sleeping.pop().unwrap();
433 to_wake.push(sleeper.waker);
434 } else {
435 break;
436 }
437 }
438 remaining = sleeping.len();
439 }
440 for waker in to_wake {
441 waker.wake();
442 }
443
444 remaining += executor.tasks.len();
446
447 if remaining == 0 {
450 panic!("runtime stalled");
451 }
452 iter += 1;
453 }
454 }
455}
456
457enum Operation {
459 Root,
460 Work {
461 future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
462 completed: Mutex<bool>,
463 },
464}
465
466struct Task {
468 id: u128,
469 label: Label,
470 tasks: Weak<Tasks>,
471
472 operation: Operation,
473}
474
475impl ArcWake for Task {
476 fn wake_by_ref(arc_self: &Arc<Self>) {
477 if let Some(tasks) = arc_self.tasks.upgrade() {
480 tasks.enqueue(arc_self.clone());
481 }
482 }
483}
484
485struct Tasks {
487 counter: Mutex<u128>,
489 queue: Mutex<Vec<Arc<Task>>>,
491 root_registered: Mutex<bool>,
493}
494
495impl Tasks {
496 fn new() -> Self {
498 Self {
499 counter: Mutex::new(0),
500 queue: Mutex::new(Vec::new()),
501 root_registered: Mutex::new(false),
502 }
503 }
504
505 fn increment(&self) -> u128 {
507 let mut counter = self.counter.lock().unwrap();
508 let old = *counter;
509 *counter = counter.checked_add(1).expect("task counter overflow");
510 old
511 }
512
513 fn register_root(arc_self: &Arc<Self>) {
517 {
518 let mut registered = arc_self.root_registered.lock().unwrap();
519 assert!(!*registered, "root already registered");
520 *registered = true;
521 }
522 let id = arc_self.increment();
523 let mut queue = arc_self.queue.lock().unwrap();
524 queue.push(Arc::new(Task {
525 id,
526 label: Label::root(),
527 tasks: Arc::downgrade(arc_self),
528 operation: Operation::Root,
529 }));
530 }
531
532 fn register_work(
534 arc_self: &Arc<Self>,
535 label: Label,
536 future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
537 ) {
538 let id = arc_self.increment();
539 let mut queue = arc_self.queue.lock().unwrap();
540 queue.push(Arc::new(Task {
541 id,
542 label,
543 tasks: Arc::downgrade(arc_self),
544 operation: Operation::Work {
545 future: Mutex::new(future),
546 completed: Mutex::new(false),
547 },
548 }));
549 }
550
551 fn enqueue(&self, task: Arc<Task>) {
553 let mut queue = self.queue.lock().unwrap();
554 queue.push(task);
555 }
556
557 fn drain(&self) -> Vec<Arc<Task>> {
559 let mut queue = self.queue.lock().unwrap();
560 let len = queue.len();
561 replace(&mut *queue, Vec::with_capacity(len))
562 }
563
564 fn len(&self) -> usize {
566 self.queue.lock().unwrap().len()
567 }
568}
569
570type Network = MeteredNetwork<AuditedNetwork<DeterministicNetwork>>;
571
572pub struct Context {
576 name: String,
577 spawned: bool,
578 executor: Arc<Executor>,
579 network: Arc<Network>,
580 storage: MeteredStorage<AuditedStorage<MemStorage>>,
581 children: Arc<Mutex<Vec<AbortHandle>>>,
582}
583
584impl Default for Context {
585 fn default() -> Self {
586 Self::new(Config::default())
587 }
588}
589
590impl Context {
591 pub fn new(cfg: Config) -> Self {
592 let mut registry = Registry::default();
594 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
595
596 let metrics = Arc::new(Metrics::init(runtime_registry));
598 let start_time = UNIX_EPOCH;
599 let deadline = cfg
600 .timeout
601 .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
602 let auditor = Arc::new(Auditor::default());
603 let storage = MeteredStorage::new(
604 AuditedStorage::new(MemStorage::default(), auditor.clone()),
605 runtime_registry,
606 );
607 let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
608 let network = MeteredNetwork::new(network, runtime_registry);
609
610 let executor = Arc::new(Executor {
611 registry: Mutex::new(registry),
612 cycle: cfg.cycle,
613 deadline,
614 metrics: metrics.clone(),
615 auditor: auditor.clone(),
616 rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
617 time: Mutex::new(start_time),
618 tasks: Arc::new(Tasks::new()),
619 sleeping: Mutex::new(BinaryHeap::new()),
620 partitions: Mutex::new(HashMap::new()),
621 shutdown: Mutex::new(Stopper::default()),
622 finished: Mutex::new(false),
623 recovered: Mutex::new(false),
624 });
625
626 Context {
627 name: String::new(),
628 spawned: false,
629 executor: executor.clone(),
630 network: Arc::new(network),
631 storage,
632 children: Arc::new(Mutex::new(Vec::new())),
633 }
634 }
635
636 pub fn recover(self) -> Self {
648 if !*self.executor.finished.lock().unwrap() {
650 panic!("execution is not finished");
651 }
652
653 {
655 let mut recovered = self.executor.recovered.lock().unwrap();
656 if *recovered {
657 panic!("runtime has already been recovered");
658 }
659 *recovered = true;
660 }
661
662 let mut registry = Registry::default();
664 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
665 let metrics = Arc::new(Metrics::init(runtime_registry));
666
667 let auditor = self.executor.auditor.clone();
669 let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
670 let network = MeteredNetwork::new(network, runtime_registry);
671
672 let executor = Arc::new(Executor {
673 cycle: self.executor.cycle,
675 deadline: self.executor.deadline,
676 auditor: auditor.clone(),
677 rng: Mutex::new(self.executor.rng.lock().unwrap().clone()),
678 time: Mutex::new(*self.executor.time.lock().unwrap()),
679 partitions: Mutex::new(self.executor.partitions.lock().unwrap().clone()),
680
681 registry: Mutex::new(registry),
683 metrics: metrics.clone(),
684 tasks: Arc::new(Tasks::new()),
685 sleeping: Mutex::new(BinaryHeap::new()),
686 shutdown: Mutex::new(Stopper::default()),
687 finished: Mutex::new(false),
688 recovered: Mutex::new(false),
689 });
690 Self {
691 name: String::new(),
692 spawned: false,
693 executor,
694 network: Arc::new(network),
695 storage: self.storage,
696 children: Arc::new(Mutex::new(Vec::new())),
697 }
698 }
699
700 pub fn auditor(&self) -> &Auditor {
701 &self.executor.auditor
702 }
703}
704
705impl Clone for Context {
706 fn clone(&self) -> Self {
707 Self {
708 name: self.name.clone(),
709 spawned: false,
710 executor: self.executor.clone(),
711 network: self.network.clone(),
712 storage: self.storage.clone(),
713 children: self.children.clone(),
714 }
715 }
716}
717
718impl crate::Spawner for Context {
719 fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
720 where
721 F: FnOnce(Self) -> Fut + Send + 'static,
722 Fut: Future<Output = T> + Send + 'static,
723 T: Send + 'static,
724 {
725 assert!(!self.spawned, "already spawned");
727
728 let (label, gauge) = spawn_metrics!(self, future);
730
731 let executor = self.executor.clone();
733
734 let children = Arc::new(Mutex::new(Vec::new()));
736 self.children = children.clone();
737
738 let future = f(self);
739 let (f, handle) = Handle::init_future(future, gauge, false, children);
740
741 Tasks::register_work(&executor.tasks, label, Box::pin(f));
743 handle
744 }
745
746 fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
747 where
748 F: Future<Output = T> + Send + 'static,
749 T: Send + 'static,
750 {
751 assert!(!self.spawned, "already spawned");
753 self.spawned = true;
754
755 let (label, gauge) = spawn_metrics!(self, future);
757
758 let executor = self.executor.clone();
760
761 move |f: F| {
762 let (f, handle) =
764 Handle::init_future(f, gauge, false, Arc::new(Mutex::new(Vec::new())));
765
766 Tasks::register_work(&executor.tasks, label, Box::pin(f));
768 handle
769 }
770 }
771
772 fn spawn_child<F, Fut, T>(self, f: F) -> Handle<T>
773 where
774 F: FnOnce(Self) -> Fut + Send + 'static,
775 Fut: Future<Output = T> + Send + 'static,
776 T: Send + 'static,
777 {
778 let parent_children = self.children.clone();
780
781 let child_handle = self.spawn(f);
783
784 if let Some(abort_handle) = child_handle.abort_handle() {
786 parent_children.lock().unwrap().push(abort_handle);
787 }
788
789 child_handle
790 }
791
792 fn spawn_blocking<F, T>(self, dedicated: bool, f: F) -> Handle<T>
793 where
794 F: FnOnce(Self) -> T + Send + 'static,
795 T: Send + 'static,
796 {
797 assert!(!self.spawned, "already spawned");
799
800 let (label, gauge) = spawn_metrics!(self, blocking, dedicated);
802
803 let executor = self.executor.clone();
805 let (f, handle) = Handle::init_blocking(|| f(self), gauge, false);
806
807 let f = async move { f() };
809 Tasks::register_work(&executor.tasks, label, Box::pin(f));
810 handle
811 }
812
813 fn spawn_blocking_ref<F, T>(&mut self, dedicated: bool) -> impl FnOnce(F) -> Handle<T> + 'static
814 where
815 F: FnOnce() -> T + Send + 'static,
816 T: Send + 'static,
817 {
818 assert!(!self.spawned, "already spawned");
820 self.spawned = true;
821
822 let (label, gauge) = spawn_metrics!(self, blocking, dedicated);
824
825 let executor = self.executor.clone();
827 move |f: F| {
828 let (f, handle) = Handle::init_blocking(f, gauge, false);
829
830 let f = async move { f() };
832 Tasks::register_work(&executor.tasks, label, Box::pin(f));
833 handle
834 }
835 }
836
837 async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
838 self.executor.auditor.event(b"stop", |hasher| {
839 hasher.update(value.to_be_bytes());
840 });
841 let stop_resolved = {
842 let mut shutdown = self.executor.shutdown.lock().unwrap();
843 shutdown.stop(value)
844 };
845
846 let timeout_future = match timeout {
848 Some(duration) => futures::future::Either::Left(self.sleep(duration)),
849 None => futures::future::Either::Right(futures::future::pending()),
850 };
851 select! {
852 result = stop_resolved => {
853 result.map_err(|_| Error::Closed)?;
854 Ok(())
855 },
856 _ = timeout_future => {
857 Err(Error::Timeout)
858 }
859 }
860 }
861
862 fn stopped(&self) -> Signal {
863 self.executor.auditor.event(b"stopped", |_| {});
864 self.executor.shutdown.lock().unwrap().stopped()
865 }
866}
867
868impl crate::Metrics for Context {
869 fn with_label(&self, label: &str) -> Self {
870 let name = {
871 let prefix = self.name.clone();
872 if prefix.is_empty() {
873 label.to_string()
874 } else {
875 format!("{prefix}_{label}")
876 }
877 };
878 assert!(
879 !name.starts_with(METRICS_PREFIX),
880 "using runtime label is not allowed"
881 );
882 Self {
883 name,
884 spawned: false,
885 executor: self.executor.clone(),
886 network: self.network.clone(),
887 storage: self.storage.clone(),
888 children: self.children.clone(),
889 }
890 }
891
892 fn label(&self) -> String {
893 self.name.clone()
894 }
895
896 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
897 let name = name.into();
899 let help = help.into();
900
901 self.executor.auditor.event(b"register", |hasher| {
903 hasher.update(name.as_bytes());
904 hasher.update(help.as_bytes());
905 });
906 let prefixed_name = {
907 let prefix = &self.name;
908 if prefix.is_empty() {
909 name
910 } else {
911 format!("{}_{}", *prefix, name)
912 }
913 };
914 self.executor
915 .registry
916 .lock()
917 .unwrap()
918 .register(prefixed_name, help, metric)
919 }
920
921 fn encode(&self) -> String {
922 self.executor.auditor.event(b"encode", |_| {});
923 let mut buffer = String::new();
924 encode(&mut buffer, &self.executor.registry.lock().unwrap()).expect("encoding failed");
925 buffer
926 }
927}
928
929struct Sleeper {
930 executor: Arc<Executor>,
931 time: SystemTime,
932 registered: bool,
933}
934
935struct Alarm {
936 time: SystemTime,
937 waker: Waker,
938}
939
940impl PartialEq for Alarm {
941 fn eq(&self, other: &Self) -> bool {
942 self.time.eq(&other.time)
943 }
944}
945
946impl Eq for Alarm {}
947
948impl PartialOrd for Alarm {
949 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
950 Some(self.cmp(other))
951 }
952}
953
954impl Ord for Alarm {
955 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
956 other.time.cmp(&self.time)
958 }
959}
960
961impl Future for Sleeper {
962 type Output = ();
963
964 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
965 {
966 let current_time = *self.executor.time.lock().unwrap();
967 if current_time >= self.time {
968 return Poll::Ready(());
969 }
970 }
971 if !self.registered {
972 self.registered = true;
973 self.executor.sleeping.lock().unwrap().push(Alarm {
974 time: self.time,
975 waker: cx.waker().clone(),
976 });
977 }
978 Poll::Pending
979 }
980}
981
982impl Clock for Context {
983 fn current(&self) -> SystemTime {
984 *self.executor.time.lock().unwrap()
985 }
986
987 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
988 let deadline = self
989 .current()
990 .checked_add(duration)
991 .expect("overflow when setting wake time");
992 self.sleep_until(deadline)
993 }
994
995 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
996 Sleeper {
997 executor: self.executor.clone(),
998
999 time: deadline,
1000 registered: false,
1001 }
1002 }
1003}
1004
1005impl GClock for Context {
1006 type Instant = SystemTime;
1007
1008 fn now(&self) -> Self::Instant {
1009 self.current()
1010 }
1011}
1012
1013impl ReasonablyRealtime for Context {}
1014
1015impl crate::Network for Context {
1016 type Listener = ListenerOf<Network>;
1017
1018 async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
1019 self.network.bind(socket).await
1020 }
1021
1022 async fn dial(
1023 &self,
1024 socket: SocketAddr,
1025 ) -> Result<(crate::SinkOf<Self>, crate::StreamOf<Self>), Error> {
1026 self.network.dial(socket).await
1027 }
1028}
1029
1030impl RngCore for Context {
1031 fn next_u32(&mut self) -> u32 {
1032 self.executor.auditor.event(b"rand", |hasher| {
1033 hasher.update(b"next_u32");
1034 });
1035 self.executor.rng.lock().unwrap().next_u32()
1036 }
1037
1038 fn next_u64(&mut self) -> u64 {
1039 self.executor.auditor.event(b"rand", |hasher| {
1040 hasher.update(b"next_u64");
1041 });
1042 self.executor.rng.lock().unwrap().next_u64()
1043 }
1044
1045 fn fill_bytes(&mut self, dest: &mut [u8]) {
1046 self.executor.auditor.event(b"rand", |hasher| {
1047 hasher.update(b"fill_bytes");
1048 });
1049 self.executor.rng.lock().unwrap().fill_bytes(dest)
1050 }
1051
1052 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1053 self.executor.auditor.event(b"rand", |hasher| {
1054 hasher.update(b"try_fill_bytes");
1055 });
1056 self.executor.rng.lock().unwrap().try_fill_bytes(dest)
1057 }
1058}
1059
1060impl CryptoRng for Context {}
1061
1062impl crate::Storage for Context {
1063 type Blob = <MeteredStorage<AuditedStorage<MemStorage>> as crate::Storage>::Blob;
1064
1065 async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> {
1066 self.storage.open(partition, name).await
1067 }
1068
1069 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1070 self.storage.remove(partition, name).await
1071 }
1072
1073 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1074 self.storage.scan(partition).await
1075 }
1076}
1077
1078#[cfg(test)]
1079mod tests {
1080 use super::*;
1081 use crate::{deterministic, utils::run_tasks, Blob, Runner as _, Storage};
1082 use commonware_macros::test_traced;
1083 use futures::task::noop_waker;
1084
1085 fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1086 let executor = deterministic::Runner::seeded(seed);
1087 run_tasks(5, executor)
1088 }
1089
1090 #[test]
1091 fn test_same_seed_same_order() {
1092 let mut outputs = Vec::new();
1094 for seed in 0..1000 {
1095 let output = run_with_seed(seed);
1096 outputs.push(output);
1097 }
1098
1099 for seed in 0..1000 {
1101 let output = run_with_seed(seed);
1102 assert_eq!(output, outputs[seed as usize]);
1103 }
1104 }
1105
1106 #[test_traced("TRACE")]
1107 fn test_different_seeds_different_order() {
1108 let output1 = run_with_seed(12345);
1109 let output2 = run_with_seed(54321);
1110 assert_ne!(output1, output2);
1111 }
1112
1113 #[test]
1114 fn test_alarm_min_heap() {
1115 let now = SystemTime::now();
1117 let alarms = vec![
1118 Alarm {
1119 time: now + Duration::new(10, 0),
1120 waker: noop_waker(),
1121 },
1122 Alarm {
1123 time: now + Duration::new(5, 0),
1124 waker: noop_waker(),
1125 },
1126 Alarm {
1127 time: now + Duration::new(15, 0),
1128 waker: noop_waker(),
1129 },
1130 Alarm {
1131 time: now + Duration::new(5, 0),
1132 waker: noop_waker(),
1133 },
1134 ];
1135 let mut heap = BinaryHeap::new();
1136 for alarm in alarms {
1137 heap.push(alarm);
1138 }
1139
1140 let mut sorted_times = Vec::new();
1142 while let Some(alarm) = heap.pop() {
1143 sorted_times.push(alarm.time);
1144 }
1145 assert_eq!(
1146 sorted_times,
1147 vec![
1148 now + Duration::new(5, 0),
1149 now + Duration::new(5, 0),
1150 now + Duration::new(10, 0),
1151 now + Duration::new(15, 0),
1152 ]
1153 );
1154 }
1155
1156 #[test]
1157 #[should_panic(expected = "runtime timeout")]
1158 fn test_timeout() {
1159 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1160 executor.start(|context| async move {
1161 loop {
1162 context.sleep(Duration::from_secs(1)).await;
1163 }
1164 });
1165 }
1166
1167 #[test]
1168 #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1169 fn test_bad_timeout() {
1170 let cfg = Config {
1171 timeout: Some(Duration::default()),
1172 cycle: Duration::default(),
1173 ..Config::default()
1174 };
1175 deterministic::Runner::new(cfg);
1176 }
1177
1178 #[test]
1179 fn test_recover_synced_storage_persists() {
1180 let executor1 = deterministic::Runner::default();
1182 let partition = "test_partition";
1183 let name = b"test_blob";
1184 let data = b"Hello, world!";
1185
1186 let (context, state) = executor1.start(|context| async move {
1188 let (blob, _) = context.open(partition, name).await.unwrap();
1189 blob.write_at(Vec::from(data), 0).await.unwrap();
1190 blob.sync().await.unwrap();
1191 let state = context.auditor().state();
1192 (context, state)
1193 });
1194 let recovered_context = context.recover();
1195
1196 assert_eq!(state, recovered_context.auditor().state());
1198
1199 let executor = Runner::from(recovered_context);
1201 executor.start(|context| async move {
1202 let (blob, len) = context.open(partition, name).await.unwrap();
1203 assert_eq!(len, data.len() as u64);
1204 let read = blob.read_at(vec![0; data.len()], 0).await.unwrap();
1205 assert_eq!(read.as_ref(), data);
1206 });
1207 }
1208
1209 #[test]
1210 fn test_recover_unsynced_storage_does_not_persist() {
1211 let executor = deterministic::Runner::default();
1213 let partition = "test_partition";
1214 let name = b"test_blob";
1215 let data = Vec::from("Hello, world!");
1216
1217 let context = executor.start(|context| async move {
1219 let context = context.clone();
1220 let (blob, _) = context.open(partition, name).await.unwrap();
1221 blob.write_at(data, 0).await.unwrap();
1222 context
1224 });
1225
1226 let context = context.recover();
1228 let executor = Runner::from(context);
1229
1230 executor.start(|context| async move {
1232 let (_, len) = context.open(partition, name).await.unwrap();
1233 assert_eq!(len, 0);
1234 });
1235 }
1236
1237 #[test]
1238 #[should_panic(expected = "execution is not finished")]
1239 fn test_recover_before_finish_panics() {
1240 let executor = deterministic::Runner::default();
1242
1243 executor.start(|context| async move {
1245 context.recover();
1247 });
1248 }
1249
1250 #[test]
1251 #[should_panic(expected = "runtime has already been recovered")]
1252 fn test_recover_twice_panics() {
1253 let executor = deterministic::Runner::default();
1255
1256 let context = executor.start(|context| async move { context });
1258
1259 let cloned_context = context.clone();
1261 context.recover();
1262
1263 cloned_context.recover();
1265 }
1266
1267 #[test]
1268 fn test_default_time_zero() {
1269 let executor = deterministic::Runner::default();
1271
1272 executor.start(|context| async move {
1273 assert_eq!(
1275 context.current().duration_since(UNIX_EPOCH).unwrap(),
1276 Duration::ZERO
1277 );
1278 });
1279 }
1280}