1use crate::{mocks, utils::Signaler, Clock, Error, Handle, Signal};
26use commonware_utils::{hex, SystemTimeExt};
27use futures::{
28 channel::mpsc,
29 task::{waker_ref, ArcWake},
30 SinkExt, StreamExt,
31};
32use governor::clock::{Clock as GClock, ReasonablyRealtime};
33use prometheus_client::{
34 encoding::EncodeLabelSet,
35 metrics::{counter::Counter, family::Family, gauge::Gauge},
36 registry::Registry,
37};
38use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
39use sha2::{Digest, Sha256};
40use std::{
41 collections::{BinaryHeap, HashMap},
42 future::Future,
43 mem::replace,
44 net::{IpAddr, Ipv4Addr, SocketAddr},
45 ops::Range,
46 pin::Pin,
47 sync::{Arc, Mutex, RwLock},
48 task::{self, Poll, Waker},
49 time::{Duration, SystemTime, UNIX_EPOCH},
50};
51use tracing::trace;
52
53const EPHEMERAL_PORT_RANGE: Range<u16> = 32768..61000;
55
56const ROOT_TASK: &str = "root";
58
59pub type Partition = HashMap<Vec<u8>, Vec<u8>>;
61
62#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
63struct Work {
64 label: String,
65}
66
67#[derive(Debug)]
68struct Metrics {
69 tasks_spawned: Family<Work, Counter>,
70 tasks_running: Family<Work, Gauge>,
71 task_polls: Family<Work, Counter>,
72
73 network_bandwidth: Counter,
74
75 open_blobs: Gauge,
76 storage_reads: Counter,
77 storage_read_bandwidth: Counter,
78 storage_writes: Counter,
79 storage_write_bandwidth: Counter,
80}
81
82impl Metrics {
83 pub fn init(registry: Arc<Mutex<Registry>>) -> Self {
84 let metrics = Self {
85 task_polls: Family::default(),
86 tasks_running: Family::default(),
87 tasks_spawned: Family::default(),
88 network_bandwidth: Counter::default(),
89 open_blobs: Gauge::default(),
90 storage_reads: Counter::default(),
91 storage_read_bandwidth: Counter::default(),
92 storage_writes: Counter::default(),
93 storage_write_bandwidth: Counter::default(),
94 };
95 {
96 let mut registry = registry.lock().unwrap();
97 registry.register(
98 "tasks_spawned",
99 "Total number of tasks spawned",
100 metrics.tasks_spawned.clone(),
101 );
102 registry.register(
103 "tasks_running",
104 "Number of tasks currently running",
105 metrics.tasks_running.clone(),
106 );
107 registry.register(
108 "task_polls",
109 "Total number of task polls",
110 metrics.task_polls.clone(),
111 );
112 registry.register(
113 "bandwidth",
114 "Total amount of data sent over network",
115 metrics.network_bandwidth.clone(),
116 );
117 registry.register(
118 "open_blobs",
119 "Number of open blobs",
120 metrics.open_blobs.clone(),
121 );
122 registry.register(
123 "storage_reads",
124 "Total number of disk reads",
125 metrics.storage_reads.clone(),
126 );
127 registry.register(
128 "storage_read_bandwidth",
129 "Total amount of data read from disk",
130 metrics.storage_read_bandwidth.clone(),
131 );
132 registry.register(
133 "storage_writes",
134 "Total number of disk writes",
135 metrics.storage_writes.clone(),
136 );
137 registry.register(
138 "storage_write_bandwidth",
139 "Total amount of data written to disk",
140 metrics.storage_write_bandwidth.clone(),
141 );
142 }
143 metrics
144 }
145}
146
147pub struct Auditor {
149 hash: Mutex<Vec<u8>>,
150}
151
152impl Auditor {
153 fn new() -> Self {
154 Self {
155 hash: Mutex::new(Vec::new()),
156 }
157 }
158
159 fn process_task(&self, task: u128, label: &str) {
160 let mut hash = self.hash.lock().unwrap();
161 let mut hasher = Sha256::new();
162 hasher.update(&*hash);
163 hasher.update(b"process_task");
164 hasher.update(task.to_be_bytes());
165 hasher.update(label.as_bytes());
166 *hash = hasher.finalize().to_vec();
167 }
168
169 fn stop(&self, value: i32) {
170 let mut hash = self.hash.lock().unwrap();
171 let mut hasher = Sha256::new();
172 hasher.update(&*hash);
173 hasher.update(b"stop");
174 hasher.update(value.to_be_bytes());
175 *hash = hasher.finalize().to_vec();
176 }
177
178 fn stopped(&self) {
179 let mut hash = self.hash.lock().unwrap();
180 let mut hasher = Sha256::new();
181 hasher.update(&*hash);
182 hasher.update(b"stopped");
183 *hash = hasher.finalize().to_vec();
184 }
185
186 fn bind(&self, address: SocketAddr) {
187 let mut hash = self.hash.lock().unwrap();
188 let mut hasher = Sha256::new();
189 hasher.update(&*hash);
190 hasher.update(b"bind");
191 hasher.update(address.to_string().as_bytes());
192 *hash = hasher.finalize().to_vec();
193 }
194
195 fn dial(&self, dialer: SocketAddr, dialee: SocketAddr) {
196 let mut hash = self.hash.lock().unwrap();
197 let mut hasher = Sha256::new();
198 hasher.update(&*hash);
199 hasher.update(b"dial");
200 hasher.update(dialer.to_string().as_bytes());
201 hasher.update(dialee.to_string().as_bytes());
202 *hash = hasher.finalize().to_vec();
203 }
204
205 fn accept(&self, dialee: SocketAddr, dialer: SocketAddr) {
206 let mut hash = self.hash.lock().unwrap();
207 let mut hasher = Sha256::new();
208 hasher.update(&*hash);
209 hasher.update(b"accept");
210 hasher.update(dialee.to_string().as_bytes());
211 hasher.update(dialer.to_string().as_bytes());
212 *hash = hasher.finalize().to_vec();
213 }
214
215 fn send(&self, sender: SocketAddr, receiver: SocketAddr, message: &[u8]) {
216 let mut hash = self.hash.lock().unwrap();
217 let mut hasher = Sha256::new();
218 hasher.update(&*hash);
219 hasher.update(b"send");
220 hasher.update(sender.to_string().as_bytes());
221 hasher.update(receiver.to_string().as_bytes());
222 hasher.update(message);
223 *hash = hasher.finalize().to_vec();
224 }
225
226 fn recv(&self, receiver: SocketAddr, sender: SocketAddr, message: &[u8]) {
227 let mut hash = self.hash.lock().unwrap();
228 let mut hasher = Sha256::new();
229 hasher.update(&*hash);
230 hasher.update(b"recv");
231 hasher.update(receiver.to_string().as_bytes());
232 hasher.update(sender.to_string().as_bytes());
233 hasher.update(message);
234 *hash = hasher.finalize().to_vec();
235 }
236
237 fn rand(&self, method: String) {
238 let mut hash = self.hash.lock().unwrap();
239 let mut hasher = Sha256::new();
240 hasher.update(&*hash);
241 hasher.update(b"rand");
242 hasher.update(method.as_bytes());
243 *hash = hasher.finalize().to_vec();
244 }
245
246 fn open(&self, partition: &str, name: &[u8]) {
247 let mut hash = self.hash.lock().unwrap();
248 let mut hasher = Sha256::new();
249 hasher.update(&*hash);
250 hasher.update(b"open");
251 hasher.update(partition.as_bytes());
252 hasher.update(name);
253 *hash = hasher.finalize().to_vec();
254 }
255
256 fn remove(&self, partition: &str, name: Option<&[u8]>) {
257 let mut hash = self.hash.lock().unwrap();
258 let mut hasher = Sha256::new();
259 hasher.update(&*hash);
260 hasher.update(b"remove");
261 hasher.update(partition.as_bytes());
262 if let Some(name) = name {
263 hasher.update(name);
264 }
265 *hash = hasher.finalize().to_vec();
266 }
267
268 fn scan(&self, partition: &str) {
269 let mut hash = self.hash.lock().unwrap();
270 let mut hasher = Sha256::new();
271 hasher.update(&*hash);
272 hasher.update(b"scan");
273 hasher.update(partition.as_bytes());
274 *hash = hasher.finalize().to_vec();
275 }
276
277 fn len(&self, partition: &str, name: &[u8]) {
278 let mut hash = self.hash.lock().unwrap();
279 let mut hasher = Sha256::new();
280 hasher.update(&*hash);
281 hasher.update(b"len");
282 hasher.update(partition.as_bytes());
283 hasher.update(name);
284 *hash = hasher.finalize().to_vec();
285 }
286
287 fn read_at(&self, partition: &str, name: &[u8], buf: usize, offset: u64) {
288 let mut hash = self.hash.lock().unwrap();
289 let mut hasher = Sha256::new();
290 hasher.update(&*hash);
291 hasher.update(b"read_at");
292 hasher.update(partition.as_bytes());
293 hasher.update(name);
294 hasher.update(buf.to_be_bytes());
295 hasher.update(offset.to_be_bytes());
296 *hash = hasher.finalize().to_vec();
297 }
298
299 fn write_at(&self, partition: &str, name: &[u8], buf: &[u8], offset: u64) {
300 let mut hash = self.hash.lock().unwrap();
301 let mut hasher = Sha256::new();
302 hasher.update(&*hash);
303 hasher.update(b"write_at");
304 hasher.update(partition.as_bytes());
305 hasher.update(name);
306 hasher.update(buf);
307 hasher.update(offset.to_be_bytes());
308 *hash = hasher.finalize().to_vec();
309 }
310
311 fn truncate(&self, partition: &str, name: &[u8], size: u64) {
312 let mut hash = self.hash.lock().unwrap();
313 let mut hasher = Sha256::new();
314 hasher.update(&*hash);
315 hasher.update(b"truncate");
316 hasher.update(partition.as_bytes());
317 hasher.update(name);
318 hasher.update(size.to_be_bytes());
319 *hash = hasher.finalize().to_vec();
320 }
321
322 fn sync(&self, partition: &str, name: &[u8]) {
323 let mut hash = self.hash.lock().unwrap();
324 let mut hasher = Sha256::new();
325 hasher.update(&*hash);
326 hasher.update(b"sync");
327 hasher.update(partition.as_bytes());
328 hasher.update(name);
329 *hash = hasher.finalize().to_vec();
330 }
331
332 fn close(&self, partition: &str, name: &[u8]) {
333 let mut hash = self.hash.lock().unwrap();
334 let mut hasher = Sha256::new();
335 hasher.update(&*hash);
336 hasher.update(b"close");
337 hasher.update(partition.as_bytes());
338 hasher.update(name);
339 *hash = hasher.finalize().to_vec();
340 }
341
342 pub fn state(&self) -> String {
347 let hash = self.hash.lock().unwrap().clone();
348 hex(&hash)
349 }
350}
351
352struct Task {
353 id: u128,
354 label: String,
355
356 tasks: Arc<Tasks>,
357
358 root: bool,
359 future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
360
361 completed: Mutex<bool>,
362}
363
364impl ArcWake for Task {
365 fn wake_by_ref(arc_self: &Arc<Self>) {
366 arc_self.tasks.enqueue(arc_self.clone());
367 }
368}
369
370struct Tasks {
371 counter: Mutex<u128>,
372 queue: Mutex<Vec<Arc<Task>>>,
373}
374
375impl Tasks {
376 fn register(
377 arc_self: &Arc<Self>,
378 label: &str,
379 root: bool,
380 future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
381 ) {
382 let mut queue = arc_self.queue.lock().unwrap();
383 let id = {
384 let mut l = arc_self.counter.lock().unwrap();
385 let old = *l;
386 *l = l.checked_add(1).expect("task counter overflow");
387 old
388 };
389 queue.push(Arc::new(Task {
390 id,
391 label: label.to_string(),
392 root,
393 future: Mutex::new(future),
394 tasks: arc_self.clone(),
395 completed: Mutex::new(false),
396 }));
397 }
398
399 fn enqueue(&self, task: Arc<Task>) {
400 let mut queue = self.queue.lock().unwrap();
401 queue.push(task);
402 }
403
404 fn drain(&self) -> Vec<Arc<Task>> {
405 let mut queue = self.queue.lock().unwrap();
406 let len = queue.len();
407 replace(&mut *queue, Vec::with_capacity(len))
408 }
409
410 fn len(&self) -> usize {
411 self.queue.lock().unwrap().len()
412 }
413}
414
415#[derive(Clone)]
417pub struct Config {
418 pub registry: Arc<Mutex<Registry>>,
420
421 pub seed: u64,
423
424 pub cycle: Duration,
427
428 pub timeout: Option<Duration>,
430}
431
432impl Default for Config {
433 fn default() -> Self {
434 Self {
435 registry: Arc::new(Mutex::new(Registry::default())),
436 seed: 42,
437 cycle: Duration::from_millis(1),
438 timeout: None,
439 }
440 }
441}
442
443pub struct Executor {
445 cycle: Duration,
446 deadline: Option<SystemTime>,
447 metrics: Arc<Metrics>,
448 auditor: Arc<Auditor>,
449 rng: Mutex<StdRng>,
450 prefix: Mutex<String>,
451 time: Mutex<SystemTime>,
452 tasks: Arc<Tasks>,
453 sleeping: Mutex<BinaryHeap<Alarm>>,
454 partitions: Mutex<HashMap<String, Partition>>,
455 signaler: Mutex<Signaler>,
456 signal: Signal,
457 finished: Mutex<bool>,
458 recovered: Mutex<bool>,
459}
460
461impl Executor {
462 pub fn init(cfg: Config) -> (Runner, Context, Arc<Auditor>) {
464 if cfg.timeout.is_some() && cfg.cycle == Duration::default() {
466 panic!("cycle duration must be non-zero when timeout is set");
467 }
468
469 let metrics = Arc::new(Metrics::init(cfg.registry));
471 let auditor = Arc::new(Auditor::new());
472 let start_time = UNIX_EPOCH;
473 let deadline = cfg
474 .timeout
475 .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
476 let (signaler, signal) = Signaler::new();
477 let executor = Arc::new(Self {
478 cycle: cfg.cycle,
479 deadline,
480 metrics: metrics.clone(),
481 auditor: auditor.clone(),
482 rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
483 prefix: Mutex::new(String::new()),
484 time: Mutex::new(start_time),
485 tasks: Arc::new(Tasks {
486 queue: Mutex::new(Vec::new()),
487 counter: Mutex::new(0),
488 }),
489 sleeping: Mutex::new(BinaryHeap::new()),
490 partitions: Mutex::new(HashMap::new()),
491 signaler: Mutex::new(signaler),
492 signal,
493 finished: Mutex::new(false),
494 recovered: Mutex::new(false),
495 });
496 (
497 Runner {
498 executor: executor.clone(),
499 },
500 Context {
501 executor,
502 networking: Arc::new(Networking::new(metrics, auditor.clone())),
503 },
504 auditor,
505 )
506 }
507
508 pub fn seeded(seed: u64) -> (Runner, Context, Arc<Auditor>) {
511 let cfg = Config {
512 seed,
513 ..Config::default()
514 };
515 Self::init(cfg)
516 }
517
518 pub fn timed(timeout: Duration) -> (Runner, Context, Arc<Auditor>) {
521 let cfg = Config {
522 timeout: Some(timeout),
523 ..Config::default()
524 };
525 Self::init(cfg)
526 }
527
528 #[allow(clippy::should_implement_trait)]
531 pub fn default() -> (Runner, Context, Arc<Auditor>) {
532 Self::init(Config::default())
533 }
534}
535
536pub struct Runner {
538 executor: Arc<Executor>,
539}
540
541impl crate::Runner for Runner {
542 fn start<F>(self, f: F) -> F::Output
543 where
544 F: Future + Send + 'static,
545 F::Output: Send + 'static,
546 {
547 let output = Arc::new(Mutex::new(None));
549 Tasks::register(
550 &self.executor.tasks,
551 ROOT_TASK,
552 true,
553 Box::pin({
554 let output = output.clone();
555 async move {
556 *output.lock().unwrap() = Some(f.await);
557 }
558 }),
559 );
560
561 let mut iter = 0;
563 loop {
564 {
566 let current = self.executor.time.lock().unwrap();
567 if let Some(deadline) = self.executor.deadline {
568 if *current >= deadline {
569 panic!("runtime timeout");
570 }
571 }
572 }
573
574 let mut tasks = self.executor.tasks.drain();
576
577 {
579 let mut rng = self.executor.rng.lock().unwrap();
580 tasks.shuffle(&mut *rng);
581 }
582
583 trace!(iter, tasks = tasks.len(), "starting loop");
589 for task in tasks {
590 {
592 let mut prefix = self.executor.prefix.lock().unwrap();
593 prefix.clone_from(&task.label);
594 }
595
596 self.executor.auditor.process_task(task.id, &task.label);
598
599 if *task.completed.lock().unwrap() {
601 trace!(id = task.id, "skipping already completed task");
602 continue;
603 }
604 trace!(id = task.id, "processing task");
605
606 let waker = waker_ref(&task);
608 let mut context = task::Context::from_waker(&waker);
609 let mut future = task.future.lock().unwrap();
610
611 self.executor
613 .metrics
614 .task_polls
615 .get_or_create(&Work {
616 label: task.label.clone(),
617 })
618 .inc();
619
620 let pending = future.as_mut().poll(&mut context).is_pending();
623 if pending {
624 trace!(id = task.id, "task is still pending");
625 continue;
626 }
627
628 *task.completed.lock().unwrap() = true;
630 trace!(id = task.id, "task is complete");
631
632 if task.root {
634 *self.executor.finished.lock().unwrap() = true;
635 return output.lock().unwrap().take().unwrap();
636 }
637 }
638
639 let mut current;
644 {
645 let mut time = self.executor.time.lock().unwrap();
646 *time = time
647 .checked_add(self.executor.cycle)
648 .expect("executor time overflowed");
649 current = *time;
650 }
651 trace!(now = current.epoch_millis(), "time advanced",);
652
653 if self.executor.tasks.len() == 0 {
655 let mut skip = None;
656 {
657 let sleeping = self.executor.sleeping.lock().unwrap();
658 if let Some(next) = sleeping.peek() {
659 if next.time > current {
660 skip = Some(next.time);
661 }
662 }
663 }
664 if skip.is_some() {
665 {
666 let mut time = self.executor.time.lock().unwrap();
667 *time = skip.unwrap();
668 current = *time;
669 }
670 trace!(now = current.epoch_millis(), "time skipped",);
671 }
672 }
673
674 let mut to_wake = Vec::new();
676 let mut remaining;
677 {
678 let mut sleeping = self.executor.sleeping.lock().unwrap();
679 while let Some(next) = sleeping.peek() {
680 if next.time <= current {
681 let sleeper = sleeping.pop().unwrap();
682 to_wake.push(sleeper.waker);
683 } else {
684 break;
685 }
686 }
687 remaining = sleeping.len();
688 }
689 for waker in to_wake {
690 waker.wake();
691 }
692
693 remaining += self.executor.tasks.len();
695
696 if remaining == 0 {
699 panic!("runtime stalled");
700 }
701 iter += 1;
702 }
703 }
704}
705
706#[derive(Clone)]
710pub struct Context {
711 executor: Arc<Executor>,
712 networking: Arc<Networking>,
713}
714
715impl Context {
716 pub fn recover(self) -> (Runner, Self, Arc<Auditor>) {
728 if !*self.executor.finished.lock().unwrap() {
730 panic!("execution is not finished");
731 }
732
733 {
735 let mut recovered = self.executor.recovered.lock().unwrap();
736 if *recovered {
737 panic!("runtime has already been recovered");
738 }
739 *recovered = true;
740 }
741
742 let metrics = self.executor.metrics.clone();
744 metrics.open_blobs.set(0);
745 metrics.tasks_running.clear();
746
747 let auditor = self.executor.auditor.clone();
749 let (signaler, signal) = Signaler::new();
750 let executor = Arc::new(Executor {
751 cycle: self.executor.cycle,
753 deadline: self.executor.deadline,
754 metrics: metrics.clone(),
755 auditor: auditor.clone(),
756 rng: Mutex::new(self.executor.rng.lock().unwrap().clone()),
757 time: Mutex::new(*self.executor.time.lock().unwrap()),
758 partitions: Mutex::new(self.executor.partitions.lock().unwrap().clone()),
759
760 prefix: Mutex::new(String::new()),
762 tasks: Arc::new(Tasks {
763 queue: Mutex::new(Vec::new()),
764 counter: Mutex::new(0),
765 }),
766 sleeping: Mutex::new(BinaryHeap::new()),
767 signaler: Mutex::new(signaler),
768 signal,
769 finished: Mutex::new(false),
770 recovered: Mutex::new(false),
771 });
772 (
773 Runner {
774 executor: executor.clone(),
775 },
776 Self {
777 executor,
778 networking: Arc::new(Networking::new(metrics, auditor.clone())),
779 },
780 auditor,
781 )
782 }
783}
784
785impl crate::Spawner for Context {
786 fn spawn<F, T>(&self, label: &str, f: F) -> Handle<T>
787 where
788 F: Future<Output = T> + Send + 'static,
789 T: Send + 'static,
790 {
791 let label = {
792 let prefix = self.executor.prefix.lock().unwrap();
793 if prefix.is_empty() || *prefix == ROOT_TASK {
794 label.to_string()
795 } else {
796 format!("{}_{}", *prefix, label)
797 }
798 };
799 if label == ROOT_TASK {
800 panic!("root task cannot be spawned");
801 }
802 let work = Work {
803 label: label.clone(),
804 };
805 self.executor
806 .metrics
807 .tasks_spawned
808 .get_or_create(&work)
809 .inc();
810 let gauge = self
811 .executor
812 .metrics
813 .tasks_running
814 .get_or_create(&work)
815 .clone();
816 let (f, handle) = Handle::init(f, gauge, false);
817 Tasks::register(&self.executor.tasks, &label, false, Box::pin(f));
818 handle
819 }
820
821 fn stop(&self, value: i32) {
822 self.executor.auditor.stop(value);
823 self.executor.signaler.lock().unwrap().signal(value);
824 }
825
826 fn stopped(&self) -> Signal {
827 self.executor.auditor.stopped();
828 self.executor.signal.clone()
829 }
830}
831
832struct Sleeper {
833 executor: Arc<Executor>,
834 time: SystemTime,
835 registered: bool,
836}
837
838struct Alarm {
839 time: SystemTime,
840 waker: Waker,
841}
842
843impl PartialEq for Alarm {
844 fn eq(&self, other: &Self) -> bool {
845 self.time.eq(&other.time)
846 }
847}
848
849impl Eq for Alarm {}
850
851impl PartialOrd for Alarm {
852 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
853 Some(self.cmp(other))
854 }
855}
856
857impl Ord for Alarm {
858 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
859 other.time.cmp(&self.time)
861 }
862}
863
864impl Future for Sleeper {
865 type Output = ();
866
867 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
868 {
869 let current_time = *self.executor.time.lock().unwrap();
870 if current_time >= self.time {
871 return Poll::Ready(());
872 }
873 }
874 if !self.registered {
875 self.registered = true;
876 self.executor.sleeping.lock().unwrap().push(Alarm {
877 time: self.time,
878 waker: cx.waker().clone(),
879 });
880 }
881 Poll::Pending
882 }
883}
884
885impl Clock for Context {
886 fn current(&self) -> SystemTime {
887 *self.executor.time.lock().unwrap()
888 }
889
890 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
891 let time = self
892 .current()
893 .checked_add(duration)
894 .expect("overflow when setting wake time");
895 Sleeper {
896 executor: self.executor.clone(),
897
898 time,
899 registered: false,
900 }
901 }
902
903 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
904 Sleeper {
905 executor: self.executor.clone(),
906
907 time: deadline,
908 registered: false,
909 }
910 }
911}
912
913impl GClock for Context {
914 type Instant = SystemTime;
915
916 fn now(&self) -> Self::Instant {
917 self.current()
918 }
919}
920
921impl ReasonablyRealtime for Context {}
922
923type Dialable = mpsc::UnboundedSender<(
924 SocketAddr,
925 mocks::Sink, mocks::Stream, )>;
928
929struct Networking {
936 metrics: Arc<Metrics>,
937 auditor: Arc<Auditor>,
938 ephemeral: Mutex<u16>,
939 listeners: Mutex<HashMap<SocketAddr, Dialable>>,
940}
941
942impl Networking {
943 fn new(metrics: Arc<Metrics>, auditor: Arc<Auditor>) -> Self {
944 Self {
945 metrics,
946 auditor,
947 ephemeral: Mutex::new(EPHEMERAL_PORT_RANGE.start),
948 listeners: Mutex::new(HashMap::new()),
949 }
950 }
951
952 fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
953 self.auditor.bind(socket);
954
955 if socket.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
958 && EPHEMERAL_PORT_RANGE.contains(&socket.port())
959 {
960 return Err(Error::BindFailed);
961 }
962
963 let mut listeners = self.listeners.lock().unwrap();
965 if listeners.contains_key(&socket) {
966 return Err(Error::BindFailed);
967 }
968
969 let (sender, receiver) = mpsc::unbounded();
971 listeners.insert(socket, sender);
972 Ok(Listener {
973 auditor: self.auditor.clone(),
974 address: socket,
975 listener: receiver,
976 metrics: self.metrics.clone(),
977 })
978 }
979
980 async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
981 let dialer = {
983 let mut ephemeral = self.ephemeral.lock().unwrap();
984 let dialer = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), *ephemeral);
985 *ephemeral = ephemeral
986 .checked_add(1)
987 .expect("ephemeral port range exhausted");
988 dialer
989 };
990 self.auditor.dial(dialer, socket);
991
992 let mut sender = {
994 let listeners = self.listeners.lock().unwrap();
995 let sender = listeners.get(&socket).ok_or(Error::ConnectionFailed)?;
996 sender.clone()
997 };
998
999 let (dialer_sender, dialer_receiver) = mocks::Channel::init();
1001 let (dialee_sender, dialee_receiver) = mocks::Channel::init();
1002 sender
1003 .send((dialer, dialer_sender, dialee_receiver))
1004 .await
1005 .map_err(|_| Error::ConnectionFailed)?;
1006 Ok((
1007 Sink {
1008 metrics: self.metrics.clone(),
1009 auditor: self.auditor.clone(),
1010 me: dialer,
1011 peer: socket,
1012 sender: dialee_sender,
1013 },
1014 Stream {
1015 auditor: self.auditor.clone(),
1016 me: dialer,
1017 peer: socket,
1018 receiver: dialer_receiver,
1019 },
1020 ))
1021 }
1022}
1023
1024impl crate::Network<Listener, Sink, Stream> for Context {
1025 async fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
1026 self.networking.bind(socket)
1027 }
1028
1029 async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
1030 self.networking.dial(socket).await
1031 }
1032}
1033
1034pub struct Listener {
1036 metrics: Arc<Metrics>,
1037 auditor: Arc<Auditor>,
1038 address: SocketAddr,
1039 listener: mpsc::UnboundedReceiver<(SocketAddr, mocks::Sink, mocks::Stream)>,
1040}
1041
1042impl crate::Listener<Sink, Stream> for Listener {
1043 async fn accept(&mut self) -> Result<(SocketAddr, Sink, Stream), Error> {
1044 let (socket, sender, receiver) = self.listener.next().await.ok_or(Error::ReadFailed)?;
1045 self.auditor.accept(self.address, socket);
1046 Ok((
1047 socket,
1048 Sink {
1049 metrics: self.metrics.clone(),
1050 auditor: self.auditor.clone(),
1051 me: self.address,
1052 peer: socket,
1053 sender,
1054 },
1055 Stream {
1056 auditor: self.auditor.clone(),
1057 me: self.address,
1058 peer: socket,
1059 receiver,
1060 },
1061 ))
1062 }
1063}
1064
1065pub struct Sink {
1067 metrics: Arc<Metrics>,
1068 auditor: Arc<Auditor>,
1069 me: SocketAddr,
1070 peer: SocketAddr,
1071 sender: mocks::Sink,
1072}
1073
1074impl crate::Sink for Sink {
1075 async fn send(&mut self, msg: &[u8]) -> Result<(), Error> {
1076 self.auditor.send(self.me, self.peer, msg);
1077 self.sender.send(msg).await.map_err(|_| Error::SendFailed)?;
1078 self.metrics.network_bandwidth.inc_by(msg.len() as u64);
1079 Ok(())
1080 }
1081}
1082
1083pub struct Stream {
1085 auditor: Arc<Auditor>,
1086 me: SocketAddr,
1087 peer: SocketAddr,
1088 receiver: mocks::Stream,
1089}
1090
1091impl crate::Stream for Stream {
1092 async fn recv(&mut self, buf: &mut [u8]) -> Result<(), Error> {
1093 self.receiver
1094 .recv(buf)
1095 .await
1096 .map_err(|_| Error::RecvFailed)?;
1097 self.auditor.recv(self.me, self.peer, buf);
1098 Ok(())
1099 }
1100}
1101
1102impl RngCore for Context {
1103 fn next_u32(&mut self) -> u32 {
1104 self.executor.auditor.rand("next_u32".to_string());
1105 self.executor.rng.lock().unwrap().next_u32()
1106 }
1107
1108 fn next_u64(&mut self) -> u64 {
1109 self.executor.auditor.rand("next_u64".to_string());
1110 self.executor.rng.lock().unwrap().next_u64()
1111 }
1112
1113 fn fill_bytes(&mut self, dest: &mut [u8]) {
1114 self.executor.auditor.rand("fill_bytes".to_string());
1115 self.executor.rng.lock().unwrap().fill_bytes(dest)
1116 }
1117
1118 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1119 self.executor.auditor.rand("try_fill_bytes".to_string());
1120 self.executor.rng.lock().unwrap().try_fill_bytes(dest)
1121 }
1122}
1123
1124impl CryptoRng for Context {}
1125
1126pub struct Blob {
1128 executor: Arc<Executor>,
1129
1130 partition: String,
1131 name: Vec<u8>,
1132
1133 content: Arc<RwLock<Vec<u8>>>,
1137}
1138
1139impl Blob {
1140 fn new(executor: Arc<Executor>, partition: String, name: &[u8], content: Vec<u8>) -> Self {
1141 executor.metrics.open_blobs.inc();
1142 Self {
1143 executor,
1144 partition,
1145 name: name.into(),
1146 content: Arc::new(RwLock::new(content)),
1147 }
1148 }
1149}
1150
1151impl Clone for Blob {
1152 fn clone(&self) -> Self {
1153 self.executor.metrics.open_blobs.inc();
1155 Self {
1156 executor: self.executor.clone(),
1157 partition: self.partition.clone(),
1158 name: self.name.clone(),
1159 content: self.content.clone(),
1160 }
1161 }
1162}
1163
1164impl crate::Storage<Blob> for Context {
1165 async fn open(&self, partition: &str, name: &[u8]) -> Result<Blob, Error> {
1166 self.executor.auditor.open(partition, name);
1167 let mut partitions = self.executor.partitions.lock().unwrap();
1168 let partition_entry = partitions.entry(partition.into()).or_default();
1169 let content = partition_entry.entry(name.into()).or_default();
1170 Ok(Blob::new(
1171 self.executor.clone(),
1172 partition.into(),
1173 name,
1174 content.clone(),
1175 ))
1176 }
1177
1178 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1179 self.executor.auditor.remove(partition, name);
1180 let mut partitions = self.executor.partitions.lock().unwrap();
1181 match name {
1182 Some(name) => {
1183 partitions
1184 .get_mut(partition)
1185 .ok_or(Error::PartitionMissing(partition.into()))?
1186 .remove(name)
1187 .ok_or(Error::BlobMissing(partition.into(), hex(name)))?;
1188 }
1189 None => {
1190 partitions
1191 .remove(partition)
1192 .ok_or(Error::PartitionMissing(partition.into()))?;
1193 }
1194 }
1195 Ok(())
1196 }
1197
1198 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1199 self.executor.auditor.scan(partition);
1200 let partitions = self.executor.partitions.lock().unwrap();
1201 let partition = partitions
1202 .get(partition)
1203 .ok_or(Error::PartitionMissing(partition.into()))?;
1204 let mut results = Vec::with_capacity(partition.len());
1205 for name in partition.keys() {
1206 results.push(name.clone());
1207 }
1208 results.sort(); Ok(results)
1210 }
1211}
1212
1213impl crate::Blob for Blob {
1214 async fn len(&self) -> Result<u64, Error> {
1215 self.executor.auditor.len(&self.partition, &self.name);
1216 let content = self.content.read().unwrap();
1217 Ok(content.len() as u64)
1218 }
1219
1220 async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
1221 let buf_len = buf.len();
1222 self.executor
1223 .auditor
1224 .read_at(&self.partition, &self.name, buf_len, offset);
1225 let offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
1226 let content = self.content.read().unwrap();
1227 let content_len = content.len();
1228 if offset + buf_len > content_len {
1229 return Err(Error::BlobInsufficientLength);
1230 }
1231 buf.copy_from_slice(&content[offset..offset + buf_len]);
1232 self.executor.metrics.storage_reads.inc();
1233 self.executor
1234 .metrics
1235 .storage_read_bandwidth
1236 .inc_by(buf_len as u64);
1237 Ok(())
1238 }
1239
1240 async fn write_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
1241 self.executor
1242 .auditor
1243 .write_at(&self.partition, &self.name, buf, offset);
1244 let offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
1245 let mut content = self.content.write().unwrap();
1246 let required = offset + buf.len();
1247 if required > content.len() {
1248 content.resize(required, 0);
1249 }
1250 content[offset..offset + buf.len()].copy_from_slice(buf);
1251 self.executor.metrics.storage_writes.inc();
1252 self.executor
1253 .metrics
1254 .storage_write_bandwidth
1255 .inc_by(buf.len() as u64);
1256 Ok(())
1257 }
1258
1259 async fn truncate(&self, len: u64) -> Result<(), Error> {
1260 self.executor
1261 .auditor
1262 .truncate(&self.partition, &self.name, len);
1263 let len = len.try_into().map_err(|_| Error::OffsetOverflow)?;
1264 let mut content = self.content.write().unwrap();
1265 content.truncate(len);
1266 Ok(())
1267 }
1268
1269 async fn sync(&self) -> Result<(), Error> {
1270 let new_content = self.content.read().unwrap().clone();
1275
1276 self.executor.auditor.sync(&self.partition, &self.name);
1278 let mut partitions = self.executor.partitions.lock().unwrap();
1279 let partition = partitions
1280 .get_mut(&self.partition)
1281 .ok_or(Error::PartitionMissing(self.partition.clone()))?;
1282 let content = partition
1283 .get_mut(&self.name)
1284 .ok_or(Error::BlobMissing(self.partition.clone(), hex(&self.name)))?;
1285 *content = new_content;
1286 Ok(())
1287 }
1288
1289 async fn close(self) -> Result<(), Error> {
1290 self.executor.auditor.close(&self.partition, &self.name);
1291 self.sync().await?;
1292 Ok(())
1293 }
1294}
1295
1296impl Drop for Blob {
1297 fn drop(&mut self) {
1298 self.executor.metrics.open_blobs.dec();
1299 }
1300}
1301
1302#[cfg(test)]
1303mod tests {
1304 use super::*;
1305 use crate::{utils::run_tasks, Blob, Runner, Spawner, Storage};
1306 use commonware_macros::test_traced;
1307 use futures::task::noop_waker;
1308
1309 fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1310 let (executor, runtime, auditor) = Executor::seeded(seed);
1311 let messages = run_tasks(5, executor, runtime);
1312 (auditor.state(), messages)
1313 }
1314
1315 #[test]
1316 fn test_same_seed_same_order() {
1317 let mut outputs = Vec::new();
1319 for seed in 0..1000 {
1320 let output = run_with_seed(seed);
1321 outputs.push(output);
1322 }
1323
1324 for seed in 0..1000 {
1326 let output = run_with_seed(seed);
1327 assert_eq!(output, outputs[seed as usize]);
1328 }
1329 }
1330
1331 #[test_traced("TRACE")]
1332 fn test_different_seeds_different_order() {
1333 let output1 = run_with_seed(12345);
1334 let output2 = run_with_seed(54321);
1335 assert_ne!(output1, output2);
1336 }
1337
1338 #[test]
1339 fn test_alarm_min_heap() {
1340 let now = SystemTime::now();
1342 let alarms = vec![
1343 Alarm {
1344 time: now + Duration::new(10, 0),
1345 waker: noop_waker(),
1346 },
1347 Alarm {
1348 time: now + Duration::new(5, 0),
1349 waker: noop_waker(),
1350 },
1351 Alarm {
1352 time: now + Duration::new(15, 0),
1353 waker: noop_waker(),
1354 },
1355 Alarm {
1356 time: now + Duration::new(5, 0),
1357 waker: noop_waker(),
1358 },
1359 ];
1360 let mut heap = BinaryHeap::new();
1361 for alarm in alarms {
1362 heap.push(alarm);
1363 }
1364
1365 let mut sorted_times = Vec::new();
1367 while let Some(alarm) = heap.pop() {
1368 sorted_times.push(alarm.time);
1369 }
1370 assert_eq!(
1371 sorted_times,
1372 vec![
1373 now + Duration::new(5, 0),
1374 now + Duration::new(5, 0),
1375 now + Duration::new(10, 0),
1376 now + Duration::new(15, 0),
1377 ]
1378 );
1379 }
1380
1381 #[test]
1382 #[should_panic(expected = "runtime timeout")]
1383 fn test_timeout() {
1384 let (executor, runtime, _) = Executor::timed(Duration::from_secs(10));
1385 executor.start(async move {
1386 loop {
1387 runtime.sleep(Duration::from_secs(1)).await;
1388 }
1389 });
1390 }
1391
1392 #[test]
1393 #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1394 fn test_bad_timeout() {
1395 let cfg = Config {
1396 timeout: Some(Duration::default()),
1397 cycle: Duration::default(),
1398 ..Config::default()
1399 };
1400 Executor::init(cfg);
1401 }
1402
1403 #[test]
1404 #[should_panic(expected = "root task cannot be spawned")]
1405 fn test_spawn_root_task() {
1406 let (executor, context, _) = Executor::default();
1407 executor.start(async move {
1408 let _ = context
1409 .spawn(ROOT_TASK, async move {
1410 panic!("root task should not run");
1412 })
1413 .await;
1414 panic!("root task should not be spawned");
1415 });
1416 }
1417
1418 #[test]
1419 fn test_recover_synced_storage_persists() {
1420 let (executor1, context1, auditor1) = Executor::default();
1422 let partition = "test_partition";
1423 let name = b"test_blob";
1424 let data = b"Hello, world!".to_vec();
1425
1426 executor1.start({
1428 let context = context1.clone();
1429 let data = data.clone();
1430 async move {
1431 let blob = context.open(partition, name).await.unwrap();
1432 blob.write_at(&data, 0).await.unwrap();
1433 blob.sync().await.unwrap();
1434 }
1435 });
1436 let state1 = auditor1.state();
1437
1438 let (executor2, context2, auditor2) = context1.recover();
1440
1441 let state2 = auditor2.state();
1443 assert_eq!(state1, state2);
1444
1445 executor2.start(async move {
1447 let blob = context2.open(partition, name).await.unwrap();
1448 let len = blob.len().await.unwrap();
1449 assert_eq!(len, data.len() as u64);
1450 let mut buf = vec![0; len as usize];
1451 blob.read_at(&mut buf, 0).await.unwrap();
1452 assert_eq!(buf, data);
1453 });
1454 }
1455
1456 #[test]
1457 fn test_recover_unsynced_storage_does_not_persist() {
1458 let (executor1, context1, _) = Executor::default();
1460 let partition = "test_partition";
1461 let name = b"test_blob";
1462 let data = b"Hello, world!".to_vec();
1463
1464 executor1.start({
1466 let context = context1.clone();
1467 async move {
1468 let blob = context.open(partition, name).await.unwrap();
1469 blob.write_at(&data, 0).await.unwrap();
1470 }
1472 });
1473
1474 let (executor2, context2, _) = context1.recover();
1476
1477 executor2.start(async move {
1479 let blob = context2.open(partition, name).await.unwrap();
1480 let len = blob.len().await.unwrap();
1481 assert_eq!(len, 0);
1482 });
1483 }
1484
1485 #[test]
1486 #[should_panic(expected = "execution is not finished")]
1487 fn test_recover_before_finish_panics() {
1488 let (_, context, _) = Executor::default();
1490
1491 context.recover();
1493 }
1494
1495 #[test]
1496 #[should_panic(expected = "runtime has already been recovered")]
1497 fn test_recover_twice_panics() {
1498 let (executor, context, _) = Executor::default();
1500
1501 executor.start(async move {});
1503
1504 let cloned_context = context.clone();
1506 context.recover();
1507
1508 cloned_context.recover();
1510 }
1511}