1use crate::{
26 network::{
27 audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
28 metered::Network as MeteredNetwork,
29 },
30 storage::{
31 audited::Storage as AuditedStorage, memory::Storage as MemStorage,
32 metered::Storage as MeteredStorage,
33 },
34 telemetry::metrics::task::Label,
35 utils::signal::{Signal, Stopper},
36 Clock, Error, Handle, ListenerOf, METRICS_PREFIX,
37};
38use commonware_macros::select;
39use commonware_utils::{hex, SystemTimeExt};
40use futures::{
41 task::{waker_ref, ArcWake},
42 Future,
43};
44use governor::clock::{Clock as GClock, ReasonablyRealtime};
45use prometheus_client::{
46 encoding::text::encode,
47 metrics::{counter::Counter, family::Family, gauge::Gauge},
48 registry::{Metric, Registry},
49};
50use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
51use sha2::{Digest, Sha256};
52use std::{
53 collections::{BinaryHeap, HashMap},
54 mem::replace,
55 net::SocketAddr,
56 pin::Pin,
57 sync::{Arc, Mutex, Weak},
58 task::{self, Poll, Waker},
59 time::{Duration, SystemTime, UNIX_EPOCH},
60};
61use tracing::trace;
62
63pub type Partition = HashMap<Vec<u8>, Vec<u8>>;
65
66#[derive(Debug)]
67struct Metrics {
68 tasks_spawned: Family<Label, Counter>,
69 tasks_running: Family<Label, Gauge>,
70 task_polls: Family<Label, Counter>,
71
72 network_bandwidth: Counter,
73}
74
75impl Metrics {
76 pub fn init(registry: &mut Registry) -> Self {
77 let metrics = Self {
78 task_polls: Family::default(),
79 tasks_spawned: Family::default(),
80 tasks_running: Family::default(),
81 network_bandwidth: Counter::default(),
82 };
83 registry.register(
84 "tasks_spawned",
85 "Total number of tasks spawned",
86 metrics.tasks_spawned.clone(),
87 );
88 registry.register(
89 "tasks_running",
90 "Number of tasks currently running",
91 metrics.tasks_running.clone(),
92 );
93 registry.register(
94 "task_polls",
95 "Total number of task polls",
96 metrics.task_polls.clone(),
97 );
98 registry.register(
99 "bandwidth",
100 "Total amount of data sent over network",
101 metrics.network_bandwidth.clone(),
102 );
103 metrics
104 }
105}
106
107pub struct Auditor {
109 hash: Mutex<Vec<u8>>,
110}
111
112impl Default for Auditor {
113 fn default() -> Self {
114 Self {
115 hash: Vec::new().into(),
116 }
117 }
118}
119
120impl Auditor {
121 pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
125 where
126 F: FnOnce(&mut Sha256),
127 {
128 let mut hash = self.hash.lock().unwrap();
129
130 let mut hasher = Sha256::new();
131 hasher.update(&*hash);
132 hasher.update(label);
133 payload(&mut hasher);
134
135 *hash = hasher.finalize().to_vec();
136 }
137
138 pub fn state(&self) -> String {
143 let hash = self.hash.lock().unwrap().clone();
144 hex(&hash)
145 }
146}
147
148#[derive(Clone)]
150pub struct Config {
151 seed: u64,
153
154 cycle: Duration,
157
158 timeout: Option<Duration>,
160}
161
162impl Config {
163 pub fn new() -> Self {
165 Self {
166 seed: 42,
167 cycle: Duration::from_millis(1),
168 timeout: None,
169 }
170 }
171
172 pub fn with_seed(mut self, seed: u64) -> Self {
175 self.seed = seed;
176 self
177 }
178 pub fn with_cycle(mut self, cycle: Duration) -> Self {
180 self.cycle = cycle;
181 self
182 }
183 pub fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
185 self.timeout = timeout;
186 self
187 }
188
189 pub fn seed(&self) -> u64 {
192 self.seed
193 }
194 pub fn cycle(&self) -> Duration {
196 self.cycle
197 }
198 pub fn timeout(&self) -> Option<Duration> {
200 self.timeout
201 }
202
203 pub fn assert(&self) {
205 assert!(
206 self.cycle != Duration::default() || self.timeout.is_none(),
207 "cycle duration must be non-zero when timeout is set",
208 );
209 }
210}
211
212impl Default for Config {
213 fn default() -> Self {
214 Self::new()
215 }
216}
217
218pub struct Executor {
220 registry: Mutex<Registry>,
221 cycle: Duration,
222 deadline: Option<SystemTime>,
223 metrics: Arc<Metrics>,
224 auditor: Arc<Auditor>,
225 rng: Mutex<StdRng>,
226 time: Mutex<SystemTime>,
227 tasks: Arc<Tasks>,
228 sleeping: Mutex<BinaryHeap<Alarm>>,
229 partitions: Mutex<HashMap<String, Partition>>,
230 shutdown: Mutex<Stopper>,
231 finished: Mutex<bool>,
232 recovered: Mutex<bool>,
233}
234
235enum State {
236 Config(Config),
237 Context(Context),
238}
239
240pub struct Runner {
242 state: State,
243}
244
245impl From<Config> for Runner {
246 fn from(cfg: Config) -> Self {
247 Self::new(cfg)
248 }
249}
250
251impl From<Context> for Runner {
252 fn from(context: Context) -> Self {
253 Self {
254 state: State::Context(context),
255 }
256 }
257}
258
259impl Runner {
260 pub fn new(cfg: Config) -> Self {
262 cfg.assert();
264 Runner {
265 state: State::Config(cfg),
266 }
267 }
268
269 pub fn seeded(seed: u64) -> Self {
272 let cfg = Config {
273 seed,
274 ..Config::default()
275 };
276 Self::new(cfg)
277 }
278
279 pub fn timed(timeout: Duration) -> Self {
282 let cfg = Config {
283 timeout: Some(timeout),
284 ..Config::default()
285 };
286 Self::new(cfg)
287 }
288}
289
290impl Default for Runner {
291 fn default() -> Self {
292 Self::new(Config::default())
293 }
294}
295
296impl crate::Runner for Runner {
297 type Context = Context;
298
299 fn start<F, Fut>(self, f: F) -> Fut::Output
300 where
301 F: FnOnce(Self::Context) -> Fut,
302 Fut: Future,
303 {
304 let context = match self.state {
306 State::Config(config) => Context::new(config),
307 State::Context(context) => context,
308 };
309
310 let executor = context.executor.clone();
312 let mut root = Box::pin(f(context));
313
314 Tasks::register_root(&executor.tasks);
316
317 let mut iter = 0;
319 loop {
320 {
322 let current = executor.time.lock().unwrap();
323 if let Some(deadline) = executor.deadline {
324 if *current >= deadline {
325 panic!("runtime timeout");
326 }
327 }
328 }
329
330 let mut tasks = executor.tasks.drain();
332
333 {
335 let mut rng = executor.rng.lock().unwrap();
336 tasks.shuffle(&mut *rng);
337 }
338
339 trace!(iter, tasks = tasks.len(), "starting loop");
345 for task in tasks {
346 executor.auditor.event(b"process_task", |hasher| {
348 hasher.update(task.id.to_be_bytes());
349 hasher.update(task.label.name().as_bytes());
350 });
351 trace!(id = task.id, "processing task");
352
353 executor.metrics.task_polls.get_or_create(&task.label).inc();
355
356 let waker = waker_ref(&task);
358 let mut cx = task::Context::from_waker(&waker);
359 match &task.operation {
360 Operation::Root => {
361 if let Poll::Ready(output) = root.as_mut().poll(&mut cx) {
363 trace!(id = task.id, "task is complete");
364 *executor.finished.lock().unwrap() = true;
365 return output;
366 }
367 }
368 Operation::Work { future, completed } => {
369 if *completed.lock().unwrap() {
371 trace!(id = task.id, "dropping already complete task");
372 continue;
373 }
374
375 let mut fut = future.lock().unwrap();
377 if fut.as_mut().poll(&mut cx).is_ready() {
378 trace!(id = task.id, "task is complete");
379 *completed.lock().unwrap() = true;
380 continue;
381 }
382 }
383 }
384
385 trace!(id = task.id, "task is still pending");
387 }
388
389 let mut current;
394 {
395 let mut time = executor.time.lock().unwrap();
396 *time = time
397 .checked_add(executor.cycle)
398 .expect("executor time overflowed");
399 current = *time;
400 }
401 trace!(now = current.epoch_millis(), "time advanced");
402
403 if executor.tasks.len() == 0 {
405 let mut skip = None;
406 {
407 let sleeping = executor.sleeping.lock().unwrap();
408 if let Some(next) = sleeping.peek() {
409 if next.time > current {
410 skip = Some(next.time);
411 }
412 }
413 }
414 if let Some(skip_time) = skip {
415 {
416 let mut time = executor.time.lock().unwrap();
417 *time = skip_time;
418 current = *time;
419 }
420 trace!(now = current.epoch_millis(), "time skipped");
421 }
422 }
423
424 let mut to_wake = Vec::new();
426 let mut remaining;
427 {
428 let mut sleeping = executor.sleeping.lock().unwrap();
429 while let Some(next) = sleeping.peek() {
430 if next.time <= current {
431 let sleeper = sleeping.pop().unwrap();
432 to_wake.push(sleeper.waker);
433 } else {
434 break;
435 }
436 }
437 remaining = sleeping.len();
438 }
439 for waker in to_wake {
440 waker.wake();
441 }
442
443 remaining += executor.tasks.len();
445
446 if remaining == 0 {
449 panic!("runtime stalled");
450 }
451 iter += 1;
452 }
453 }
454}
455
456enum Operation {
458 Root,
459 Work {
460 future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
461 completed: Mutex<bool>,
462 },
463}
464
465struct Task {
467 id: u128,
468 label: Label,
469 tasks: Weak<Tasks>,
470
471 operation: Operation,
472}
473
474impl ArcWake for Task {
475 fn wake_by_ref(arc_self: &Arc<Self>) {
476 if let Some(tasks) = arc_self.tasks.upgrade() {
479 tasks.enqueue(arc_self.clone());
480 }
481 }
482}
483
484struct Tasks {
486 counter: Mutex<u128>,
488 queue: Mutex<Vec<Arc<Task>>>,
490 root_registered: Mutex<bool>,
492}
493
494impl Tasks {
495 fn new() -> Self {
497 Self {
498 counter: Mutex::new(0),
499 queue: Mutex::new(Vec::new()),
500 root_registered: Mutex::new(false),
501 }
502 }
503
504 fn increment(&self) -> u128 {
506 let mut counter = self.counter.lock().unwrap();
507 let old = *counter;
508 *counter = counter.checked_add(1).expect("task counter overflow");
509 old
510 }
511
512 fn register_root(arc_self: &Arc<Self>) {
516 {
517 let mut registered = arc_self.root_registered.lock().unwrap();
518 assert!(!*registered, "root already registered");
519 *registered = true;
520 }
521 let id = arc_self.increment();
522 let mut queue = arc_self.queue.lock().unwrap();
523 queue.push(Arc::new(Task {
524 id,
525 label: Label::root(),
526 tasks: Arc::downgrade(arc_self),
527 operation: Operation::Root,
528 }));
529 }
530
531 fn register_work(
533 arc_self: &Arc<Self>,
534 label: Label,
535 future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
536 ) {
537 let id = arc_self.increment();
538 let mut queue = arc_self.queue.lock().unwrap();
539 queue.push(Arc::new(Task {
540 id,
541 label,
542 tasks: Arc::downgrade(arc_self),
543 operation: Operation::Work {
544 future: Mutex::new(future),
545 completed: Mutex::new(false),
546 },
547 }));
548 }
549
550 fn enqueue(&self, task: Arc<Task>) {
552 let mut queue = self.queue.lock().unwrap();
553 queue.push(task);
554 }
555
556 fn drain(&self) -> Vec<Arc<Task>> {
558 let mut queue = self.queue.lock().unwrap();
559 let len = queue.len();
560 replace(&mut *queue, Vec::with_capacity(len))
561 }
562
563 fn len(&self) -> usize {
565 self.queue.lock().unwrap().len()
566 }
567}
568
569type Network = MeteredNetwork<AuditedNetwork<DeterministicNetwork>>;
570
571pub struct Context {
575 name: String,
576 spawned: bool,
577 executor: Arc<Executor>,
578 network: Arc<Network>,
579 storage: MeteredStorage<AuditedStorage<MemStorage>>,
580}
581
582impl Default for Context {
583 fn default() -> Self {
584 Self::new(Config::default())
585 }
586}
587
588impl Context {
589 pub fn new(cfg: Config) -> Self {
590 let mut registry = Registry::default();
592 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
593
594 let metrics = Arc::new(Metrics::init(runtime_registry));
596 let start_time = UNIX_EPOCH;
597 let deadline = cfg
598 .timeout
599 .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
600 let auditor = Arc::new(Auditor::default());
601 let storage = MeteredStorage::new(
602 AuditedStorage::new(MemStorage::default(), auditor.clone()),
603 runtime_registry,
604 );
605 let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
606 let network = MeteredNetwork::new(network, runtime_registry);
607
608 let executor = Arc::new(Executor {
609 registry: Mutex::new(registry),
610 cycle: cfg.cycle,
611 deadline,
612 metrics: metrics.clone(),
613 auditor: auditor.clone(),
614 rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
615 time: Mutex::new(start_time),
616 tasks: Arc::new(Tasks::new()),
617 sleeping: Mutex::new(BinaryHeap::new()),
618 partitions: Mutex::new(HashMap::new()),
619 shutdown: Mutex::new(Stopper::default()),
620 finished: Mutex::new(false),
621 recovered: Mutex::new(false),
622 });
623
624 Context {
625 name: String::new(),
626 spawned: false,
627 executor: executor.clone(),
628 network: Arc::new(network),
629 storage,
630 }
631 }
632
633 pub fn recover(self) -> Self {
645 if !*self.executor.finished.lock().unwrap() {
647 panic!("execution is not finished");
648 }
649
650 {
652 let mut recovered = self.executor.recovered.lock().unwrap();
653 if *recovered {
654 panic!("runtime has already been recovered");
655 }
656 *recovered = true;
657 }
658
659 let mut registry = Registry::default();
661 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
662 let metrics = Arc::new(Metrics::init(runtime_registry));
663
664 let auditor = self.executor.auditor.clone();
666 let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
667 let network = MeteredNetwork::new(network, runtime_registry);
668
669 let executor = Arc::new(Executor {
670 cycle: self.executor.cycle,
672 deadline: self.executor.deadline,
673 auditor: auditor.clone(),
674 rng: Mutex::new(self.executor.rng.lock().unwrap().clone()),
675 time: Mutex::new(*self.executor.time.lock().unwrap()),
676 partitions: Mutex::new(self.executor.partitions.lock().unwrap().clone()),
677
678 registry: Mutex::new(registry),
680 metrics: metrics.clone(),
681 tasks: Arc::new(Tasks::new()),
682 sleeping: Mutex::new(BinaryHeap::new()),
683 shutdown: Mutex::new(Stopper::default()),
684 finished: Mutex::new(false),
685 recovered: Mutex::new(false),
686 });
687 Self {
688 name: String::new(),
689 spawned: false,
690 executor,
691 network: Arc::new(network),
692 storage: self.storage,
693 }
694 }
695
696 pub fn auditor(&self) -> &Auditor {
697 &self.executor.auditor
698 }
699}
700
701impl Clone for Context {
702 fn clone(&self) -> Self {
703 Self {
704 name: self.name.clone(),
705 spawned: false,
706 executor: self.executor.clone(),
707 network: self.network.clone(),
708 storage: self.storage.clone(),
709 }
710 }
711}
712
713impl crate::Spawner for Context {
714 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
715 where
716 F: FnOnce(Self) -> Fut + Send + 'static,
717 Fut: Future<Output = T> + Send + 'static,
718 T: Send + 'static,
719 {
720 assert!(!self.spawned, "already spawned");
722
723 let (label, gauge) = spawn_metrics!(self, future);
725
726 let executor = self.executor.clone();
728 let future = f(self);
729 let (f, handle) = Handle::init_future(future, gauge, false);
730
731 Tasks::register_work(&executor.tasks, label, Box::pin(f));
733 handle
734 }
735
736 fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
737 where
738 F: Future<Output = T> + Send + 'static,
739 T: Send + 'static,
740 {
741 assert!(!self.spawned, "already spawned");
743 self.spawned = true;
744
745 let (label, gauge) = spawn_metrics!(self, future);
747
748 let executor = self.executor.clone();
750 move |f: F| {
751 let (f, handle) = Handle::init_future(f, gauge, false);
752
753 Tasks::register_work(&executor.tasks, label, Box::pin(f));
755 handle
756 }
757 }
758
759 fn spawn_blocking<F, T>(self, dedicated: bool, f: F) -> Handle<T>
760 where
761 F: FnOnce(Self) -> T + Send + 'static,
762 T: Send + 'static,
763 {
764 assert!(!self.spawned, "already spawned");
766
767 let (label, gauge) = spawn_metrics!(self, blocking, dedicated);
769
770 let executor = self.executor.clone();
772 let (f, handle) = Handle::init_blocking(|| f(self), gauge, false);
773
774 let f = async move { f() };
776 Tasks::register_work(&executor.tasks, label, Box::pin(f));
777 handle
778 }
779
780 fn spawn_blocking_ref<F, T>(&mut self, dedicated: bool) -> impl FnOnce(F) -> Handle<T> + 'static
781 where
782 F: FnOnce() -> T + Send + 'static,
783 T: Send + 'static,
784 {
785 assert!(!self.spawned, "already spawned");
787 self.spawned = true;
788
789 let (label, gauge) = spawn_metrics!(self, blocking, dedicated);
791
792 let executor = self.executor.clone();
794 move |f: F| {
795 let (f, handle) = Handle::init_blocking(f, gauge, false);
796
797 let f = async move { f() };
799 Tasks::register_work(&executor.tasks, label, Box::pin(f));
800 handle
801 }
802 }
803
804 async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
805 self.executor.auditor.event(b"stop", |hasher| {
806 hasher.update(value.to_be_bytes());
807 });
808 let stop_resolved = {
809 let mut shutdown = self.executor.shutdown.lock().unwrap();
810 shutdown.stop(value)
811 };
812
813 let timeout_future = match timeout {
815 Some(duration) => futures::future::Either::Left(self.sleep(duration)),
816 None => futures::future::Either::Right(futures::future::pending()),
817 };
818 select! {
819 result = stop_resolved => {
820 result.map_err(|_| Error::Closed)?;
821 Ok(())
822 },
823 _ = timeout_future => {
824 Err(Error::Timeout)
825 }
826 }
827 }
828
829 fn stopped(&self) -> Signal {
830 self.executor.auditor.event(b"stopped", |_| {});
831 self.executor.shutdown.lock().unwrap().stopped()
832 }
833}
834
835impl crate::Metrics for Context {
836 fn with_label(&self, label: &str) -> Self {
837 let name = {
838 let prefix = self.name.clone();
839 if prefix.is_empty() {
840 label.to_string()
841 } else {
842 format!("{prefix}_{label}")
843 }
844 };
845 assert!(
846 !name.starts_with(METRICS_PREFIX),
847 "using runtime label is not allowed"
848 );
849 Self {
850 name,
851 spawned: false,
852 executor: self.executor.clone(),
853 network: self.network.clone(),
854 storage: self.storage.clone(),
855 }
856 }
857
858 fn label(&self) -> String {
859 self.name.clone()
860 }
861
862 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
863 let name = name.into();
865 let help = help.into();
866
867 self.executor.auditor.event(b"register", |hasher| {
869 hasher.update(name.as_bytes());
870 hasher.update(help.as_bytes());
871 });
872 let prefixed_name = {
873 let prefix = &self.name;
874 if prefix.is_empty() {
875 name
876 } else {
877 format!("{}_{}", *prefix, name)
878 }
879 };
880 self.executor
881 .registry
882 .lock()
883 .unwrap()
884 .register(prefixed_name, help, metric)
885 }
886
887 fn encode(&self) -> String {
888 self.executor.auditor.event(b"encode", |_| {});
889 let mut buffer = String::new();
890 encode(&mut buffer, &self.executor.registry.lock().unwrap()).expect("encoding failed");
891 buffer
892 }
893}
894
895struct Sleeper {
896 executor: Arc<Executor>,
897 time: SystemTime,
898 registered: bool,
899}
900
901struct Alarm {
902 time: SystemTime,
903 waker: Waker,
904}
905
906impl PartialEq for Alarm {
907 fn eq(&self, other: &Self) -> bool {
908 self.time.eq(&other.time)
909 }
910}
911
912impl Eq for Alarm {}
913
914impl PartialOrd for Alarm {
915 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
916 Some(self.cmp(other))
917 }
918}
919
920impl Ord for Alarm {
921 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
922 other.time.cmp(&self.time)
924 }
925}
926
927impl Future for Sleeper {
928 type Output = ();
929
930 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
931 {
932 let current_time = *self.executor.time.lock().unwrap();
933 if current_time >= self.time {
934 return Poll::Ready(());
935 }
936 }
937 if !self.registered {
938 self.registered = true;
939 self.executor.sleeping.lock().unwrap().push(Alarm {
940 time: self.time,
941 waker: cx.waker().clone(),
942 });
943 }
944 Poll::Pending
945 }
946}
947
948impl Clock for Context {
949 fn current(&self) -> SystemTime {
950 *self.executor.time.lock().unwrap()
951 }
952
953 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
954 let deadline = self
955 .current()
956 .checked_add(duration)
957 .expect("overflow when setting wake time");
958 self.sleep_until(deadline)
959 }
960
961 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
962 Sleeper {
963 executor: self.executor.clone(),
964
965 time: deadline,
966 registered: false,
967 }
968 }
969}
970
971impl GClock for Context {
972 type Instant = SystemTime;
973
974 fn now(&self) -> Self::Instant {
975 self.current()
976 }
977}
978
979impl ReasonablyRealtime for Context {}
980
981impl crate::Network for Context {
982 type Listener = ListenerOf<Network>;
983
984 async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
985 self.network.bind(socket).await
986 }
987
988 async fn dial(
989 &self,
990 socket: SocketAddr,
991 ) -> Result<(crate::SinkOf<Self>, crate::StreamOf<Self>), Error> {
992 self.network.dial(socket).await
993 }
994}
995
996impl RngCore for Context {
997 fn next_u32(&mut self) -> u32 {
998 self.executor.auditor.event(b"rand", |hasher| {
999 hasher.update(b"next_u32");
1000 });
1001 self.executor.rng.lock().unwrap().next_u32()
1002 }
1003
1004 fn next_u64(&mut self) -> u64 {
1005 self.executor.auditor.event(b"rand", |hasher| {
1006 hasher.update(b"next_u64");
1007 });
1008 self.executor.rng.lock().unwrap().next_u64()
1009 }
1010
1011 fn fill_bytes(&mut self, dest: &mut [u8]) {
1012 self.executor.auditor.event(b"rand", |hasher| {
1013 hasher.update(b"fill_bytes");
1014 });
1015 self.executor.rng.lock().unwrap().fill_bytes(dest)
1016 }
1017
1018 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1019 self.executor.auditor.event(b"rand", |hasher| {
1020 hasher.update(b"try_fill_bytes");
1021 });
1022 self.executor.rng.lock().unwrap().try_fill_bytes(dest)
1023 }
1024}
1025
1026impl CryptoRng for Context {}
1027
1028impl crate::Storage for Context {
1029 type Blob = <MeteredStorage<AuditedStorage<MemStorage>> as crate::Storage>::Blob;
1030
1031 async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> {
1032 self.storage.open(partition, name).await
1033 }
1034
1035 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1036 self.storage.remove(partition, name).await
1037 }
1038
1039 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1040 self.storage.scan(partition).await
1041 }
1042}
1043
1044#[cfg(test)]
1045mod tests {
1046 use super::*;
1047 use crate::{deterministic, utils::run_tasks, Blob, Runner as _, Storage};
1048 use commonware_macros::test_traced;
1049 use futures::task::noop_waker;
1050
1051 fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1052 let executor = deterministic::Runner::seeded(seed);
1053 run_tasks(5, executor)
1054 }
1055
1056 #[test]
1057 fn test_same_seed_same_order() {
1058 let mut outputs = Vec::new();
1060 for seed in 0..1000 {
1061 let output = run_with_seed(seed);
1062 outputs.push(output);
1063 }
1064
1065 for seed in 0..1000 {
1067 let output = run_with_seed(seed);
1068 assert_eq!(output, outputs[seed as usize]);
1069 }
1070 }
1071
1072 #[test_traced("TRACE")]
1073 fn test_different_seeds_different_order() {
1074 let output1 = run_with_seed(12345);
1075 let output2 = run_with_seed(54321);
1076 assert_ne!(output1, output2);
1077 }
1078
1079 #[test]
1080 fn test_alarm_min_heap() {
1081 let now = SystemTime::now();
1083 let alarms = vec![
1084 Alarm {
1085 time: now + Duration::new(10, 0),
1086 waker: noop_waker(),
1087 },
1088 Alarm {
1089 time: now + Duration::new(5, 0),
1090 waker: noop_waker(),
1091 },
1092 Alarm {
1093 time: now + Duration::new(15, 0),
1094 waker: noop_waker(),
1095 },
1096 Alarm {
1097 time: now + Duration::new(5, 0),
1098 waker: noop_waker(),
1099 },
1100 ];
1101 let mut heap = BinaryHeap::new();
1102 for alarm in alarms {
1103 heap.push(alarm);
1104 }
1105
1106 let mut sorted_times = Vec::new();
1108 while let Some(alarm) = heap.pop() {
1109 sorted_times.push(alarm.time);
1110 }
1111 assert_eq!(
1112 sorted_times,
1113 vec![
1114 now + Duration::new(5, 0),
1115 now + Duration::new(5, 0),
1116 now + Duration::new(10, 0),
1117 now + Duration::new(15, 0),
1118 ]
1119 );
1120 }
1121
1122 #[test]
1123 #[should_panic(expected = "runtime timeout")]
1124 fn test_timeout() {
1125 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1126 executor.start(|context| async move {
1127 loop {
1128 context.sleep(Duration::from_secs(1)).await;
1129 }
1130 });
1131 }
1132
1133 #[test]
1134 #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1135 fn test_bad_timeout() {
1136 let cfg = Config {
1137 timeout: Some(Duration::default()),
1138 cycle: Duration::default(),
1139 ..Config::default()
1140 };
1141 deterministic::Runner::new(cfg);
1142 }
1143
1144 #[test]
1145 fn test_recover_synced_storage_persists() {
1146 let executor1 = deterministic::Runner::default();
1148 let partition = "test_partition";
1149 let name = b"test_blob";
1150 let data = b"Hello, world!";
1151
1152 let (context, state) = executor1.start(|context| async move {
1154 let (blob, _) = context.open(partition, name).await.unwrap();
1155 blob.write_at(Vec::from(data), 0).await.unwrap();
1156 blob.sync().await.unwrap();
1157 let state = context.auditor().state();
1158 (context, state)
1159 });
1160 let recovered_context = context.recover();
1161
1162 assert_eq!(state, recovered_context.auditor().state());
1164
1165 let executor = Runner::from(recovered_context);
1167 executor.start(|context| async move {
1168 let (blob, len) = context.open(partition, name).await.unwrap();
1169 assert_eq!(len, data.len() as u64);
1170 let read = blob.read_at(vec![0; data.len()], 0).await.unwrap();
1171 assert_eq!(read.as_ref(), data);
1172 });
1173 }
1174
1175 #[test]
1176 fn test_recover_unsynced_storage_does_not_persist() {
1177 let executor = deterministic::Runner::default();
1179 let partition = "test_partition";
1180 let name = b"test_blob";
1181 let data = Vec::from("Hello, world!");
1182
1183 let context = executor.start(|context| async move {
1185 let context = context.clone();
1186 let (blob, _) = context.open(partition, name).await.unwrap();
1187 blob.write_at(data, 0).await.unwrap();
1188 context
1190 });
1191
1192 let context = context.recover();
1194 let executor = Runner::from(context);
1195
1196 executor.start(|context| async move {
1198 let (_, len) = context.open(partition, name).await.unwrap();
1199 assert_eq!(len, 0);
1200 });
1201 }
1202
1203 #[test]
1204 #[should_panic(expected = "execution is not finished")]
1205 fn test_recover_before_finish_panics() {
1206 let executor = deterministic::Runner::default();
1208
1209 executor.start(|context| async move {
1211 context.recover();
1213 });
1214 }
1215
1216 #[test]
1217 #[should_panic(expected = "runtime has already been recovered")]
1218 fn test_recover_twice_panics() {
1219 let executor = deterministic::Runner::default();
1221
1222 let context = executor.start(|context| async move { context });
1224
1225 let cloned_context = context.clone();
1227 context.recover();
1228
1229 cloned_context.recover();
1231 }
1232
1233 #[test]
1234 fn test_default_time_zero() {
1235 let executor = deterministic::Runner::default();
1237
1238 executor.start(|context| async move {
1239 assert_eq!(
1241 context.current().duration_since(UNIX_EPOCH).unwrap(),
1242 Duration::ZERO
1243 );
1244 });
1245 }
1246}