1use crate::{
26 network::{
27 audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
28 metered::Network as MeteredNetwork,
29 },
30 storage::{
31 audited::Storage as AuditedStorage, memory::Storage as MemStorage,
32 metered::Storage as MeteredStorage,
33 },
34 telemetry::metrics::task::Label,
35 utils::Signaler,
36 Clock, Error, Handle, ListenerOf, Signal, METRICS_PREFIX,
37};
38use commonware_utils::{hex, SystemTimeExt};
39use futures::{
40 task::{waker_ref, ArcWake},
41 Future,
42};
43use governor::clock::{Clock as GClock, ReasonablyRealtime};
44use prometheus_client::{
45 encoding::text::encode,
46 metrics::{counter::Counter, family::Family, gauge::Gauge},
47 registry::{Metric, Registry},
48};
49use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
50use sha2::{Digest, Sha256};
51use std::{
52 collections::{BinaryHeap, HashMap},
53 mem::replace,
54 net::SocketAddr,
55 pin::Pin,
56 sync::{Arc, Mutex, Weak},
57 task::{self, Poll, Waker},
58 time::{Duration, SystemTime, UNIX_EPOCH},
59};
60use tracing::trace;
61
62pub type Partition = HashMap<Vec<u8>, Vec<u8>>;
64
65#[derive(Debug)]
66struct Metrics {
67 tasks_spawned: Family<Label, Counter>,
68 tasks_running: Family<Label, Gauge>,
69 task_polls: Family<Label, Counter>,
70
71 network_bandwidth: Counter,
72}
73
74impl Metrics {
75 pub fn init(registry: &mut Registry) -> Self {
76 let metrics = Self {
77 task_polls: Family::default(),
78 tasks_spawned: Family::default(),
79 tasks_running: Family::default(),
80 network_bandwidth: Counter::default(),
81 };
82 registry.register(
83 "tasks_spawned",
84 "Total number of tasks spawned",
85 metrics.tasks_spawned.clone(),
86 );
87 registry.register(
88 "tasks_running",
89 "Number of tasks currently running",
90 metrics.tasks_running.clone(),
91 );
92 registry.register(
93 "task_polls",
94 "Total number of task polls",
95 metrics.task_polls.clone(),
96 );
97 registry.register(
98 "bandwidth",
99 "Total amount of data sent over network",
100 metrics.network_bandwidth.clone(),
101 );
102 metrics
103 }
104}
105
106pub struct Auditor {
108 hash: Mutex<Vec<u8>>,
109}
110
111impl Default for Auditor {
112 fn default() -> Self {
113 Self {
114 hash: Vec::new().into(),
115 }
116 }
117}
118
119impl Auditor {
120 pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
124 where
125 F: FnOnce(&mut Sha256),
126 {
127 let mut hash = self.hash.lock().unwrap();
128
129 let mut hasher = Sha256::new();
130 hasher.update(&*hash);
131 hasher.update(label);
132 payload(&mut hasher);
133
134 *hash = hasher.finalize().to_vec();
135 }
136
137 pub fn state(&self) -> String {
142 let hash = self.hash.lock().unwrap().clone();
143 hex(&hash)
144 }
145}
146
147#[derive(Clone)]
149pub struct Config {
150 seed: u64,
152
153 cycle: Duration,
156
157 timeout: Option<Duration>,
159}
160
161impl Config {
162 pub fn new() -> Self {
164 Self {
165 seed: 42,
166 cycle: Duration::from_millis(1),
167 timeout: None,
168 }
169 }
170
171 pub fn with_seed(mut self, seed: u64) -> Self {
174 self.seed = seed;
175 self
176 }
177 pub fn with_cycle(mut self, cycle: Duration) -> Self {
179 self.cycle = cycle;
180 self
181 }
182 pub fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
184 self.timeout = timeout;
185 self
186 }
187
188 pub fn seed(&self) -> u64 {
191 self.seed
192 }
193 pub fn cycle(&self) -> Duration {
195 self.cycle
196 }
197 pub fn timeout(&self) -> Option<Duration> {
199 self.timeout
200 }
201
202 pub fn assert(&self) {
204 assert!(
205 self.cycle != Duration::default() || self.timeout.is_none(),
206 "cycle duration must be non-zero when timeout is set",
207 );
208 }
209}
210
211impl Default for Config {
212 fn default() -> Self {
213 Self::new()
214 }
215}
216
217pub struct Executor {
219 registry: Mutex<Registry>,
220 cycle: Duration,
221 deadline: Option<SystemTime>,
222 metrics: Arc<Metrics>,
223 auditor: Arc<Auditor>,
224 rng: Mutex<StdRng>,
225 time: Mutex<SystemTime>,
226 tasks: Arc<Tasks>,
227 sleeping: Mutex<BinaryHeap<Alarm>>,
228 partitions: Mutex<HashMap<String, Partition>>,
229 signaler: Mutex<Signaler>,
230 signal: Signal,
231 finished: Mutex<bool>,
232 recovered: Mutex<bool>,
233}
234
235enum State {
236 Config(Config),
237 Context(Context),
238}
239
240pub struct Runner {
242 state: State,
243}
244
245impl From<Config> for Runner {
246 fn from(cfg: Config) -> Self {
247 Self::new(cfg)
248 }
249}
250
251impl From<Context> for Runner {
252 fn from(context: Context) -> Self {
253 Self {
254 state: State::Context(context),
255 }
256 }
257}
258
259impl Runner {
260 pub fn new(cfg: Config) -> Self {
262 cfg.assert();
264 Runner {
265 state: State::Config(cfg),
266 }
267 }
268
269 pub fn seeded(seed: u64) -> Self {
272 let cfg = Config {
273 seed,
274 ..Config::default()
275 };
276 Self::new(cfg)
277 }
278
279 pub fn timed(timeout: Duration) -> Self {
282 let cfg = Config {
283 timeout: Some(timeout),
284 ..Config::default()
285 };
286 Self::new(cfg)
287 }
288}
289
290impl Default for Runner {
291 fn default() -> Self {
292 Self::new(Config::default())
293 }
294}
295
296impl crate::Runner for Runner {
297 type Context = Context;
298
299 fn start<F, Fut>(self, f: F) -> Fut::Output
300 where
301 F: FnOnce(Self::Context) -> Fut,
302 Fut: Future,
303 {
304 let context = match self.state {
306 State::Config(config) => Context::new(config),
307 State::Context(context) => context,
308 };
309
310 let executor = context.executor.clone();
312 let mut root = Box::pin(f(context));
313
314 Tasks::register_root(&executor.tasks);
316
317 let mut iter = 0;
319 loop {
320 {
322 let current = executor.time.lock().unwrap();
323 if let Some(deadline) = executor.deadline {
324 if *current >= deadline {
325 panic!("runtime timeout");
326 }
327 }
328 }
329
330 let mut tasks = executor.tasks.drain();
332
333 {
335 let mut rng = executor.rng.lock().unwrap();
336 tasks.shuffle(&mut *rng);
337 }
338
339 trace!(iter, tasks = tasks.len(), "starting loop");
345 for task in tasks {
346 executor.auditor.event(b"process_task", |hasher| {
348 hasher.update(task.id.to_be_bytes());
349 hasher.update(task.label.name().as_bytes());
350 });
351 trace!(id = task.id, "processing task");
352
353 executor.metrics.task_polls.get_or_create(&task.label).inc();
355
356 let waker = waker_ref(&task);
358 let mut cx = task::Context::from_waker(&waker);
359 match &task.operation {
360 Operation::Root => {
361 if let Poll::Ready(output) = root.as_mut().poll(&mut cx) {
363 trace!(id = task.id, "task is complete");
364 *executor.finished.lock().unwrap() = true;
365 return output;
366 }
367 }
368 Operation::Work { future, completed } => {
369 if *completed.lock().unwrap() {
371 trace!(id = task.id, "dropping already complete task");
372 continue;
373 }
374
375 let mut fut = future.lock().unwrap();
377 if fut.as_mut().poll(&mut cx).is_ready() {
378 trace!(id = task.id, "task is complete");
379 *completed.lock().unwrap() = true;
380 continue;
381 }
382 }
383 }
384
385 trace!(id = task.id, "task is still pending");
387 }
388
389 let mut current;
394 {
395 let mut time = executor.time.lock().unwrap();
396 *time = time
397 .checked_add(executor.cycle)
398 .expect("executor time overflowed");
399 current = *time;
400 }
401 trace!(now = current.epoch_millis(), "time advanced");
402
403 if executor.tasks.len() == 0 {
405 let mut skip = None;
406 {
407 let sleeping = executor.sleeping.lock().unwrap();
408 if let Some(next) = sleeping.peek() {
409 if next.time > current {
410 skip = Some(next.time);
411 }
412 }
413 }
414 if skip.is_some() {
415 {
416 let mut time = executor.time.lock().unwrap();
417 *time = skip.unwrap();
418 current = *time;
419 }
420 trace!(now = current.epoch_millis(), "time skipped");
421 }
422 }
423
424 let mut to_wake = Vec::new();
426 let mut remaining;
427 {
428 let mut sleeping = executor.sleeping.lock().unwrap();
429 while let Some(next) = sleeping.peek() {
430 if next.time <= current {
431 let sleeper = sleeping.pop().unwrap();
432 to_wake.push(sleeper.waker);
433 } else {
434 break;
435 }
436 }
437 remaining = sleeping.len();
438 }
439 for waker in to_wake {
440 waker.wake();
441 }
442
443 remaining += executor.tasks.len();
445
446 if remaining == 0 {
449 panic!("runtime stalled");
450 }
451 iter += 1;
452 }
453 }
454}
455
456enum Operation {
458 Root,
459 Work {
460 future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
461 completed: Mutex<bool>,
462 },
463}
464
465struct Task {
467 id: u128,
468 label: Label,
469 tasks: Weak<Tasks>,
470
471 operation: Operation,
472}
473
474impl ArcWake for Task {
475 fn wake_by_ref(arc_self: &Arc<Self>) {
476 if let Some(tasks) = arc_self.tasks.upgrade() {
479 tasks.enqueue(arc_self.clone());
480 }
481 }
482}
483
484struct Tasks {
486 counter: Mutex<u128>,
488 queue: Mutex<Vec<Arc<Task>>>,
490 root_registered: Mutex<bool>,
492}
493
494impl Tasks {
495 fn new() -> Self {
497 Self {
498 counter: Mutex::new(0),
499 queue: Mutex::new(Vec::new()),
500 root_registered: Mutex::new(false),
501 }
502 }
503
504 fn increment(&self) -> u128 {
506 let mut counter = self.counter.lock().unwrap();
507 let old = *counter;
508 *counter = counter.checked_add(1).expect("task counter overflow");
509 old
510 }
511
512 fn register_root(arc_self: &Arc<Self>) {
516 {
517 let mut registered = arc_self.root_registered.lock().unwrap();
518 assert!(!*registered, "root already registered");
519 *registered = true;
520 }
521 let id = arc_self.increment();
522 let mut queue = arc_self.queue.lock().unwrap();
523 queue.push(Arc::new(Task {
524 id,
525 label: Label::root(),
526 tasks: Arc::downgrade(arc_self),
527 operation: Operation::Root,
528 }));
529 }
530
531 fn register_work(
533 arc_self: &Arc<Self>,
534 label: Label,
535 future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
536 ) {
537 let id = arc_self.increment();
538 let mut queue = arc_self.queue.lock().unwrap();
539 queue.push(Arc::new(Task {
540 id,
541 label,
542 tasks: Arc::downgrade(arc_self),
543 operation: Operation::Work {
544 future: Mutex::new(future),
545 completed: Mutex::new(false),
546 },
547 }));
548 }
549
550 fn enqueue(&self, task: Arc<Task>) {
552 let mut queue = self.queue.lock().unwrap();
553 queue.push(task);
554 }
555
556 fn drain(&self) -> Vec<Arc<Task>> {
558 let mut queue = self.queue.lock().unwrap();
559 let len = queue.len();
560 replace(&mut *queue, Vec::with_capacity(len))
561 }
562
563 fn len(&self) -> usize {
565 self.queue.lock().unwrap().len()
566 }
567}
568
569type Network = MeteredNetwork<AuditedNetwork<DeterministicNetwork>>;
570
571pub struct Context {
575 name: String,
576 spawned: bool,
577 executor: Arc<Executor>,
578 network: Arc<Network>,
579 storage: MeteredStorage<AuditedStorage<MemStorage>>,
580}
581
582impl Default for Context {
583 fn default() -> Self {
584 Self::new(Config::default())
585 }
586}
587
588impl Context {
589 pub fn new(cfg: Config) -> Self {
590 let mut registry = Registry::default();
592 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
593
594 let metrics = Arc::new(Metrics::init(runtime_registry));
596 let start_time = UNIX_EPOCH;
597 let deadline = cfg
598 .timeout
599 .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
600 let (signaler, signal) = Signaler::new();
601 let auditor = Arc::new(Auditor::default());
602 let storage = MeteredStorage::new(
603 AuditedStorage::new(MemStorage::default(), auditor.clone()),
604 runtime_registry,
605 );
606 let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
607 let network = MeteredNetwork::new(network, runtime_registry);
608
609 let executor = Arc::new(Executor {
610 registry: Mutex::new(registry),
611 cycle: cfg.cycle,
612 deadline,
613 metrics: metrics.clone(),
614 auditor: auditor.clone(),
615 rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
616 time: Mutex::new(start_time),
617 tasks: Arc::new(Tasks::new()),
618 sleeping: Mutex::new(BinaryHeap::new()),
619 partitions: Mutex::new(HashMap::new()),
620 signaler: Mutex::new(signaler),
621 signal,
622 finished: Mutex::new(false),
623 recovered: Mutex::new(false),
624 });
625
626 Context {
627 name: String::new(),
628 spawned: false,
629 executor: executor.clone(),
630 network: Arc::new(network),
631 storage,
632 }
633 }
634
635 pub fn recover(self) -> Self {
647 if !*self.executor.finished.lock().unwrap() {
649 panic!("execution is not finished");
650 }
651
652 {
654 let mut recovered = self.executor.recovered.lock().unwrap();
655 if *recovered {
656 panic!("runtime has already been recovered");
657 }
658 *recovered = true;
659 }
660
661 let mut registry = Registry::default();
663 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
664 let metrics = Arc::new(Metrics::init(runtime_registry));
665
666 let auditor = self.executor.auditor.clone();
668 let (signaler, signal) = Signaler::new();
669 let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
670 let network = MeteredNetwork::new(network, runtime_registry);
671
672 let executor = Arc::new(Executor {
673 cycle: self.executor.cycle,
675 deadline: self.executor.deadline,
676 auditor: auditor.clone(),
677 rng: Mutex::new(self.executor.rng.lock().unwrap().clone()),
678 time: Mutex::new(*self.executor.time.lock().unwrap()),
679 partitions: Mutex::new(self.executor.partitions.lock().unwrap().clone()),
680
681 registry: Mutex::new(registry),
683 metrics: metrics.clone(),
684 tasks: Arc::new(Tasks::new()),
685 sleeping: Mutex::new(BinaryHeap::new()),
686 signaler: Mutex::new(signaler),
687 signal,
688 finished: Mutex::new(false),
689 recovered: Mutex::new(false),
690 });
691 Self {
692 name: String::new(),
693 spawned: false,
694 executor,
695 network: Arc::new(network),
696 storage: self.storage,
697 }
698 }
699
700 pub fn auditor(&self) -> &Auditor {
701 &self.executor.auditor
702 }
703}
704
705impl Clone for Context {
706 fn clone(&self) -> Self {
707 Self {
708 name: self.name.clone(),
709 spawned: false,
710 executor: self.executor.clone(),
711 network: self.network.clone(),
712 storage: self.storage.clone(),
713 }
714 }
715}
716
717impl crate::Spawner for Context {
718 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
719 where
720 F: FnOnce(Self) -> Fut + Send + 'static,
721 Fut: Future<Output = T> + Send + 'static,
722 T: Send + 'static,
723 {
724 assert!(!self.spawned, "already spawned");
726
727 let (label, gauge) = spawn_metrics!(self, future);
729
730 let executor = self.executor.clone();
732 let future = f(self);
733 let (f, handle) = Handle::init_future(future, gauge, false);
734
735 Tasks::register_work(&executor.tasks, label, Box::pin(f));
737 handle
738 }
739
740 fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
741 where
742 F: Future<Output = T> + Send + 'static,
743 T: Send + 'static,
744 {
745 assert!(!self.spawned, "already spawned");
747 self.spawned = true;
748
749 let (label, gauge) = spawn_metrics!(self, future);
751
752 let executor = self.executor.clone();
754 move |f: F| {
755 let (f, handle) = Handle::init_future(f, gauge, false);
756
757 Tasks::register_work(&executor.tasks, label, Box::pin(f));
759 handle
760 }
761 }
762
763 fn spawn_blocking<F, T>(self, dedicated: bool, f: F) -> Handle<T>
764 where
765 F: FnOnce(Self) -> T + Send + 'static,
766 T: Send + 'static,
767 {
768 assert!(!self.spawned, "already spawned");
770
771 let (label, gauge) = spawn_metrics!(self, blocking, dedicated);
773
774 let executor = self.executor.clone();
776 let (f, handle) = Handle::init_blocking(|| f(self), gauge, false);
777
778 let f = async move { f() };
780 Tasks::register_work(&executor.tasks, label, Box::pin(f));
781 handle
782 }
783
784 fn spawn_blocking_ref<F, T>(&mut self, dedicated: bool) -> impl FnOnce(F) -> Handle<T> + 'static
785 where
786 F: FnOnce() -> T + Send + 'static,
787 T: Send + 'static,
788 {
789 assert!(!self.spawned, "already spawned");
791 self.spawned = true;
792
793 let (label, gauge) = spawn_metrics!(self, blocking, dedicated);
795
796 let executor = self.executor.clone();
798 move |f: F| {
799 let (f, handle) = Handle::init_blocking(f, gauge, false);
800
801 let f = async move { f() };
803 Tasks::register_work(&executor.tasks, label, Box::pin(f));
804 handle
805 }
806 }
807
808 fn stop(&self, value: i32) {
809 self.executor.auditor.event(b"stop", |hasher| {
810 hasher.update(value.to_be_bytes());
811 });
812 self.executor.signaler.lock().unwrap().signal(value);
813 }
814
815 fn stopped(&self) -> Signal {
816 self.executor.auditor.event(b"stopped", |_| {});
817 self.executor.signal.clone()
818 }
819}
820
821impl crate::Metrics for Context {
822 fn with_label(&self, label: &str) -> Self {
823 let name = {
824 let prefix = self.name.clone();
825 if prefix.is_empty() {
826 label.to_string()
827 } else {
828 format!("{prefix}_{label}")
829 }
830 };
831 assert!(
832 !name.starts_with(METRICS_PREFIX),
833 "using runtime label is not allowed"
834 );
835 Self {
836 name,
837 spawned: false,
838 executor: self.executor.clone(),
839 network: self.network.clone(),
840 storage: self.storage.clone(),
841 }
842 }
843
844 fn label(&self) -> String {
845 self.name.clone()
846 }
847
848 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
849 let name = name.into();
851 let help = help.into();
852
853 self.executor.auditor.event(b"register", |hasher| {
855 hasher.update(name.as_bytes());
856 hasher.update(help.as_bytes());
857 });
858 let prefixed_name = {
859 let prefix = &self.name;
860 if prefix.is_empty() {
861 name
862 } else {
863 format!("{}_{}", *prefix, name)
864 }
865 };
866 self.executor
867 .registry
868 .lock()
869 .unwrap()
870 .register(prefixed_name, help, metric)
871 }
872
873 fn encode(&self) -> String {
874 self.executor.auditor.event(b"encode", |_| {});
875 let mut buffer = String::new();
876 encode(&mut buffer, &self.executor.registry.lock().unwrap()).expect("encoding failed");
877 buffer
878 }
879}
880
881struct Sleeper {
882 executor: Arc<Executor>,
883 time: SystemTime,
884 registered: bool,
885}
886
887struct Alarm {
888 time: SystemTime,
889 waker: Waker,
890}
891
892impl PartialEq for Alarm {
893 fn eq(&self, other: &Self) -> bool {
894 self.time.eq(&other.time)
895 }
896}
897
898impl Eq for Alarm {}
899
900impl PartialOrd for Alarm {
901 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
902 Some(self.cmp(other))
903 }
904}
905
906impl Ord for Alarm {
907 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
908 other.time.cmp(&self.time)
910 }
911}
912
913impl Future for Sleeper {
914 type Output = ();
915
916 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
917 {
918 let current_time = *self.executor.time.lock().unwrap();
919 if current_time >= self.time {
920 return Poll::Ready(());
921 }
922 }
923 if !self.registered {
924 self.registered = true;
925 self.executor.sleeping.lock().unwrap().push(Alarm {
926 time: self.time,
927 waker: cx.waker().clone(),
928 });
929 }
930 Poll::Pending
931 }
932}
933
934impl Clock for Context {
935 fn current(&self) -> SystemTime {
936 *self.executor.time.lock().unwrap()
937 }
938
939 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
940 let deadline = self
941 .current()
942 .checked_add(duration)
943 .expect("overflow when setting wake time");
944 self.sleep_until(deadline)
945 }
946
947 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
948 Sleeper {
949 executor: self.executor.clone(),
950
951 time: deadline,
952 registered: false,
953 }
954 }
955}
956
957impl GClock for Context {
958 type Instant = SystemTime;
959
960 fn now(&self) -> Self::Instant {
961 self.current()
962 }
963}
964
965impl ReasonablyRealtime for Context {}
966
967impl crate::Network for Context {
968 type Listener = ListenerOf<Network>;
969
970 async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
971 self.network.bind(socket).await
972 }
973
974 async fn dial(
975 &self,
976 socket: SocketAddr,
977 ) -> Result<(crate::SinkOf<Self>, crate::StreamOf<Self>), Error> {
978 self.network.dial(socket).await
979 }
980}
981
982impl RngCore for Context {
983 fn next_u32(&mut self) -> u32 {
984 self.executor.auditor.event(b"rand", |hasher| {
985 hasher.update(b"next_u32");
986 });
987 self.executor.rng.lock().unwrap().next_u32()
988 }
989
990 fn next_u64(&mut self) -> u64 {
991 self.executor.auditor.event(b"rand", |hasher| {
992 hasher.update(b"next_u64");
993 });
994 self.executor.rng.lock().unwrap().next_u64()
995 }
996
997 fn fill_bytes(&mut self, dest: &mut [u8]) {
998 self.executor.auditor.event(b"rand", |hasher| {
999 hasher.update(b"fill_bytes");
1000 });
1001 self.executor.rng.lock().unwrap().fill_bytes(dest)
1002 }
1003
1004 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1005 self.executor.auditor.event(b"rand", |hasher| {
1006 hasher.update(b"try_fill_bytes");
1007 });
1008 self.executor.rng.lock().unwrap().try_fill_bytes(dest)
1009 }
1010}
1011
1012impl CryptoRng for Context {}
1013
1014impl crate::Storage for Context {
1015 type Blob = <MeteredStorage<AuditedStorage<MemStorage>> as crate::Storage>::Blob;
1016
1017 async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> {
1018 self.storage.open(partition, name).await
1019 }
1020
1021 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1022 self.storage.remove(partition, name).await
1023 }
1024
1025 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1026 self.storage.scan(partition).await
1027 }
1028}
1029
1030#[cfg(test)]
1031mod tests {
1032 use super::*;
1033 use crate::{deterministic, utils::run_tasks, Blob, Runner as _, Storage};
1034 use commonware_macros::test_traced;
1035 use futures::task::noop_waker;
1036
1037 fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1038 let executor = deterministic::Runner::seeded(seed);
1039 run_tasks(5, executor)
1040 }
1041
1042 #[test]
1043 fn test_same_seed_same_order() {
1044 let mut outputs = Vec::new();
1046 for seed in 0..1000 {
1047 let output = run_with_seed(seed);
1048 outputs.push(output);
1049 }
1050
1051 for seed in 0..1000 {
1053 let output = run_with_seed(seed);
1054 assert_eq!(output, outputs[seed as usize]);
1055 }
1056 }
1057
1058 #[test_traced("TRACE")]
1059 fn test_different_seeds_different_order() {
1060 let output1 = run_with_seed(12345);
1061 let output2 = run_with_seed(54321);
1062 assert_ne!(output1, output2);
1063 }
1064
1065 #[test]
1066 fn test_alarm_min_heap() {
1067 let now = SystemTime::now();
1069 let alarms = vec![
1070 Alarm {
1071 time: now + Duration::new(10, 0),
1072 waker: noop_waker(),
1073 },
1074 Alarm {
1075 time: now + Duration::new(5, 0),
1076 waker: noop_waker(),
1077 },
1078 Alarm {
1079 time: now + Duration::new(15, 0),
1080 waker: noop_waker(),
1081 },
1082 Alarm {
1083 time: now + Duration::new(5, 0),
1084 waker: noop_waker(),
1085 },
1086 ];
1087 let mut heap = BinaryHeap::new();
1088 for alarm in alarms {
1089 heap.push(alarm);
1090 }
1091
1092 let mut sorted_times = Vec::new();
1094 while let Some(alarm) = heap.pop() {
1095 sorted_times.push(alarm.time);
1096 }
1097 assert_eq!(
1098 sorted_times,
1099 vec![
1100 now + Duration::new(5, 0),
1101 now + Duration::new(5, 0),
1102 now + Duration::new(10, 0),
1103 now + Duration::new(15, 0),
1104 ]
1105 );
1106 }
1107
1108 #[test]
1109 #[should_panic(expected = "runtime timeout")]
1110 fn test_timeout() {
1111 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1112 executor.start(|context| async move {
1113 loop {
1114 context.sleep(Duration::from_secs(1)).await;
1115 }
1116 });
1117 }
1118
1119 #[test]
1120 #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1121 fn test_bad_timeout() {
1122 let cfg = Config {
1123 timeout: Some(Duration::default()),
1124 cycle: Duration::default(),
1125 ..Config::default()
1126 };
1127 deterministic::Runner::new(cfg);
1128 }
1129
1130 #[test]
1131 fn test_recover_synced_storage_persists() {
1132 let executor1 = deterministic::Runner::default();
1134 let partition = "test_partition";
1135 let name = b"test_blob";
1136 let data = b"Hello, world!";
1137
1138 let (context, state) = executor1.start(|context| async move {
1140 let (blob, _) = context.open(partition, name).await.unwrap();
1141 blob.write_at(Vec::from(data), 0).await.unwrap();
1142 blob.sync().await.unwrap();
1143 let state = context.auditor().state();
1144 (context, state)
1145 });
1146 let recovered_context = context.recover();
1147
1148 assert_eq!(state, recovered_context.auditor().state());
1150
1151 let executor = Runner::from(recovered_context);
1153 executor.start(|context| async move {
1154 let (blob, len) = context.open(partition, name).await.unwrap();
1155 assert_eq!(len, data.len() as u64);
1156 let read = blob.read_at(vec![0; data.len()], 0).await.unwrap();
1157 assert_eq!(read.as_ref(), data);
1158 });
1159 }
1160
1161 #[test]
1162 fn test_recover_unsynced_storage_does_not_persist() {
1163 let executor = deterministic::Runner::default();
1165 let partition = "test_partition";
1166 let name = b"test_blob";
1167 let data = Vec::from("Hello, world!");
1168
1169 let context = executor.start(|context| async move {
1171 let context = context.clone();
1172 let (blob, _) = context.open(partition, name).await.unwrap();
1173 blob.write_at(data, 0).await.unwrap();
1174 context
1176 });
1177
1178 let context = context.recover();
1180 let executor = Runner::from(context);
1181
1182 executor.start(|context| async move {
1184 let (_, len) = context.open(partition, name).await.unwrap();
1185 assert_eq!(len, 0);
1186 });
1187 }
1188
1189 #[test]
1190 #[should_panic(expected = "execution is not finished")]
1191 fn test_recover_before_finish_panics() {
1192 let executor = deterministic::Runner::default();
1194
1195 executor.start(|context| async move {
1197 context.recover();
1199 });
1200 }
1201
1202 #[test]
1203 #[should_panic(expected = "runtime has already been recovered")]
1204 fn test_recover_twice_panics() {
1205 let executor = deterministic::Runner::default();
1207
1208 let context = executor.start(|context| async move { context });
1210
1211 let cloned_context = context.clone();
1213 context.recover();
1214
1215 cloned_context.recover();
1217 }
1218
1219 #[test]
1220 fn test_default_time_zero() {
1221 let executor = deterministic::Runner::default();
1223
1224 executor.start(|context| async move {
1225 assert_eq!(
1227 context.current().duration_since(UNIX_EPOCH).unwrap(),
1228 Duration::ZERO
1229 );
1230 });
1231 }
1232}