1use crate::{
26 mocks,
27 storage::{
28 audited::Storage as AuditedStorage, memory::Storage as MemStorage,
29 metered::Storage as MeteredStorage,
30 },
31 utils::Signaler,
32 Clock, Error, Handle, Signal, METRICS_PREFIX,
33};
34use commonware_utils::{hex, SystemTimeExt};
35use futures::{
36 channel::mpsc,
37 task::{waker_ref, ArcWake},
38 Future, SinkExt, StreamExt,
39};
40use governor::clock::{Clock as GClock, ReasonablyRealtime};
41use prometheus_client::{
42 encoding::{text::encode, EncodeLabelSet},
43 metrics::{counter::Counter, family::Family, gauge::Gauge},
44 registry::{Metric, Registry},
45};
46use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
47use sha2::{Digest, Sha256};
48use std::{
49 collections::{BinaryHeap, HashMap},
50 mem::replace,
51 net::{IpAddr, Ipv4Addr, SocketAddr},
52 ops::Range,
53 pin::Pin,
54 sync::{Arc, Mutex},
55 task::{self, Poll, Waker},
56 time::{Duration, SystemTime, UNIX_EPOCH},
57};
58use tracing::trace;
59
60const EPHEMERAL_PORT_RANGE: Range<u16> = 32768..61000;
62
63pub type Partition = HashMap<Vec<u8>, Vec<u8>>;
65
66#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
67struct Work {
68 label: String,
69}
70
71#[derive(Debug)]
72struct Metrics {
73 tasks_spawned: Family<Work, Counter>,
74 tasks_running: Family<Work, Gauge>,
75 blocking_tasks_spawned: Family<Work, Counter>,
76 blocking_tasks_running: Family<Work, Gauge>,
77 task_polls: Family<Work, Counter>,
78
79 network_bandwidth: Counter,
80}
81
82impl Metrics {
83 pub fn init(registry: &mut Registry) -> Self {
84 let metrics = Self {
85 task_polls: Family::default(),
86 tasks_spawned: Family::default(),
87 tasks_running: Family::default(),
88 blocking_tasks_spawned: Family::default(),
89 blocking_tasks_running: Family::default(),
90 network_bandwidth: Counter::default(),
91 };
92 registry.register(
93 "tasks_spawned",
94 "Total number of tasks spawned",
95 metrics.tasks_spawned.clone(),
96 );
97 registry.register(
98 "tasks_running",
99 "Number of tasks currently running",
100 metrics.tasks_running.clone(),
101 );
102 registry.register(
103 "blocking_tasks_spawned",
104 "Total number of blocking tasks spawned",
105 metrics.blocking_tasks_spawned.clone(),
106 );
107 registry.register(
108 "blocking_tasks_running",
109 "Number of blocking tasks currently running",
110 metrics.blocking_tasks_running.clone(),
111 );
112 registry.register(
113 "task_polls",
114 "Total number of task polls",
115 metrics.task_polls.clone(),
116 );
117 registry.register(
118 "bandwidth",
119 "Total amount of data sent over network",
120 metrics.network_bandwidth.clone(),
121 );
122 metrics
123 }
124}
125
126pub struct Auditor {
128 hash: Mutex<Vec<u8>>,
129}
130
131impl Default for Auditor {
132 fn default() -> Self {
133 Self {
134 hash: Vec::new().into(),
135 }
136 }
137}
138
139impl Auditor {
140 fn process_task(&self, task: u128, label: &str) {
141 let mut hash = self.hash.lock().unwrap();
142 let mut hasher = Sha256::new();
143 hasher.update(&*hash);
144 hasher.update(b"process_task");
145 hasher.update(task.to_be_bytes());
146 hasher.update(label.as_bytes());
147 *hash = hasher.finalize().to_vec();
148 }
149
150 fn stop(&self, value: i32) {
151 let mut hash = self.hash.lock().unwrap();
152 let mut hasher = Sha256::new();
153 hasher.update(&*hash);
154 hasher.update(b"stop");
155 hasher.update(value.to_be_bytes());
156 *hash = hasher.finalize().to_vec();
157 }
158
159 fn stopped(&self) {
160 let mut hash = self.hash.lock().unwrap();
161 let mut hasher = Sha256::new();
162 hasher.update(&*hash);
163 hasher.update(b"stopped");
164 *hash = hasher.finalize().to_vec();
165 }
166
167 fn bind(&self, address: SocketAddr) {
168 let mut hash = self.hash.lock().unwrap();
169 let mut hasher = Sha256::new();
170 hasher.update(&*hash);
171 hasher.update(b"bind");
172 hasher.update(address.to_string().as_bytes());
173 *hash = hasher.finalize().to_vec();
174 }
175
176 fn dial(&self, dialer: SocketAddr, listener: SocketAddr) {
177 let mut hash = self.hash.lock().unwrap();
178 let mut hasher = Sha256::new();
179 hasher.update(&*hash);
180 hasher.update(b"dial");
181 hasher.update(dialer.to_string().as_bytes());
182 hasher.update(listener.to_string().as_bytes());
183 *hash = hasher.finalize().to_vec();
184 }
185
186 fn accept(&self, listener: SocketAddr, dialer: SocketAddr) {
187 let mut hash = self.hash.lock().unwrap();
188 let mut hasher = Sha256::new();
189 hasher.update(&*hash);
190 hasher.update(b"accept");
191 hasher.update(listener.to_string().as_bytes());
192 hasher.update(dialer.to_string().as_bytes());
193 *hash = hasher.finalize().to_vec();
194 }
195
196 fn send(&self, sender: SocketAddr, receiver: SocketAddr, message: &[u8]) {
197 let mut hash = self.hash.lock().unwrap();
198 let mut hasher = Sha256::new();
199 hasher.update(&*hash);
200 hasher.update(b"send");
201 hasher.update(sender.to_string().as_bytes());
202 hasher.update(receiver.to_string().as_bytes());
203 hasher.update(message);
204 *hash = hasher.finalize().to_vec();
205 }
206
207 fn recv(&self, receiver: SocketAddr, sender: SocketAddr, message: &[u8]) {
208 let mut hash = self.hash.lock().unwrap();
209 let mut hasher = Sha256::new();
210 hasher.update(&*hash);
211 hasher.update(b"recv");
212 hasher.update(receiver.to_string().as_bytes());
213 hasher.update(sender.to_string().as_bytes());
214 hasher.update(message);
215 *hash = hasher.finalize().to_vec();
216 }
217
218 fn rand(&self, method: String) {
219 let mut hash = self.hash.lock().unwrap();
220 let mut hasher = Sha256::new();
221 hasher.update(&*hash);
222 hasher.update(b"rand");
223 hasher.update(method.as_bytes());
224 *hash = hasher.finalize().to_vec();
225 }
226
227 pub(crate) fn open(&self, partition: &str, name: &[u8]) {
228 let mut hash = self.hash.lock().unwrap();
229 let mut hasher = Sha256::new();
230 hasher.update(&*hash);
231 hasher.update(b"open");
232 hasher.update(partition.as_bytes());
233 hasher.update(name);
234 *hash = hasher.finalize().to_vec();
235 }
236
237 pub(crate) fn remove(&self, partition: &str, name: Option<&[u8]>) {
238 let mut hash = self.hash.lock().unwrap();
239 let mut hasher = Sha256::new();
240 hasher.update(&*hash);
241 hasher.update(b"remove");
242 hasher.update(partition.as_bytes());
243 if let Some(name) = name {
244 hasher.update(name);
245 }
246 *hash = hasher.finalize().to_vec();
247 }
248
249 pub(crate) fn scan(&self, partition: &str) {
250 let mut hash = self.hash.lock().unwrap();
251 let mut hasher = Sha256::new();
252 hasher.update(&*hash);
253 hasher.update(b"scan");
254 hasher.update(partition.as_bytes());
255 *hash = hasher.finalize().to_vec();
256 }
257
258 pub(crate) fn read_at(&self, partition: &str, name: &[u8], buf: usize, offset: u64) {
259 let mut hash = self.hash.lock().unwrap();
260 let mut hasher = Sha256::new();
261 hasher.update(&*hash);
262 hasher.update(b"read_at");
263 hasher.update(partition.as_bytes());
264 hasher.update(name);
265 hasher.update(buf.to_be_bytes());
266 hasher.update(offset.to_be_bytes());
267 *hash = hasher.finalize().to_vec();
268 }
269
270 pub(crate) fn write_at(&self, partition: &str, name: &[u8], buf: &[u8], offset: u64) {
271 let mut hash = self.hash.lock().unwrap();
272 let mut hasher = Sha256::new();
273 hasher.update(&*hash);
274 hasher.update(b"write_at");
275 hasher.update(partition.as_bytes());
276 hasher.update(name);
277 hasher.update(buf);
278 hasher.update(offset.to_be_bytes());
279 *hash = hasher.finalize().to_vec();
280 }
281
282 pub(crate) fn truncate(&self, partition: &str, name: &[u8], size: u64) {
283 let mut hash = self.hash.lock().unwrap();
284 let mut hasher = Sha256::new();
285 hasher.update(&*hash);
286 hasher.update(b"truncate");
287 hasher.update(partition.as_bytes());
288 hasher.update(name);
289 hasher.update(size.to_be_bytes());
290 *hash = hasher.finalize().to_vec();
291 }
292
293 pub(crate) fn sync(&self, partition: &str, name: &[u8]) {
294 let mut hash = self.hash.lock().unwrap();
295 let mut hasher = Sha256::new();
296 hasher.update(&*hash);
297 hasher.update(b"sync");
298 hasher.update(partition.as_bytes());
299 hasher.update(name);
300 *hash = hasher.finalize().to_vec();
301 }
302
303 pub(crate) fn close(&self, partition: &str, name: &[u8]) {
304 let mut hash = self.hash.lock().unwrap();
305 let mut hasher = Sha256::new();
306 hasher.update(&*hash);
307 hasher.update(b"close");
308 hasher.update(partition.as_bytes());
309 hasher.update(name);
310 *hash = hasher.finalize().to_vec();
311 }
312
313 fn register(&self, name: &str, help: &str) {
314 let mut hash = self.hash.lock().unwrap();
315 let mut hasher = Sha256::new();
316 hasher.update(&*hash);
317 hasher.update(b"register");
318 hasher.update(name.as_bytes());
319 hasher.update(help.as_bytes());
320 *hash = hasher.finalize().to_vec();
321 }
322
323 fn encode(&self) {
324 let mut hash = self.hash.lock().unwrap();
325 let mut hasher = Sha256::new();
326 hasher.update(&*hash);
327 hasher.update(b"encode");
328 *hash = hasher.finalize().to_vec();
329 }
330
331 pub fn state(&self) -> String {
336 let hash = self.hash.lock().unwrap().clone();
337 hex(&hash)
338 }
339}
340
341#[derive(Clone)]
343pub struct Config {
344 pub seed: u64,
346
347 pub cycle: Duration,
350
351 pub timeout: Option<Duration>,
353}
354
355impl Config {
356 pub fn assert(&self) {
357 assert!(
358 self.cycle != Duration::default() || self.timeout.is_none(),
359 "cycle duration must be non-zero when timeout is set",
360 );
361 }
362}
363
364impl Default for Config {
365 fn default() -> Self {
366 Self {
367 seed: 42,
368 cycle: Duration::from_millis(1),
369 timeout: None,
370 }
371 }
372}
373
374pub struct Executor {
376 registry: Mutex<Registry>,
377 cycle: Duration,
378 deadline: Option<SystemTime>,
379 metrics: Arc<Metrics>,
380 auditor: Arc<Auditor>,
381 rng: Mutex<StdRng>,
382 time: Mutex<SystemTime>,
383 tasks: Arc<Tasks>,
384 sleeping: Mutex<BinaryHeap<Alarm>>,
385 partitions: Mutex<HashMap<String, Partition>>,
386 signaler: Mutex<Signaler>,
387 signal: Signal,
388 finished: Mutex<bool>,
389 recovered: Mutex<bool>,
390}
391
392enum State {
393 Config(Config),
394 Context(Context),
395}
396
397pub struct Runner {
399 state: State,
400}
401
402impl From<Config> for Runner {
403 fn from(cfg: Config) -> Self {
404 Self::new(cfg)
405 }
406}
407
408impl From<Context> for Runner {
409 fn from(context: Context) -> Self {
410 Self {
411 state: State::Context(context),
412 }
413 }
414}
415
416impl Runner {
417 pub fn new(cfg: Config) -> Self {
419 cfg.assert();
421 Runner {
422 state: State::Config(cfg),
423 }
424 }
425
426 pub fn seeded(seed: u64) -> Self {
429 let cfg = Config {
430 seed,
431 ..Config::default()
432 };
433 Self::new(cfg)
434 }
435
436 pub fn timed(timeout: Duration) -> Self {
439 let cfg = Config {
440 timeout: Some(timeout),
441 ..Config::default()
442 };
443 Self::new(cfg)
444 }
445}
446
447impl Default for Runner {
448 fn default() -> Self {
449 Self::new(Config::default())
450 }
451}
452
453impl crate::Runner for Runner {
454 type Context = Context;
455
456 fn start<F, Fut>(self, f: F) -> Fut::Output
457 where
458 F: FnOnce(Self::Context) -> Fut,
459 Fut: Future,
460 {
461 let context = match self.state {
463 State::Config(config) => Context::new(config),
464 State::Context(context) => context,
465 };
466
467 let executor = context.executor.clone();
469 let mut root = Box::pin(f(context));
470
471 Tasks::register_root(&executor.tasks);
473
474 let mut iter = 0;
476 loop {
477 {
479 let current = executor.time.lock().unwrap();
480 if let Some(deadline) = executor.deadline {
481 if *current >= deadline {
482 panic!("runtime timeout");
483 }
484 }
485 }
486
487 let mut tasks = executor.tasks.drain();
489
490 {
492 let mut rng = executor.rng.lock().unwrap();
493 tasks.shuffle(&mut *rng);
494 }
495
496 trace!(iter, tasks = tasks.len(), "starting loop");
502 for task in tasks {
503 executor.auditor.process_task(task.id, &task.label);
505 trace!(id = task.id, "processing task");
506
507 executor
509 .metrics
510 .task_polls
511 .get_or_create(&Work {
512 label: task.label.clone(),
513 })
514 .inc();
515
516 let waker = waker_ref(&task);
518 let mut cx = task::Context::from_waker(&waker);
519 match &task.operation {
520 Operation::Root => {
521 if let Poll::Ready(output) = root.as_mut().poll(&mut cx) {
523 trace!(id = task.id, "task is complete");
524 *executor.finished.lock().unwrap() = true;
525 return output;
526 }
527 }
528 Operation::Work { future, completed } => {
529 if *completed.lock().unwrap() {
531 trace!(id = task.id, "dropping already complete task");
532 continue;
533 }
534
535 let mut fut = future.lock().unwrap();
537 if fut.as_mut().poll(&mut cx).is_ready() {
538 trace!(id = task.id, "task is complete");
539 *completed.lock().unwrap() = true;
540 continue;
541 }
542 }
543 }
544
545 trace!(id = task.id, "task is still pending");
547 }
548
549 let mut current;
554 {
555 let mut time = executor.time.lock().unwrap();
556 *time = time
557 .checked_add(executor.cycle)
558 .expect("executor time overflowed");
559 current = *time;
560 }
561 trace!(now = current.epoch_millis(), "time advanced");
562
563 if executor.tasks.len() == 0 {
565 let mut skip = None;
566 {
567 let sleeping = executor.sleeping.lock().unwrap();
568 if let Some(next) = sleeping.peek() {
569 if next.time > current {
570 skip = Some(next.time);
571 }
572 }
573 }
574 if skip.is_some() {
575 {
576 let mut time = executor.time.lock().unwrap();
577 *time = skip.unwrap();
578 current = *time;
579 }
580 trace!(now = current.epoch_millis(), "time skipped");
581 }
582 }
583
584 let mut to_wake = Vec::new();
586 let mut remaining;
587 {
588 let mut sleeping = executor.sleeping.lock().unwrap();
589 while let Some(next) = sleeping.peek() {
590 if next.time <= current {
591 let sleeper = sleeping.pop().unwrap();
592 to_wake.push(sleeper.waker);
593 } else {
594 break;
595 }
596 }
597 remaining = sleeping.len();
598 }
599 for waker in to_wake {
600 waker.wake();
601 }
602
603 remaining += executor.tasks.len();
605
606 if remaining == 0 {
609 panic!("runtime stalled");
610 }
611 iter += 1;
612 }
613 }
614}
615
616enum Operation {
618 Root,
619 Work {
620 future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
621 completed: Mutex<bool>,
622 },
623}
624
625struct Task {
627 id: u128,
628 label: String,
629 tasks: Arc<Tasks>,
630
631 operation: Operation,
632}
633
634impl ArcWake for Task {
635 fn wake_by_ref(arc_self: &Arc<Self>) {
636 arc_self.tasks.enqueue(arc_self.clone());
637 }
638}
639
640struct Tasks {
642 counter: Mutex<u128>,
644 queue: Mutex<Vec<Arc<Task>>>,
646 root_registered: Mutex<bool>,
648}
649
650impl Tasks {
651 fn new() -> Self {
653 Self {
654 counter: Mutex::new(0),
655 queue: Mutex::new(Vec::new()),
656 root_registered: Mutex::new(false),
657 }
658 }
659
660 fn increment(&self) -> u128 {
662 let mut counter = self.counter.lock().unwrap();
663 let old = *counter;
664 *counter = counter.checked_add(1).expect("task counter overflow");
665 old
666 }
667
668 fn register_root(arc_self: &Arc<Self>) {
672 {
673 let mut registered = arc_self.root_registered.lock().unwrap();
674 assert!(!*registered, "root already registered");
675 *registered = true;
676 }
677 let id = arc_self.increment();
678 let mut queue = arc_self.queue.lock().unwrap();
679 queue.push(Arc::new(Task {
680 id,
681 label: String::new(),
682 tasks: arc_self.clone(),
683 operation: Operation::Root,
684 }));
685 }
686
687 fn register_work(
689 arc_self: &Arc<Self>,
690 label: &str,
691 future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
692 ) {
693 let id = arc_self.increment();
694 let mut queue = arc_self.queue.lock().unwrap();
695 queue.push(Arc::new(Task {
696 id,
697 label: label.to_string(),
698 tasks: arc_self.clone(),
699 operation: Operation::Work {
700 future: Mutex::new(future),
701 completed: Mutex::new(false),
702 },
703 }));
704 }
705
706 fn enqueue(&self, task: Arc<Task>) {
708 let mut queue = self.queue.lock().unwrap();
709 queue.push(task);
710 }
711
712 fn drain(&self) -> Vec<Arc<Task>> {
714 let mut queue = self.queue.lock().unwrap();
715 let len = queue.len();
716 replace(&mut *queue, Vec::with_capacity(len))
717 }
718
719 fn len(&self) -> usize {
721 self.queue.lock().unwrap().len()
722 }
723}
724
725pub struct Context {
729 label: String,
730 spawned: bool,
731 executor: Arc<Executor>,
732 networking: Arc<Networking>,
733 storage: MeteredStorage<AuditedStorage<MemStorage>>,
734}
735
736impl Default for Context {
737 fn default() -> Self {
738 Self::new(Config::default())
739 }
740}
741
742impl Context {
743 pub fn new(cfg: Config) -> Self {
744 let mut registry = Registry::default();
746 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
747
748 let metrics = Arc::new(Metrics::init(runtime_registry));
750 let start_time = UNIX_EPOCH;
751 let deadline = cfg
752 .timeout
753 .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
754 let (signaler, signal) = Signaler::new();
755 let auditor = Arc::new(Auditor::default());
756 let storage = MeteredStorage::new(
757 AuditedStorage::new(MemStorage::default(), auditor.clone()),
758 runtime_registry,
759 );
760 let executor = Arc::new(Executor {
761 registry: Mutex::new(registry),
762 cycle: cfg.cycle,
763 deadline,
764 metrics: metrics.clone(),
765 auditor: auditor.clone(),
766 rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
767 time: Mutex::new(start_time),
768 tasks: Arc::new(Tasks::new()),
769 sleeping: Mutex::new(BinaryHeap::new()),
770 partitions: Mutex::new(HashMap::new()),
771 signaler: Mutex::new(signaler),
772 signal,
773 finished: Mutex::new(false),
774 recovered: Mutex::new(false),
775 });
776 Context {
777 label: String::new(),
778 spawned: false,
779 executor: executor.clone(),
780 networking: Arc::new(Networking::new(metrics, auditor)),
781 storage,
782 }
783 }
784
785 pub fn recover(self) -> Self {
797 if !*self.executor.finished.lock().unwrap() {
799 panic!("execution is not finished");
800 }
801
802 {
804 let mut recovered = self.executor.recovered.lock().unwrap();
805 if *recovered {
806 panic!("runtime has already been recovered");
807 }
808 *recovered = true;
809 }
810
811 let mut registry = Registry::default();
813 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
814 let metrics = Arc::new(Metrics::init(runtime_registry));
815
816 let auditor = self.executor.auditor.clone();
818 let (signaler, signal) = Signaler::new();
819 let executor = Arc::new(Executor {
820 cycle: self.executor.cycle,
822 deadline: self.executor.deadline,
823 auditor: auditor.clone(),
824 rng: Mutex::new(self.executor.rng.lock().unwrap().clone()),
825 time: Mutex::new(*self.executor.time.lock().unwrap()),
826 partitions: Mutex::new(self.executor.partitions.lock().unwrap().clone()),
827
828 registry: Mutex::new(registry),
830 metrics: metrics.clone(),
831 tasks: Arc::new(Tasks::new()),
832 sleeping: Mutex::new(BinaryHeap::new()),
833 signaler: Mutex::new(signaler),
834 signal,
835 finished: Mutex::new(false),
836 recovered: Mutex::new(false),
837 });
838 Self {
839 label: String::new(),
840 spawned: false,
841 executor,
842 networking: Arc::new(Networking::new(metrics, auditor.clone())),
843 storage: self.storage,
844 }
845 }
846
847 pub fn auditor(&self) -> &Auditor {
848 &self.executor.auditor
849 }
850}
851
852impl Clone for Context {
853 fn clone(&self) -> Self {
854 Self {
855 label: self.label.clone(),
856 spawned: false,
857 executor: self.executor.clone(),
858 networking: self.networking.clone(),
859 storage: self.storage.clone(),
860 }
861 }
862}
863
864impl crate::Spawner for Context {
865 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
866 where
867 F: FnOnce(Self) -> Fut + Send + 'static,
868 Fut: Future<Output = T> + Send + 'static,
869 T: Send + 'static,
870 {
871 assert!(!self.spawned, "already spawned");
873
874 let label = self.label.clone();
876 let work = Work {
877 label: label.clone(),
878 };
879 self.executor
880 .metrics
881 .tasks_spawned
882 .get_or_create(&work)
883 .inc();
884 let gauge = self
885 .executor
886 .metrics
887 .tasks_running
888 .get_or_create(&work)
889 .clone();
890
891 let executor = self.executor.clone();
893 let future = f(self);
894 let (f, handle) = Handle::init(future, gauge, false);
895
896 Tasks::register_work(&executor.tasks, &label, Box::pin(f));
898 handle
899 }
900
901 fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
902 where
903 F: Future<Output = T> + Send + 'static,
904 T: Send + 'static,
905 {
906 assert!(!self.spawned, "already spawned");
908 self.spawned = true;
909
910 let work = Work {
912 label: self.label.clone(),
913 };
914 self.executor
915 .metrics
916 .tasks_spawned
917 .get_or_create(&work)
918 .inc();
919 let gauge = self
920 .executor
921 .metrics
922 .tasks_running
923 .get_or_create(&work)
924 .clone();
925
926 let label = self.label.clone();
928 let executor = self.executor.clone();
929 move |f: F| {
930 let (f, handle) = Handle::init(f, gauge, false);
931
932 Tasks::register_work(&executor.tasks, &label, Box::pin(f));
934 handle
935 }
936 }
937
938 fn spawn_blocking<F, T>(self, f: F) -> Handle<T>
939 where
940 F: FnOnce() -> T + Send + 'static,
941 T: Send + 'static,
942 {
943 assert!(!self.spawned, "already spawned");
945
946 let work = Work {
948 label: self.label.clone(),
949 };
950 self.executor
951 .metrics
952 .blocking_tasks_spawned
953 .get_or_create(&work)
954 .inc();
955 let gauge = self
956 .executor
957 .metrics
958 .blocking_tasks_running
959 .get_or_create(&work)
960 .clone();
961
962 let (f, handle) = Handle::init_blocking(f, gauge, false);
964
965 let f = async move { f() };
967 Tasks::register_work(&self.executor.tasks, &self.label, Box::pin(f));
968 handle
969 }
970
971 fn stop(&self, value: i32) {
972 self.executor.auditor.stop(value);
973 self.executor.signaler.lock().unwrap().signal(value);
974 }
975
976 fn stopped(&self) -> Signal {
977 self.executor.auditor.stopped();
978 self.executor.signal.clone()
979 }
980}
981
982impl crate::Metrics for Context {
983 fn with_label(&self, label: &str) -> Self {
984 let label = {
985 let prefix = self.label.clone();
986 if prefix.is_empty() {
987 label.to_string()
988 } else {
989 format!("{}_{}", prefix, label)
990 }
991 };
992 assert!(
993 !label.starts_with(METRICS_PREFIX),
994 "using runtime label is not allowed"
995 );
996 Self {
997 label,
998 spawned: false,
999 executor: self.executor.clone(),
1000 networking: self.networking.clone(),
1001 storage: self.storage.clone(),
1002 }
1003 }
1004
1005 fn label(&self) -> String {
1006 self.label.clone()
1007 }
1008
1009 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
1010 let name = name.into();
1012 let help = help.into();
1013
1014 self.executor.auditor.register(&name, &help);
1016 let prefixed_name = {
1017 let prefix = &self.label;
1018 if prefix.is_empty() {
1019 name
1020 } else {
1021 format!("{}_{}", *prefix, name)
1022 }
1023 };
1024 self.executor
1025 .registry
1026 .lock()
1027 .unwrap()
1028 .register(prefixed_name, help, metric)
1029 }
1030
1031 fn encode(&self) -> String {
1032 self.executor.auditor.encode();
1033 let mut buffer = String::new();
1034 encode(&mut buffer, &self.executor.registry.lock().unwrap()).expect("encoding failed");
1035 buffer
1036 }
1037}
1038
1039struct Sleeper {
1040 executor: Arc<Executor>,
1041 time: SystemTime,
1042 registered: bool,
1043}
1044
1045struct Alarm {
1046 time: SystemTime,
1047 waker: Waker,
1048}
1049
1050impl PartialEq for Alarm {
1051 fn eq(&self, other: &Self) -> bool {
1052 self.time.eq(&other.time)
1053 }
1054}
1055
1056impl Eq for Alarm {}
1057
1058impl PartialOrd for Alarm {
1059 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1060 Some(self.cmp(other))
1061 }
1062}
1063
1064impl Ord for Alarm {
1065 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1066 other.time.cmp(&self.time)
1068 }
1069}
1070
1071impl Future for Sleeper {
1072 type Output = ();
1073
1074 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1075 {
1076 let current_time = *self.executor.time.lock().unwrap();
1077 if current_time >= self.time {
1078 return Poll::Ready(());
1079 }
1080 }
1081 if !self.registered {
1082 self.registered = true;
1083 self.executor.sleeping.lock().unwrap().push(Alarm {
1084 time: self.time,
1085 waker: cx.waker().clone(),
1086 });
1087 }
1088 Poll::Pending
1089 }
1090}
1091
1092impl Clock for Context {
1093 fn current(&self) -> SystemTime {
1094 *self.executor.time.lock().unwrap()
1095 }
1096
1097 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
1098 let deadline = self
1099 .current()
1100 .checked_add(duration)
1101 .expect("overflow when setting wake time");
1102 self.sleep_until(deadline)
1103 }
1104
1105 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
1106 Sleeper {
1107 executor: self.executor.clone(),
1108
1109 time: deadline,
1110 registered: false,
1111 }
1112 }
1113}
1114
1115impl GClock for Context {
1116 type Instant = SystemTime;
1117
1118 fn now(&self) -> Self::Instant {
1119 self.current()
1120 }
1121}
1122
1123impl ReasonablyRealtime for Context {}
1124
1125type Dialable = mpsc::UnboundedSender<(
1126 SocketAddr,
1127 mocks::Sink, mocks::Stream, )>;
1130
1131struct Networking {
1138 metrics: Arc<Metrics>,
1139 auditor: Arc<Auditor>,
1140 ephemeral: Mutex<u16>,
1141 listeners: Mutex<HashMap<SocketAddr, Dialable>>,
1142}
1143
1144impl Networking {
1145 fn new(metrics: Arc<Metrics>, auditor: Arc<Auditor>) -> Self {
1146 Self {
1147 metrics,
1148 auditor,
1149 ephemeral: Mutex::new(EPHEMERAL_PORT_RANGE.start),
1150 listeners: Mutex::new(HashMap::new()),
1151 }
1152 }
1153
1154 fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
1155 self.auditor.bind(socket);
1156
1157 if socket.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
1160 && EPHEMERAL_PORT_RANGE.contains(&socket.port())
1161 {
1162 return Err(Error::BindFailed);
1163 }
1164
1165 let mut listeners = self.listeners.lock().unwrap();
1167 if listeners.contains_key(&socket) {
1168 return Err(Error::BindFailed);
1169 }
1170
1171 let (sender, receiver) = mpsc::unbounded();
1173 listeners.insert(socket, sender);
1174 Ok(Listener {
1175 auditor: self.auditor.clone(),
1176 address: socket,
1177 listener: receiver,
1178 metrics: self.metrics.clone(),
1179 })
1180 }
1181
1182 async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
1183 let dialer = {
1185 let mut ephemeral = self.ephemeral.lock().unwrap();
1186 let dialer = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), *ephemeral);
1187 *ephemeral = ephemeral
1188 .checked_add(1)
1189 .expect("ephemeral port range exhausted");
1190 dialer
1191 };
1192 self.auditor.dial(dialer, socket);
1193
1194 let mut sender = {
1196 let listeners = self.listeners.lock().unwrap();
1197 let sender = listeners.get(&socket).ok_or(Error::ConnectionFailed)?;
1198 sender.clone()
1199 };
1200
1201 let (dialer_sender, dialer_receiver) = mocks::Channel::init();
1203 let (listener_sender, listener_receiver) = mocks::Channel::init();
1204 sender
1205 .send((dialer, dialer_sender, listener_receiver))
1206 .await
1207 .map_err(|_| Error::ConnectionFailed)?;
1208 Ok((
1209 Sink {
1210 metrics: self.metrics.clone(),
1211 auditor: self.auditor.clone(),
1212 me: dialer,
1213 peer: socket,
1214 sender: listener_sender,
1215 },
1216 Stream {
1217 auditor: self.auditor.clone(),
1218 me: dialer,
1219 peer: socket,
1220 receiver: dialer_receiver,
1221 },
1222 ))
1223 }
1224}
1225
1226impl crate::Network<Listener, Sink, Stream> for Context {
1227 async fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
1228 self.networking.bind(socket)
1229 }
1230
1231 async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
1232 self.networking.dial(socket).await
1233 }
1234}
1235
1236pub struct Listener {
1238 metrics: Arc<Metrics>,
1239 auditor: Arc<Auditor>,
1240 address: SocketAddr,
1241 listener: mpsc::UnboundedReceiver<(SocketAddr, mocks::Sink, mocks::Stream)>,
1242}
1243
1244impl crate::Listener<Sink, Stream> for Listener {
1245 async fn accept(&mut self) -> Result<(SocketAddr, Sink, Stream), Error> {
1246 let (socket, sender, receiver) = self.listener.next().await.ok_or(Error::ReadFailed)?;
1247 self.auditor.accept(self.address, socket);
1248 Ok((
1249 socket,
1250 Sink {
1251 metrics: self.metrics.clone(),
1252 auditor: self.auditor.clone(),
1253 me: self.address,
1254 peer: socket,
1255 sender,
1256 },
1257 Stream {
1258 auditor: self.auditor.clone(),
1259 me: self.address,
1260 peer: socket,
1261 receiver,
1262 },
1263 ))
1264 }
1265}
1266
1267pub struct Sink {
1269 metrics: Arc<Metrics>,
1270 auditor: Arc<Auditor>,
1271 me: SocketAddr,
1272 peer: SocketAddr,
1273 sender: mocks::Sink,
1274}
1275
1276impl crate::Sink for Sink {
1277 async fn send(&mut self, msg: &[u8]) -> Result<(), Error> {
1278 self.auditor.send(self.me, self.peer, msg);
1279 self.sender.send(msg).await.map_err(|_| Error::SendFailed)?;
1280 self.metrics.network_bandwidth.inc_by(msg.len() as u64);
1281 Ok(())
1282 }
1283}
1284
1285pub struct Stream {
1287 auditor: Arc<Auditor>,
1288 me: SocketAddr,
1289 peer: SocketAddr,
1290 receiver: mocks::Stream,
1291}
1292
1293impl crate::Stream for Stream {
1294 async fn recv(&mut self, buf: &mut [u8]) -> Result<(), Error> {
1295 self.receiver
1296 .recv(buf)
1297 .await
1298 .map_err(|_| Error::RecvFailed)?;
1299 self.auditor.recv(self.me, self.peer, buf);
1300 Ok(())
1301 }
1302}
1303
1304impl RngCore for Context {
1305 fn next_u32(&mut self) -> u32 {
1306 self.executor.auditor.rand("next_u32".to_string());
1307 self.executor.rng.lock().unwrap().next_u32()
1308 }
1309
1310 fn next_u64(&mut self) -> u64 {
1311 self.executor.auditor.rand("next_u64".to_string());
1312 self.executor.rng.lock().unwrap().next_u64()
1313 }
1314
1315 fn fill_bytes(&mut self, dest: &mut [u8]) {
1316 self.executor.auditor.rand("fill_bytes".to_string());
1317 self.executor.rng.lock().unwrap().fill_bytes(dest)
1318 }
1319
1320 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1321 self.executor.auditor.rand("try_fill_bytes".to_string());
1322 self.executor.rng.lock().unwrap().try_fill_bytes(dest)
1323 }
1324}
1325
1326impl CryptoRng for Context {}
1327
1328impl crate::Storage for Context {
1329 type Blob = <MeteredStorage<AuditedStorage<MemStorage>> as crate::Storage>::Blob;
1330
1331 async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> {
1332 self.storage.open(partition, name).await
1333 }
1334
1335 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1336 self.storage.remove(partition, name).await
1337 }
1338
1339 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1340 self.storage.scan(partition).await
1341 }
1342}
1343
1344#[cfg(test)]
1345mod tests {
1346 use super::*;
1347 use crate::{deterministic, utils::run_tasks, Blob, Runner as _, Storage};
1348 use commonware_macros::test_traced;
1349 use futures::task::noop_waker;
1350
1351 fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1352 let executor = deterministic::Runner::seeded(seed);
1353 run_tasks(5, executor)
1354 }
1355
1356 #[test]
1357 fn test_same_seed_same_order() {
1358 let mut outputs = Vec::new();
1360 for seed in 0..1000 {
1361 let output = run_with_seed(seed);
1362 outputs.push(output);
1363 }
1364
1365 for seed in 0..1000 {
1367 let output = run_with_seed(seed);
1368 assert_eq!(output, outputs[seed as usize]);
1369 }
1370 }
1371
1372 #[test_traced("TRACE")]
1373 fn test_different_seeds_different_order() {
1374 let output1 = run_with_seed(12345);
1375 let output2 = run_with_seed(54321);
1376 assert_ne!(output1, output2);
1377 }
1378
1379 #[test]
1380 fn test_alarm_min_heap() {
1381 let now = SystemTime::now();
1383 let alarms = vec![
1384 Alarm {
1385 time: now + Duration::new(10, 0),
1386 waker: noop_waker(),
1387 },
1388 Alarm {
1389 time: now + Duration::new(5, 0),
1390 waker: noop_waker(),
1391 },
1392 Alarm {
1393 time: now + Duration::new(15, 0),
1394 waker: noop_waker(),
1395 },
1396 Alarm {
1397 time: now + Duration::new(5, 0),
1398 waker: noop_waker(),
1399 },
1400 ];
1401 let mut heap = BinaryHeap::new();
1402 for alarm in alarms {
1403 heap.push(alarm);
1404 }
1405
1406 let mut sorted_times = Vec::new();
1408 while let Some(alarm) = heap.pop() {
1409 sorted_times.push(alarm.time);
1410 }
1411 assert_eq!(
1412 sorted_times,
1413 vec![
1414 now + Duration::new(5, 0),
1415 now + Duration::new(5, 0),
1416 now + Duration::new(10, 0),
1417 now + Duration::new(15, 0),
1418 ]
1419 );
1420 }
1421
1422 #[test]
1423 #[should_panic(expected = "runtime timeout")]
1424 fn test_timeout() {
1425 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1426 executor.start(|context| async move {
1427 loop {
1428 context.sleep(Duration::from_secs(1)).await;
1429 }
1430 });
1431 }
1432
1433 #[test]
1434 #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1435 fn test_bad_timeout() {
1436 let cfg = Config {
1437 timeout: Some(Duration::default()),
1438 cycle: Duration::default(),
1439 ..Config::default()
1440 };
1441 deterministic::Runner::new(cfg);
1442 }
1443
1444 #[test]
1445 fn test_recover_synced_storage_persists() {
1446 let executor1 = deterministic::Runner::default();
1448 let partition = "test_partition";
1449 let name = b"test_blob";
1450 let data = b"Hello, world!";
1451
1452 let (context, state) = executor1.start(|context| async move {
1454 let (blob, _) = context.open(partition, name).await.unwrap();
1455 blob.write_at(data, 0).await.unwrap();
1456 blob.sync().await.unwrap();
1457 let state = context.auditor().state();
1458 (context, state)
1459 });
1460 let recovered_context = context.recover();
1461
1462 assert_eq!(state, recovered_context.auditor().state());
1464
1465 let executor = Runner::from(recovered_context);
1467 executor.start(|context| async move {
1468 let (blob, len) = context.open(partition, name).await.unwrap();
1469 assert_eq!(len, data.len() as u64);
1470 let mut buf = vec![0; data.len()];
1471 blob.read_at(&mut buf, 0).await.unwrap();
1472 assert_eq!(buf, data);
1473 });
1474 }
1475
1476 #[test]
1477 fn test_recover_unsynced_storage_does_not_persist() {
1478 let executor = deterministic::Runner::default();
1480 let partition = "test_partition";
1481 let name = b"test_blob";
1482 let data = b"Hello, world!".to_vec();
1483
1484 let context = executor.start(|context| async move {
1486 let context = context.clone();
1487 let (blob, _) = context.open(partition, name).await.unwrap();
1488 blob.write_at(&data, 0).await.unwrap();
1489 context
1491 });
1492
1493 let context = context.recover();
1495 let executor = Runner::from(context);
1496
1497 executor.start(|context| async move {
1499 let (_, len) = context.open(partition, name).await.unwrap();
1500 assert_eq!(len, 0);
1501 });
1502 }
1503
1504 #[test]
1505 #[should_panic(expected = "execution is not finished")]
1506 fn test_recover_before_finish_panics() {
1507 let executor = deterministic::Runner::default();
1509
1510 executor.start(|context| async move {
1512 context.recover();
1514 });
1515 }
1516
1517 #[test]
1518 #[should_panic(expected = "runtime has already been recovered")]
1519 fn test_recover_twice_panics() {
1520 let executor = deterministic::Runner::default();
1522
1523 let context = executor.start(|context| async move { context });
1525
1526 let cloned_context = context.clone();
1528 context.recover();
1529
1530 cloned_context.recover();
1532 }
1533
1534 #[test]
1535 fn test_default_time_zero() {
1536 let executor = deterministic::Runner::default();
1538
1539 executor.start(|context| async move {
1540 assert_eq!(
1542 context.current().duration_since(UNIX_EPOCH).unwrap(),
1543 Duration::ZERO
1544 );
1545 });
1546 }
1547}