1use crate::{mocks, utils::Signaler, Clock, Error, Handle, Signal, METRICS_PREFIX};
26use commonware_utils::{hex, SystemTimeExt};
27use futures::{
28 channel::mpsc,
29 task::{waker_ref, ArcWake},
30 SinkExt, StreamExt,
31};
32use governor::clock::{Clock as GClock, ReasonablyRealtime};
33use prometheus_client::{
34 encoding::{text::encode, EncodeLabelSet},
35 metrics::{counter::Counter, family::Family, gauge::Gauge},
36 registry::{Metric, Registry},
37};
38use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
39use sha2::{Digest, Sha256};
40use std::{
41 collections::{BinaryHeap, HashMap},
42 future::Future,
43 mem::replace,
44 net::{IpAddr, Ipv4Addr, SocketAddr},
45 ops::Range,
46 pin::Pin,
47 sync::{Arc, Mutex, RwLock},
48 task::{self, Poll, Waker},
49 time::{Duration, SystemTime, UNIX_EPOCH},
50};
51use tracing::trace;
52
53const EPHEMERAL_PORT_RANGE: Range<u16> = 32768..61000;
55
56pub type Partition = HashMap<Vec<u8>, Vec<u8>>;
58
59#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
60struct Work {
61 label: String,
62}
63
64#[derive(Debug)]
65struct Metrics {
66 tasks_spawned: Family<Work, Counter>,
67 tasks_running: Family<Work, Gauge>,
68 task_polls: Family<Work, Counter>,
69
70 network_bandwidth: Counter,
71
72 open_blobs: Gauge,
73 storage_reads: Counter,
74 storage_read_bandwidth: Counter,
75 storage_writes: Counter,
76 storage_write_bandwidth: Counter,
77}
78
79impl Metrics {
80 pub fn init(registry: &mut Registry) -> Self {
81 let metrics = Self {
82 task_polls: Family::default(),
83 tasks_running: Family::default(),
84 tasks_spawned: Family::default(),
85 network_bandwidth: Counter::default(),
86 open_blobs: Gauge::default(),
87 storage_reads: Counter::default(),
88 storage_read_bandwidth: Counter::default(),
89 storage_writes: Counter::default(),
90 storage_write_bandwidth: Counter::default(),
91 };
92 registry.register(
93 "tasks_spawned",
94 "Total number of tasks spawned",
95 metrics.tasks_spawned.clone(),
96 );
97 registry.register(
98 "tasks_running",
99 "Number of tasks currently running",
100 metrics.tasks_running.clone(),
101 );
102 registry.register(
103 "task_polls",
104 "Total number of task polls",
105 metrics.task_polls.clone(),
106 );
107 registry.register(
108 "bandwidth",
109 "Total amount of data sent over network",
110 metrics.network_bandwidth.clone(),
111 );
112 registry.register(
113 "open_blobs",
114 "Number of open blobs",
115 metrics.open_blobs.clone(),
116 );
117 registry.register(
118 "storage_reads",
119 "Total number of disk reads",
120 metrics.storage_reads.clone(),
121 );
122 registry.register(
123 "storage_read_bandwidth",
124 "Total amount of data read from disk",
125 metrics.storage_read_bandwidth.clone(),
126 );
127 registry.register(
128 "storage_writes",
129 "Total number of disk writes",
130 metrics.storage_writes.clone(),
131 );
132 registry.register(
133 "storage_write_bandwidth",
134 "Total amount of data written to disk",
135 metrics.storage_write_bandwidth.clone(),
136 );
137 metrics
138 }
139}
140
141pub struct Auditor {
143 hash: Mutex<Vec<u8>>,
144}
145
146impl Auditor {
147 fn new() -> Self {
148 Self {
149 hash: Mutex::new(Vec::new()),
150 }
151 }
152
153 fn process_task(&self, task: u128, label: &str) {
154 let mut hash = self.hash.lock().unwrap();
155 let mut hasher = Sha256::new();
156 hasher.update(&*hash);
157 hasher.update(b"process_task");
158 hasher.update(task.to_be_bytes());
159 hasher.update(label.as_bytes());
160 *hash = hasher.finalize().to_vec();
161 }
162
163 fn stop(&self, value: i32) {
164 let mut hash = self.hash.lock().unwrap();
165 let mut hasher = Sha256::new();
166 hasher.update(&*hash);
167 hasher.update(b"stop");
168 hasher.update(value.to_be_bytes());
169 *hash = hasher.finalize().to_vec();
170 }
171
172 fn stopped(&self) {
173 let mut hash = self.hash.lock().unwrap();
174 let mut hasher = Sha256::new();
175 hasher.update(&*hash);
176 hasher.update(b"stopped");
177 *hash = hasher.finalize().to_vec();
178 }
179
180 fn bind(&self, address: SocketAddr) {
181 let mut hash = self.hash.lock().unwrap();
182 let mut hasher = Sha256::new();
183 hasher.update(&*hash);
184 hasher.update(b"bind");
185 hasher.update(address.to_string().as_bytes());
186 *hash = hasher.finalize().to_vec();
187 }
188
189 fn dial(&self, dialer: SocketAddr, dialee: SocketAddr) {
190 let mut hash = self.hash.lock().unwrap();
191 let mut hasher = Sha256::new();
192 hasher.update(&*hash);
193 hasher.update(b"dial");
194 hasher.update(dialer.to_string().as_bytes());
195 hasher.update(dialee.to_string().as_bytes());
196 *hash = hasher.finalize().to_vec();
197 }
198
199 fn accept(&self, dialee: SocketAddr, dialer: SocketAddr) {
200 let mut hash = self.hash.lock().unwrap();
201 let mut hasher = Sha256::new();
202 hasher.update(&*hash);
203 hasher.update(b"accept");
204 hasher.update(dialee.to_string().as_bytes());
205 hasher.update(dialer.to_string().as_bytes());
206 *hash = hasher.finalize().to_vec();
207 }
208
209 fn send(&self, sender: SocketAddr, receiver: SocketAddr, message: &[u8]) {
210 let mut hash = self.hash.lock().unwrap();
211 let mut hasher = Sha256::new();
212 hasher.update(&*hash);
213 hasher.update(b"send");
214 hasher.update(sender.to_string().as_bytes());
215 hasher.update(receiver.to_string().as_bytes());
216 hasher.update(message);
217 *hash = hasher.finalize().to_vec();
218 }
219
220 fn recv(&self, receiver: SocketAddr, sender: SocketAddr, message: &[u8]) {
221 let mut hash = self.hash.lock().unwrap();
222 let mut hasher = Sha256::new();
223 hasher.update(&*hash);
224 hasher.update(b"recv");
225 hasher.update(receiver.to_string().as_bytes());
226 hasher.update(sender.to_string().as_bytes());
227 hasher.update(message);
228 *hash = hasher.finalize().to_vec();
229 }
230
231 fn rand(&self, method: String) {
232 let mut hash = self.hash.lock().unwrap();
233 let mut hasher = Sha256::new();
234 hasher.update(&*hash);
235 hasher.update(b"rand");
236 hasher.update(method.as_bytes());
237 *hash = hasher.finalize().to_vec();
238 }
239
240 fn open(&self, partition: &str, name: &[u8]) {
241 let mut hash = self.hash.lock().unwrap();
242 let mut hasher = Sha256::new();
243 hasher.update(&*hash);
244 hasher.update(b"open");
245 hasher.update(partition.as_bytes());
246 hasher.update(name);
247 *hash = hasher.finalize().to_vec();
248 }
249
250 fn remove(&self, partition: &str, name: Option<&[u8]>) {
251 let mut hash = self.hash.lock().unwrap();
252 let mut hasher = Sha256::new();
253 hasher.update(&*hash);
254 hasher.update(b"remove");
255 hasher.update(partition.as_bytes());
256 if let Some(name) = name {
257 hasher.update(name);
258 }
259 *hash = hasher.finalize().to_vec();
260 }
261
262 fn scan(&self, partition: &str) {
263 let mut hash = self.hash.lock().unwrap();
264 let mut hasher = Sha256::new();
265 hasher.update(&*hash);
266 hasher.update(b"scan");
267 hasher.update(partition.as_bytes());
268 *hash = hasher.finalize().to_vec();
269 }
270
271 fn len(&self, partition: &str, name: &[u8]) {
272 let mut hash = self.hash.lock().unwrap();
273 let mut hasher = Sha256::new();
274 hasher.update(&*hash);
275 hasher.update(b"len");
276 hasher.update(partition.as_bytes());
277 hasher.update(name);
278 *hash = hasher.finalize().to_vec();
279 }
280
281 fn read_at(&self, partition: &str, name: &[u8], buf: usize, offset: u64) {
282 let mut hash = self.hash.lock().unwrap();
283 let mut hasher = Sha256::new();
284 hasher.update(&*hash);
285 hasher.update(b"read_at");
286 hasher.update(partition.as_bytes());
287 hasher.update(name);
288 hasher.update(buf.to_be_bytes());
289 hasher.update(offset.to_be_bytes());
290 *hash = hasher.finalize().to_vec();
291 }
292
293 fn write_at(&self, partition: &str, name: &[u8], buf: &[u8], offset: u64) {
294 let mut hash = self.hash.lock().unwrap();
295 let mut hasher = Sha256::new();
296 hasher.update(&*hash);
297 hasher.update(b"write_at");
298 hasher.update(partition.as_bytes());
299 hasher.update(name);
300 hasher.update(buf);
301 hasher.update(offset.to_be_bytes());
302 *hash = hasher.finalize().to_vec();
303 }
304
305 fn truncate(&self, partition: &str, name: &[u8], size: u64) {
306 let mut hash = self.hash.lock().unwrap();
307 let mut hasher = Sha256::new();
308 hasher.update(&*hash);
309 hasher.update(b"truncate");
310 hasher.update(partition.as_bytes());
311 hasher.update(name);
312 hasher.update(size.to_be_bytes());
313 *hash = hasher.finalize().to_vec();
314 }
315
316 fn sync(&self, partition: &str, name: &[u8]) {
317 let mut hash = self.hash.lock().unwrap();
318 let mut hasher = Sha256::new();
319 hasher.update(&*hash);
320 hasher.update(b"sync");
321 hasher.update(partition.as_bytes());
322 hasher.update(name);
323 *hash = hasher.finalize().to_vec();
324 }
325
326 fn close(&self, partition: &str, name: &[u8]) {
327 let mut hash = self.hash.lock().unwrap();
328 let mut hasher = Sha256::new();
329 hasher.update(&*hash);
330 hasher.update(b"close");
331 hasher.update(partition.as_bytes());
332 hasher.update(name);
333 *hash = hasher.finalize().to_vec();
334 }
335
336 fn register(&self, name: &str, help: &str) {
337 let mut hash = self.hash.lock().unwrap();
338 let mut hasher = Sha256::new();
339 hasher.update(&*hash);
340 hasher.update(b"register");
341 hasher.update(name.as_bytes());
342 hasher.update(help.as_bytes());
343 *hash = hasher.finalize().to_vec();
344 }
345
346 fn encode(&self) {
347 let mut hash = self.hash.lock().unwrap();
348 let mut hasher = Sha256::new();
349 hasher.update(&*hash);
350 hasher.update(b"encode");
351 *hash = hasher.finalize().to_vec();
352 }
353
354 pub fn state(&self) -> String {
359 let hash = self.hash.lock().unwrap().clone();
360 hex(&hash)
361 }
362}
363
364struct Task {
365 id: u128,
366 label: String,
367
368 tasks: Arc<Tasks>,
369
370 root: bool,
371 future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
372
373 completed: Mutex<bool>,
374}
375
376impl ArcWake for Task {
377 fn wake_by_ref(arc_self: &Arc<Self>) {
378 arc_self.tasks.enqueue(arc_self.clone());
379 }
380}
381
382struct Tasks {
383 counter: Mutex<u128>,
384 queue: Mutex<Vec<Arc<Task>>>,
385}
386
387impl Tasks {
388 fn register(
389 arc_self: &Arc<Self>,
390 label: &str,
391 root: bool,
392 future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
393 ) {
394 let mut queue = arc_self.queue.lock().unwrap();
395 let id = {
396 let mut l = arc_self.counter.lock().unwrap();
397 let old = *l;
398 *l = l.checked_add(1).expect("task counter overflow");
399 old
400 };
401 queue.push(Arc::new(Task {
402 id,
403 label: label.to_string(),
404 root,
405 future: Mutex::new(future),
406 tasks: arc_self.clone(),
407 completed: Mutex::new(false),
408 }));
409 }
410
411 fn enqueue(&self, task: Arc<Task>) {
412 let mut queue = self.queue.lock().unwrap();
413 queue.push(task);
414 }
415
416 fn drain(&self) -> Vec<Arc<Task>> {
417 let mut queue = self.queue.lock().unwrap();
418 let len = queue.len();
419 replace(&mut *queue, Vec::with_capacity(len))
420 }
421
422 fn len(&self) -> usize {
423 self.queue.lock().unwrap().len()
424 }
425}
426
427#[derive(Clone)]
429pub struct Config {
430 pub seed: u64,
432
433 pub cycle: Duration,
436
437 pub timeout: Option<Duration>,
439}
440
441impl Default for Config {
442 fn default() -> Self {
443 Self {
444 seed: 42,
445 cycle: Duration::from_millis(1),
446 timeout: None,
447 }
448 }
449}
450
451pub struct Executor {
453 registry: Mutex<Registry>,
454 cycle: Duration,
455 deadline: Option<SystemTime>,
456 metrics: Arc<Metrics>,
457 auditor: Arc<Auditor>,
458 rng: Mutex<StdRng>,
459 time: Mutex<SystemTime>,
460 tasks: Arc<Tasks>,
461 sleeping: Mutex<BinaryHeap<Alarm>>,
462 partitions: Mutex<HashMap<String, Partition>>,
463 signaler: Mutex<Signaler>,
464 signal: Signal,
465 finished: Mutex<bool>,
466 recovered: Mutex<bool>,
467}
468
469impl Executor {
470 pub fn init(cfg: Config) -> (Runner, Context, Arc<Auditor>) {
472 if cfg.timeout.is_some() && cfg.cycle == Duration::default() {
474 panic!("cycle duration must be non-zero when timeout is set");
475 }
476
477 let mut registry = Registry::default();
479 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
480
481 let metrics = Arc::new(Metrics::init(runtime_registry));
483 let auditor = Arc::new(Auditor::new());
484 let start_time = UNIX_EPOCH;
485 let deadline = cfg
486 .timeout
487 .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
488 let (signaler, signal) = Signaler::new();
489 let executor = Arc::new(Self {
490 registry: Mutex::new(registry),
491 cycle: cfg.cycle,
492 deadline,
493 metrics: metrics.clone(),
494 auditor: auditor.clone(),
495 rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
496 time: Mutex::new(start_time),
497 tasks: Arc::new(Tasks {
498 queue: Mutex::new(Vec::new()),
499 counter: Mutex::new(0),
500 }),
501 sleeping: Mutex::new(BinaryHeap::new()),
502 partitions: Mutex::new(HashMap::new()),
503 signaler: Mutex::new(signaler),
504 signal,
505 finished: Mutex::new(false),
506 recovered: Mutex::new(false),
507 });
508 (
509 Runner {
510 executor: executor.clone(),
511 },
512 Context {
513 label: String::new(),
514 spawned: false,
515 executor,
516 networking: Arc::new(Networking::new(metrics, auditor.clone())),
517 },
518 auditor,
519 )
520 }
521
522 pub fn seeded(seed: u64) -> (Runner, Context, Arc<Auditor>) {
525 let cfg = Config {
526 seed,
527 ..Config::default()
528 };
529 Self::init(cfg)
530 }
531
532 pub fn timed(timeout: Duration) -> (Runner, Context, Arc<Auditor>) {
535 let cfg = Config {
536 timeout: Some(timeout),
537 ..Config::default()
538 };
539 Self::init(cfg)
540 }
541
542 #[allow(clippy::should_implement_trait)]
545 pub fn default() -> (Runner, Context, Arc<Auditor>) {
546 Self::init(Config::default())
547 }
548}
549
550pub struct Runner {
552 executor: Arc<Executor>,
553}
554
555impl crate::Runner for Runner {
556 fn start<F>(self, f: F) -> F::Output
557 where
558 F: Future + Send + 'static,
559 F::Output: Send + 'static,
560 {
561 let output = Arc::new(Mutex::new(None));
563 Tasks::register(
564 &self.executor.tasks,
565 "",
566 true,
567 Box::pin({
568 let output = output.clone();
569 async move {
570 *output.lock().unwrap() = Some(f.await);
571 }
572 }),
573 );
574
575 let mut iter = 0;
577 loop {
578 {
580 let current = self.executor.time.lock().unwrap();
581 if let Some(deadline) = self.executor.deadline {
582 if *current >= deadline {
583 panic!("runtime timeout");
584 }
585 }
586 }
587
588 let mut tasks = self.executor.tasks.drain();
590
591 {
593 let mut rng = self.executor.rng.lock().unwrap();
594 tasks.shuffle(&mut *rng);
595 }
596
597 trace!(iter, tasks = tasks.len(), "starting loop");
603 for task in tasks {
604 self.executor.auditor.process_task(task.id, &task.label);
606
607 if *task.completed.lock().unwrap() {
609 trace!(id = task.id, "skipping already completed task");
610 continue;
611 }
612 trace!(id = task.id, "processing task");
613
614 let waker = waker_ref(&task);
616 let mut context = task::Context::from_waker(&waker);
617 let mut future = task.future.lock().unwrap();
618
619 self.executor
621 .metrics
622 .task_polls
623 .get_or_create(&Work {
624 label: task.label.clone(),
625 })
626 .inc();
627
628 let pending = future.as_mut().poll(&mut context).is_pending();
631 if pending {
632 trace!(id = task.id, "task is still pending");
633 continue;
634 }
635
636 *task.completed.lock().unwrap() = true;
638 trace!(id = task.id, "task is complete");
639
640 if task.root {
642 *self.executor.finished.lock().unwrap() = true;
643 return output.lock().unwrap().take().unwrap();
644 }
645 }
646
647 let mut current;
652 {
653 let mut time = self.executor.time.lock().unwrap();
654 *time = time
655 .checked_add(self.executor.cycle)
656 .expect("executor time overflowed");
657 current = *time;
658 }
659 trace!(now = current.epoch_millis(), "time advanced",);
660
661 if self.executor.tasks.len() == 0 {
663 let mut skip = None;
664 {
665 let sleeping = self.executor.sleeping.lock().unwrap();
666 if let Some(next) = sleeping.peek() {
667 if next.time > current {
668 skip = Some(next.time);
669 }
670 }
671 }
672 if skip.is_some() {
673 {
674 let mut time = self.executor.time.lock().unwrap();
675 *time = skip.unwrap();
676 current = *time;
677 }
678 trace!(now = current.epoch_millis(), "time skipped",);
679 }
680 }
681
682 let mut to_wake = Vec::new();
684 let mut remaining;
685 {
686 let mut sleeping = self.executor.sleeping.lock().unwrap();
687 while let Some(next) = sleeping.peek() {
688 if next.time <= current {
689 let sleeper = sleeping.pop().unwrap();
690 to_wake.push(sleeper.waker);
691 } else {
692 break;
693 }
694 }
695 remaining = sleeping.len();
696 }
697 for waker in to_wake {
698 waker.wake();
699 }
700
701 remaining += self.executor.tasks.len();
703
704 if remaining == 0 {
707 panic!("runtime stalled");
708 }
709 iter += 1;
710 }
711 }
712}
713
714pub struct Context {
718 label: String,
719 spawned: bool,
720 executor: Arc<Executor>,
721 networking: Arc<Networking>,
722}
723
724impl Context {
725 pub fn recover(self) -> (Runner, Self, Arc<Auditor>) {
737 if !*self.executor.finished.lock().unwrap() {
739 panic!("execution is not finished");
740 }
741
742 {
744 let mut recovered = self.executor.recovered.lock().unwrap();
745 if *recovered {
746 panic!("runtime has already been recovered");
747 }
748 *recovered = true;
749 }
750
751 let mut registry = Registry::default();
753 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
754 let metrics = Arc::new(Metrics::init(runtime_registry));
755
756 let auditor = self.executor.auditor.clone();
758 let (signaler, signal) = Signaler::new();
759 let executor = Arc::new(Executor {
760 cycle: self.executor.cycle,
762 deadline: self.executor.deadline,
763 auditor: auditor.clone(),
764 rng: Mutex::new(self.executor.rng.lock().unwrap().clone()),
765 time: Mutex::new(*self.executor.time.lock().unwrap()),
766 partitions: Mutex::new(self.executor.partitions.lock().unwrap().clone()),
767
768 registry: Mutex::new(registry),
770 metrics: metrics.clone(),
771 tasks: Arc::new(Tasks {
772 queue: Mutex::new(Vec::new()),
773 counter: Mutex::new(0),
774 }),
775 sleeping: Mutex::new(BinaryHeap::new()),
776 signaler: Mutex::new(signaler),
777 signal,
778 finished: Mutex::new(false),
779 recovered: Mutex::new(false),
780 });
781 (
782 Runner {
783 executor: executor.clone(),
784 },
785 Self {
786 label: String::new(),
787 spawned: false,
788 executor,
789 networking: Arc::new(Networking::new(metrics, auditor.clone())),
790 },
791 auditor,
792 )
793 }
794}
795
796impl Clone for Context {
797 fn clone(&self) -> Self {
798 Self {
799 label: self.label.clone(),
800 spawned: false,
801 executor: self.executor.clone(),
802 networking: self.networking.clone(),
803 }
804 }
805}
806
807impl crate::Spawner for Context {
808 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
809 where
810 F: FnOnce(Self) -> Fut + Send + 'static,
811 Fut: Future<Output = T> + Send + 'static,
812 T: Send + 'static,
813 {
814 assert!(!self.spawned, "already spawned");
816
817 let label = self.label.clone();
819 let work = Work {
820 label: label.clone(),
821 };
822 self.executor
823 .metrics
824 .tasks_spawned
825 .get_or_create(&work)
826 .inc();
827 let gauge = self
828 .executor
829 .metrics
830 .tasks_running
831 .get_or_create(&work)
832 .clone();
833
834 let executor = self.executor.clone();
836 let future = f(self);
837 let (f, handle) = Handle::init(future, gauge, false);
838
839 Tasks::register(&executor.tasks, &label, false, Box::pin(f));
841 handle
842 }
843
844 fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
845 where
846 F: Future<Output = T> + Send + 'static,
847 T: Send + 'static,
848 {
849 assert!(!self.spawned, "already spawned");
851 self.spawned = true;
852
853 let work = Work {
855 label: self.label.clone(),
856 };
857 self.executor
858 .metrics
859 .tasks_spawned
860 .get_or_create(&work)
861 .inc();
862 let gauge = self
863 .executor
864 .metrics
865 .tasks_running
866 .get_or_create(&work)
867 .clone();
868
869 let label = self.label.clone();
871 let executor = self.executor.clone();
872 move |f: F| {
873 let (f, handle) = Handle::init(f, gauge, false);
874
875 Tasks::register(&executor.tasks, &label, false, Box::pin(f));
877 handle
878 }
879 }
880
881 fn stop(&self, value: i32) {
882 self.executor.auditor.stop(value);
883 self.executor.signaler.lock().unwrap().signal(value);
884 }
885
886 fn stopped(&self) -> Signal {
887 self.executor.auditor.stopped();
888 self.executor.signal.clone()
889 }
890}
891
892impl crate::Metrics for Context {
893 fn with_label(&self, label: &str) -> Self {
894 let label = {
895 let prefix = self.label.clone();
896 if prefix.is_empty() {
897 label.to_string()
898 } else {
899 format!("{}_{}", prefix, label)
900 }
901 };
902 assert!(
903 !label.starts_with(METRICS_PREFIX),
904 "using runtime label is not allowed"
905 );
906 Self {
907 label,
908 spawned: false,
909 executor: self.executor.clone(),
910 networking: self.networking.clone(),
911 }
912 }
913
914 fn label(&self) -> String {
915 self.label.clone()
916 }
917
918 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
919 let name = name.into();
921 let help = help.into();
922
923 self.executor.auditor.register(&name, &help);
925 let prefixed_name = {
926 let prefix = &self.label;
927 if prefix.is_empty() {
928 name
929 } else {
930 format!("{}_{}", *prefix, name)
931 }
932 };
933 self.executor
934 .registry
935 .lock()
936 .unwrap()
937 .register(prefixed_name, help, metric)
938 }
939
940 fn encode(&self) -> String {
941 self.executor.auditor.encode();
942 let mut buffer = String::new();
943 encode(&mut buffer, &self.executor.registry.lock().unwrap()).expect("encoding failed");
944 buffer
945 }
946}
947
948struct Sleeper {
949 executor: Arc<Executor>,
950 time: SystemTime,
951 registered: bool,
952}
953
954struct Alarm {
955 time: SystemTime,
956 waker: Waker,
957}
958
959impl PartialEq for Alarm {
960 fn eq(&self, other: &Self) -> bool {
961 self.time.eq(&other.time)
962 }
963}
964
965impl Eq for Alarm {}
966
967impl PartialOrd for Alarm {
968 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
969 Some(self.cmp(other))
970 }
971}
972
973impl Ord for Alarm {
974 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
975 other.time.cmp(&self.time)
977 }
978}
979
980impl Future for Sleeper {
981 type Output = ();
982
983 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
984 {
985 let current_time = *self.executor.time.lock().unwrap();
986 if current_time >= self.time {
987 return Poll::Ready(());
988 }
989 }
990 if !self.registered {
991 self.registered = true;
992 self.executor.sleeping.lock().unwrap().push(Alarm {
993 time: self.time,
994 waker: cx.waker().clone(),
995 });
996 }
997 Poll::Pending
998 }
999}
1000
1001impl Clock for Context {
1002 fn current(&self) -> SystemTime {
1003 *self.executor.time.lock().unwrap()
1004 }
1005
1006 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
1007 let time = self
1008 .current()
1009 .checked_add(duration)
1010 .expect("overflow when setting wake time");
1011 Sleeper {
1012 executor: self.executor.clone(),
1013
1014 time,
1015 registered: false,
1016 }
1017 }
1018
1019 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
1020 Sleeper {
1021 executor: self.executor.clone(),
1022
1023 time: deadline,
1024 registered: false,
1025 }
1026 }
1027}
1028
1029impl GClock for Context {
1030 type Instant = SystemTime;
1031
1032 fn now(&self) -> Self::Instant {
1033 self.current()
1034 }
1035}
1036
1037impl ReasonablyRealtime for Context {}
1038
1039type Dialable = mpsc::UnboundedSender<(
1040 SocketAddr,
1041 mocks::Sink, mocks::Stream, )>;
1044
1045struct Networking {
1052 metrics: Arc<Metrics>,
1053 auditor: Arc<Auditor>,
1054 ephemeral: Mutex<u16>,
1055 listeners: Mutex<HashMap<SocketAddr, Dialable>>,
1056}
1057
1058impl Networking {
1059 fn new(metrics: Arc<Metrics>, auditor: Arc<Auditor>) -> Self {
1060 Self {
1061 metrics,
1062 auditor,
1063 ephemeral: Mutex::new(EPHEMERAL_PORT_RANGE.start),
1064 listeners: Mutex::new(HashMap::new()),
1065 }
1066 }
1067
1068 fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
1069 self.auditor.bind(socket);
1070
1071 if socket.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
1074 && EPHEMERAL_PORT_RANGE.contains(&socket.port())
1075 {
1076 return Err(Error::BindFailed);
1077 }
1078
1079 let mut listeners = self.listeners.lock().unwrap();
1081 if listeners.contains_key(&socket) {
1082 return Err(Error::BindFailed);
1083 }
1084
1085 let (sender, receiver) = mpsc::unbounded();
1087 listeners.insert(socket, sender);
1088 Ok(Listener {
1089 auditor: self.auditor.clone(),
1090 address: socket,
1091 listener: receiver,
1092 metrics: self.metrics.clone(),
1093 })
1094 }
1095
1096 async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
1097 let dialer = {
1099 let mut ephemeral = self.ephemeral.lock().unwrap();
1100 let dialer = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), *ephemeral);
1101 *ephemeral = ephemeral
1102 .checked_add(1)
1103 .expect("ephemeral port range exhausted");
1104 dialer
1105 };
1106 self.auditor.dial(dialer, socket);
1107
1108 let mut sender = {
1110 let listeners = self.listeners.lock().unwrap();
1111 let sender = listeners.get(&socket).ok_or(Error::ConnectionFailed)?;
1112 sender.clone()
1113 };
1114
1115 let (dialer_sender, dialer_receiver) = mocks::Channel::init();
1117 let (dialee_sender, dialee_receiver) = mocks::Channel::init();
1118 sender
1119 .send((dialer, dialer_sender, dialee_receiver))
1120 .await
1121 .map_err(|_| Error::ConnectionFailed)?;
1122 Ok((
1123 Sink {
1124 metrics: self.metrics.clone(),
1125 auditor: self.auditor.clone(),
1126 me: dialer,
1127 peer: socket,
1128 sender: dialee_sender,
1129 },
1130 Stream {
1131 auditor: self.auditor.clone(),
1132 me: dialer,
1133 peer: socket,
1134 receiver: dialer_receiver,
1135 },
1136 ))
1137 }
1138}
1139
1140impl crate::Network<Listener, Sink, Stream> for Context {
1141 async fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
1142 self.networking.bind(socket)
1143 }
1144
1145 async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
1146 self.networking.dial(socket).await
1147 }
1148}
1149
1150pub struct Listener {
1152 metrics: Arc<Metrics>,
1153 auditor: Arc<Auditor>,
1154 address: SocketAddr,
1155 listener: mpsc::UnboundedReceiver<(SocketAddr, mocks::Sink, mocks::Stream)>,
1156}
1157
1158impl crate::Listener<Sink, Stream> for Listener {
1159 async fn accept(&mut self) -> Result<(SocketAddr, Sink, Stream), Error> {
1160 let (socket, sender, receiver) = self.listener.next().await.ok_or(Error::ReadFailed)?;
1161 self.auditor.accept(self.address, socket);
1162 Ok((
1163 socket,
1164 Sink {
1165 metrics: self.metrics.clone(),
1166 auditor: self.auditor.clone(),
1167 me: self.address,
1168 peer: socket,
1169 sender,
1170 },
1171 Stream {
1172 auditor: self.auditor.clone(),
1173 me: self.address,
1174 peer: socket,
1175 receiver,
1176 },
1177 ))
1178 }
1179}
1180
1181pub struct Sink {
1183 metrics: Arc<Metrics>,
1184 auditor: Arc<Auditor>,
1185 me: SocketAddr,
1186 peer: SocketAddr,
1187 sender: mocks::Sink,
1188}
1189
1190impl crate::Sink for Sink {
1191 async fn send(&mut self, msg: &[u8]) -> Result<(), Error> {
1192 self.auditor.send(self.me, self.peer, msg);
1193 self.sender.send(msg).await.map_err(|_| Error::SendFailed)?;
1194 self.metrics.network_bandwidth.inc_by(msg.len() as u64);
1195 Ok(())
1196 }
1197}
1198
1199pub struct Stream {
1201 auditor: Arc<Auditor>,
1202 me: SocketAddr,
1203 peer: SocketAddr,
1204 receiver: mocks::Stream,
1205}
1206
1207impl crate::Stream for Stream {
1208 async fn recv(&mut self, buf: &mut [u8]) -> Result<(), Error> {
1209 self.receiver
1210 .recv(buf)
1211 .await
1212 .map_err(|_| Error::RecvFailed)?;
1213 self.auditor.recv(self.me, self.peer, buf);
1214 Ok(())
1215 }
1216}
1217
1218impl RngCore for Context {
1219 fn next_u32(&mut self) -> u32 {
1220 self.executor.auditor.rand("next_u32".to_string());
1221 self.executor.rng.lock().unwrap().next_u32()
1222 }
1223
1224 fn next_u64(&mut self) -> u64 {
1225 self.executor.auditor.rand("next_u64".to_string());
1226 self.executor.rng.lock().unwrap().next_u64()
1227 }
1228
1229 fn fill_bytes(&mut self, dest: &mut [u8]) {
1230 self.executor.auditor.rand("fill_bytes".to_string());
1231 self.executor.rng.lock().unwrap().fill_bytes(dest)
1232 }
1233
1234 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1235 self.executor.auditor.rand("try_fill_bytes".to_string());
1236 self.executor.rng.lock().unwrap().try_fill_bytes(dest)
1237 }
1238}
1239
1240impl CryptoRng for Context {}
1241
1242pub struct Blob {
1244 executor: Arc<Executor>,
1245
1246 partition: String,
1247 name: Vec<u8>,
1248
1249 content: Arc<RwLock<Vec<u8>>>,
1253}
1254
1255impl Blob {
1256 fn new(executor: Arc<Executor>, partition: String, name: &[u8], content: Vec<u8>) -> Self {
1257 executor.metrics.open_blobs.inc();
1258 Self {
1259 executor,
1260 partition,
1261 name: name.into(),
1262 content: Arc::new(RwLock::new(content)),
1263 }
1264 }
1265}
1266
1267impl Clone for Blob {
1268 fn clone(&self) -> Self {
1269 self.executor.metrics.open_blobs.inc();
1271 Self {
1272 executor: self.executor.clone(),
1273 partition: self.partition.clone(),
1274 name: self.name.clone(),
1275 content: self.content.clone(),
1276 }
1277 }
1278}
1279
1280impl crate::Storage<Blob> for Context {
1281 async fn open(&self, partition: &str, name: &[u8]) -> Result<Blob, Error> {
1282 self.executor.auditor.open(partition, name);
1283 let mut partitions = self.executor.partitions.lock().unwrap();
1284 let partition_entry = partitions.entry(partition.into()).or_default();
1285 let content = partition_entry.entry(name.into()).or_default();
1286 Ok(Blob::new(
1287 self.executor.clone(),
1288 partition.into(),
1289 name,
1290 content.clone(),
1291 ))
1292 }
1293
1294 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1295 self.executor.auditor.remove(partition, name);
1296 let mut partitions = self.executor.partitions.lock().unwrap();
1297 match name {
1298 Some(name) => {
1299 partitions
1300 .get_mut(partition)
1301 .ok_or(Error::PartitionMissing(partition.into()))?
1302 .remove(name)
1303 .ok_or(Error::BlobMissing(partition.into(), hex(name)))?;
1304 }
1305 None => {
1306 partitions
1307 .remove(partition)
1308 .ok_or(Error::PartitionMissing(partition.into()))?;
1309 }
1310 }
1311 Ok(())
1312 }
1313
1314 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1315 self.executor.auditor.scan(partition);
1316 let partitions = self.executor.partitions.lock().unwrap();
1317 let partition = partitions
1318 .get(partition)
1319 .ok_or(Error::PartitionMissing(partition.into()))?;
1320 let mut results = Vec::with_capacity(partition.len());
1321 for name in partition.keys() {
1322 results.push(name.clone());
1323 }
1324 results.sort(); Ok(results)
1326 }
1327}
1328
1329impl crate::Blob for Blob {
1330 async fn len(&self) -> Result<u64, Error> {
1331 self.executor.auditor.len(&self.partition, &self.name);
1332 let content = self.content.read().unwrap();
1333 Ok(content.len() as u64)
1334 }
1335
1336 async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
1337 let buf_len = buf.len();
1338 self.executor
1339 .auditor
1340 .read_at(&self.partition, &self.name, buf_len, offset);
1341 let offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
1342 let content = self.content.read().unwrap();
1343 let content_len = content.len();
1344 if offset + buf_len > content_len {
1345 return Err(Error::BlobInsufficientLength);
1346 }
1347 buf.copy_from_slice(&content[offset..offset + buf_len]);
1348 self.executor.metrics.storage_reads.inc();
1349 self.executor
1350 .metrics
1351 .storage_read_bandwidth
1352 .inc_by(buf_len as u64);
1353 Ok(())
1354 }
1355
1356 async fn write_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
1357 self.executor
1358 .auditor
1359 .write_at(&self.partition, &self.name, buf, offset);
1360 let offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
1361 let mut content = self.content.write().unwrap();
1362 let required = offset + buf.len();
1363 if required > content.len() {
1364 content.resize(required, 0);
1365 }
1366 content[offset..offset + buf.len()].copy_from_slice(buf);
1367 self.executor.metrics.storage_writes.inc();
1368 self.executor
1369 .metrics
1370 .storage_write_bandwidth
1371 .inc_by(buf.len() as u64);
1372 Ok(())
1373 }
1374
1375 async fn truncate(&self, len: u64) -> Result<(), Error> {
1376 self.executor
1377 .auditor
1378 .truncate(&self.partition, &self.name, len);
1379 let len = len.try_into().map_err(|_| Error::OffsetOverflow)?;
1380 let mut content = self.content.write().unwrap();
1381 content.truncate(len);
1382 Ok(())
1383 }
1384
1385 async fn sync(&self) -> Result<(), Error> {
1386 let new_content = self.content.read().unwrap().clone();
1391
1392 self.executor.auditor.sync(&self.partition, &self.name);
1394 let mut partitions = self.executor.partitions.lock().unwrap();
1395 let partition = partitions
1396 .get_mut(&self.partition)
1397 .ok_or(Error::PartitionMissing(self.partition.clone()))?;
1398 let content = partition
1399 .get_mut(&self.name)
1400 .ok_or(Error::BlobMissing(self.partition.clone(), hex(&self.name)))?;
1401 *content = new_content;
1402 Ok(())
1403 }
1404
1405 async fn close(self) -> Result<(), Error> {
1406 self.executor.auditor.close(&self.partition, &self.name);
1407 self.sync().await?;
1408 Ok(())
1409 }
1410}
1411
1412impl Drop for Blob {
1413 fn drop(&mut self) {
1414 self.executor.metrics.open_blobs.dec();
1415 }
1416}
1417
1418#[cfg(test)]
1419mod tests {
1420 use super::*;
1421 use crate::{utils::run_tasks, Blob, Runner, Storage};
1422 use commonware_macros::test_traced;
1423 use futures::task::noop_waker;
1424
1425 fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1426 let (executor, context, auditor) = Executor::seeded(seed);
1427 let messages = run_tasks(5, executor, context);
1428 (auditor.state(), messages)
1429 }
1430
1431 #[test]
1432 fn test_same_seed_same_order() {
1433 let mut outputs = Vec::new();
1435 for seed in 0..1000 {
1436 let output = run_with_seed(seed);
1437 outputs.push(output);
1438 }
1439
1440 for seed in 0..1000 {
1442 let output = run_with_seed(seed);
1443 assert_eq!(output, outputs[seed as usize]);
1444 }
1445 }
1446
1447 #[test_traced("TRACE")]
1448 fn test_different_seeds_different_order() {
1449 let output1 = run_with_seed(12345);
1450 let output2 = run_with_seed(54321);
1451 assert_ne!(output1, output2);
1452 }
1453
1454 #[test]
1455 fn test_alarm_min_heap() {
1456 let now = SystemTime::now();
1458 let alarms = vec![
1459 Alarm {
1460 time: now + Duration::new(10, 0),
1461 waker: noop_waker(),
1462 },
1463 Alarm {
1464 time: now + Duration::new(5, 0),
1465 waker: noop_waker(),
1466 },
1467 Alarm {
1468 time: now + Duration::new(15, 0),
1469 waker: noop_waker(),
1470 },
1471 Alarm {
1472 time: now + Duration::new(5, 0),
1473 waker: noop_waker(),
1474 },
1475 ];
1476 let mut heap = BinaryHeap::new();
1477 for alarm in alarms {
1478 heap.push(alarm);
1479 }
1480
1481 let mut sorted_times = Vec::new();
1483 while let Some(alarm) = heap.pop() {
1484 sorted_times.push(alarm.time);
1485 }
1486 assert_eq!(
1487 sorted_times,
1488 vec![
1489 now + Duration::new(5, 0),
1490 now + Duration::new(5, 0),
1491 now + Duration::new(10, 0),
1492 now + Duration::new(15, 0),
1493 ]
1494 );
1495 }
1496
1497 #[test]
1498 #[should_panic(expected = "runtime timeout")]
1499 fn test_timeout() {
1500 let (executor, context, _) = Executor::timed(Duration::from_secs(10));
1501 executor.start(async move {
1502 loop {
1503 context.sleep(Duration::from_secs(1)).await;
1504 }
1505 });
1506 }
1507
1508 #[test]
1509 #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1510 fn test_bad_timeout() {
1511 let cfg = Config {
1512 timeout: Some(Duration::default()),
1513 cycle: Duration::default(),
1514 ..Config::default()
1515 };
1516 Executor::init(cfg);
1517 }
1518
1519 #[test]
1520 fn test_recover_synced_storage_persists() {
1521 let (executor1, context1, auditor1) = Executor::default();
1523 let partition = "test_partition";
1524 let name = b"test_blob";
1525 let data = b"Hello, world!".to_vec();
1526
1527 executor1.start({
1529 let context = context1.clone();
1530 let data = data.clone();
1531 async move {
1532 let blob = context.open(partition, name).await.unwrap();
1533 blob.write_at(&data, 0).await.unwrap();
1534 blob.sync().await.unwrap();
1535 }
1536 });
1537 let state1 = auditor1.state();
1538
1539 let (executor2, context2, auditor2) = context1.recover();
1541
1542 let state2 = auditor2.state();
1544 assert_eq!(state1, state2);
1545
1546 executor2.start(async move {
1548 let blob = context2.open(partition, name).await.unwrap();
1549 let len = blob.len().await.unwrap();
1550 assert_eq!(len, data.len() as u64);
1551 let mut buf = vec![0; len as usize];
1552 blob.read_at(&mut buf, 0).await.unwrap();
1553 assert_eq!(buf, data);
1554 });
1555 }
1556
1557 #[test]
1558 fn test_recover_unsynced_storage_does_not_persist() {
1559 let (executor1, context1, _) = Executor::default();
1561 let partition = "test_partition";
1562 let name = b"test_blob";
1563 let data = b"Hello, world!".to_vec();
1564
1565 executor1.start({
1567 let context = context1.clone();
1568 async move {
1569 let blob = context.open(partition, name).await.unwrap();
1570 blob.write_at(&data, 0).await.unwrap();
1571 }
1573 });
1574
1575 let (executor2, context2, _) = context1.recover();
1577
1578 executor2.start(async move {
1580 let blob = context2.open(partition, name).await.unwrap();
1581 let len = blob.len().await.unwrap();
1582 assert_eq!(len, 0);
1583 });
1584 }
1585
1586 #[test]
1587 #[should_panic(expected = "execution is not finished")]
1588 fn test_recover_before_finish_panics() {
1589 let (_, context, _) = Executor::default();
1591
1592 context.recover();
1594 }
1595
1596 #[test]
1597 #[should_panic(expected = "runtime has already been recovered")]
1598 fn test_recover_twice_panics() {
1599 let (executor, context, _) = Executor::default();
1601
1602 executor.start(async move {});
1604
1605 let cloned_context = context.clone();
1607 context.recover();
1608
1609 cloned_context.recover();
1611 }
1612}