1use crate::{
26 network::{
27 audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
28 metered::Network as MeteredNetwork,
29 },
30 storage::{
31 audited::Storage as AuditedStorage, memory::Storage as MemStorage,
32 metered::Storage as MeteredStorage,
33 },
34 utils::Signaler,
35 Clock, Error, Handle, ListenerOf, Signal, METRICS_PREFIX,
36};
37use commonware_utils::{hex, SystemTimeExt};
38use futures::{
39 task::{waker_ref, ArcWake},
40 Future,
41};
42use governor::clock::{Clock as GClock, ReasonablyRealtime};
43use prometheus_client::{
44 encoding::{text::encode, EncodeLabelSet},
45 metrics::{counter::Counter, family::Family, gauge::Gauge},
46 registry::{Metric, Registry},
47};
48use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
49use sha2::{Digest, Sha256};
50use std::{
51 collections::{BinaryHeap, HashMap},
52 mem::replace,
53 net::SocketAddr,
54 pin::Pin,
55 sync::{Arc, Mutex},
56 task::{self, Poll, Waker},
57 time::{Duration, SystemTime, UNIX_EPOCH},
58};
59use tracing::trace;
60
61pub type Partition = HashMap<Vec<u8>, Vec<u8>>;
63
64#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
65struct Work {
66 label: String,
67}
68
69#[derive(Debug)]
70struct Metrics {
71 tasks_spawned: Family<Work, Counter>,
72 tasks_running: Family<Work, Gauge>,
73 blocking_tasks_spawned: Family<Work, Counter>,
74 blocking_tasks_running: Family<Work, Gauge>,
75 task_polls: Family<Work, Counter>,
76
77 network_bandwidth: Counter,
78}
79
80impl Metrics {
81 pub fn init(registry: &mut Registry) -> Self {
82 let metrics = Self {
83 task_polls: Family::default(),
84 tasks_spawned: Family::default(),
85 tasks_running: Family::default(),
86 blocking_tasks_spawned: Family::default(),
87 blocking_tasks_running: Family::default(),
88 network_bandwidth: Counter::default(),
89 };
90 registry.register(
91 "tasks_spawned",
92 "Total number of tasks spawned",
93 metrics.tasks_spawned.clone(),
94 );
95 registry.register(
96 "tasks_running",
97 "Number of tasks currently running",
98 metrics.tasks_running.clone(),
99 );
100 registry.register(
101 "blocking_tasks_spawned",
102 "Total number of blocking tasks spawned",
103 metrics.blocking_tasks_spawned.clone(),
104 );
105 registry.register(
106 "blocking_tasks_running",
107 "Number of blocking tasks currently running",
108 metrics.blocking_tasks_running.clone(),
109 );
110 registry.register(
111 "task_polls",
112 "Total number of task polls",
113 metrics.task_polls.clone(),
114 );
115 registry.register(
116 "bandwidth",
117 "Total amount of data sent over network",
118 metrics.network_bandwidth.clone(),
119 );
120 metrics
121 }
122}
123
124pub struct Auditor {
126 hash: Mutex<Vec<u8>>,
127}
128
129impl Default for Auditor {
130 fn default() -> Self {
131 Self {
132 hash: Vec::new().into(),
133 }
134 }
135}
136
137impl Auditor {
138 pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
142 where
143 F: FnOnce(&mut Sha256),
144 {
145 let mut hash = self.hash.lock().unwrap();
146
147 let mut hasher = Sha256::new();
148 hasher.update(&*hash);
149 hasher.update(label);
150 payload(&mut hasher);
151
152 *hash = hasher.finalize().to_vec();
153 }
154
155 pub fn state(&self) -> String {
160 let hash = self.hash.lock().unwrap().clone();
161 hex(&hash)
162 }
163}
164
165#[derive(Clone)]
167pub struct Config {
168 seed: u64,
170
171 cycle: Duration,
174
175 timeout: Option<Duration>,
177}
178
179impl Config {
180 pub fn new() -> Self {
182 Self {
183 seed: 42,
184 cycle: Duration::from_millis(1),
185 timeout: None,
186 }
187 }
188
189 pub fn with_seed(mut self, seed: u64) -> Self {
192 self.seed = seed;
193 self
194 }
195 pub fn with_cycle(mut self, cycle: Duration) -> Self {
197 self.cycle = cycle;
198 self
199 }
200 pub fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
202 self.timeout = timeout;
203 self
204 }
205
206 pub fn seed(&self) -> u64 {
209 self.seed
210 }
211 pub fn cycle(&self) -> Duration {
213 self.cycle
214 }
215 pub fn timeout(&self) -> Option<Duration> {
217 self.timeout
218 }
219
220 pub fn assert(&self) {
222 assert!(
223 self.cycle != Duration::default() || self.timeout.is_none(),
224 "cycle duration must be non-zero when timeout is set",
225 );
226 }
227}
228
229impl Default for Config {
230 fn default() -> Self {
231 Self::new()
232 }
233}
234
235pub struct Executor {
237 registry: Mutex<Registry>,
238 cycle: Duration,
239 deadline: Option<SystemTime>,
240 metrics: Arc<Metrics>,
241 auditor: Arc<Auditor>,
242 rng: Mutex<StdRng>,
243 time: Mutex<SystemTime>,
244 tasks: Arc<Tasks>,
245 sleeping: Mutex<BinaryHeap<Alarm>>,
246 partitions: Mutex<HashMap<String, Partition>>,
247 signaler: Mutex<Signaler>,
248 signal: Signal,
249 finished: Mutex<bool>,
250 recovered: Mutex<bool>,
251}
252
253enum State {
254 Config(Config),
255 Context(Context),
256}
257
258pub struct Runner {
260 state: State,
261}
262
263impl From<Config> for Runner {
264 fn from(cfg: Config) -> Self {
265 Self::new(cfg)
266 }
267}
268
269impl From<Context> for Runner {
270 fn from(context: Context) -> Self {
271 Self {
272 state: State::Context(context),
273 }
274 }
275}
276
277impl Runner {
278 pub fn new(cfg: Config) -> Self {
280 cfg.assert();
282 Runner {
283 state: State::Config(cfg),
284 }
285 }
286
287 pub fn seeded(seed: u64) -> Self {
290 let cfg = Config {
291 seed,
292 ..Config::default()
293 };
294 Self::new(cfg)
295 }
296
297 pub fn timed(timeout: Duration) -> Self {
300 let cfg = Config {
301 timeout: Some(timeout),
302 ..Config::default()
303 };
304 Self::new(cfg)
305 }
306}
307
308impl Default for Runner {
309 fn default() -> Self {
310 Self::new(Config::default())
311 }
312}
313
314impl crate::Runner for Runner {
315 type Context = Context;
316
317 fn start<F, Fut>(self, f: F) -> Fut::Output
318 where
319 F: FnOnce(Self::Context) -> Fut,
320 Fut: Future,
321 {
322 let context = match self.state {
324 State::Config(config) => Context::new(config),
325 State::Context(context) => context,
326 };
327
328 let executor = context.executor.clone();
330 let mut root = Box::pin(f(context));
331
332 Tasks::register_root(&executor.tasks);
334
335 let mut iter = 0;
337 loop {
338 {
340 let current = executor.time.lock().unwrap();
341 if let Some(deadline) = executor.deadline {
342 if *current >= deadline {
343 panic!("runtime timeout");
344 }
345 }
346 }
347
348 let mut tasks = executor.tasks.drain();
350
351 {
353 let mut rng = executor.rng.lock().unwrap();
354 tasks.shuffle(&mut *rng);
355 }
356
357 trace!(iter, tasks = tasks.len(), "starting loop");
363 for task in tasks {
364 executor.auditor.event(b"process_task", |hasher| {
366 hasher.update(task.id.to_be_bytes());
367 hasher.update(task.label.as_bytes());
368 });
369 trace!(id = task.id, "processing task");
370
371 executor
373 .metrics
374 .task_polls
375 .get_or_create(&Work {
376 label: task.label.clone(),
377 })
378 .inc();
379
380 let waker = waker_ref(&task);
382 let mut cx = task::Context::from_waker(&waker);
383 match &task.operation {
384 Operation::Root => {
385 if let Poll::Ready(output) = root.as_mut().poll(&mut cx) {
387 trace!(id = task.id, "task is complete");
388 *executor.finished.lock().unwrap() = true;
389 return output;
390 }
391 }
392 Operation::Work { future, completed } => {
393 if *completed.lock().unwrap() {
395 trace!(id = task.id, "dropping already complete task");
396 continue;
397 }
398
399 let mut fut = future.lock().unwrap();
401 if fut.as_mut().poll(&mut cx).is_ready() {
402 trace!(id = task.id, "task is complete");
403 *completed.lock().unwrap() = true;
404 continue;
405 }
406 }
407 }
408
409 trace!(id = task.id, "task is still pending");
411 }
412
413 let mut current;
418 {
419 let mut time = executor.time.lock().unwrap();
420 *time = time
421 .checked_add(executor.cycle)
422 .expect("executor time overflowed");
423 current = *time;
424 }
425 trace!(now = current.epoch_millis(), "time advanced");
426
427 if executor.tasks.len() == 0 {
429 let mut skip = None;
430 {
431 let sleeping = executor.sleeping.lock().unwrap();
432 if let Some(next) = sleeping.peek() {
433 if next.time > current {
434 skip = Some(next.time);
435 }
436 }
437 }
438 if skip.is_some() {
439 {
440 let mut time = executor.time.lock().unwrap();
441 *time = skip.unwrap();
442 current = *time;
443 }
444 trace!(now = current.epoch_millis(), "time skipped");
445 }
446 }
447
448 let mut to_wake = Vec::new();
450 let mut remaining;
451 {
452 let mut sleeping = executor.sleeping.lock().unwrap();
453 while let Some(next) = sleeping.peek() {
454 if next.time <= current {
455 let sleeper = sleeping.pop().unwrap();
456 to_wake.push(sleeper.waker);
457 } else {
458 break;
459 }
460 }
461 remaining = sleeping.len();
462 }
463 for waker in to_wake {
464 waker.wake();
465 }
466
467 remaining += executor.tasks.len();
469
470 if remaining == 0 {
473 panic!("runtime stalled");
474 }
475 iter += 1;
476 }
477 }
478}
479
480enum Operation {
482 Root,
483 Work {
484 future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
485 completed: Mutex<bool>,
486 },
487}
488
489struct Task {
491 id: u128,
492 label: String,
493 tasks: Arc<Tasks>,
494
495 operation: Operation,
496}
497
498impl ArcWake for Task {
499 fn wake_by_ref(arc_self: &Arc<Self>) {
500 arc_self.tasks.enqueue(arc_self.clone());
501 }
502}
503
504struct Tasks {
506 counter: Mutex<u128>,
508 queue: Mutex<Vec<Arc<Task>>>,
510 root_registered: Mutex<bool>,
512}
513
514impl Tasks {
515 fn new() -> Self {
517 Self {
518 counter: Mutex::new(0),
519 queue: Mutex::new(Vec::new()),
520 root_registered: Mutex::new(false),
521 }
522 }
523
524 fn increment(&self) -> u128 {
526 let mut counter = self.counter.lock().unwrap();
527 let old = *counter;
528 *counter = counter.checked_add(1).expect("task counter overflow");
529 old
530 }
531
532 fn register_root(arc_self: &Arc<Self>) {
536 {
537 let mut registered = arc_self.root_registered.lock().unwrap();
538 assert!(!*registered, "root already registered");
539 *registered = true;
540 }
541 let id = arc_self.increment();
542 let mut queue = arc_self.queue.lock().unwrap();
543 queue.push(Arc::new(Task {
544 id,
545 label: String::new(),
546 tasks: arc_self.clone(),
547 operation: Operation::Root,
548 }));
549 }
550
551 fn register_work(
553 arc_self: &Arc<Self>,
554 label: &str,
555 future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
556 ) {
557 let id = arc_self.increment();
558 let mut queue = arc_self.queue.lock().unwrap();
559 queue.push(Arc::new(Task {
560 id,
561 label: label.to_string(),
562 tasks: arc_self.clone(),
563 operation: Operation::Work {
564 future: Mutex::new(future),
565 completed: Mutex::new(false),
566 },
567 }));
568 }
569
570 fn enqueue(&self, task: Arc<Task>) {
572 let mut queue = self.queue.lock().unwrap();
573 queue.push(task);
574 }
575
576 fn drain(&self) -> Vec<Arc<Task>> {
578 let mut queue = self.queue.lock().unwrap();
579 let len = queue.len();
580 replace(&mut *queue, Vec::with_capacity(len))
581 }
582
583 fn len(&self) -> usize {
585 self.queue.lock().unwrap().len()
586 }
587}
588
589type Network = MeteredNetwork<AuditedNetwork<DeterministicNetwork>>;
590
591pub struct Context {
595 label: String,
596 spawned: bool,
597 executor: Arc<Executor>,
598 network: Arc<Network>,
599 storage: MeteredStorage<AuditedStorage<MemStorage>>,
600}
601
602impl Default for Context {
603 fn default() -> Self {
604 Self::new(Config::default())
605 }
606}
607
608impl Context {
609 pub fn new(cfg: Config) -> Self {
610 let mut registry = Registry::default();
612 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
613
614 let metrics = Arc::new(Metrics::init(runtime_registry));
616 let start_time = UNIX_EPOCH;
617 let deadline = cfg
618 .timeout
619 .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
620 let (signaler, signal) = Signaler::new();
621 let auditor = Arc::new(Auditor::default());
622 let storage = MeteredStorage::new(
623 AuditedStorage::new(MemStorage::default(), auditor.clone()),
624 runtime_registry,
625 );
626 let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
627 let network = MeteredNetwork::new(network, runtime_registry);
628
629 let executor = Arc::new(Executor {
630 registry: Mutex::new(registry),
631 cycle: cfg.cycle,
632 deadline,
633 metrics: metrics.clone(),
634 auditor: auditor.clone(),
635 rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
636 time: Mutex::new(start_time),
637 tasks: Arc::new(Tasks::new()),
638 sleeping: Mutex::new(BinaryHeap::new()),
639 partitions: Mutex::new(HashMap::new()),
640 signaler: Mutex::new(signaler),
641 signal,
642 finished: Mutex::new(false),
643 recovered: Mutex::new(false),
644 });
645
646 Context {
647 label: String::new(),
648 spawned: false,
649 executor: executor.clone(),
650 network: Arc::new(network),
651 storage,
652 }
653 }
654
655 pub fn recover(self) -> Self {
667 if !*self.executor.finished.lock().unwrap() {
669 panic!("execution is not finished");
670 }
671
672 {
674 let mut recovered = self.executor.recovered.lock().unwrap();
675 if *recovered {
676 panic!("runtime has already been recovered");
677 }
678 *recovered = true;
679 }
680
681 let mut registry = Registry::default();
683 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
684 let metrics = Arc::new(Metrics::init(runtime_registry));
685
686 let auditor = self.executor.auditor.clone();
688 let (signaler, signal) = Signaler::new();
689 let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
690 let network = MeteredNetwork::new(network, runtime_registry);
691
692 let executor = Arc::new(Executor {
693 cycle: self.executor.cycle,
695 deadline: self.executor.deadline,
696 auditor: auditor.clone(),
697 rng: Mutex::new(self.executor.rng.lock().unwrap().clone()),
698 time: Mutex::new(*self.executor.time.lock().unwrap()),
699 partitions: Mutex::new(self.executor.partitions.lock().unwrap().clone()),
700
701 registry: Mutex::new(registry),
703 metrics: metrics.clone(),
704 tasks: Arc::new(Tasks::new()),
705 sleeping: Mutex::new(BinaryHeap::new()),
706 signaler: Mutex::new(signaler),
707 signal,
708 finished: Mutex::new(false),
709 recovered: Mutex::new(false),
710 });
711 Self {
712 label: String::new(),
713 spawned: false,
714 executor,
715 network: Arc::new(network),
716 storage: self.storage,
717 }
718 }
719
720 pub fn auditor(&self) -> &Auditor {
721 &self.executor.auditor
722 }
723}
724
725impl Clone for Context {
726 fn clone(&self) -> Self {
727 Self {
728 label: self.label.clone(),
729 spawned: false,
730 executor: self.executor.clone(),
731 network: self.network.clone(),
732 storage: self.storage.clone(),
733 }
734 }
735}
736
737impl crate::Spawner for Context {
738 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
739 where
740 F: FnOnce(Self) -> Fut + Send + 'static,
741 Fut: Future<Output = T> + Send + 'static,
742 T: Send + 'static,
743 {
744 assert!(!self.spawned, "already spawned");
746
747 let label = self.label.clone();
749 let work = Work {
750 label: label.clone(),
751 };
752 self.executor
753 .metrics
754 .tasks_spawned
755 .get_or_create(&work)
756 .inc();
757 let gauge = self
758 .executor
759 .metrics
760 .tasks_running
761 .get_or_create(&work)
762 .clone();
763
764 let executor = self.executor.clone();
766 let future = f(self);
767 let (f, handle) = Handle::init(future, gauge, false);
768
769 Tasks::register_work(&executor.tasks, &label, Box::pin(f));
771 handle
772 }
773
774 fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
775 where
776 F: Future<Output = T> + Send + 'static,
777 T: Send + 'static,
778 {
779 assert!(!self.spawned, "already spawned");
781 self.spawned = true;
782
783 let work = Work {
785 label: self.label.clone(),
786 };
787 self.executor
788 .metrics
789 .tasks_spawned
790 .get_or_create(&work)
791 .inc();
792 let gauge = self
793 .executor
794 .metrics
795 .tasks_running
796 .get_or_create(&work)
797 .clone();
798
799 let label = self.label.clone();
801 let executor = self.executor.clone();
802 move |f: F| {
803 let (f, handle) = Handle::init(f, gauge, false);
804
805 Tasks::register_work(&executor.tasks, &label, Box::pin(f));
807 handle
808 }
809 }
810
811 fn spawn_blocking<F, T>(self, f: F) -> Handle<T>
812 where
813 F: FnOnce() -> T + Send + 'static,
814 T: Send + 'static,
815 {
816 assert!(!self.spawned, "already spawned");
818
819 let work = Work {
821 label: self.label.clone(),
822 };
823 self.executor
824 .metrics
825 .blocking_tasks_spawned
826 .get_or_create(&work)
827 .inc();
828 let gauge = self
829 .executor
830 .metrics
831 .blocking_tasks_running
832 .get_or_create(&work)
833 .clone();
834
835 let (f, handle) = Handle::init_blocking(f, gauge, false);
837
838 let f = async move { f() };
840 Tasks::register_work(&self.executor.tasks, &self.label, Box::pin(f));
841 handle
842 }
843
844 fn stop(&self, value: i32) {
845 self.executor.auditor.event(b"stop", |hasher| {
846 hasher.update(value.to_be_bytes());
847 });
848 self.executor.signaler.lock().unwrap().signal(value);
849 }
850
851 fn stopped(&self) -> Signal {
852 self.executor.auditor.event(b"stopped", |_| {});
853 self.executor.signal.clone()
854 }
855}
856
857impl crate::Metrics for Context {
858 fn with_label(&self, label: &str) -> Self {
859 let label = {
860 let prefix = self.label.clone();
861 if prefix.is_empty() {
862 label.to_string()
863 } else {
864 format!("{}_{}", prefix, label)
865 }
866 };
867 assert!(
868 !label.starts_with(METRICS_PREFIX),
869 "using runtime label is not allowed"
870 );
871 Self {
872 label,
873 spawned: false,
874 executor: self.executor.clone(),
875 network: self.network.clone(),
876 storage: self.storage.clone(),
877 }
878 }
879
880 fn label(&self) -> String {
881 self.label.clone()
882 }
883
884 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
885 let name = name.into();
887 let help = help.into();
888
889 self.executor.auditor.event(b"register", |hasher| {
891 hasher.update(name.as_bytes());
892 hasher.update(help.as_bytes());
893 });
894 let prefixed_name = {
895 let prefix = &self.label;
896 if prefix.is_empty() {
897 name
898 } else {
899 format!("{}_{}", *prefix, name)
900 }
901 };
902 self.executor
903 .registry
904 .lock()
905 .unwrap()
906 .register(prefixed_name, help, metric)
907 }
908
909 fn encode(&self) -> String {
910 self.executor.auditor.event(b"encode", |_| {});
911 let mut buffer = String::new();
912 encode(&mut buffer, &self.executor.registry.lock().unwrap()).expect("encoding failed");
913 buffer
914 }
915}
916
917struct Sleeper {
918 executor: Arc<Executor>,
919 time: SystemTime,
920 registered: bool,
921}
922
923struct Alarm {
924 time: SystemTime,
925 waker: Waker,
926}
927
928impl PartialEq for Alarm {
929 fn eq(&self, other: &Self) -> bool {
930 self.time.eq(&other.time)
931 }
932}
933
934impl Eq for Alarm {}
935
936impl PartialOrd for Alarm {
937 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
938 Some(self.cmp(other))
939 }
940}
941
942impl Ord for Alarm {
943 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
944 other.time.cmp(&self.time)
946 }
947}
948
949impl Future for Sleeper {
950 type Output = ();
951
952 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
953 {
954 let current_time = *self.executor.time.lock().unwrap();
955 if current_time >= self.time {
956 return Poll::Ready(());
957 }
958 }
959 if !self.registered {
960 self.registered = true;
961 self.executor.sleeping.lock().unwrap().push(Alarm {
962 time: self.time,
963 waker: cx.waker().clone(),
964 });
965 }
966 Poll::Pending
967 }
968}
969
970impl Clock for Context {
971 fn current(&self) -> SystemTime {
972 *self.executor.time.lock().unwrap()
973 }
974
975 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
976 let deadline = self
977 .current()
978 .checked_add(duration)
979 .expect("overflow when setting wake time");
980 self.sleep_until(deadline)
981 }
982
983 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
984 Sleeper {
985 executor: self.executor.clone(),
986
987 time: deadline,
988 registered: false,
989 }
990 }
991}
992
993impl GClock for Context {
994 type Instant = SystemTime;
995
996 fn now(&self) -> Self::Instant {
997 self.current()
998 }
999}
1000
1001impl ReasonablyRealtime for Context {}
1002
1003impl crate::Network for Context {
1004 type Listener = ListenerOf<Network>;
1005
1006 async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
1007 self.network.bind(socket).await
1008 }
1009
1010 async fn dial(
1011 &self,
1012 socket: SocketAddr,
1013 ) -> Result<(crate::SinkOf<Self>, crate::StreamOf<Self>), Error> {
1014 self.network.dial(socket).await
1015 }
1016}
1017
1018impl RngCore for Context {
1019 fn next_u32(&mut self) -> u32 {
1020 self.executor.auditor.event(b"rand", |hasher| {
1021 hasher.update(b"next_u32");
1022 });
1023 self.executor.rng.lock().unwrap().next_u32()
1024 }
1025
1026 fn next_u64(&mut self) -> u64 {
1027 self.executor.auditor.event(b"rand", |hasher| {
1028 hasher.update(b"next_u64");
1029 });
1030 self.executor.rng.lock().unwrap().next_u64()
1031 }
1032
1033 fn fill_bytes(&mut self, dest: &mut [u8]) {
1034 self.executor.auditor.event(b"rand", |hasher| {
1035 hasher.update(b"fill_bytes");
1036 });
1037 self.executor.rng.lock().unwrap().fill_bytes(dest)
1038 }
1039
1040 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1041 self.executor.auditor.event(b"rand", |hasher| {
1042 hasher.update(b"try_fill_bytes");
1043 });
1044 self.executor.rng.lock().unwrap().try_fill_bytes(dest)
1045 }
1046}
1047
1048impl CryptoRng for Context {}
1049
1050impl crate::Storage for Context {
1051 type Blob = <MeteredStorage<AuditedStorage<MemStorage>> as crate::Storage>::Blob;
1052
1053 async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> {
1054 self.storage.open(partition, name).await
1055 }
1056
1057 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1058 self.storage.remove(partition, name).await
1059 }
1060
1061 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1062 self.storage.scan(partition).await
1063 }
1064}
1065
1066#[cfg(test)]
1067mod tests {
1068 use super::*;
1069 use crate::{deterministic, utils::run_tasks, Blob, Runner as _, Storage};
1070 use commonware_macros::test_traced;
1071 use futures::task::noop_waker;
1072
1073 fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1074 let executor = deterministic::Runner::seeded(seed);
1075 run_tasks(5, executor)
1076 }
1077
1078 #[test]
1079 fn test_same_seed_same_order() {
1080 let mut outputs = Vec::new();
1082 for seed in 0..1000 {
1083 let output = run_with_seed(seed);
1084 outputs.push(output);
1085 }
1086
1087 for seed in 0..1000 {
1089 let output = run_with_seed(seed);
1090 assert_eq!(output, outputs[seed as usize]);
1091 }
1092 }
1093
1094 #[test_traced("TRACE")]
1095 fn test_different_seeds_different_order() {
1096 let output1 = run_with_seed(12345);
1097 let output2 = run_with_seed(54321);
1098 assert_ne!(output1, output2);
1099 }
1100
1101 #[test]
1102 fn test_alarm_min_heap() {
1103 let now = SystemTime::now();
1105 let alarms = vec![
1106 Alarm {
1107 time: now + Duration::new(10, 0),
1108 waker: noop_waker(),
1109 },
1110 Alarm {
1111 time: now + Duration::new(5, 0),
1112 waker: noop_waker(),
1113 },
1114 Alarm {
1115 time: now + Duration::new(15, 0),
1116 waker: noop_waker(),
1117 },
1118 Alarm {
1119 time: now + Duration::new(5, 0),
1120 waker: noop_waker(),
1121 },
1122 ];
1123 let mut heap = BinaryHeap::new();
1124 for alarm in alarms {
1125 heap.push(alarm);
1126 }
1127
1128 let mut sorted_times = Vec::new();
1130 while let Some(alarm) = heap.pop() {
1131 sorted_times.push(alarm.time);
1132 }
1133 assert_eq!(
1134 sorted_times,
1135 vec![
1136 now + Duration::new(5, 0),
1137 now + Duration::new(5, 0),
1138 now + Duration::new(10, 0),
1139 now + Duration::new(15, 0),
1140 ]
1141 );
1142 }
1143
1144 #[test]
1145 #[should_panic(expected = "runtime timeout")]
1146 fn test_timeout() {
1147 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1148 executor.start(|context| async move {
1149 loop {
1150 context.sleep(Duration::from_secs(1)).await;
1151 }
1152 });
1153 }
1154
1155 #[test]
1156 #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1157 fn test_bad_timeout() {
1158 let cfg = Config {
1159 timeout: Some(Duration::default()),
1160 cycle: Duration::default(),
1161 ..Config::default()
1162 };
1163 deterministic::Runner::new(cfg);
1164 }
1165
1166 #[test]
1167 fn test_recover_synced_storage_persists() {
1168 let executor1 = deterministic::Runner::default();
1170 let partition = "test_partition";
1171 let name = b"test_blob";
1172 let data = b"Hello, world!";
1173
1174 let (context, state) = executor1.start(|context| async move {
1176 let (blob, _) = context.open(partition, name).await.unwrap();
1177 blob.write_at(data, 0).await.unwrap();
1178 blob.sync().await.unwrap();
1179 let state = context.auditor().state();
1180 (context, state)
1181 });
1182 let recovered_context = context.recover();
1183
1184 assert_eq!(state, recovered_context.auditor().state());
1186
1187 let executor = Runner::from(recovered_context);
1189 executor.start(|context| async move {
1190 let (blob, len) = context.open(partition, name).await.unwrap();
1191 assert_eq!(len, data.len() as u64);
1192 let mut buf = vec![0; data.len()];
1193 blob.read_at(&mut buf, 0).await.unwrap();
1194 assert_eq!(buf, data);
1195 });
1196 }
1197
1198 #[test]
1199 fn test_recover_unsynced_storage_does_not_persist() {
1200 let executor = deterministic::Runner::default();
1202 let partition = "test_partition";
1203 let name = b"test_blob";
1204 let data = b"Hello, world!".to_vec();
1205
1206 let context = executor.start(|context| async move {
1208 let context = context.clone();
1209 let (blob, _) = context.open(partition, name).await.unwrap();
1210 blob.write_at(&data, 0).await.unwrap();
1211 context
1213 });
1214
1215 let context = context.recover();
1217 let executor = Runner::from(context);
1218
1219 executor.start(|context| async move {
1221 let (_, len) = context.open(partition, name).await.unwrap();
1222 assert_eq!(len, 0);
1223 });
1224 }
1225
1226 #[test]
1227 #[should_panic(expected = "execution is not finished")]
1228 fn test_recover_before_finish_panics() {
1229 let executor = deterministic::Runner::default();
1231
1232 executor.start(|context| async move {
1234 context.recover();
1236 });
1237 }
1238
1239 #[test]
1240 #[should_panic(expected = "runtime has already been recovered")]
1241 fn test_recover_twice_panics() {
1242 let executor = deterministic::Runner::default();
1244
1245 let context = executor.start(|context| async move { context });
1247
1248 let cloned_context = context.clone();
1250 context.recover();
1251
1252 cloned_context.recover();
1254 }
1255
1256 #[test]
1257 fn test_default_time_zero() {
1258 let executor = deterministic::Runner::default();
1260
1261 executor.start(|context| async move {
1262 assert_eq!(
1264 context.current().duration_since(UNIX_EPOCH).unwrap(),
1265 Duration::ZERO
1266 );
1267 });
1268 }
1269}