1use crate::{
26 mocks,
27 storage::{
28 audited::Storage as AuditedStorage, memory::Storage as MemStorage,
29 metered::Storage as MeteredStorage,
30 },
31 utils::Signaler,
32 Clock, Error, Handle, Signal, METRICS_PREFIX,
33};
34use commonware_utils::{hex, SystemTimeExt};
35use futures::{
36 channel::mpsc,
37 task::{waker_ref, ArcWake},
38 Future, SinkExt, StreamExt,
39};
40use governor::clock::{Clock as GClock, ReasonablyRealtime};
41use prometheus_client::{
42 encoding::{text::encode, EncodeLabelSet},
43 metrics::{counter::Counter, family::Family, gauge::Gauge},
44 registry::{Metric, Registry},
45};
46use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
47use sha2::{Digest, Sha256};
48use std::{
49 collections::{BinaryHeap, HashMap},
50 mem::replace,
51 net::{IpAddr, Ipv4Addr, SocketAddr},
52 ops::Range,
53 pin::Pin,
54 sync::{Arc, Mutex},
55 task::{self, Poll, Waker},
56 time::{Duration, SystemTime, UNIX_EPOCH},
57};
58use tracing::trace;
59
60const EPHEMERAL_PORT_RANGE: Range<u16> = 32768..61000;
62
63pub type Partition = HashMap<Vec<u8>, Vec<u8>>;
65
66#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
67struct Work {
68 label: String,
69}
70
71#[derive(Debug)]
72struct Metrics {
73 tasks_spawned: Family<Work, Counter>,
74 tasks_running: Family<Work, Gauge>,
75 blocking_tasks_spawned: Family<Work, Counter>,
76 blocking_tasks_running: Family<Work, Gauge>,
77 task_polls: Family<Work, Counter>,
78
79 network_bandwidth: Counter,
80}
81
82impl Metrics {
83 pub fn init(registry: &mut Registry) -> Self {
84 let metrics = Self {
85 task_polls: Family::default(),
86 tasks_spawned: Family::default(),
87 tasks_running: Family::default(),
88 blocking_tasks_spawned: Family::default(),
89 blocking_tasks_running: Family::default(),
90 network_bandwidth: Counter::default(),
91 };
92 registry.register(
93 "tasks_spawned",
94 "Total number of tasks spawned",
95 metrics.tasks_spawned.clone(),
96 );
97 registry.register(
98 "tasks_running",
99 "Number of tasks currently running",
100 metrics.tasks_running.clone(),
101 );
102 registry.register(
103 "blocking_tasks_spawned",
104 "Total number of blocking tasks spawned",
105 metrics.blocking_tasks_spawned.clone(),
106 );
107 registry.register(
108 "blocking_tasks_running",
109 "Number of blocking tasks currently running",
110 metrics.blocking_tasks_running.clone(),
111 );
112 registry.register(
113 "task_polls",
114 "Total number of task polls",
115 metrics.task_polls.clone(),
116 );
117 registry.register(
118 "bandwidth",
119 "Total amount of data sent over network",
120 metrics.network_bandwidth.clone(),
121 );
122 metrics
123 }
124}
125
126pub struct Auditor {
128 hash: Mutex<Vec<u8>>,
129}
130
131impl Default for Auditor {
132 fn default() -> Self {
133 Self {
134 hash: Vec::new().into(),
135 }
136 }
137}
138
139impl Auditor {
140 pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
144 where
145 F: FnOnce(&mut Sha256),
146 {
147 let mut hash = self.hash.lock().unwrap();
148
149 let mut hasher = Sha256::new();
150 hasher.update(&*hash);
151 hasher.update(label);
152 payload(&mut hasher);
153
154 *hash = hasher.finalize().to_vec();
155 }
156
157 pub fn state(&self) -> String {
162 let hash = self.hash.lock().unwrap().clone();
163 hex(&hash)
164 }
165}
166
167#[derive(Clone)]
169pub struct Config {
170 seed: u64,
172
173 cycle: Duration,
176
177 timeout: Option<Duration>,
179}
180
181impl Config {
182 pub fn new() -> Self {
184 Self {
185 seed: 42,
186 cycle: Duration::from_millis(1),
187 timeout: None,
188 }
189 }
190
191 pub fn with_seed(mut self, seed: u64) -> Self {
194 self.seed = seed;
195 self
196 }
197 pub fn with_cycle(mut self, cycle: Duration) -> Self {
199 self.cycle = cycle;
200 self
201 }
202 pub fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
204 self.timeout = timeout;
205 self
206 }
207
208 pub fn seed(&self) -> u64 {
211 self.seed
212 }
213 pub fn cycle(&self) -> Duration {
215 self.cycle
216 }
217 pub fn timeout(&self) -> Option<Duration> {
219 self.timeout
220 }
221
222 pub fn assert(&self) {
224 assert!(
225 self.cycle != Duration::default() || self.timeout.is_none(),
226 "cycle duration must be non-zero when timeout is set",
227 );
228 }
229}
230
231impl Default for Config {
232 fn default() -> Self {
233 Self::new()
234 }
235}
236
237pub struct Executor {
239 registry: Mutex<Registry>,
240 cycle: Duration,
241 deadline: Option<SystemTime>,
242 metrics: Arc<Metrics>,
243 auditor: Arc<Auditor>,
244 rng: Mutex<StdRng>,
245 time: Mutex<SystemTime>,
246 tasks: Arc<Tasks>,
247 sleeping: Mutex<BinaryHeap<Alarm>>,
248 partitions: Mutex<HashMap<String, Partition>>,
249 signaler: Mutex<Signaler>,
250 signal: Signal,
251 finished: Mutex<bool>,
252 recovered: Mutex<bool>,
253}
254
255enum State {
256 Config(Config),
257 Context(Context),
258}
259
260pub struct Runner {
262 state: State,
263}
264
265impl From<Config> for Runner {
266 fn from(cfg: Config) -> Self {
267 Self::new(cfg)
268 }
269}
270
271impl From<Context> for Runner {
272 fn from(context: Context) -> Self {
273 Self {
274 state: State::Context(context),
275 }
276 }
277}
278
279impl Runner {
280 pub fn new(cfg: Config) -> Self {
282 cfg.assert();
284 Runner {
285 state: State::Config(cfg),
286 }
287 }
288
289 pub fn seeded(seed: u64) -> Self {
292 let cfg = Config {
293 seed,
294 ..Config::default()
295 };
296 Self::new(cfg)
297 }
298
299 pub fn timed(timeout: Duration) -> Self {
302 let cfg = Config {
303 timeout: Some(timeout),
304 ..Config::default()
305 };
306 Self::new(cfg)
307 }
308}
309
310impl Default for Runner {
311 fn default() -> Self {
312 Self::new(Config::default())
313 }
314}
315
316impl crate::Runner for Runner {
317 type Context = Context;
318
319 fn start<F, Fut>(self, f: F) -> Fut::Output
320 where
321 F: FnOnce(Self::Context) -> Fut,
322 Fut: Future,
323 {
324 let context = match self.state {
326 State::Config(config) => Context::new(config),
327 State::Context(context) => context,
328 };
329
330 let executor = context.executor.clone();
332 let mut root = Box::pin(f(context));
333
334 Tasks::register_root(&executor.tasks);
336
337 let mut iter = 0;
339 loop {
340 {
342 let current = executor.time.lock().unwrap();
343 if let Some(deadline) = executor.deadline {
344 if *current >= deadline {
345 panic!("runtime timeout");
346 }
347 }
348 }
349
350 let mut tasks = executor.tasks.drain();
352
353 {
355 let mut rng = executor.rng.lock().unwrap();
356 tasks.shuffle(&mut *rng);
357 }
358
359 trace!(iter, tasks = tasks.len(), "starting loop");
365 for task in tasks {
366 executor.auditor.event(b"process_task", |hasher| {
368 hasher.update(task.id.to_be_bytes());
369 hasher.update(task.label.as_bytes());
370 });
371 trace!(id = task.id, "processing task");
372
373 executor
375 .metrics
376 .task_polls
377 .get_or_create(&Work {
378 label: task.label.clone(),
379 })
380 .inc();
381
382 let waker = waker_ref(&task);
384 let mut cx = task::Context::from_waker(&waker);
385 match &task.operation {
386 Operation::Root => {
387 if let Poll::Ready(output) = root.as_mut().poll(&mut cx) {
389 trace!(id = task.id, "task is complete");
390 *executor.finished.lock().unwrap() = true;
391 return output;
392 }
393 }
394 Operation::Work { future, completed } => {
395 if *completed.lock().unwrap() {
397 trace!(id = task.id, "dropping already complete task");
398 continue;
399 }
400
401 let mut fut = future.lock().unwrap();
403 if fut.as_mut().poll(&mut cx).is_ready() {
404 trace!(id = task.id, "task is complete");
405 *completed.lock().unwrap() = true;
406 continue;
407 }
408 }
409 }
410
411 trace!(id = task.id, "task is still pending");
413 }
414
415 let mut current;
420 {
421 let mut time = executor.time.lock().unwrap();
422 *time = time
423 .checked_add(executor.cycle)
424 .expect("executor time overflowed");
425 current = *time;
426 }
427 trace!(now = current.epoch_millis(), "time advanced");
428
429 if executor.tasks.len() == 0 {
431 let mut skip = None;
432 {
433 let sleeping = executor.sleeping.lock().unwrap();
434 if let Some(next) = sleeping.peek() {
435 if next.time > current {
436 skip = Some(next.time);
437 }
438 }
439 }
440 if skip.is_some() {
441 {
442 let mut time = executor.time.lock().unwrap();
443 *time = skip.unwrap();
444 current = *time;
445 }
446 trace!(now = current.epoch_millis(), "time skipped");
447 }
448 }
449
450 let mut to_wake = Vec::new();
452 let mut remaining;
453 {
454 let mut sleeping = executor.sleeping.lock().unwrap();
455 while let Some(next) = sleeping.peek() {
456 if next.time <= current {
457 let sleeper = sleeping.pop().unwrap();
458 to_wake.push(sleeper.waker);
459 } else {
460 break;
461 }
462 }
463 remaining = sleeping.len();
464 }
465 for waker in to_wake {
466 waker.wake();
467 }
468
469 remaining += executor.tasks.len();
471
472 if remaining == 0 {
475 panic!("runtime stalled");
476 }
477 iter += 1;
478 }
479 }
480}
481
482enum Operation {
484 Root,
485 Work {
486 future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
487 completed: Mutex<bool>,
488 },
489}
490
491struct Task {
493 id: u128,
494 label: String,
495 tasks: Arc<Tasks>,
496
497 operation: Operation,
498}
499
500impl ArcWake for Task {
501 fn wake_by_ref(arc_self: &Arc<Self>) {
502 arc_self.tasks.enqueue(arc_self.clone());
503 }
504}
505
506struct Tasks {
508 counter: Mutex<u128>,
510 queue: Mutex<Vec<Arc<Task>>>,
512 root_registered: Mutex<bool>,
514}
515
516impl Tasks {
517 fn new() -> Self {
519 Self {
520 counter: Mutex::new(0),
521 queue: Mutex::new(Vec::new()),
522 root_registered: Mutex::new(false),
523 }
524 }
525
526 fn increment(&self) -> u128 {
528 let mut counter = self.counter.lock().unwrap();
529 let old = *counter;
530 *counter = counter.checked_add(1).expect("task counter overflow");
531 old
532 }
533
534 fn register_root(arc_self: &Arc<Self>) {
538 {
539 let mut registered = arc_self.root_registered.lock().unwrap();
540 assert!(!*registered, "root already registered");
541 *registered = true;
542 }
543 let id = arc_self.increment();
544 let mut queue = arc_self.queue.lock().unwrap();
545 queue.push(Arc::new(Task {
546 id,
547 label: String::new(),
548 tasks: arc_self.clone(),
549 operation: Operation::Root,
550 }));
551 }
552
553 fn register_work(
555 arc_self: &Arc<Self>,
556 label: &str,
557 future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
558 ) {
559 let id = arc_self.increment();
560 let mut queue = arc_self.queue.lock().unwrap();
561 queue.push(Arc::new(Task {
562 id,
563 label: label.to_string(),
564 tasks: arc_self.clone(),
565 operation: Operation::Work {
566 future: Mutex::new(future),
567 completed: Mutex::new(false),
568 },
569 }));
570 }
571
572 fn enqueue(&self, task: Arc<Task>) {
574 let mut queue = self.queue.lock().unwrap();
575 queue.push(task);
576 }
577
578 fn drain(&self) -> Vec<Arc<Task>> {
580 let mut queue = self.queue.lock().unwrap();
581 let len = queue.len();
582 replace(&mut *queue, Vec::with_capacity(len))
583 }
584
585 fn len(&self) -> usize {
587 self.queue.lock().unwrap().len()
588 }
589}
590
591pub struct Context {
595 label: String,
596 spawned: bool,
597 executor: Arc<Executor>,
598 networking: Arc<Networking>,
599 storage: MeteredStorage<AuditedStorage<MemStorage>>,
600}
601
602impl Default for Context {
603 fn default() -> Self {
604 Self::new(Config::default())
605 }
606}
607
608impl Context {
609 pub fn new(cfg: Config) -> Self {
610 let mut registry = Registry::default();
612 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
613
614 let metrics = Arc::new(Metrics::init(runtime_registry));
616 let start_time = UNIX_EPOCH;
617 let deadline = cfg
618 .timeout
619 .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
620 let (signaler, signal) = Signaler::new();
621 let auditor = Arc::new(Auditor::default());
622 let storage = MeteredStorage::new(
623 AuditedStorage::new(MemStorage::default(), auditor.clone()),
624 runtime_registry,
625 );
626 let executor = Arc::new(Executor {
627 registry: Mutex::new(registry),
628 cycle: cfg.cycle,
629 deadline,
630 metrics: metrics.clone(),
631 auditor: auditor.clone(),
632 rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
633 time: Mutex::new(start_time),
634 tasks: Arc::new(Tasks::new()),
635 sleeping: Mutex::new(BinaryHeap::new()),
636 partitions: Mutex::new(HashMap::new()),
637 signaler: Mutex::new(signaler),
638 signal,
639 finished: Mutex::new(false),
640 recovered: Mutex::new(false),
641 });
642 Context {
643 label: String::new(),
644 spawned: false,
645 executor: executor.clone(),
646 networking: Arc::new(Networking::new(metrics, auditor)),
647 storage,
648 }
649 }
650
651 pub fn recover(self) -> Self {
663 if !*self.executor.finished.lock().unwrap() {
665 panic!("execution is not finished");
666 }
667
668 {
670 let mut recovered = self.executor.recovered.lock().unwrap();
671 if *recovered {
672 panic!("runtime has already been recovered");
673 }
674 *recovered = true;
675 }
676
677 let mut registry = Registry::default();
679 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
680 let metrics = Arc::new(Metrics::init(runtime_registry));
681
682 let auditor = self.executor.auditor.clone();
684 let (signaler, signal) = Signaler::new();
685 let executor = Arc::new(Executor {
686 cycle: self.executor.cycle,
688 deadline: self.executor.deadline,
689 auditor: auditor.clone(),
690 rng: Mutex::new(self.executor.rng.lock().unwrap().clone()),
691 time: Mutex::new(*self.executor.time.lock().unwrap()),
692 partitions: Mutex::new(self.executor.partitions.lock().unwrap().clone()),
693
694 registry: Mutex::new(registry),
696 metrics: metrics.clone(),
697 tasks: Arc::new(Tasks::new()),
698 sleeping: Mutex::new(BinaryHeap::new()),
699 signaler: Mutex::new(signaler),
700 signal,
701 finished: Mutex::new(false),
702 recovered: Mutex::new(false),
703 });
704 Self {
705 label: String::new(),
706 spawned: false,
707 executor,
708 networking: Arc::new(Networking::new(metrics, auditor.clone())),
709 storage: self.storage,
710 }
711 }
712
713 pub fn auditor(&self) -> &Auditor {
714 &self.executor.auditor
715 }
716}
717
718impl Clone for Context {
719 fn clone(&self) -> Self {
720 Self {
721 label: self.label.clone(),
722 spawned: false,
723 executor: self.executor.clone(),
724 networking: self.networking.clone(),
725 storage: self.storage.clone(),
726 }
727 }
728}
729
730impl crate::Spawner for Context {
731 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
732 where
733 F: FnOnce(Self) -> Fut + Send + 'static,
734 Fut: Future<Output = T> + Send + 'static,
735 T: Send + 'static,
736 {
737 assert!(!self.spawned, "already spawned");
739
740 let label = self.label.clone();
742 let work = Work {
743 label: label.clone(),
744 };
745 self.executor
746 .metrics
747 .tasks_spawned
748 .get_or_create(&work)
749 .inc();
750 let gauge = self
751 .executor
752 .metrics
753 .tasks_running
754 .get_or_create(&work)
755 .clone();
756
757 let executor = self.executor.clone();
759 let future = f(self);
760 let (f, handle) = Handle::init(future, gauge, false);
761
762 Tasks::register_work(&executor.tasks, &label, Box::pin(f));
764 handle
765 }
766
767 fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
768 where
769 F: Future<Output = T> + Send + 'static,
770 T: Send + 'static,
771 {
772 assert!(!self.spawned, "already spawned");
774 self.spawned = true;
775
776 let work = Work {
778 label: self.label.clone(),
779 };
780 self.executor
781 .metrics
782 .tasks_spawned
783 .get_or_create(&work)
784 .inc();
785 let gauge = self
786 .executor
787 .metrics
788 .tasks_running
789 .get_or_create(&work)
790 .clone();
791
792 let label = self.label.clone();
794 let executor = self.executor.clone();
795 move |f: F| {
796 let (f, handle) = Handle::init(f, gauge, false);
797
798 Tasks::register_work(&executor.tasks, &label, Box::pin(f));
800 handle
801 }
802 }
803
804 fn spawn_blocking<F, T>(self, f: F) -> Handle<T>
805 where
806 F: FnOnce() -> T + Send + 'static,
807 T: Send + 'static,
808 {
809 assert!(!self.spawned, "already spawned");
811
812 let work = Work {
814 label: self.label.clone(),
815 };
816 self.executor
817 .metrics
818 .blocking_tasks_spawned
819 .get_or_create(&work)
820 .inc();
821 let gauge = self
822 .executor
823 .metrics
824 .blocking_tasks_running
825 .get_or_create(&work)
826 .clone();
827
828 let (f, handle) = Handle::init_blocking(f, gauge, false);
830
831 let f = async move { f() };
833 Tasks::register_work(&self.executor.tasks, &self.label, Box::pin(f));
834 handle
835 }
836
837 fn stop(&self, value: i32) {
838 self.executor.auditor.event(b"stop", |hasher| {
839 hasher.update(value.to_be_bytes());
840 });
841 self.executor.signaler.lock().unwrap().signal(value);
842 }
843
844 fn stopped(&self) -> Signal {
845 self.executor.auditor.event(b"stopped", |_| {});
846 self.executor.signal.clone()
847 }
848}
849
850impl crate::Metrics for Context {
851 fn with_label(&self, label: &str) -> Self {
852 let label = {
853 let prefix = self.label.clone();
854 if prefix.is_empty() {
855 label.to_string()
856 } else {
857 format!("{}_{}", prefix, label)
858 }
859 };
860 assert!(
861 !label.starts_with(METRICS_PREFIX),
862 "using runtime label is not allowed"
863 );
864 Self {
865 label,
866 spawned: false,
867 executor: self.executor.clone(),
868 networking: self.networking.clone(),
869 storage: self.storage.clone(),
870 }
871 }
872
873 fn label(&self) -> String {
874 self.label.clone()
875 }
876
877 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
878 let name = name.into();
880 let help = help.into();
881
882 self.executor.auditor.event(b"register", |hasher| {
884 hasher.update(name.as_bytes());
885 hasher.update(help.as_bytes());
886 });
887 let prefixed_name = {
888 let prefix = &self.label;
889 if prefix.is_empty() {
890 name
891 } else {
892 format!("{}_{}", *prefix, name)
893 }
894 };
895 self.executor
896 .registry
897 .lock()
898 .unwrap()
899 .register(prefixed_name, help, metric)
900 }
901
902 fn encode(&self) -> String {
903 self.executor.auditor.event(b"encode", |_| {});
904 let mut buffer = String::new();
905 encode(&mut buffer, &self.executor.registry.lock().unwrap()).expect("encoding failed");
906 buffer
907 }
908}
909
910struct Sleeper {
911 executor: Arc<Executor>,
912 time: SystemTime,
913 registered: bool,
914}
915
916struct Alarm {
917 time: SystemTime,
918 waker: Waker,
919}
920
921impl PartialEq for Alarm {
922 fn eq(&self, other: &Self) -> bool {
923 self.time.eq(&other.time)
924 }
925}
926
927impl Eq for Alarm {}
928
929impl PartialOrd for Alarm {
930 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
931 Some(self.cmp(other))
932 }
933}
934
935impl Ord for Alarm {
936 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
937 other.time.cmp(&self.time)
939 }
940}
941
942impl Future for Sleeper {
943 type Output = ();
944
945 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
946 {
947 let current_time = *self.executor.time.lock().unwrap();
948 if current_time >= self.time {
949 return Poll::Ready(());
950 }
951 }
952 if !self.registered {
953 self.registered = true;
954 self.executor.sleeping.lock().unwrap().push(Alarm {
955 time: self.time,
956 waker: cx.waker().clone(),
957 });
958 }
959 Poll::Pending
960 }
961}
962
963impl Clock for Context {
964 fn current(&self) -> SystemTime {
965 *self.executor.time.lock().unwrap()
966 }
967
968 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
969 let deadline = self
970 .current()
971 .checked_add(duration)
972 .expect("overflow when setting wake time");
973 self.sleep_until(deadline)
974 }
975
976 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
977 Sleeper {
978 executor: self.executor.clone(),
979
980 time: deadline,
981 registered: false,
982 }
983 }
984}
985
986impl GClock for Context {
987 type Instant = SystemTime;
988
989 fn now(&self) -> Self::Instant {
990 self.current()
991 }
992}
993
994impl ReasonablyRealtime for Context {}
995
996type Dialable = mpsc::UnboundedSender<(
997 SocketAddr,
998 mocks::Sink, mocks::Stream, )>;
1001
1002struct Networking {
1009 metrics: Arc<Metrics>,
1010 auditor: Arc<Auditor>,
1011 ephemeral: Mutex<u16>,
1012 listeners: Mutex<HashMap<SocketAddr, Dialable>>,
1013}
1014
1015impl Networking {
1016 fn new(metrics: Arc<Metrics>, auditor: Arc<Auditor>) -> Self {
1017 Self {
1018 metrics,
1019 auditor,
1020 ephemeral: Mutex::new(EPHEMERAL_PORT_RANGE.start),
1021 listeners: Mutex::new(HashMap::new()),
1022 }
1023 }
1024
1025 fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
1026 self.auditor.event(b"bind", |hasher| {
1027 hasher.update(socket.to_string().as_bytes());
1028 });
1029
1030 if socket.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
1033 && EPHEMERAL_PORT_RANGE.contains(&socket.port())
1034 {
1035 return Err(Error::BindFailed);
1036 }
1037
1038 let mut listeners = self.listeners.lock().unwrap();
1040 if listeners.contains_key(&socket) {
1041 return Err(Error::BindFailed);
1042 }
1043
1044 let (sender, receiver) = mpsc::unbounded();
1046 listeners.insert(socket, sender);
1047 Ok(Listener {
1048 auditor: self.auditor.clone(),
1049 address: socket,
1050 listener: receiver,
1051 metrics: self.metrics.clone(),
1052 })
1053 }
1054
1055 async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
1056 let dialer = {
1058 let mut ephemeral = self.ephemeral.lock().unwrap();
1059 let dialer = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), *ephemeral);
1060 *ephemeral = ephemeral
1061 .checked_add(1)
1062 .expect("ephemeral port range exhausted");
1063 dialer
1064 };
1065 self.auditor.event(b"dial", |hasher| {
1066 hasher.update(dialer.to_string().as_bytes());
1067 hasher.update(socket.to_string().as_bytes());
1068 });
1069
1070 let mut sender = {
1072 let listeners = self.listeners.lock().unwrap();
1073 let sender = listeners.get(&socket).ok_or(Error::ConnectionFailed)?;
1074 sender.clone()
1075 };
1076
1077 let (dialer_sender, dialer_receiver) = mocks::Channel::init();
1079 let (listener_sender, listener_receiver) = mocks::Channel::init();
1080 sender
1081 .send((dialer, dialer_sender, listener_receiver))
1082 .await
1083 .map_err(|_| Error::ConnectionFailed)?;
1084 Ok((
1085 Sink {
1086 metrics: self.metrics.clone(),
1087 auditor: self.auditor.clone(),
1088 me: dialer,
1089 peer: socket,
1090 sender: listener_sender,
1091 },
1092 Stream {
1093 auditor: self.auditor.clone(),
1094 me: dialer,
1095 peer: socket,
1096 receiver: dialer_receiver,
1097 },
1098 ))
1099 }
1100}
1101
1102impl crate::Network for Context {
1103 type Listener = Listener;
1104
1105 async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
1106 self.networking.bind(socket)
1107 }
1108
1109 async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
1110 self.networking.dial(socket).await
1111 }
1112}
1113
1114pub struct Listener {
1116 metrics: Arc<Metrics>,
1117 auditor: Arc<Auditor>,
1118 address: SocketAddr,
1119 listener: mpsc::UnboundedReceiver<(SocketAddr, mocks::Sink, mocks::Stream)>,
1120}
1121
1122impl crate::Listener for Listener {
1123 type Sink = Sink;
1124 type Stream = Stream;
1125
1126 async fn accept(&mut self) -> Result<(SocketAddr, Self::Sink, Self::Stream), Error> {
1127 let (socket, sender, receiver) = self.listener.next().await.ok_or(Error::ReadFailed)?;
1128 self.auditor.event(b"accept", |hasher| {
1129 hasher.update(self.address.to_string().as_bytes());
1130 hasher.update(socket.to_string().as_bytes());
1131 });
1132 Ok((
1133 socket,
1134 Sink {
1135 metrics: self.metrics.clone(),
1136 auditor: self.auditor.clone(),
1137 me: self.address,
1138 peer: socket,
1139 sender,
1140 },
1141 Stream {
1142 auditor: self.auditor.clone(),
1143 me: self.address,
1144 peer: socket,
1145 receiver,
1146 },
1147 ))
1148 }
1149}
1150
1151pub struct Sink {
1153 metrics: Arc<Metrics>,
1154 auditor: Arc<Auditor>,
1155 me: SocketAddr,
1156 peer: SocketAddr,
1157 sender: mocks::Sink,
1158}
1159
1160impl crate::Sink for Sink {
1161 async fn send(&mut self, msg: &[u8]) -> Result<(), Error> {
1162 self.auditor.event(b"send", |hasher| {
1163 hasher.update(self.me.to_string().as_bytes());
1164 hasher.update(self.peer.to_string().as_bytes());
1165 hasher.update(msg);
1166 });
1167 self.sender.send(msg).await.map_err(|_| Error::SendFailed)?;
1168 self.metrics.network_bandwidth.inc_by(msg.len() as u64);
1169 Ok(())
1170 }
1171}
1172
1173pub struct Stream {
1175 auditor: Arc<Auditor>,
1176 me: SocketAddr,
1177 peer: SocketAddr,
1178 receiver: mocks::Stream,
1179}
1180
1181impl crate::Stream for Stream {
1182 async fn recv(&mut self, buf: &mut [u8]) -> Result<(), Error> {
1183 self.receiver
1184 .recv(buf)
1185 .await
1186 .map_err(|_| Error::RecvFailed)?;
1187 self.auditor.event(b"recv", |hasher| {
1188 hasher.update(self.me.to_string().as_bytes());
1189 hasher.update(self.peer.to_string().as_bytes());
1190 hasher.update(buf);
1191 });
1192 Ok(())
1193 }
1194}
1195
1196impl RngCore for Context {
1197 fn next_u32(&mut self) -> u32 {
1198 self.executor.auditor.event(b"rand", |hasher| {
1199 hasher.update(b"next_u32");
1200 });
1201 self.executor.rng.lock().unwrap().next_u32()
1202 }
1203
1204 fn next_u64(&mut self) -> u64 {
1205 self.executor.auditor.event(b"rand", |hasher| {
1206 hasher.update(b"next_u64");
1207 });
1208 self.executor.rng.lock().unwrap().next_u64()
1209 }
1210
1211 fn fill_bytes(&mut self, dest: &mut [u8]) {
1212 self.executor.auditor.event(b"rand", |hasher| {
1213 hasher.update(b"fill_bytes");
1214 });
1215 self.executor.rng.lock().unwrap().fill_bytes(dest)
1216 }
1217
1218 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1219 self.executor.auditor.event(b"rand", |hasher| {
1220 hasher.update(b"try_fill_bytes");
1221 });
1222 self.executor.rng.lock().unwrap().try_fill_bytes(dest)
1223 }
1224}
1225
1226impl CryptoRng for Context {}
1227
1228impl crate::Storage for Context {
1229 type Blob = <MeteredStorage<AuditedStorage<MemStorage>> as crate::Storage>::Blob;
1230
1231 async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> {
1232 self.storage.open(partition, name).await
1233 }
1234
1235 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1236 self.storage.remove(partition, name).await
1237 }
1238
1239 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1240 self.storage.scan(partition).await
1241 }
1242}
1243
1244#[cfg(test)]
1245mod tests {
1246 use super::*;
1247 use crate::{deterministic, utils::run_tasks, Blob, Runner as _, Storage};
1248 use commonware_macros::test_traced;
1249 use futures::task::noop_waker;
1250
1251 fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1252 let executor = deterministic::Runner::seeded(seed);
1253 run_tasks(5, executor)
1254 }
1255
1256 #[test]
1257 fn test_same_seed_same_order() {
1258 let mut outputs = Vec::new();
1260 for seed in 0..1000 {
1261 let output = run_with_seed(seed);
1262 outputs.push(output);
1263 }
1264
1265 for seed in 0..1000 {
1267 let output = run_with_seed(seed);
1268 assert_eq!(output, outputs[seed as usize]);
1269 }
1270 }
1271
1272 #[test_traced("TRACE")]
1273 fn test_different_seeds_different_order() {
1274 let output1 = run_with_seed(12345);
1275 let output2 = run_with_seed(54321);
1276 assert_ne!(output1, output2);
1277 }
1278
1279 #[test]
1280 fn test_alarm_min_heap() {
1281 let now = SystemTime::now();
1283 let alarms = vec![
1284 Alarm {
1285 time: now + Duration::new(10, 0),
1286 waker: noop_waker(),
1287 },
1288 Alarm {
1289 time: now + Duration::new(5, 0),
1290 waker: noop_waker(),
1291 },
1292 Alarm {
1293 time: now + Duration::new(15, 0),
1294 waker: noop_waker(),
1295 },
1296 Alarm {
1297 time: now + Duration::new(5, 0),
1298 waker: noop_waker(),
1299 },
1300 ];
1301 let mut heap = BinaryHeap::new();
1302 for alarm in alarms {
1303 heap.push(alarm);
1304 }
1305
1306 let mut sorted_times = Vec::new();
1308 while let Some(alarm) = heap.pop() {
1309 sorted_times.push(alarm.time);
1310 }
1311 assert_eq!(
1312 sorted_times,
1313 vec![
1314 now + Duration::new(5, 0),
1315 now + Duration::new(5, 0),
1316 now + Duration::new(10, 0),
1317 now + Duration::new(15, 0),
1318 ]
1319 );
1320 }
1321
1322 #[test]
1323 #[should_panic(expected = "runtime timeout")]
1324 fn test_timeout() {
1325 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1326 executor.start(|context| async move {
1327 loop {
1328 context.sleep(Duration::from_secs(1)).await;
1329 }
1330 });
1331 }
1332
1333 #[test]
1334 #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1335 fn test_bad_timeout() {
1336 let cfg = Config {
1337 timeout: Some(Duration::default()),
1338 cycle: Duration::default(),
1339 ..Config::default()
1340 };
1341 deterministic::Runner::new(cfg);
1342 }
1343
1344 #[test]
1345 fn test_recover_synced_storage_persists() {
1346 let executor1 = deterministic::Runner::default();
1348 let partition = "test_partition";
1349 let name = b"test_blob";
1350 let data = b"Hello, world!";
1351
1352 let (context, state) = executor1.start(|context| async move {
1354 let (blob, _) = context.open(partition, name).await.unwrap();
1355 blob.write_at(data, 0).await.unwrap();
1356 blob.sync().await.unwrap();
1357 let state = context.auditor().state();
1358 (context, state)
1359 });
1360 let recovered_context = context.recover();
1361
1362 assert_eq!(state, recovered_context.auditor().state());
1364
1365 let executor = Runner::from(recovered_context);
1367 executor.start(|context| async move {
1368 let (blob, len) = context.open(partition, name).await.unwrap();
1369 assert_eq!(len, data.len() as u64);
1370 let mut buf = vec![0; data.len()];
1371 blob.read_at(&mut buf, 0).await.unwrap();
1372 assert_eq!(buf, data);
1373 });
1374 }
1375
1376 #[test]
1377 fn test_recover_unsynced_storage_does_not_persist() {
1378 let executor = deterministic::Runner::default();
1380 let partition = "test_partition";
1381 let name = b"test_blob";
1382 let data = b"Hello, world!".to_vec();
1383
1384 let context = executor.start(|context| async move {
1386 let context = context.clone();
1387 let (blob, _) = context.open(partition, name).await.unwrap();
1388 blob.write_at(&data, 0).await.unwrap();
1389 context
1391 });
1392
1393 let context = context.recover();
1395 let executor = Runner::from(context);
1396
1397 executor.start(|context| async move {
1399 let (_, len) = context.open(partition, name).await.unwrap();
1400 assert_eq!(len, 0);
1401 });
1402 }
1403
1404 #[test]
1405 #[should_panic(expected = "execution is not finished")]
1406 fn test_recover_before_finish_panics() {
1407 let executor = deterministic::Runner::default();
1409
1410 executor.start(|context| async move {
1412 context.recover();
1414 });
1415 }
1416
1417 #[test]
1418 #[should_panic(expected = "runtime has already been recovered")]
1419 fn test_recover_twice_panics() {
1420 let executor = deterministic::Runner::default();
1422
1423 let context = executor.start(|context| async move { context });
1425
1426 let cloned_context = context.clone();
1428 context.recover();
1429
1430 cloned_context.recover();
1432 }
1433
1434 #[test]
1435 fn test_default_time_zero() {
1436 let executor = deterministic::Runner::default();
1438
1439 executor.start(|context| async move {
1440 assert_eq!(
1442 context.current().duration_since(UNIX_EPOCH).unwrap(),
1443 Duration::ZERO
1444 );
1445 });
1446 }
1447}