1use crate::{
26 mocks, storage::audited::Storage as AuditedStorage, storage::memory::Storage as MemStorage,
27 storage::metered::Storage as MeteredStorage, utils::Signaler, Clock, Error, Handle, Signal,
28 METRICS_PREFIX,
29};
30use commonware_utils::{hex, SystemTimeExt};
31use futures::{
32 channel::mpsc,
33 task::{waker_ref, ArcWake},
34 SinkExt, StreamExt,
35};
36use governor::clock::{Clock as GClock, ReasonablyRealtime};
37use prometheus_client::{
38 encoding::{text::encode, EncodeLabelSet},
39 metrics::{counter::Counter, family::Family, gauge::Gauge},
40 registry::{Metric, Registry},
41};
42use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
43use sha2::{Digest, Sha256};
44use std::{
45 collections::{BinaryHeap, HashMap},
46 future::Future,
47 mem::replace,
48 net::{IpAddr, Ipv4Addr, SocketAddr},
49 ops::Range,
50 pin::Pin,
51 sync::{Arc, Mutex},
52 task::{self, Poll, Waker},
53 time::{Duration, SystemTime, UNIX_EPOCH},
54};
55use tracing::trace;
56
57const EPHEMERAL_PORT_RANGE: Range<u16> = 32768..61000;
59
60pub type Partition = HashMap<Vec<u8>, Vec<u8>>;
62
63#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
64struct Work {
65 label: String,
66}
67
68#[derive(Debug)]
69struct Metrics {
70 tasks_spawned: Family<Work, Counter>,
71 tasks_running: Family<Work, Gauge>,
72 blocking_tasks_spawned: Family<Work, Counter>,
73 blocking_tasks_running: Family<Work, Gauge>,
74 task_polls: Family<Work, Counter>,
75
76 network_bandwidth: Counter,
77}
78
79impl Metrics {
80 pub fn init(registry: &mut Registry) -> Self {
81 let metrics = Self {
82 task_polls: Family::default(),
83 tasks_spawned: Family::default(),
84 tasks_running: Family::default(),
85 blocking_tasks_spawned: Family::default(),
86 blocking_tasks_running: Family::default(),
87 network_bandwidth: Counter::default(),
88 };
89 registry.register(
90 "tasks_spawned",
91 "Total number of tasks spawned",
92 metrics.tasks_spawned.clone(),
93 );
94 registry.register(
95 "tasks_running",
96 "Number of tasks currently running",
97 metrics.tasks_running.clone(),
98 );
99 registry.register(
100 "blocking_tasks_spawned",
101 "Total number of blocking tasks spawned",
102 metrics.blocking_tasks_spawned.clone(),
103 );
104 registry.register(
105 "blocking_tasks_running",
106 "Number of blocking tasks currently running",
107 metrics.blocking_tasks_running.clone(),
108 );
109 registry.register(
110 "task_polls",
111 "Total number of task polls",
112 metrics.task_polls.clone(),
113 );
114 registry.register(
115 "bandwidth",
116 "Total amount of data sent over network",
117 metrics.network_bandwidth.clone(),
118 );
119 metrics
120 }
121}
122
123pub struct Auditor {
125 hash: Mutex<Vec<u8>>,
126}
127
128impl Default for Auditor {
129 fn default() -> Self {
130 Self {
131 hash: Vec::new().into(),
132 }
133 }
134}
135
136impl Auditor {
137 fn process_task(&self, task: u128, label: &str) {
138 let mut hash = self.hash.lock().unwrap();
139 let mut hasher = Sha256::new();
140 hasher.update(&*hash);
141 hasher.update(b"process_task");
142 hasher.update(task.to_be_bytes());
143 hasher.update(label.as_bytes());
144 *hash = hasher.finalize().to_vec();
145 }
146
147 fn stop(&self, value: i32) {
148 let mut hash = self.hash.lock().unwrap();
149 let mut hasher = Sha256::new();
150 hasher.update(&*hash);
151 hasher.update(b"stop");
152 hasher.update(value.to_be_bytes());
153 *hash = hasher.finalize().to_vec();
154 }
155
156 fn stopped(&self) {
157 let mut hash = self.hash.lock().unwrap();
158 let mut hasher = Sha256::new();
159 hasher.update(&*hash);
160 hasher.update(b"stopped");
161 *hash = hasher.finalize().to_vec();
162 }
163
164 fn bind(&self, address: SocketAddr) {
165 let mut hash = self.hash.lock().unwrap();
166 let mut hasher = Sha256::new();
167 hasher.update(&*hash);
168 hasher.update(b"bind");
169 hasher.update(address.to_string().as_bytes());
170 *hash = hasher.finalize().to_vec();
171 }
172
173 fn dial(&self, dialer: SocketAddr, listener: SocketAddr) {
174 let mut hash = self.hash.lock().unwrap();
175 let mut hasher = Sha256::new();
176 hasher.update(&*hash);
177 hasher.update(b"dial");
178 hasher.update(dialer.to_string().as_bytes());
179 hasher.update(listener.to_string().as_bytes());
180 *hash = hasher.finalize().to_vec();
181 }
182
183 fn accept(&self, listener: SocketAddr, dialer: SocketAddr) {
184 let mut hash = self.hash.lock().unwrap();
185 let mut hasher = Sha256::new();
186 hasher.update(&*hash);
187 hasher.update(b"accept");
188 hasher.update(listener.to_string().as_bytes());
189 hasher.update(dialer.to_string().as_bytes());
190 *hash = hasher.finalize().to_vec();
191 }
192
193 fn send(&self, sender: SocketAddr, receiver: SocketAddr, message: &[u8]) {
194 let mut hash = self.hash.lock().unwrap();
195 let mut hasher = Sha256::new();
196 hasher.update(&*hash);
197 hasher.update(b"send");
198 hasher.update(sender.to_string().as_bytes());
199 hasher.update(receiver.to_string().as_bytes());
200 hasher.update(message);
201 *hash = hasher.finalize().to_vec();
202 }
203
204 fn recv(&self, receiver: SocketAddr, sender: SocketAddr, message: &[u8]) {
205 let mut hash = self.hash.lock().unwrap();
206 let mut hasher = Sha256::new();
207 hasher.update(&*hash);
208 hasher.update(b"recv");
209 hasher.update(receiver.to_string().as_bytes());
210 hasher.update(sender.to_string().as_bytes());
211 hasher.update(message);
212 *hash = hasher.finalize().to_vec();
213 }
214
215 fn rand(&self, method: String) {
216 let mut hash = self.hash.lock().unwrap();
217 let mut hasher = Sha256::new();
218 hasher.update(&*hash);
219 hasher.update(b"rand");
220 hasher.update(method.as_bytes());
221 *hash = hasher.finalize().to_vec();
222 }
223
224 pub(crate) fn open(&self, partition: &str, name: &[u8]) {
225 let mut hash = self.hash.lock().unwrap();
226 let mut hasher = Sha256::new();
227 hasher.update(&*hash);
228 hasher.update(b"open");
229 hasher.update(partition.as_bytes());
230 hasher.update(name);
231 *hash = hasher.finalize().to_vec();
232 }
233
234 pub(crate) fn remove(&self, partition: &str, name: Option<&[u8]>) {
235 let mut hash = self.hash.lock().unwrap();
236 let mut hasher = Sha256::new();
237 hasher.update(&*hash);
238 hasher.update(b"remove");
239 hasher.update(partition.as_bytes());
240 if let Some(name) = name {
241 hasher.update(name);
242 }
243 *hash = hasher.finalize().to_vec();
244 }
245
246 pub(crate) fn scan(&self, partition: &str) {
247 let mut hash = self.hash.lock().unwrap();
248 let mut hasher = Sha256::new();
249 hasher.update(&*hash);
250 hasher.update(b"scan");
251 hasher.update(partition.as_bytes());
252 *hash = hasher.finalize().to_vec();
253 }
254
255 pub(crate) fn len(&self, partition: &str, name: &[u8]) {
256 let mut hash = self.hash.lock().unwrap();
257 let mut hasher = Sha256::new();
258 hasher.update(&*hash);
259 hasher.update(b"len");
260 hasher.update(partition.as_bytes());
261 hasher.update(name);
262 *hash = hasher.finalize().to_vec();
263 }
264
265 pub(crate) fn read_at(&self, partition: &str, name: &[u8], buf: usize, offset: u64) {
266 let mut hash = self.hash.lock().unwrap();
267 let mut hasher = Sha256::new();
268 hasher.update(&*hash);
269 hasher.update(b"read_at");
270 hasher.update(partition.as_bytes());
271 hasher.update(name);
272 hasher.update(buf.to_be_bytes());
273 hasher.update(offset.to_be_bytes());
274 *hash = hasher.finalize().to_vec();
275 }
276
277 pub(crate) fn write_at(&self, partition: &str, name: &[u8], buf: &[u8], offset: u64) {
278 let mut hash = self.hash.lock().unwrap();
279 let mut hasher = Sha256::new();
280 hasher.update(&*hash);
281 hasher.update(b"write_at");
282 hasher.update(partition.as_bytes());
283 hasher.update(name);
284 hasher.update(buf);
285 hasher.update(offset.to_be_bytes());
286 *hash = hasher.finalize().to_vec();
287 }
288
289 pub(crate) fn truncate(&self, partition: &str, name: &[u8], size: u64) {
290 let mut hash = self.hash.lock().unwrap();
291 let mut hasher = Sha256::new();
292 hasher.update(&*hash);
293 hasher.update(b"truncate");
294 hasher.update(partition.as_bytes());
295 hasher.update(name);
296 hasher.update(size.to_be_bytes());
297 *hash = hasher.finalize().to_vec();
298 }
299
300 pub(crate) fn sync(&self, partition: &str, name: &[u8]) {
301 let mut hash = self.hash.lock().unwrap();
302 let mut hasher = Sha256::new();
303 hasher.update(&*hash);
304 hasher.update(b"sync");
305 hasher.update(partition.as_bytes());
306 hasher.update(name);
307 *hash = hasher.finalize().to_vec();
308 }
309
310 pub(crate) fn close(&self, partition: &str, name: &[u8]) {
311 let mut hash = self.hash.lock().unwrap();
312 let mut hasher = Sha256::new();
313 hasher.update(&*hash);
314 hasher.update(b"close");
315 hasher.update(partition.as_bytes());
316 hasher.update(name);
317 *hash = hasher.finalize().to_vec();
318 }
319
320 fn register(&self, name: &str, help: &str) {
321 let mut hash = self.hash.lock().unwrap();
322 let mut hasher = Sha256::new();
323 hasher.update(&*hash);
324 hasher.update(b"register");
325 hasher.update(name.as_bytes());
326 hasher.update(help.as_bytes());
327 *hash = hasher.finalize().to_vec();
328 }
329
330 fn encode(&self) {
331 let mut hash = self.hash.lock().unwrap();
332 let mut hasher = Sha256::new();
333 hasher.update(&*hash);
334 hasher.update(b"encode");
335 *hash = hasher.finalize().to_vec();
336 }
337
338 pub fn state(&self) -> String {
343 let hash = self.hash.lock().unwrap().clone();
344 hex(&hash)
345 }
346}
347
348#[derive(Clone)]
350pub struct Config {
351 pub seed: u64,
353
354 pub cycle: Duration,
357
358 pub timeout: Option<Duration>,
360}
361
362impl Default for Config {
363 fn default() -> Self {
364 Self {
365 seed: 42,
366 cycle: Duration::from_millis(1),
367 timeout: None,
368 }
369 }
370}
371
372pub struct Executor {
374 registry: Mutex<Registry>,
375 cycle: Duration,
376 deadline: Option<SystemTime>,
377 metrics: Arc<Metrics>,
378 auditor: Arc<Auditor>,
379 rng: Mutex<StdRng>,
380 time: Mutex<SystemTime>,
381 tasks: Arc<Tasks>,
382 sleeping: Mutex<BinaryHeap<Alarm>>,
383 partitions: Mutex<HashMap<String, Partition>>,
384 signaler: Mutex<Signaler>,
385 signal: Signal,
386 finished: Mutex<bool>,
387 recovered: Mutex<bool>,
388}
389
390impl Executor {
391 pub fn init(cfg: Config) -> (Runner, Context, Arc<Auditor>) {
393 if cfg.timeout.is_some() && cfg.cycle == Duration::default() {
395 panic!("cycle duration must be non-zero when timeout is set");
396 }
397
398 let mut registry = Registry::default();
400 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
401
402 let metrics = Arc::new(Metrics::init(runtime_registry));
404 let auditor = Arc::new(Auditor::default());
405 let start_time = UNIX_EPOCH;
406 let deadline = cfg
407 .timeout
408 .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
409 let (signaler, signal) = Signaler::new();
410 let storage = MeteredStorage::new(
411 AuditedStorage::new(MemStorage::default(), auditor.clone()),
412 runtime_registry,
413 );
414 let executor = Arc::new(Self {
415 registry: Mutex::new(registry),
416 cycle: cfg.cycle,
417 deadline,
418 metrics: metrics.clone(),
419 auditor: auditor.clone(),
420 rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
421 time: Mutex::new(start_time),
422 tasks: Arc::new(Tasks::new()),
423 sleeping: Mutex::new(BinaryHeap::new()),
424 partitions: Mutex::new(HashMap::new()),
425 signaler: Mutex::new(signaler),
426 signal,
427 finished: Mutex::new(false),
428 recovered: Mutex::new(false),
429 });
430 (
431 Runner {
432 executor: executor.clone(),
433 },
434 Context {
435 label: String::new(),
436 spawned: false,
437 executor,
438 networking: Arc::new(Networking::new(metrics, auditor.clone())),
439 storage,
440 },
441 auditor,
442 )
443 }
444
445 pub fn seeded(seed: u64) -> (Runner, Context, Arc<Auditor>) {
448 let cfg = Config {
449 seed,
450 ..Config::default()
451 };
452 Self::init(cfg)
453 }
454
455 pub fn timed(timeout: Duration) -> (Runner, Context, Arc<Auditor>) {
458 let cfg = Config {
459 timeout: Some(timeout),
460 ..Config::default()
461 };
462 Self::init(cfg)
463 }
464
465 #[allow(clippy::should_implement_trait)]
468 pub fn default() -> (Runner, Context, Arc<Auditor>) {
469 Self::init(Config::default())
470 }
471}
472
473enum Operation {
475 Root,
476 Work {
477 future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
478 completed: Mutex<bool>,
479 },
480}
481
482struct Task {
484 id: u128,
485 label: String,
486 tasks: Arc<Tasks>,
487
488 operation: Operation,
489}
490
491impl ArcWake for Task {
492 fn wake_by_ref(arc_self: &Arc<Self>) {
493 arc_self.tasks.enqueue(arc_self.clone());
494 }
495}
496
497struct Tasks {
499 counter: Mutex<u128>,
501 queue: Mutex<Vec<Arc<Task>>>,
503 root_registered: Mutex<bool>,
505}
506
507impl Tasks {
508 fn new() -> Self {
510 Self {
511 counter: Mutex::new(0),
512 queue: Mutex::new(Vec::new()),
513 root_registered: Mutex::new(false),
514 }
515 }
516
517 fn increment(&self) -> u128 {
519 let mut counter = self.counter.lock().unwrap();
520 let old = *counter;
521 *counter = counter.checked_add(1).expect("task counter overflow");
522 old
523 }
524
525 fn register_root(arc_self: &Arc<Self>) {
529 {
530 let mut registered = arc_self.root_registered.lock().unwrap();
531 assert!(!*registered, "root already registered");
532 *registered = true;
533 }
534 let id = arc_self.increment();
535 let mut queue = arc_self.queue.lock().unwrap();
536 queue.push(Arc::new(Task {
537 id,
538 label: String::new(),
539 tasks: arc_self.clone(),
540 operation: Operation::Root,
541 }));
542 }
543
544 fn register_work(
546 arc_self: &Arc<Self>,
547 label: &str,
548 future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
549 ) {
550 let id = arc_self.increment();
551 let mut queue = arc_self.queue.lock().unwrap();
552 queue.push(Arc::new(Task {
553 id,
554 label: label.to_string(),
555 tasks: arc_self.clone(),
556 operation: Operation::Work {
557 future: Mutex::new(future),
558 completed: Mutex::new(false),
559 },
560 }));
561 }
562
563 fn enqueue(&self, task: Arc<Task>) {
565 let mut queue = self.queue.lock().unwrap();
566 queue.push(task);
567 }
568
569 fn drain(&self) -> Vec<Arc<Task>> {
571 let mut queue = self.queue.lock().unwrap();
572 let len = queue.len();
573 replace(&mut *queue, Vec::with_capacity(len))
574 }
575
576 fn len(&self) -> usize {
578 self.queue.lock().unwrap().len()
579 }
580}
581
582pub struct Runner {
584 executor: Arc<Executor>,
585}
586
587impl crate::Runner for Runner {
588 fn start<F>(self, f: F) -> F::Output
589 where
590 F: Future,
591 {
592 let mut root = Box::pin(f);
594
595 Tasks::register_root(&self.executor.tasks);
597
598 let mut iter = 0;
600 loop {
601 {
603 let current = self.executor.time.lock().unwrap();
604 if let Some(deadline) = self.executor.deadline {
605 if *current >= deadline {
606 panic!("runtime timeout");
607 }
608 }
609 }
610
611 let mut tasks = self.executor.tasks.drain();
613
614 {
616 let mut rng = self.executor.rng.lock().unwrap();
617 tasks.shuffle(&mut *rng);
618 }
619
620 trace!(iter, tasks = tasks.len(), "starting loop");
626 for task in tasks {
627 self.executor.auditor.process_task(task.id, &task.label);
629 trace!(id = task.id, "processing task");
630
631 self.executor
633 .metrics
634 .task_polls
635 .get_or_create(&Work {
636 label: task.label.clone(),
637 })
638 .inc();
639
640 let waker = waker_ref(&task);
642 let mut cx = task::Context::from_waker(&waker);
643 match &task.operation {
644 Operation::Root => {
645 if let Poll::Ready(output) = root.as_mut().poll(&mut cx) {
647 trace!(id = task.id, "task is complete");
648 *self.executor.finished.lock().unwrap() = true;
649 return output;
650 }
651 }
652 Operation::Work { future, completed } => {
653 if *completed.lock().unwrap() {
655 trace!(id = task.id, "dropping already complete task");
656 continue;
657 }
658
659 let mut fut = future.lock().unwrap();
661 if fut.as_mut().poll(&mut cx).is_ready() {
662 trace!(id = task.id, "task is complete");
663 *completed.lock().unwrap() = true;
664 continue;
665 }
666 }
667 }
668
669 trace!(id = task.id, "task is still pending");
671 }
672
673 let mut current;
678 {
679 let mut time = self.executor.time.lock().unwrap();
680 *time = time
681 .checked_add(self.executor.cycle)
682 .expect("executor time overflowed");
683 current = *time;
684 }
685 trace!(now = current.epoch_millis(), "time advanced");
686
687 if self.executor.tasks.len() == 0 {
689 let mut skip = None;
690 {
691 let sleeping = self.executor.sleeping.lock().unwrap();
692 if let Some(next) = sleeping.peek() {
693 if next.time > current {
694 skip = Some(next.time);
695 }
696 }
697 }
698 if skip.is_some() {
699 {
700 let mut time = self.executor.time.lock().unwrap();
701 *time = skip.unwrap();
702 current = *time;
703 }
704 trace!(now = current.epoch_millis(), "time skipped");
705 }
706 }
707
708 let mut to_wake = Vec::new();
710 let mut remaining;
711 {
712 let mut sleeping = self.executor.sleeping.lock().unwrap();
713 while let Some(next) = sleeping.peek() {
714 if next.time <= current {
715 let sleeper = sleeping.pop().unwrap();
716 to_wake.push(sleeper.waker);
717 } else {
718 break;
719 }
720 }
721 remaining = sleeping.len();
722 }
723 for waker in to_wake {
724 waker.wake();
725 }
726
727 remaining += self.executor.tasks.len();
729
730 if remaining == 0 {
733 panic!("runtime stalled");
734 }
735 iter += 1;
736 }
737 }
738}
739
740pub struct Context {
744 label: String,
745 spawned: bool,
746 executor: Arc<Executor>,
747 networking: Arc<Networking>,
748 storage: MeteredStorage<AuditedStorage<MemStorage>>,
749}
750
751impl Context {
752 pub fn recover(self) -> (Runner, Self, Arc<Auditor>) {
764 if !*self.executor.finished.lock().unwrap() {
766 panic!("execution is not finished");
767 }
768
769 {
771 let mut recovered = self.executor.recovered.lock().unwrap();
772 if *recovered {
773 panic!("runtime has already been recovered");
774 }
775 *recovered = true;
776 }
777
778 let mut registry = Registry::default();
780 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
781 let metrics = Arc::new(Metrics::init(runtime_registry));
782
783 let auditor = self.executor.auditor.clone();
785 let (signaler, signal) = Signaler::new();
786 let executor = Arc::new(Executor {
787 cycle: self.executor.cycle,
789 deadline: self.executor.deadline,
790 auditor: auditor.clone(),
791 rng: Mutex::new(self.executor.rng.lock().unwrap().clone()),
792 time: Mutex::new(*self.executor.time.lock().unwrap()),
793 partitions: Mutex::new(self.executor.partitions.lock().unwrap().clone()),
794
795 registry: Mutex::new(registry),
797 metrics: metrics.clone(),
798 tasks: Arc::new(Tasks::new()),
799 sleeping: Mutex::new(BinaryHeap::new()),
800 signaler: Mutex::new(signaler),
801 signal,
802 finished: Mutex::new(false),
803 recovered: Mutex::new(false),
804 });
805 (
806 Runner {
807 executor: executor.clone(),
808 },
809 Self {
810 label: String::new(),
811 spawned: false,
812 executor,
813 networking: Arc::new(Networking::new(metrics, auditor.clone())),
814 storage: self.storage,
815 },
816 auditor,
817 )
818 }
819}
820
821impl Clone for Context {
822 fn clone(&self) -> Self {
823 Self {
824 label: self.label.clone(),
825 spawned: false,
826 executor: self.executor.clone(),
827 networking: self.networking.clone(),
828 storage: self.storage.clone(),
829 }
830 }
831}
832
833impl crate::Spawner for Context {
834 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
835 where
836 F: FnOnce(Self) -> Fut + Send + 'static,
837 Fut: Future<Output = T> + Send + 'static,
838 T: Send + 'static,
839 {
840 assert!(!self.spawned, "already spawned");
842
843 let label = self.label.clone();
845 let work = Work {
846 label: label.clone(),
847 };
848 self.executor
849 .metrics
850 .tasks_spawned
851 .get_or_create(&work)
852 .inc();
853 let gauge = self
854 .executor
855 .metrics
856 .tasks_running
857 .get_or_create(&work)
858 .clone();
859
860 let executor = self.executor.clone();
862 let future = f(self);
863 let (f, handle) = Handle::init(future, gauge, false);
864
865 Tasks::register_work(&executor.tasks, &label, Box::pin(f));
867 handle
868 }
869
870 fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
871 where
872 F: Future<Output = T> + Send + 'static,
873 T: Send + 'static,
874 {
875 assert!(!self.spawned, "already spawned");
877 self.spawned = true;
878
879 let work = Work {
881 label: self.label.clone(),
882 };
883 self.executor
884 .metrics
885 .tasks_spawned
886 .get_or_create(&work)
887 .inc();
888 let gauge = self
889 .executor
890 .metrics
891 .tasks_running
892 .get_or_create(&work)
893 .clone();
894
895 let label = self.label.clone();
897 let executor = self.executor.clone();
898 move |f: F| {
899 let (f, handle) = Handle::init(f, gauge, false);
900
901 Tasks::register_work(&executor.tasks, &label, Box::pin(f));
903 handle
904 }
905 }
906
907 fn spawn_blocking<F, T>(self, f: F) -> Handle<T>
908 where
909 F: FnOnce() -> T + Send + 'static,
910 T: Send + 'static,
911 {
912 assert!(!self.spawned, "already spawned");
914
915 let work = Work {
917 label: self.label.clone(),
918 };
919 self.executor
920 .metrics
921 .blocking_tasks_spawned
922 .get_or_create(&work)
923 .inc();
924 let gauge = self
925 .executor
926 .metrics
927 .blocking_tasks_running
928 .get_or_create(&work)
929 .clone();
930
931 let (f, handle) = Handle::init_blocking(f, gauge, false);
933
934 let f = async move { f() };
936 Tasks::register_work(&self.executor.tasks, &self.label, Box::pin(f));
937 handle
938 }
939
940 fn stop(&self, value: i32) {
941 self.executor.auditor.stop(value);
942 self.executor.signaler.lock().unwrap().signal(value);
943 }
944
945 fn stopped(&self) -> Signal {
946 self.executor.auditor.stopped();
947 self.executor.signal.clone()
948 }
949}
950
951impl crate::Metrics for Context {
952 fn with_label(&self, label: &str) -> Self {
953 let label = {
954 let prefix = self.label.clone();
955 if prefix.is_empty() {
956 label.to_string()
957 } else {
958 format!("{}_{}", prefix, label)
959 }
960 };
961 assert!(
962 !label.starts_with(METRICS_PREFIX),
963 "using runtime label is not allowed"
964 );
965 Self {
966 label,
967 spawned: false,
968 executor: self.executor.clone(),
969 networking: self.networking.clone(),
970 storage: self.storage.clone(),
971 }
972 }
973
974 fn label(&self) -> String {
975 self.label.clone()
976 }
977
978 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
979 let name = name.into();
981 let help = help.into();
982
983 self.executor.auditor.register(&name, &help);
985 let prefixed_name = {
986 let prefix = &self.label;
987 if prefix.is_empty() {
988 name
989 } else {
990 format!("{}_{}", *prefix, name)
991 }
992 };
993 self.executor
994 .registry
995 .lock()
996 .unwrap()
997 .register(prefixed_name, help, metric)
998 }
999
1000 fn encode(&self) -> String {
1001 self.executor.auditor.encode();
1002 let mut buffer = String::new();
1003 encode(&mut buffer, &self.executor.registry.lock().unwrap()).expect("encoding failed");
1004 buffer
1005 }
1006}
1007
1008struct Sleeper {
1009 executor: Arc<Executor>,
1010 time: SystemTime,
1011 registered: bool,
1012}
1013
1014struct Alarm {
1015 time: SystemTime,
1016 waker: Waker,
1017}
1018
1019impl PartialEq for Alarm {
1020 fn eq(&self, other: &Self) -> bool {
1021 self.time.eq(&other.time)
1022 }
1023}
1024
1025impl Eq for Alarm {}
1026
1027impl PartialOrd for Alarm {
1028 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1029 Some(self.cmp(other))
1030 }
1031}
1032
1033impl Ord for Alarm {
1034 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1035 other.time.cmp(&self.time)
1037 }
1038}
1039
1040impl Future for Sleeper {
1041 type Output = ();
1042
1043 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1044 {
1045 let current_time = *self.executor.time.lock().unwrap();
1046 if current_time >= self.time {
1047 return Poll::Ready(());
1048 }
1049 }
1050 if !self.registered {
1051 self.registered = true;
1052 self.executor.sleeping.lock().unwrap().push(Alarm {
1053 time: self.time,
1054 waker: cx.waker().clone(),
1055 });
1056 }
1057 Poll::Pending
1058 }
1059}
1060
1061impl Clock for Context {
1062 fn current(&self) -> SystemTime {
1063 *self.executor.time.lock().unwrap()
1064 }
1065
1066 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
1067 let deadline = self
1068 .current()
1069 .checked_add(duration)
1070 .expect("overflow when setting wake time");
1071 self.sleep_until(deadline)
1072 }
1073
1074 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
1075 Sleeper {
1076 executor: self.executor.clone(),
1077
1078 time: deadline,
1079 registered: false,
1080 }
1081 }
1082}
1083
1084impl GClock for Context {
1085 type Instant = SystemTime;
1086
1087 fn now(&self) -> Self::Instant {
1088 self.current()
1089 }
1090}
1091
1092impl ReasonablyRealtime for Context {}
1093
1094type Dialable = mpsc::UnboundedSender<(
1095 SocketAddr,
1096 mocks::Sink, mocks::Stream, )>;
1099
1100struct Networking {
1107 metrics: Arc<Metrics>,
1108 auditor: Arc<Auditor>,
1109 ephemeral: Mutex<u16>,
1110 listeners: Mutex<HashMap<SocketAddr, Dialable>>,
1111}
1112
1113impl Networking {
1114 fn new(metrics: Arc<Metrics>, auditor: Arc<Auditor>) -> Self {
1115 Self {
1116 metrics,
1117 auditor,
1118 ephemeral: Mutex::new(EPHEMERAL_PORT_RANGE.start),
1119 listeners: Mutex::new(HashMap::new()),
1120 }
1121 }
1122
1123 fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
1124 self.auditor.bind(socket);
1125
1126 if socket.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
1129 && EPHEMERAL_PORT_RANGE.contains(&socket.port())
1130 {
1131 return Err(Error::BindFailed);
1132 }
1133
1134 let mut listeners = self.listeners.lock().unwrap();
1136 if listeners.contains_key(&socket) {
1137 return Err(Error::BindFailed);
1138 }
1139
1140 let (sender, receiver) = mpsc::unbounded();
1142 listeners.insert(socket, sender);
1143 Ok(Listener {
1144 auditor: self.auditor.clone(),
1145 address: socket,
1146 listener: receiver,
1147 metrics: self.metrics.clone(),
1148 })
1149 }
1150
1151 async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
1152 let dialer = {
1154 let mut ephemeral = self.ephemeral.lock().unwrap();
1155 let dialer = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), *ephemeral);
1156 *ephemeral = ephemeral
1157 .checked_add(1)
1158 .expect("ephemeral port range exhausted");
1159 dialer
1160 };
1161 self.auditor.dial(dialer, socket);
1162
1163 let mut sender = {
1165 let listeners = self.listeners.lock().unwrap();
1166 let sender = listeners.get(&socket).ok_or(Error::ConnectionFailed)?;
1167 sender.clone()
1168 };
1169
1170 let (dialer_sender, dialer_receiver) = mocks::Channel::init();
1172 let (listener_sender, listener_receiver) = mocks::Channel::init();
1173 sender
1174 .send((dialer, dialer_sender, listener_receiver))
1175 .await
1176 .map_err(|_| Error::ConnectionFailed)?;
1177 Ok((
1178 Sink {
1179 metrics: self.metrics.clone(),
1180 auditor: self.auditor.clone(),
1181 me: dialer,
1182 peer: socket,
1183 sender: listener_sender,
1184 },
1185 Stream {
1186 auditor: self.auditor.clone(),
1187 me: dialer,
1188 peer: socket,
1189 receiver: dialer_receiver,
1190 },
1191 ))
1192 }
1193}
1194
1195impl crate::Network<Listener, Sink, Stream> for Context {
1196 async fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
1197 self.networking.bind(socket)
1198 }
1199
1200 async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
1201 self.networking.dial(socket).await
1202 }
1203}
1204
1205pub struct Listener {
1207 metrics: Arc<Metrics>,
1208 auditor: Arc<Auditor>,
1209 address: SocketAddr,
1210 listener: mpsc::UnboundedReceiver<(SocketAddr, mocks::Sink, mocks::Stream)>,
1211}
1212
1213impl crate::Listener<Sink, Stream> for Listener {
1214 async fn accept(&mut self) -> Result<(SocketAddr, Sink, Stream), Error> {
1215 let (socket, sender, receiver) = self.listener.next().await.ok_or(Error::ReadFailed)?;
1216 self.auditor.accept(self.address, socket);
1217 Ok((
1218 socket,
1219 Sink {
1220 metrics: self.metrics.clone(),
1221 auditor: self.auditor.clone(),
1222 me: self.address,
1223 peer: socket,
1224 sender,
1225 },
1226 Stream {
1227 auditor: self.auditor.clone(),
1228 me: self.address,
1229 peer: socket,
1230 receiver,
1231 },
1232 ))
1233 }
1234}
1235
1236pub struct Sink {
1238 metrics: Arc<Metrics>,
1239 auditor: Arc<Auditor>,
1240 me: SocketAddr,
1241 peer: SocketAddr,
1242 sender: mocks::Sink,
1243}
1244
1245impl crate::Sink for Sink {
1246 async fn send(&mut self, msg: &[u8]) -> Result<(), Error> {
1247 self.auditor.send(self.me, self.peer, msg);
1248 self.sender.send(msg).await.map_err(|_| Error::SendFailed)?;
1249 self.metrics.network_bandwidth.inc_by(msg.len() as u64);
1250 Ok(())
1251 }
1252}
1253
1254pub struct Stream {
1256 auditor: Arc<Auditor>,
1257 me: SocketAddr,
1258 peer: SocketAddr,
1259 receiver: mocks::Stream,
1260}
1261
1262impl crate::Stream for Stream {
1263 async fn recv(&mut self, buf: &mut [u8]) -> Result<(), Error> {
1264 self.receiver
1265 .recv(buf)
1266 .await
1267 .map_err(|_| Error::RecvFailed)?;
1268 self.auditor.recv(self.me, self.peer, buf);
1269 Ok(())
1270 }
1271}
1272
1273impl RngCore for Context {
1274 fn next_u32(&mut self) -> u32 {
1275 self.executor.auditor.rand("next_u32".to_string());
1276 self.executor.rng.lock().unwrap().next_u32()
1277 }
1278
1279 fn next_u64(&mut self) -> u64 {
1280 self.executor.auditor.rand("next_u64".to_string());
1281 self.executor.rng.lock().unwrap().next_u64()
1282 }
1283
1284 fn fill_bytes(&mut self, dest: &mut [u8]) {
1285 self.executor.auditor.rand("fill_bytes".to_string());
1286 self.executor.rng.lock().unwrap().fill_bytes(dest)
1287 }
1288
1289 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1290 self.executor.auditor.rand("try_fill_bytes".to_string());
1291 self.executor.rng.lock().unwrap().try_fill_bytes(dest)
1292 }
1293}
1294
1295impl CryptoRng for Context {}
1296
1297impl crate::Storage for Context {
1298 type Blob = <MeteredStorage<AuditedStorage<MemStorage>> as crate::Storage>::Blob;
1299
1300 async fn open(&self, partition: &str, name: &[u8]) -> Result<Self::Blob, Error> {
1301 self.storage.open(partition, name).await
1302 }
1303
1304 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1305 self.storage.remove(partition, name).await
1306 }
1307
1308 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1309 self.storage.scan(partition).await
1310 }
1311}
1312
1313#[cfg(test)]
1314mod tests {
1315 use super::*;
1316 use crate::{utils::run_tasks, Blob, Runner, Storage};
1317 use commonware_macros::test_traced;
1318 use futures::task::noop_waker;
1319
1320 fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1321 let (executor, context, auditor) = Executor::seeded(seed);
1322 let messages = run_tasks(5, executor, context);
1323 (auditor.state(), messages)
1324 }
1325
1326 #[test]
1327 fn test_same_seed_same_order() {
1328 let mut outputs = Vec::new();
1330 for seed in 0..1000 {
1331 let output = run_with_seed(seed);
1332 outputs.push(output);
1333 }
1334
1335 for seed in 0..1000 {
1337 let output = run_with_seed(seed);
1338 assert_eq!(output, outputs[seed as usize]);
1339 }
1340 }
1341
1342 #[test_traced("TRACE")]
1343 fn test_different_seeds_different_order() {
1344 let output1 = run_with_seed(12345);
1345 let output2 = run_with_seed(54321);
1346 assert_ne!(output1, output2);
1347 }
1348
1349 #[test]
1350 fn test_alarm_min_heap() {
1351 let now = SystemTime::now();
1353 let alarms = vec![
1354 Alarm {
1355 time: now + Duration::new(10, 0),
1356 waker: noop_waker(),
1357 },
1358 Alarm {
1359 time: now + Duration::new(5, 0),
1360 waker: noop_waker(),
1361 },
1362 Alarm {
1363 time: now + Duration::new(15, 0),
1364 waker: noop_waker(),
1365 },
1366 Alarm {
1367 time: now + Duration::new(5, 0),
1368 waker: noop_waker(),
1369 },
1370 ];
1371 let mut heap = BinaryHeap::new();
1372 for alarm in alarms {
1373 heap.push(alarm);
1374 }
1375
1376 let mut sorted_times = Vec::new();
1378 while let Some(alarm) = heap.pop() {
1379 sorted_times.push(alarm.time);
1380 }
1381 assert_eq!(
1382 sorted_times,
1383 vec![
1384 now + Duration::new(5, 0),
1385 now + Duration::new(5, 0),
1386 now + Duration::new(10, 0),
1387 now + Duration::new(15, 0),
1388 ]
1389 );
1390 }
1391
1392 #[test]
1393 #[should_panic(expected = "runtime timeout")]
1394 fn test_timeout() {
1395 let (executor, context, _) = Executor::timed(Duration::from_secs(10));
1396 executor.start(async move {
1397 loop {
1398 context.sleep(Duration::from_secs(1)).await;
1399 }
1400 });
1401 }
1402
1403 #[test]
1404 #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1405 fn test_bad_timeout() {
1406 let cfg = Config {
1407 timeout: Some(Duration::default()),
1408 cycle: Duration::default(),
1409 ..Config::default()
1410 };
1411 Executor::init(cfg);
1412 }
1413
1414 #[test]
1415 fn test_recover_synced_storage_persists() {
1416 let (executor1, context1, auditor1) = Executor::default();
1418 let partition = "test_partition";
1419 let name = b"test_blob";
1420 let data = b"Hello, world!".to_vec();
1421
1422 executor1.start({
1424 let context = context1.clone();
1425 let data = data.clone();
1426 async move {
1427 let blob = context.open(partition, name).await.unwrap();
1428 blob.write_at(&data, 0).await.unwrap();
1429 blob.sync().await.unwrap();
1430 }
1431 });
1432 let state1 = auditor1.state();
1433
1434 let (executor2, context2, auditor2) = context1.recover();
1436
1437 let state2 = auditor2.state();
1439 assert_eq!(state1, state2);
1440
1441 executor2.start(async move {
1443 let blob = context2.open(partition, name).await.unwrap();
1444 let len = blob.len().await.unwrap();
1445 assert_eq!(len, data.len() as u64);
1446 let mut buf = vec![0; len as usize];
1447 blob.read_at(&mut buf, 0).await.unwrap();
1448 assert_eq!(buf, data);
1449 });
1450 }
1451
1452 #[test]
1453 fn test_recover_unsynced_storage_does_not_persist() {
1454 let (executor1, context1, _) = Executor::default();
1456 let partition = "test_partition";
1457 let name = b"test_blob";
1458 let data = b"Hello, world!".to_vec();
1459
1460 executor1.start({
1462 let context = context1.clone();
1463 async move {
1464 let blob = context.open(partition, name).await.unwrap();
1465 blob.write_at(&data, 0).await.unwrap();
1466 }
1468 });
1469
1470 let (executor2, context2, _) = context1.recover();
1472
1473 executor2.start(async move {
1475 let blob = context2.open(partition, name).await.unwrap();
1476 let len = blob.len().await.unwrap();
1477 assert_eq!(len, 0);
1478 });
1479 }
1480
1481 #[test]
1482 #[should_panic(expected = "execution is not finished")]
1483 fn test_recover_before_finish_panics() {
1484 let (_, context, _) = Executor::default();
1486
1487 context.recover();
1489 }
1490
1491 #[test]
1492 #[should_panic(expected = "runtime has already been recovered")]
1493 fn test_recover_twice_panics() {
1494 let (executor, context, _) = Executor::default();
1496
1497 executor.start(async move {});
1499
1500 let cloned_context = context.clone();
1502 context.recover();
1503
1504 cloned_context.recover();
1506 }
1507}