1use crate::{
26 network::{
27 audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
28 metered::Network as MeteredNetwork,
29 },
30 storage::{
31 audited::Storage as AuditedStorage, memory::Storage as MemStorage,
32 metered::Storage as MeteredStorage,
33 },
34 telemetry::metrics::task::Label,
35 utils::Signaler,
36 Clock, Error, Handle, ListenerOf, Signal, METRICS_PREFIX,
37};
38use commonware_utils::{hex, SystemTimeExt};
39use futures::{
40 task::{waker_ref, ArcWake},
41 Future,
42};
43use governor::clock::{Clock as GClock, ReasonablyRealtime};
44use prometheus_client::{
45 encoding::text::encode,
46 metrics::{counter::Counter, family::Family, gauge::Gauge},
47 registry::{Metric, Registry},
48};
49use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
50use sha2::{Digest, Sha256};
51use std::{
52 collections::{BinaryHeap, HashMap},
53 mem::replace,
54 net::SocketAddr,
55 pin::Pin,
56 sync::{Arc, Mutex},
57 task::{self, Poll, Waker},
58 time::{Duration, SystemTime, UNIX_EPOCH},
59};
60use tracing::trace;
61
62pub type Partition = HashMap<Vec<u8>, Vec<u8>>;
64
65#[derive(Debug)]
66struct Metrics {
67 tasks_spawned: Family<Label, Counter>,
68 tasks_running: Family<Label, Gauge>,
69 task_polls: Family<Label, Counter>,
70
71 network_bandwidth: Counter,
72}
73
74impl Metrics {
75 pub fn init(registry: &mut Registry) -> Self {
76 let metrics = Self {
77 task_polls: Family::default(),
78 tasks_spawned: Family::default(),
79 tasks_running: Family::default(),
80 network_bandwidth: Counter::default(),
81 };
82 registry.register(
83 "tasks_spawned",
84 "Total number of tasks spawned",
85 metrics.tasks_spawned.clone(),
86 );
87 registry.register(
88 "tasks_running",
89 "Number of tasks currently running",
90 metrics.tasks_running.clone(),
91 );
92 registry.register(
93 "task_polls",
94 "Total number of task polls",
95 metrics.task_polls.clone(),
96 );
97 registry.register(
98 "bandwidth",
99 "Total amount of data sent over network",
100 metrics.network_bandwidth.clone(),
101 );
102 metrics
103 }
104}
105
106pub struct Auditor {
108 hash: Mutex<Vec<u8>>,
109}
110
111impl Default for Auditor {
112 fn default() -> Self {
113 Self {
114 hash: Vec::new().into(),
115 }
116 }
117}
118
119impl Auditor {
120 pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
124 where
125 F: FnOnce(&mut Sha256),
126 {
127 let mut hash = self.hash.lock().unwrap();
128
129 let mut hasher = Sha256::new();
130 hasher.update(&*hash);
131 hasher.update(label);
132 payload(&mut hasher);
133
134 *hash = hasher.finalize().to_vec();
135 }
136
137 pub fn state(&self) -> String {
142 let hash = self.hash.lock().unwrap().clone();
143 hex(&hash)
144 }
145}
146
147#[derive(Clone)]
149pub struct Config {
150 seed: u64,
152
153 cycle: Duration,
156
157 timeout: Option<Duration>,
159}
160
161impl Config {
162 pub fn new() -> Self {
164 Self {
165 seed: 42,
166 cycle: Duration::from_millis(1),
167 timeout: None,
168 }
169 }
170
171 pub fn with_seed(mut self, seed: u64) -> Self {
174 self.seed = seed;
175 self
176 }
177 pub fn with_cycle(mut self, cycle: Duration) -> Self {
179 self.cycle = cycle;
180 self
181 }
182 pub fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
184 self.timeout = timeout;
185 self
186 }
187
188 pub fn seed(&self) -> u64 {
191 self.seed
192 }
193 pub fn cycle(&self) -> Duration {
195 self.cycle
196 }
197 pub fn timeout(&self) -> Option<Duration> {
199 self.timeout
200 }
201
202 pub fn assert(&self) {
204 assert!(
205 self.cycle != Duration::default() || self.timeout.is_none(),
206 "cycle duration must be non-zero when timeout is set",
207 );
208 }
209}
210
211impl Default for Config {
212 fn default() -> Self {
213 Self::new()
214 }
215}
216
217pub struct Executor {
219 registry: Mutex<Registry>,
220 cycle: Duration,
221 deadline: Option<SystemTime>,
222 metrics: Arc<Metrics>,
223 auditor: Arc<Auditor>,
224 rng: Mutex<StdRng>,
225 time: Mutex<SystemTime>,
226 tasks: Arc<Tasks>,
227 sleeping: Mutex<BinaryHeap<Alarm>>,
228 partitions: Mutex<HashMap<String, Partition>>,
229 signaler: Mutex<Signaler>,
230 signal: Signal,
231 finished: Mutex<bool>,
232 recovered: Mutex<bool>,
233}
234
235enum State {
236 Config(Config),
237 Context(Context),
238}
239
240pub struct Runner {
242 state: State,
243}
244
245impl From<Config> for Runner {
246 fn from(cfg: Config) -> Self {
247 Self::new(cfg)
248 }
249}
250
251impl From<Context> for Runner {
252 fn from(context: Context) -> Self {
253 Self {
254 state: State::Context(context),
255 }
256 }
257}
258
259impl Runner {
260 pub fn new(cfg: Config) -> Self {
262 cfg.assert();
264 Runner {
265 state: State::Config(cfg),
266 }
267 }
268
269 pub fn seeded(seed: u64) -> Self {
272 let cfg = Config {
273 seed,
274 ..Config::default()
275 };
276 Self::new(cfg)
277 }
278
279 pub fn timed(timeout: Duration) -> Self {
282 let cfg = Config {
283 timeout: Some(timeout),
284 ..Config::default()
285 };
286 Self::new(cfg)
287 }
288}
289
290impl Default for Runner {
291 fn default() -> Self {
292 Self::new(Config::default())
293 }
294}
295
296impl crate::Runner for Runner {
297 type Context = Context;
298
299 fn start<F, Fut>(self, f: F) -> Fut::Output
300 where
301 F: FnOnce(Self::Context) -> Fut,
302 Fut: Future,
303 {
304 let context = match self.state {
306 State::Config(config) => Context::new(config),
307 State::Context(context) => context,
308 };
309
310 let executor = context.executor.clone();
312 let mut root = Box::pin(f(context));
313
314 Tasks::register_root(&executor.tasks);
316
317 let mut iter = 0;
319 loop {
320 {
322 let current = executor.time.lock().unwrap();
323 if let Some(deadline) = executor.deadline {
324 if *current >= deadline {
325 panic!("runtime timeout");
326 }
327 }
328 }
329
330 let mut tasks = executor.tasks.drain();
332
333 {
335 let mut rng = executor.rng.lock().unwrap();
336 tasks.shuffle(&mut *rng);
337 }
338
339 trace!(iter, tasks = tasks.len(), "starting loop");
345 for task in tasks {
346 executor.auditor.event(b"process_task", |hasher| {
348 hasher.update(task.id.to_be_bytes());
349 hasher.update(task.label.name().as_bytes());
350 });
351 trace!(id = task.id, "processing task");
352
353 executor.metrics.task_polls.get_or_create(&task.label).inc();
355
356 let waker = waker_ref(&task);
358 let mut cx = task::Context::from_waker(&waker);
359 match &task.operation {
360 Operation::Root => {
361 if let Poll::Ready(output) = root.as_mut().poll(&mut cx) {
363 trace!(id = task.id, "task is complete");
364 *executor.finished.lock().unwrap() = true;
365 return output;
366 }
367 }
368 Operation::Work { future, completed } => {
369 if *completed.lock().unwrap() {
371 trace!(id = task.id, "dropping already complete task");
372 continue;
373 }
374
375 let mut fut = future.lock().unwrap();
377 if fut.as_mut().poll(&mut cx).is_ready() {
378 trace!(id = task.id, "task is complete");
379 *completed.lock().unwrap() = true;
380 continue;
381 }
382 }
383 }
384
385 trace!(id = task.id, "task is still pending");
387 }
388
389 let mut current;
394 {
395 let mut time = executor.time.lock().unwrap();
396 *time = time
397 .checked_add(executor.cycle)
398 .expect("executor time overflowed");
399 current = *time;
400 }
401 trace!(now = current.epoch_millis(), "time advanced");
402
403 if executor.tasks.len() == 0 {
405 let mut skip = None;
406 {
407 let sleeping = executor.sleeping.lock().unwrap();
408 if let Some(next) = sleeping.peek() {
409 if next.time > current {
410 skip = Some(next.time);
411 }
412 }
413 }
414 if skip.is_some() {
415 {
416 let mut time = executor.time.lock().unwrap();
417 *time = skip.unwrap();
418 current = *time;
419 }
420 trace!(now = current.epoch_millis(), "time skipped");
421 }
422 }
423
424 let mut to_wake = Vec::new();
426 let mut remaining;
427 {
428 let mut sleeping = executor.sleeping.lock().unwrap();
429 while let Some(next) = sleeping.peek() {
430 if next.time <= current {
431 let sleeper = sleeping.pop().unwrap();
432 to_wake.push(sleeper.waker);
433 } else {
434 break;
435 }
436 }
437 remaining = sleeping.len();
438 }
439 for waker in to_wake {
440 waker.wake();
441 }
442
443 remaining += executor.tasks.len();
445
446 if remaining == 0 {
449 panic!("runtime stalled");
450 }
451 iter += 1;
452 }
453 }
454}
455
456enum Operation {
458 Root,
459 Work {
460 future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
461 completed: Mutex<bool>,
462 },
463}
464
465struct Task {
467 id: u128,
468 label: Label,
469 tasks: Arc<Tasks>,
470
471 operation: Operation,
472}
473
474impl ArcWake for Task {
475 fn wake_by_ref(arc_self: &Arc<Self>) {
476 arc_self.tasks.enqueue(arc_self.clone());
477 }
478}
479
480struct Tasks {
482 counter: Mutex<u128>,
484 queue: Mutex<Vec<Arc<Task>>>,
486 root_registered: Mutex<bool>,
488}
489
490impl Tasks {
491 fn new() -> Self {
493 Self {
494 counter: Mutex::new(0),
495 queue: Mutex::new(Vec::new()),
496 root_registered: Mutex::new(false),
497 }
498 }
499
500 fn increment(&self) -> u128 {
502 let mut counter = self.counter.lock().unwrap();
503 let old = *counter;
504 *counter = counter.checked_add(1).expect("task counter overflow");
505 old
506 }
507
508 fn register_root(arc_self: &Arc<Self>) {
512 {
513 let mut registered = arc_self.root_registered.lock().unwrap();
514 assert!(!*registered, "root already registered");
515 *registered = true;
516 }
517 let id = arc_self.increment();
518 let mut queue = arc_self.queue.lock().unwrap();
519 queue.push(Arc::new(Task {
520 id,
521 label: Label::root(),
522 tasks: arc_self.clone(),
523 operation: Operation::Root,
524 }));
525 }
526
527 fn register_work(
529 arc_self: &Arc<Self>,
530 label: Label,
531 future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
532 ) {
533 let id = arc_self.increment();
534 let mut queue = arc_self.queue.lock().unwrap();
535 queue.push(Arc::new(Task {
536 id,
537 label,
538 tasks: arc_self.clone(),
539 operation: Operation::Work {
540 future: Mutex::new(future),
541 completed: Mutex::new(false),
542 },
543 }));
544 }
545
546 fn enqueue(&self, task: Arc<Task>) {
548 let mut queue = self.queue.lock().unwrap();
549 queue.push(task);
550 }
551
552 fn drain(&self) -> Vec<Arc<Task>> {
554 let mut queue = self.queue.lock().unwrap();
555 let len = queue.len();
556 replace(&mut *queue, Vec::with_capacity(len))
557 }
558
559 fn len(&self) -> usize {
561 self.queue.lock().unwrap().len()
562 }
563}
564
565type Network = MeteredNetwork<AuditedNetwork<DeterministicNetwork>>;
566
567pub struct Context {
571 name: String,
572 spawned: bool,
573 executor: Arc<Executor>,
574 network: Arc<Network>,
575 storage: MeteredStorage<AuditedStorage<MemStorage>>,
576}
577
578impl Default for Context {
579 fn default() -> Self {
580 Self::new(Config::default())
581 }
582}
583
584impl Context {
585 pub fn new(cfg: Config) -> Self {
586 let mut registry = Registry::default();
588 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
589
590 let metrics = Arc::new(Metrics::init(runtime_registry));
592 let start_time = UNIX_EPOCH;
593 let deadline = cfg
594 .timeout
595 .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
596 let (signaler, signal) = Signaler::new();
597 let auditor = Arc::new(Auditor::default());
598 let storage = MeteredStorage::new(
599 AuditedStorage::new(MemStorage::default(), auditor.clone()),
600 runtime_registry,
601 );
602 let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
603 let network = MeteredNetwork::new(network, runtime_registry);
604
605 let executor = Arc::new(Executor {
606 registry: Mutex::new(registry),
607 cycle: cfg.cycle,
608 deadline,
609 metrics: metrics.clone(),
610 auditor: auditor.clone(),
611 rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
612 time: Mutex::new(start_time),
613 tasks: Arc::new(Tasks::new()),
614 sleeping: Mutex::new(BinaryHeap::new()),
615 partitions: Mutex::new(HashMap::new()),
616 signaler: Mutex::new(signaler),
617 signal,
618 finished: Mutex::new(false),
619 recovered: Mutex::new(false),
620 });
621
622 Context {
623 name: String::new(),
624 spawned: false,
625 executor: executor.clone(),
626 network: Arc::new(network),
627 storage,
628 }
629 }
630
631 pub fn recover(self) -> Self {
643 if !*self.executor.finished.lock().unwrap() {
645 panic!("execution is not finished");
646 }
647
648 {
650 let mut recovered = self.executor.recovered.lock().unwrap();
651 if *recovered {
652 panic!("runtime has already been recovered");
653 }
654 *recovered = true;
655 }
656
657 let mut registry = Registry::default();
659 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
660 let metrics = Arc::new(Metrics::init(runtime_registry));
661
662 let auditor = self.executor.auditor.clone();
664 let (signaler, signal) = Signaler::new();
665 let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
666 let network = MeteredNetwork::new(network, runtime_registry);
667
668 let executor = Arc::new(Executor {
669 cycle: self.executor.cycle,
671 deadline: self.executor.deadline,
672 auditor: auditor.clone(),
673 rng: Mutex::new(self.executor.rng.lock().unwrap().clone()),
674 time: Mutex::new(*self.executor.time.lock().unwrap()),
675 partitions: Mutex::new(self.executor.partitions.lock().unwrap().clone()),
676
677 registry: Mutex::new(registry),
679 metrics: metrics.clone(),
680 tasks: Arc::new(Tasks::new()),
681 sleeping: Mutex::new(BinaryHeap::new()),
682 signaler: Mutex::new(signaler),
683 signal,
684 finished: Mutex::new(false),
685 recovered: Mutex::new(false),
686 });
687 Self {
688 name: String::new(),
689 spawned: false,
690 executor,
691 network: Arc::new(network),
692 storage: self.storage,
693 }
694 }
695
696 pub fn auditor(&self) -> &Auditor {
697 &self.executor.auditor
698 }
699}
700
701impl Clone for Context {
702 fn clone(&self) -> Self {
703 Self {
704 name: self.name.clone(),
705 spawned: false,
706 executor: self.executor.clone(),
707 network: self.network.clone(),
708 storage: self.storage.clone(),
709 }
710 }
711}
712
713impl crate::Spawner for Context {
714 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
715 where
716 F: FnOnce(Self) -> Fut + Send + 'static,
717 Fut: Future<Output = T> + Send + 'static,
718 T: Send + 'static,
719 {
720 assert!(!self.spawned, "already spawned");
722
723 let (label, gauge) = spawn_metrics!(self, future);
725
726 let executor = self.executor.clone();
728 let future = f(self);
729 let (f, handle) = Handle::init_future(future, gauge, false);
730
731 Tasks::register_work(&executor.tasks, label, Box::pin(f));
733 handle
734 }
735
736 fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
737 where
738 F: Future<Output = T> + Send + 'static,
739 T: Send + 'static,
740 {
741 assert!(!self.spawned, "already spawned");
743 self.spawned = true;
744
745 let (label, gauge) = spawn_metrics!(self, future);
747
748 let executor = self.executor.clone();
750 move |f: F| {
751 let (f, handle) = Handle::init_future(f, gauge, false);
752
753 Tasks::register_work(&executor.tasks, label, Box::pin(f));
755 handle
756 }
757 }
758
759 fn spawn_blocking<F, T>(self, dedicated: bool, f: F) -> Handle<T>
760 where
761 F: FnOnce(Self) -> T + Send + 'static,
762 T: Send + 'static,
763 {
764 assert!(!self.spawned, "already spawned");
766
767 let (label, gauge) = spawn_metrics!(self, blocking, dedicated);
769
770 let executor = self.executor.clone();
772 let (f, handle) = Handle::init_blocking(|| f(self), gauge, false);
773
774 let f = async move { f() };
776 Tasks::register_work(&executor.tasks, label, Box::pin(f));
777 handle
778 }
779
780 fn spawn_blocking_ref<F, T>(&mut self, dedicated: bool) -> impl FnOnce(F) -> Handle<T> + 'static
781 where
782 F: FnOnce() -> T + Send + 'static,
783 T: Send + 'static,
784 {
785 assert!(!self.spawned, "already spawned");
787 self.spawned = true;
788
789 let (label, gauge) = spawn_metrics!(self, blocking, dedicated);
791
792 let executor = self.executor.clone();
794 move |f: F| {
795 let (f, handle) = Handle::init_blocking(f, gauge, false);
796
797 let f = async move { f() };
799 Tasks::register_work(&executor.tasks, label, Box::pin(f));
800 handle
801 }
802 }
803
804 fn stop(&self, value: i32) {
805 self.executor.auditor.event(b"stop", |hasher| {
806 hasher.update(value.to_be_bytes());
807 });
808 self.executor.signaler.lock().unwrap().signal(value);
809 }
810
811 fn stopped(&self) -> Signal {
812 self.executor.auditor.event(b"stopped", |_| {});
813 self.executor.signal.clone()
814 }
815}
816
817impl crate::Metrics for Context {
818 fn with_label(&self, label: &str) -> Self {
819 let name = {
820 let prefix = self.name.clone();
821 if prefix.is_empty() {
822 label.to_string()
823 } else {
824 format!("{prefix}_{label}")
825 }
826 };
827 assert!(
828 !name.starts_with(METRICS_PREFIX),
829 "using runtime label is not allowed"
830 );
831 Self {
832 name,
833 spawned: false,
834 executor: self.executor.clone(),
835 network: self.network.clone(),
836 storage: self.storage.clone(),
837 }
838 }
839
840 fn label(&self) -> String {
841 self.name.clone()
842 }
843
844 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
845 let name = name.into();
847 let help = help.into();
848
849 self.executor.auditor.event(b"register", |hasher| {
851 hasher.update(name.as_bytes());
852 hasher.update(help.as_bytes());
853 });
854 let prefixed_name = {
855 let prefix = &self.name;
856 if prefix.is_empty() {
857 name
858 } else {
859 format!("{}_{}", *prefix, name)
860 }
861 };
862 self.executor
863 .registry
864 .lock()
865 .unwrap()
866 .register(prefixed_name, help, metric)
867 }
868
869 fn encode(&self) -> String {
870 self.executor.auditor.event(b"encode", |_| {});
871 let mut buffer = String::new();
872 encode(&mut buffer, &self.executor.registry.lock().unwrap()).expect("encoding failed");
873 buffer
874 }
875}
876
877struct Sleeper {
878 executor: Arc<Executor>,
879 time: SystemTime,
880 registered: bool,
881}
882
883struct Alarm {
884 time: SystemTime,
885 waker: Waker,
886}
887
888impl PartialEq for Alarm {
889 fn eq(&self, other: &Self) -> bool {
890 self.time.eq(&other.time)
891 }
892}
893
894impl Eq for Alarm {}
895
896impl PartialOrd for Alarm {
897 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
898 Some(self.cmp(other))
899 }
900}
901
902impl Ord for Alarm {
903 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
904 other.time.cmp(&self.time)
906 }
907}
908
909impl Future for Sleeper {
910 type Output = ();
911
912 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
913 {
914 let current_time = *self.executor.time.lock().unwrap();
915 if current_time >= self.time {
916 return Poll::Ready(());
917 }
918 }
919 if !self.registered {
920 self.registered = true;
921 self.executor.sleeping.lock().unwrap().push(Alarm {
922 time: self.time,
923 waker: cx.waker().clone(),
924 });
925 }
926 Poll::Pending
927 }
928}
929
930impl Clock for Context {
931 fn current(&self) -> SystemTime {
932 *self.executor.time.lock().unwrap()
933 }
934
935 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
936 let deadline = self
937 .current()
938 .checked_add(duration)
939 .expect("overflow when setting wake time");
940 self.sleep_until(deadline)
941 }
942
943 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
944 Sleeper {
945 executor: self.executor.clone(),
946
947 time: deadline,
948 registered: false,
949 }
950 }
951}
952
953impl GClock for Context {
954 type Instant = SystemTime;
955
956 fn now(&self) -> Self::Instant {
957 self.current()
958 }
959}
960
961impl ReasonablyRealtime for Context {}
962
963impl crate::Network for Context {
964 type Listener = ListenerOf<Network>;
965
966 async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
967 self.network.bind(socket).await
968 }
969
970 async fn dial(
971 &self,
972 socket: SocketAddr,
973 ) -> Result<(crate::SinkOf<Self>, crate::StreamOf<Self>), Error> {
974 self.network.dial(socket).await
975 }
976}
977
978impl RngCore for Context {
979 fn next_u32(&mut self) -> u32 {
980 self.executor.auditor.event(b"rand", |hasher| {
981 hasher.update(b"next_u32");
982 });
983 self.executor.rng.lock().unwrap().next_u32()
984 }
985
986 fn next_u64(&mut self) -> u64 {
987 self.executor.auditor.event(b"rand", |hasher| {
988 hasher.update(b"next_u64");
989 });
990 self.executor.rng.lock().unwrap().next_u64()
991 }
992
993 fn fill_bytes(&mut self, dest: &mut [u8]) {
994 self.executor.auditor.event(b"rand", |hasher| {
995 hasher.update(b"fill_bytes");
996 });
997 self.executor.rng.lock().unwrap().fill_bytes(dest)
998 }
999
1000 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1001 self.executor.auditor.event(b"rand", |hasher| {
1002 hasher.update(b"try_fill_bytes");
1003 });
1004 self.executor.rng.lock().unwrap().try_fill_bytes(dest)
1005 }
1006}
1007
1008impl CryptoRng for Context {}
1009
1010impl crate::Storage for Context {
1011 type Blob = <MeteredStorage<AuditedStorage<MemStorage>> as crate::Storage>::Blob;
1012
1013 async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> {
1014 self.storage.open(partition, name).await
1015 }
1016
1017 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1018 self.storage.remove(partition, name).await
1019 }
1020
1021 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1022 self.storage.scan(partition).await
1023 }
1024}
1025
1026#[cfg(test)]
1027mod tests {
1028 use super::*;
1029 use crate::{deterministic, utils::run_tasks, Blob, Runner as _, Storage};
1030 use commonware_macros::test_traced;
1031 use futures::task::noop_waker;
1032
1033 fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1034 let executor = deterministic::Runner::seeded(seed);
1035 run_tasks(5, executor)
1036 }
1037
1038 #[test]
1039 fn test_same_seed_same_order() {
1040 let mut outputs = Vec::new();
1042 for seed in 0..1000 {
1043 let output = run_with_seed(seed);
1044 outputs.push(output);
1045 }
1046
1047 for seed in 0..1000 {
1049 let output = run_with_seed(seed);
1050 assert_eq!(output, outputs[seed as usize]);
1051 }
1052 }
1053
1054 #[test_traced("TRACE")]
1055 fn test_different_seeds_different_order() {
1056 let output1 = run_with_seed(12345);
1057 let output2 = run_with_seed(54321);
1058 assert_ne!(output1, output2);
1059 }
1060
1061 #[test]
1062 fn test_alarm_min_heap() {
1063 let now = SystemTime::now();
1065 let alarms = vec![
1066 Alarm {
1067 time: now + Duration::new(10, 0),
1068 waker: noop_waker(),
1069 },
1070 Alarm {
1071 time: now + Duration::new(5, 0),
1072 waker: noop_waker(),
1073 },
1074 Alarm {
1075 time: now + Duration::new(15, 0),
1076 waker: noop_waker(),
1077 },
1078 Alarm {
1079 time: now + Duration::new(5, 0),
1080 waker: noop_waker(),
1081 },
1082 ];
1083 let mut heap = BinaryHeap::new();
1084 for alarm in alarms {
1085 heap.push(alarm);
1086 }
1087
1088 let mut sorted_times = Vec::new();
1090 while let Some(alarm) = heap.pop() {
1091 sorted_times.push(alarm.time);
1092 }
1093 assert_eq!(
1094 sorted_times,
1095 vec![
1096 now + Duration::new(5, 0),
1097 now + Duration::new(5, 0),
1098 now + Duration::new(10, 0),
1099 now + Duration::new(15, 0),
1100 ]
1101 );
1102 }
1103
1104 #[test]
1105 #[should_panic(expected = "runtime timeout")]
1106 fn test_timeout() {
1107 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1108 executor.start(|context| async move {
1109 loop {
1110 context.sleep(Duration::from_secs(1)).await;
1111 }
1112 });
1113 }
1114
1115 #[test]
1116 #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1117 fn test_bad_timeout() {
1118 let cfg = Config {
1119 timeout: Some(Duration::default()),
1120 cycle: Duration::default(),
1121 ..Config::default()
1122 };
1123 deterministic::Runner::new(cfg);
1124 }
1125
1126 #[test]
1127 fn test_recover_synced_storage_persists() {
1128 let executor1 = deterministic::Runner::default();
1130 let partition = "test_partition";
1131 let name = b"test_blob";
1132 let data = b"Hello, world!";
1133
1134 let (context, state) = executor1.start(|context| async move {
1136 let (blob, _) = context.open(partition, name).await.unwrap();
1137 blob.write_at(Vec::from(data), 0).await.unwrap();
1138 blob.sync().await.unwrap();
1139 let state = context.auditor().state();
1140 (context, state)
1141 });
1142 let recovered_context = context.recover();
1143
1144 assert_eq!(state, recovered_context.auditor().state());
1146
1147 let executor = Runner::from(recovered_context);
1149 executor.start(|context| async move {
1150 let (blob, len) = context.open(partition, name).await.unwrap();
1151 assert_eq!(len, data.len() as u64);
1152 let read = blob.read_at(vec![0; data.len()], 0).await.unwrap();
1153 assert_eq!(read.as_ref(), data);
1154 });
1155 }
1156
1157 #[test]
1158 fn test_recover_unsynced_storage_does_not_persist() {
1159 let executor = deterministic::Runner::default();
1161 let partition = "test_partition";
1162 let name = b"test_blob";
1163 let data = Vec::from("Hello, world!");
1164
1165 let context = executor.start(|context| async move {
1167 let context = context.clone();
1168 let (blob, _) = context.open(partition, name).await.unwrap();
1169 blob.write_at(data, 0).await.unwrap();
1170 context
1172 });
1173
1174 let context = context.recover();
1176 let executor = Runner::from(context);
1177
1178 executor.start(|context| async move {
1180 let (_, len) = context.open(partition, name).await.unwrap();
1181 assert_eq!(len, 0);
1182 });
1183 }
1184
1185 #[test]
1186 #[should_panic(expected = "execution is not finished")]
1187 fn test_recover_before_finish_panics() {
1188 let executor = deterministic::Runner::default();
1190
1191 executor.start(|context| async move {
1193 context.recover();
1195 });
1196 }
1197
1198 #[test]
1199 #[should_panic(expected = "runtime has already been recovered")]
1200 fn test_recover_twice_panics() {
1201 let executor = deterministic::Runner::default();
1203
1204 let context = executor.start(|context| async move { context });
1206
1207 let cloned_context = context.clone();
1209 context.recover();
1210
1211 cloned_context.recover();
1213 }
1214
1215 #[test]
1216 fn test_default_time_zero() {
1217 let executor = deterministic::Runner::default();
1219
1220 executor.start(|context| async move {
1221 assert_eq!(
1223 context.current().duration_since(UNIX_EPOCH).unwrap(),
1224 Duration::ZERO
1225 );
1226 });
1227 }
1228}