1use crate::{mocks, utils::Signaler, Clock, Error, Handle, Signal, METRICS_PREFIX};
26use commonware_utils::{hex, SystemTimeExt};
27use futures::{
28 channel::mpsc,
29 task::{waker_ref, ArcWake},
30 SinkExt, StreamExt,
31};
32use governor::clock::{Clock as GClock, ReasonablyRealtime};
33use prometheus_client::{
34 encoding::{text::encode, EncodeLabelSet},
35 metrics::{counter::Counter, family::Family, gauge::Gauge},
36 registry::{Metric, Registry},
37};
38use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
39use sha2::{Digest, Sha256};
40use std::{
41 collections::{BinaryHeap, HashMap},
42 future::Future,
43 mem::replace,
44 net::{IpAddr, Ipv4Addr, SocketAddr},
45 ops::Range,
46 pin::Pin,
47 sync::{Arc, Mutex, RwLock},
48 task::{self, Poll, Waker},
49 time::{Duration, SystemTime, UNIX_EPOCH},
50};
51use tracing::trace;
52
53const EPHEMERAL_PORT_RANGE: Range<u16> = 32768..61000;
55
56pub type Partition = HashMap<Vec<u8>, Vec<u8>>;
58
59#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
60struct Work {
61 label: String,
62}
63
64#[derive(Debug)]
65struct Metrics {
66 tasks_spawned: Family<Work, Counter>,
67 tasks_running: Family<Work, Gauge>,
68 blocking_tasks_spawned: Family<Work, Counter>,
69 blocking_tasks_running: Family<Work, Gauge>,
70 task_polls: Family<Work, Counter>,
71
72 network_bandwidth: Counter,
73
74 open_blobs: Gauge,
75 storage_reads: Counter,
76 storage_read_bandwidth: Counter,
77 storage_writes: Counter,
78 storage_write_bandwidth: Counter,
79}
80
81impl Metrics {
82 pub fn init(registry: &mut Registry) -> Self {
83 let metrics = Self {
84 task_polls: Family::default(),
85 tasks_spawned: Family::default(),
86 tasks_running: Family::default(),
87 blocking_tasks_spawned: Family::default(),
88 blocking_tasks_running: Family::default(),
89 network_bandwidth: Counter::default(),
90 open_blobs: Gauge::default(),
91 storage_reads: Counter::default(),
92 storage_read_bandwidth: Counter::default(),
93 storage_writes: Counter::default(),
94 storage_write_bandwidth: Counter::default(),
95 };
96 registry.register(
97 "tasks_spawned",
98 "Total number of tasks spawned",
99 metrics.tasks_spawned.clone(),
100 );
101 registry.register(
102 "tasks_running",
103 "Number of tasks currently running",
104 metrics.tasks_running.clone(),
105 );
106 registry.register(
107 "blocking_tasks_spawned",
108 "Total number of blocking tasks spawned",
109 metrics.blocking_tasks_spawned.clone(),
110 );
111 registry.register(
112 "blocking_tasks_running",
113 "Number of blocking tasks currently running",
114 metrics.blocking_tasks_running.clone(),
115 );
116 registry.register(
117 "task_polls",
118 "Total number of task polls",
119 metrics.task_polls.clone(),
120 );
121 registry.register(
122 "bandwidth",
123 "Total amount of data sent over network",
124 metrics.network_bandwidth.clone(),
125 );
126 registry.register(
127 "open_blobs",
128 "Number of open blobs",
129 metrics.open_blobs.clone(),
130 );
131 registry.register(
132 "storage_reads",
133 "Total number of disk reads",
134 metrics.storage_reads.clone(),
135 );
136 registry.register(
137 "storage_read_bandwidth",
138 "Total amount of data read from disk",
139 metrics.storage_read_bandwidth.clone(),
140 );
141 registry.register(
142 "storage_writes",
143 "Total number of disk writes",
144 metrics.storage_writes.clone(),
145 );
146 registry.register(
147 "storage_write_bandwidth",
148 "Total amount of data written to disk",
149 metrics.storage_write_bandwidth.clone(),
150 );
151 metrics
152 }
153}
154
155pub struct Auditor {
157 hash: Mutex<Vec<u8>>,
158}
159
160impl Auditor {
161 fn new() -> Self {
162 Self {
163 hash: Mutex::new(Vec::new()),
164 }
165 }
166
167 fn process_task(&self, task: u128, label: &str) {
168 let mut hash = self.hash.lock().unwrap();
169 let mut hasher = Sha256::new();
170 hasher.update(&*hash);
171 hasher.update(b"process_task");
172 hasher.update(task.to_be_bytes());
173 hasher.update(label.as_bytes());
174 *hash = hasher.finalize().to_vec();
175 }
176
177 fn stop(&self, value: i32) {
178 let mut hash = self.hash.lock().unwrap();
179 let mut hasher = Sha256::new();
180 hasher.update(&*hash);
181 hasher.update(b"stop");
182 hasher.update(value.to_be_bytes());
183 *hash = hasher.finalize().to_vec();
184 }
185
186 fn stopped(&self) {
187 let mut hash = self.hash.lock().unwrap();
188 let mut hasher = Sha256::new();
189 hasher.update(&*hash);
190 hasher.update(b"stopped");
191 *hash = hasher.finalize().to_vec();
192 }
193
194 fn bind(&self, address: SocketAddr) {
195 let mut hash = self.hash.lock().unwrap();
196 let mut hasher = Sha256::new();
197 hasher.update(&*hash);
198 hasher.update(b"bind");
199 hasher.update(address.to_string().as_bytes());
200 *hash = hasher.finalize().to_vec();
201 }
202
203 fn dial(&self, dialer: SocketAddr, listener: SocketAddr) {
204 let mut hash = self.hash.lock().unwrap();
205 let mut hasher = Sha256::new();
206 hasher.update(&*hash);
207 hasher.update(b"dial");
208 hasher.update(dialer.to_string().as_bytes());
209 hasher.update(listener.to_string().as_bytes());
210 *hash = hasher.finalize().to_vec();
211 }
212
213 fn accept(&self, listener: SocketAddr, dialer: SocketAddr) {
214 let mut hash = self.hash.lock().unwrap();
215 let mut hasher = Sha256::new();
216 hasher.update(&*hash);
217 hasher.update(b"accept");
218 hasher.update(listener.to_string().as_bytes());
219 hasher.update(dialer.to_string().as_bytes());
220 *hash = hasher.finalize().to_vec();
221 }
222
223 fn send(&self, sender: SocketAddr, receiver: SocketAddr, message: &[u8]) {
224 let mut hash = self.hash.lock().unwrap();
225 let mut hasher = Sha256::new();
226 hasher.update(&*hash);
227 hasher.update(b"send");
228 hasher.update(sender.to_string().as_bytes());
229 hasher.update(receiver.to_string().as_bytes());
230 hasher.update(message);
231 *hash = hasher.finalize().to_vec();
232 }
233
234 fn recv(&self, receiver: SocketAddr, sender: SocketAddr, message: &[u8]) {
235 let mut hash = self.hash.lock().unwrap();
236 let mut hasher = Sha256::new();
237 hasher.update(&*hash);
238 hasher.update(b"recv");
239 hasher.update(receiver.to_string().as_bytes());
240 hasher.update(sender.to_string().as_bytes());
241 hasher.update(message);
242 *hash = hasher.finalize().to_vec();
243 }
244
245 fn rand(&self, method: String) {
246 let mut hash = self.hash.lock().unwrap();
247 let mut hasher = Sha256::new();
248 hasher.update(&*hash);
249 hasher.update(b"rand");
250 hasher.update(method.as_bytes());
251 *hash = hasher.finalize().to_vec();
252 }
253
254 fn open(&self, partition: &str, name: &[u8]) {
255 let mut hash = self.hash.lock().unwrap();
256 let mut hasher = Sha256::new();
257 hasher.update(&*hash);
258 hasher.update(b"open");
259 hasher.update(partition.as_bytes());
260 hasher.update(name);
261 *hash = hasher.finalize().to_vec();
262 }
263
264 fn remove(&self, partition: &str, name: Option<&[u8]>) {
265 let mut hash = self.hash.lock().unwrap();
266 let mut hasher = Sha256::new();
267 hasher.update(&*hash);
268 hasher.update(b"remove");
269 hasher.update(partition.as_bytes());
270 if let Some(name) = name {
271 hasher.update(name);
272 }
273 *hash = hasher.finalize().to_vec();
274 }
275
276 fn scan(&self, partition: &str) {
277 let mut hash = self.hash.lock().unwrap();
278 let mut hasher = Sha256::new();
279 hasher.update(&*hash);
280 hasher.update(b"scan");
281 hasher.update(partition.as_bytes());
282 *hash = hasher.finalize().to_vec();
283 }
284
285 fn len(&self, partition: &str, name: &[u8]) {
286 let mut hash = self.hash.lock().unwrap();
287 let mut hasher = Sha256::new();
288 hasher.update(&*hash);
289 hasher.update(b"len");
290 hasher.update(partition.as_bytes());
291 hasher.update(name);
292 *hash = hasher.finalize().to_vec();
293 }
294
295 fn read_at(&self, partition: &str, name: &[u8], buf: usize, offset: u64) {
296 let mut hash = self.hash.lock().unwrap();
297 let mut hasher = Sha256::new();
298 hasher.update(&*hash);
299 hasher.update(b"read_at");
300 hasher.update(partition.as_bytes());
301 hasher.update(name);
302 hasher.update(buf.to_be_bytes());
303 hasher.update(offset.to_be_bytes());
304 *hash = hasher.finalize().to_vec();
305 }
306
307 fn write_at(&self, partition: &str, name: &[u8], buf: &[u8], offset: u64) {
308 let mut hash = self.hash.lock().unwrap();
309 let mut hasher = Sha256::new();
310 hasher.update(&*hash);
311 hasher.update(b"write_at");
312 hasher.update(partition.as_bytes());
313 hasher.update(name);
314 hasher.update(buf);
315 hasher.update(offset.to_be_bytes());
316 *hash = hasher.finalize().to_vec();
317 }
318
319 fn truncate(&self, partition: &str, name: &[u8], size: u64) {
320 let mut hash = self.hash.lock().unwrap();
321 let mut hasher = Sha256::new();
322 hasher.update(&*hash);
323 hasher.update(b"truncate");
324 hasher.update(partition.as_bytes());
325 hasher.update(name);
326 hasher.update(size.to_be_bytes());
327 *hash = hasher.finalize().to_vec();
328 }
329
330 fn sync(&self, partition: &str, name: &[u8]) {
331 let mut hash = self.hash.lock().unwrap();
332 let mut hasher = Sha256::new();
333 hasher.update(&*hash);
334 hasher.update(b"sync");
335 hasher.update(partition.as_bytes());
336 hasher.update(name);
337 *hash = hasher.finalize().to_vec();
338 }
339
340 fn close(&self, partition: &str, name: &[u8]) {
341 let mut hash = self.hash.lock().unwrap();
342 let mut hasher = Sha256::new();
343 hasher.update(&*hash);
344 hasher.update(b"close");
345 hasher.update(partition.as_bytes());
346 hasher.update(name);
347 *hash = hasher.finalize().to_vec();
348 }
349
350 fn register(&self, name: &str, help: &str) {
351 let mut hash = self.hash.lock().unwrap();
352 let mut hasher = Sha256::new();
353 hasher.update(&*hash);
354 hasher.update(b"register");
355 hasher.update(name.as_bytes());
356 hasher.update(help.as_bytes());
357 *hash = hasher.finalize().to_vec();
358 }
359
360 fn encode(&self) {
361 let mut hash = self.hash.lock().unwrap();
362 let mut hasher = Sha256::new();
363 hasher.update(&*hash);
364 hasher.update(b"encode");
365 *hash = hasher.finalize().to_vec();
366 }
367
368 pub fn state(&self) -> String {
373 let hash = self.hash.lock().unwrap().clone();
374 hex(&hash)
375 }
376}
377
378struct Task {
379 id: u128,
380 label: String,
381
382 tasks: Arc<Tasks>,
383
384 root: bool,
385 future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
386
387 completed: Mutex<bool>,
388}
389
390impl ArcWake for Task {
391 fn wake_by_ref(arc_self: &Arc<Self>) {
392 arc_self.tasks.enqueue(arc_self.clone());
393 }
394}
395
396struct Tasks {
397 counter: Mutex<u128>,
398 queue: Mutex<Vec<Arc<Task>>>,
399}
400
401impl Tasks {
402 fn register(
403 arc_self: &Arc<Self>,
404 label: &str,
405 root: bool,
406 future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
407 ) {
408 let mut queue = arc_self.queue.lock().unwrap();
409 let id = {
410 let mut l = arc_self.counter.lock().unwrap();
411 let old = *l;
412 *l = l.checked_add(1).expect("task counter overflow");
413 old
414 };
415 queue.push(Arc::new(Task {
416 id,
417 label: label.to_string(),
418 root,
419 future: Mutex::new(future),
420 tasks: arc_self.clone(),
421 completed: Mutex::new(false),
422 }));
423 }
424
425 fn enqueue(&self, task: Arc<Task>) {
426 let mut queue = self.queue.lock().unwrap();
427 queue.push(task);
428 }
429
430 fn drain(&self) -> Vec<Arc<Task>> {
431 let mut queue = self.queue.lock().unwrap();
432 let len = queue.len();
433 replace(&mut *queue, Vec::with_capacity(len))
434 }
435
436 fn len(&self) -> usize {
437 self.queue.lock().unwrap().len()
438 }
439}
440
441#[derive(Clone)]
443pub struct Config {
444 pub seed: u64,
446
447 pub cycle: Duration,
450
451 pub timeout: Option<Duration>,
453}
454
455impl Default for Config {
456 fn default() -> Self {
457 Self {
458 seed: 42,
459 cycle: Duration::from_millis(1),
460 timeout: None,
461 }
462 }
463}
464
465pub struct Executor {
467 registry: Mutex<Registry>,
468 cycle: Duration,
469 deadline: Option<SystemTime>,
470 metrics: Arc<Metrics>,
471 auditor: Arc<Auditor>,
472 rng: Mutex<StdRng>,
473 time: Mutex<SystemTime>,
474 tasks: Arc<Tasks>,
475 sleeping: Mutex<BinaryHeap<Alarm>>,
476 partitions: Mutex<HashMap<String, Partition>>,
477 signaler: Mutex<Signaler>,
478 signal: Signal,
479 finished: Mutex<bool>,
480 recovered: Mutex<bool>,
481}
482
483impl Executor {
484 pub fn init(cfg: Config) -> (Runner, Context, Arc<Auditor>) {
486 if cfg.timeout.is_some() && cfg.cycle == Duration::default() {
488 panic!("cycle duration must be non-zero when timeout is set");
489 }
490
491 let mut registry = Registry::default();
493 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
494
495 let metrics = Arc::new(Metrics::init(runtime_registry));
497 let auditor = Arc::new(Auditor::new());
498 let start_time = UNIX_EPOCH;
499 let deadline = cfg
500 .timeout
501 .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
502 let (signaler, signal) = Signaler::new();
503 let executor = Arc::new(Self {
504 registry: Mutex::new(registry),
505 cycle: cfg.cycle,
506 deadline,
507 metrics: metrics.clone(),
508 auditor: auditor.clone(),
509 rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
510 time: Mutex::new(start_time),
511 tasks: Arc::new(Tasks {
512 queue: Mutex::new(Vec::new()),
513 counter: Mutex::new(0),
514 }),
515 sleeping: Mutex::new(BinaryHeap::new()),
516 partitions: Mutex::new(HashMap::new()),
517 signaler: Mutex::new(signaler),
518 signal,
519 finished: Mutex::new(false),
520 recovered: Mutex::new(false),
521 });
522 (
523 Runner {
524 executor: executor.clone(),
525 },
526 Context {
527 label: String::new(),
528 spawned: false,
529 executor,
530 networking: Arc::new(Networking::new(metrics, auditor.clone())),
531 },
532 auditor,
533 )
534 }
535
536 pub fn seeded(seed: u64) -> (Runner, Context, Arc<Auditor>) {
539 let cfg = Config {
540 seed,
541 ..Config::default()
542 };
543 Self::init(cfg)
544 }
545
546 pub fn timed(timeout: Duration) -> (Runner, Context, Arc<Auditor>) {
549 let cfg = Config {
550 timeout: Some(timeout),
551 ..Config::default()
552 };
553 Self::init(cfg)
554 }
555
556 #[allow(clippy::should_implement_trait)]
559 pub fn default() -> (Runner, Context, Arc<Auditor>) {
560 Self::init(Config::default())
561 }
562}
563
564pub struct Runner {
566 executor: Arc<Executor>,
567}
568
569impl crate::Runner for Runner {
570 fn start<F>(self, f: F) -> F::Output
571 where
572 F: Future + Send + 'static,
573 F::Output: Send + 'static,
574 {
575 let output = Arc::new(Mutex::new(None));
577 Tasks::register(
578 &self.executor.tasks,
579 "",
580 true,
581 Box::pin({
582 let output = output.clone();
583 async move {
584 *output.lock().unwrap() = Some(f.await);
585 }
586 }),
587 );
588
589 let mut iter = 0;
591 loop {
592 {
594 let current = self.executor.time.lock().unwrap();
595 if let Some(deadline) = self.executor.deadline {
596 if *current >= deadline {
597 panic!("runtime timeout");
598 }
599 }
600 }
601
602 let mut tasks = self.executor.tasks.drain();
604
605 {
607 let mut rng = self.executor.rng.lock().unwrap();
608 tasks.shuffle(&mut *rng);
609 }
610
611 trace!(iter, tasks = tasks.len(), "starting loop");
617 for task in tasks {
618 self.executor.auditor.process_task(task.id, &task.label);
620
621 if *task.completed.lock().unwrap() {
623 trace!(id = task.id, "skipping already completed task");
624 continue;
625 }
626 trace!(id = task.id, "processing task");
627
628 let waker = waker_ref(&task);
630 let mut context = task::Context::from_waker(&waker);
631 let mut future = task.future.lock().unwrap();
632
633 self.executor
635 .metrics
636 .task_polls
637 .get_or_create(&Work {
638 label: task.label.clone(),
639 })
640 .inc();
641
642 let pending = future.as_mut().poll(&mut context).is_pending();
645 if pending {
646 trace!(id = task.id, "task is still pending");
647 continue;
648 }
649
650 *task.completed.lock().unwrap() = true;
652 trace!(id = task.id, "task is complete");
653
654 if task.root {
656 *self.executor.finished.lock().unwrap() = true;
657 return output.lock().unwrap().take().unwrap();
658 }
659 }
660
661 let mut current;
666 {
667 let mut time = self.executor.time.lock().unwrap();
668 *time = time
669 .checked_add(self.executor.cycle)
670 .expect("executor time overflowed");
671 current = *time;
672 }
673 trace!(now = current.epoch_millis(), "time advanced",);
674
675 if self.executor.tasks.len() == 0 {
677 let mut skip = None;
678 {
679 let sleeping = self.executor.sleeping.lock().unwrap();
680 if let Some(next) = sleeping.peek() {
681 if next.time > current {
682 skip = Some(next.time);
683 }
684 }
685 }
686 if skip.is_some() {
687 {
688 let mut time = self.executor.time.lock().unwrap();
689 *time = skip.unwrap();
690 current = *time;
691 }
692 trace!(now = current.epoch_millis(), "time skipped",);
693 }
694 }
695
696 let mut to_wake = Vec::new();
698 let mut remaining;
699 {
700 let mut sleeping = self.executor.sleeping.lock().unwrap();
701 while let Some(next) = sleeping.peek() {
702 if next.time <= current {
703 let sleeper = sleeping.pop().unwrap();
704 to_wake.push(sleeper.waker);
705 } else {
706 break;
707 }
708 }
709 remaining = sleeping.len();
710 }
711 for waker in to_wake {
712 waker.wake();
713 }
714
715 remaining += self.executor.tasks.len();
717
718 if remaining == 0 {
721 panic!("runtime stalled");
722 }
723 iter += 1;
724 }
725 }
726}
727
728pub struct Context {
732 label: String,
733 spawned: bool,
734 executor: Arc<Executor>,
735 networking: Arc<Networking>,
736}
737
738impl Context {
739 pub fn recover(self) -> (Runner, Self, Arc<Auditor>) {
751 if !*self.executor.finished.lock().unwrap() {
753 panic!("execution is not finished");
754 }
755
756 {
758 let mut recovered = self.executor.recovered.lock().unwrap();
759 if *recovered {
760 panic!("runtime has already been recovered");
761 }
762 *recovered = true;
763 }
764
765 let mut registry = Registry::default();
767 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
768 let metrics = Arc::new(Metrics::init(runtime_registry));
769
770 let auditor = self.executor.auditor.clone();
772 let (signaler, signal) = Signaler::new();
773 let executor = Arc::new(Executor {
774 cycle: self.executor.cycle,
776 deadline: self.executor.deadline,
777 auditor: auditor.clone(),
778 rng: Mutex::new(self.executor.rng.lock().unwrap().clone()),
779 time: Mutex::new(*self.executor.time.lock().unwrap()),
780 partitions: Mutex::new(self.executor.partitions.lock().unwrap().clone()),
781
782 registry: Mutex::new(registry),
784 metrics: metrics.clone(),
785 tasks: Arc::new(Tasks {
786 queue: Mutex::new(Vec::new()),
787 counter: Mutex::new(0),
788 }),
789 sleeping: Mutex::new(BinaryHeap::new()),
790 signaler: Mutex::new(signaler),
791 signal,
792 finished: Mutex::new(false),
793 recovered: Mutex::new(false),
794 });
795 (
796 Runner {
797 executor: executor.clone(),
798 },
799 Self {
800 label: String::new(),
801 spawned: false,
802 executor,
803 networking: Arc::new(Networking::new(metrics, auditor.clone())),
804 },
805 auditor,
806 )
807 }
808}
809
810impl Clone for Context {
811 fn clone(&self) -> Self {
812 Self {
813 label: self.label.clone(),
814 spawned: false,
815 executor: self.executor.clone(),
816 networking: self.networking.clone(),
817 }
818 }
819}
820
821impl crate::Spawner for Context {
822 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
823 where
824 F: FnOnce(Self) -> Fut + Send + 'static,
825 Fut: Future<Output = T> + Send + 'static,
826 T: Send + 'static,
827 {
828 assert!(!self.spawned, "already spawned");
830
831 let label = self.label.clone();
833 let work = Work {
834 label: label.clone(),
835 };
836 self.executor
837 .metrics
838 .tasks_spawned
839 .get_or_create(&work)
840 .inc();
841 let gauge = self
842 .executor
843 .metrics
844 .tasks_running
845 .get_or_create(&work)
846 .clone();
847
848 let executor = self.executor.clone();
850 let future = f(self);
851 let (f, handle) = Handle::init(future, gauge, false);
852
853 Tasks::register(&executor.tasks, &label, false, Box::pin(f));
855 handle
856 }
857
858 fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
859 where
860 F: Future<Output = T> + Send + 'static,
861 T: Send + 'static,
862 {
863 assert!(!self.spawned, "already spawned");
865 self.spawned = true;
866
867 let work = Work {
869 label: self.label.clone(),
870 };
871 self.executor
872 .metrics
873 .tasks_spawned
874 .get_or_create(&work)
875 .inc();
876 let gauge = self
877 .executor
878 .metrics
879 .tasks_running
880 .get_or_create(&work)
881 .clone();
882
883 let label = self.label.clone();
885 let executor = self.executor.clone();
886 move |f: F| {
887 let (f, handle) = Handle::init(f, gauge, false);
888
889 Tasks::register(&executor.tasks, &label, false, Box::pin(f));
891 handle
892 }
893 }
894
895 fn spawn_blocking<F, T>(self, f: F) -> Handle<T>
896 where
897 F: FnOnce() -> T + Send + 'static,
898 T: Send + 'static,
899 {
900 assert!(!self.spawned, "already spawned");
902
903 let work = Work {
905 label: self.label.clone(),
906 };
907 self.executor
908 .metrics
909 .blocking_tasks_spawned
910 .get_or_create(&work)
911 .inc();
912 let gauge = self
913 .executor
914 .metrics
915 .blocking_tasks_running
916 .get_or_create(&work)
917 .clone();
918
919 let (f, handle) = Handle::init_blocking(f, gauge, false);
921
922 let f = async move { f() };
924 Tasks::register(&self.executor.tasks, &self.label, false, Box::pin(f));
925 handle
926 }
927
928 fn stop(&self, value: i32) {
929 self.executor.auditor.stop(value);
930 self.executor.signaler.lock().unwrap().signal(value);
931 }
932
933 fn stopped(&self) -> Signal {
934 self.executor.auditor.stopped();
935 self.executor.signal.clone()
936 }
937}
938
939impl crate::Metrics for Context {
940 fn with_label(&self, label: &str) -> Self {
941 let label = {
942 let prefix = self.label.clone();
943 if prefix.is_empty() {
944 label.to_string()
945 } else {
946 format!("{}_{}", prefix, label)
947 }
948 };
949 assert!(
950 !label.starts_with(METRICS_PREFIX),
951 "using runtime label is not allowed"
952 );
953 Self {
954 label,
955 spawned: false,
956 executor: self.executor.clone(),
957 networking: self.networking.clone(),
958 }
959 }
960
961 fn label(&self) -> String {
962 self.label.clone()
963 }
964
965 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
966 let name = name.into();
968 let help = help.into();
969
970 self.executor.auditor.register(&name, &help);
972 let prefixed_name = {
973 let prefix = &self.label;
974 if prefix.is_empty() {
975 name
976 } else {
977 format!("{}_{}", *prefix, name)
978 }
979 };
980 self.executor
981 .registry
982 .lock()
983 .unwrap()
984 .register(prefixed_name, help, metric)
985 }
986
987 fn encode(&self) -> String {
988 self.executor.auditor.encode();
989 let mut buffer = String::new();
990 encode(&mut buffer, &self.executor.registry.lock().unwrap()).expect("encoding failed");
991 buffer
992 }
993}
994
995struct Sleeper {
996 executor: Arc<Executor>,
997 time: SystemTime,
998 registered: bool,
999}
1000
1001struct Alarm {
1002 time: SystemTime,
1003 waker: Waker,
1004}
1005
1006impl PartialEq for Alarm {
1007 fn eq(&self, other: &Self) -> bool {
1008 self.time.eq(&other.time)
1009 }
1010}
1011
1012impl Eq for Alarm {}
1013
1014impl PartialOrd for Alarm {
1015 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1016 Some(self.cmp(other))
1017 }
1018}
1019
1020impl Ord for Alarm {
1021 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1022 other.time.cmp(&self.time)
1024 }
1025}
1026
1027impl Future for Sleeper {
1028 type Output = ();
1029
1030 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1031 {
1032 let current_time = *self.executor.time.lock().unwrap();
1033 if current_time >= self.time {
1034 return Poll::Ready(());
1035 }
1036 }
1037 if !self.registered {
1038 self.registered = true;
1039 self.executor.sleeping.lock().unwrap().push(Alarm {
1040 time: self.time,
1041 waker: cx.waker().clone(),
1042 });
1043 }
1044 Poll::Pending
1045 }
1046}
1047
1048impl Clock for Context {
1049 fn current(&self) -> SystemTime {
1050 *self.executor.time.lock().unwrap()
1051 }
1052
1053 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
1054 let time = self
1055 .current()
1056 .checked_add(duration)
1057 .expect("overflow when setting wake time");
1058 Sleeper {
1059 executor: self.executor.clone(),
1060
1061 time,
1062 registered: false,
1063 }
1064 }
1065
1066 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
1067 Sleeper {
1068 executor: self.executor.clone(),
1069
1070 time: deadline,
1071 registered: false,
1072 }
1073 }
1074}
1075
1076impl GClock for Context {
1077 type Instant = SystemTime;
1078
1079 fn now(&self) -> Self::Instant {
1080 self.current()
1081 }
1082}
1083
1084impl ReasonablyRealtime for Context {}
1085
1086type Dialable = mpsc::UnboundedSender<(
1087 SocketAddr,
1088 mocks::Sink, mocks::Stream, )>;
1091
1092struct Networking {
1099 metrics: Arc<Metrics>,
1100 auditor: Arc<Auditor>,
1101 ephemeral: Mutex<u16>,
1102 listeners: Mutex<HashMap<SocketAddr, Dialable>>,
1103}
1104
1105impl Networking {
1106 fn new(metrics: Arc<Metrics>, auditor: Arc<Auditor>) -> Self {
1107 Self {
1108 metrics,
1109 auditor,
1110 ephemeral: Mutex::new(EPHEMERAL_PORT_RANGE.start),
1111 listeners: Mutex::new(HashMap::new()),
1112 }
1113 }
1114
1115 fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
1116 self.auditor.bind(socket);
1117
1118 if socket.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
1121 && EPHEMERAL_PORT_RANGE.contains(&socket.port())
1122 {
1123 return Err(Error::BindFailed);
1124 }
1125
1126 let mut listeners = self.listeners.lock().unwrap();
1128 if listeners.contains_key(&socket) {
1129 return Err(Error::BindFailed);
1130 }
1131
1132 let (sender, receiver) = mpsc::unbounded();
1134 listeners.insert(socket, sender);
1135 Ok(Listener {
1136 auditor: self.auditor.clone(),
1137 address: socket,
1138 listener: receiver,
1139 metrics: self.metrics.clone(),
1140 })
1141 }
1142
1143 async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
1144 let dialer = {
1146 let mut ephemeral = self.ephemeral.lock().unwrap();
1147 let dialer = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), *ephemeral);
1148 *ephemeral = ephemeral
1149 .checked_add(1)
1150 .expect("ephemeral port range exhausted");
1151 dialer
1152 };
1153 self.auditor.dial(dialer, socket);
1154
1155 let mut sender = {
1157 let listeners = self.listeners.lock().unwrap();
1158 let sender = listeners.get(&socket).ok_or(Error::ConnectionFailed)?;
1159 sender.clone()
1160 };
1161
1162 let (dialer_sender, dialer_receiver) = mocks::Channel::init();
1164 let (listener_sender, listener_receiver) = mocks::Channel::init();
1165 sender
1166 .send((dialer, dialer_sender, listener_receiver))
1167 .await
1168 .map_err(|_| Error::ConnectionFailed)?;
1169 Ok((
1170 Sink {
1171 metrics: self.metrics.clone(),
1172 auditor: self.auditor.clone(),
1173 me: dialer,
1174 peer: socket,
1175 sender: listener_sender,
1176 },
1177 Stream {
1178 auditor: self.auditor.clone(),
1179 me: dialer,
1180 peer: socket,
1181 receiver: dialer_receiver,
1182 },
1183 ))
1184 }
1185}
1186
1187impl crate::Network<Listener, Sink, Stream> for Context {
1188 async fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
1189 self.networking.bind(socket)
1190 }
1191
1192 async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
1193 self.networking.dial(socket).await
1194 }
1195}
1196
1197pub struct Listener {
1199 metrics: Arc<Metrics>,
1200 auditor: Arc<Auditor>,
1201 address: SocketAddr,
1202 listener: mpsc::UnboundedReceiver<(SocketAddr, mocks::Sink, mocks::Stream)>,
1203}
1204
1205impl crate::Listener<Sink, Stream> for Listener {
1206 async fn accept(&mut self) -> Result<(SocketAddr, Sink, Stream), Error> {
1207 let (socket, sender, receiver) = self.listener.next().await.ok_or(Error::ReadFailed)?;
1208 self.auditor.accept(self.address, socket);
1209 Ok((
1210 socket,
1211 Sink {
1212 metrics: self.metrics.clone(),
1213 auditor: self.auditor.clone(),
1214 me: self.address,
1215 peer: socket,
1216 sender,
1217 },
1218 Stream {
1219 auditor: self.auditor.clone(),
1220 me: self.address,
1221 peer: socket,
1222 receiver,
1223 },
1224 ))
1225 }
1226}
1227
1228pub struct Sink {
1230 metrics: Arc<Metrics>,
1231 auditor: Arc<Auditor>,
1232 me: SocketAddr,
1233 peer: SocketAddr,
1234 sender: mocks::Sink,
1235}
1236
1237impl crate::Sink for Sink {
1238 async fn send(&mut self, msg: &[u8]) -> Result<(), Error> {
1239 self.auditor.send(self.me, self.peer, msg);
1240 self.sender.send(msg).await.map_err(|_| Error::SendFailed)?;
1241 self.metrics.network_bandwidth.inc_by(msg.len() as u64);
1242 Ok(())
1243 }
1244}
1245
1246pub struct Stream {
1248 auditor: Arc<Auditor>,
1249 me: SocketAddr,
1250 peer: SocketAddr,
1251 receiver: mocks::Stream,
1252}
1253
1254impl crate::Stream for Stream {
1255 async fn recv(&mut self, buf: &mut [u8]) -> Result<(), Error> {
1256 self.receiver
1257 .recv(buf)
1258 .await
1259 .map_err(|_| Error::RecvFailed)?;
1260 self.auditor.recv(self.me, self.peer, buf);
1261 Ok(())
1262 }
1263}
1264
1265impl RngCore for Context {
1266 fn next_u32(&mut self) -> u32 {
1267 self.executor.auditor.rand("next_u32".to_string());
1268 self.executor.rng.lock().unwrap().next_u32()
1269 }
1270
1271 fn next_u64(&mut self) -> u64 {
1272 self.executor.auditor.rand("next_u64".to_string());
1273 self.executor.rng.lock().unwrap().next_u64()
1274 }
1275
1276 fn fill_bytes(&mut self, dest: &mut [u8]) {
1277 self.executor.auditor.rand("fill_bytes".to_string());
1278 self.executor.rng.lock().unwrap().fill_bytes(dest)
1279 }
1280
1281 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1282 self.executor.auditor.rand("try_fill_bytes".to_string());
1283 self.executor.rng.lock().unwrap().try_fill_bytes(dest)
1284 }
1285}
1286
1287impl CryptoRng for Context {}
1288
1289pub struct Blob {
1291 executor: Arc<Executor>,
1292
1293 partition: String,
1294 name: Vec<u8>,
1295
1296 content: Arc<RwLock<Vec<u8>>>,
1300}
1301
1302impl Blob {
1303 fn new(executor: Arc<Executor>, partition: String, name: &[u8], content: Vec<u8>) -> Self {
1304 executor.metrics.open_blobs.inc();
1305 Self {
1306 executor,
1307 partition,
1308 name: name.into(),
1309 content: Arc::new(RwLock::new(content)),
1310 }
1311 }
1312}
1313
1314impl Clone for Blob {
1315 fn clone(&self) -> Self {
1316 self.executor.metrics.open_blobs.inc();
1318 Self {
1319 executor: self.executor.clone(),
1320 partition: self.partition.clone(),
1321 name: self.name.clone(),
1322 content: self.content.clone(),
1323 }
1324 }
1325}
1326
1327impl crate::Storage<Blob> for Context {
1328 async fn open(&self, partition: &str, name: &[u8]) -> Result<Blob, Error> {
1329 self.executor.auditor.open(partition, name);
1330 let mut partitions = self.executor.partitions.lock().unwrap();
1331 let partition_entry = partitions.entry(partition.into()).or_default();
1332 let content = partition_entry.entry(name.into()).or_default();
1333 Ok(Blob::new(
1334 self.executor.clone(),
1335 partition.into(),
1336 name,
1337 content.clone(),
1338 ))
1339 }
1340
1341 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1342 self.executor.auditor.remove(partition, name);
1343 let mut partitions = self.executor.partitions.lock().unwrap();
1344 match name {
1345 Some(name) => {
1346 partitions
1347 .get_mut(partition)
1348 .ok_or(Error::PartitionMissing(partition.into()))?
1349 .remove(name)
1350 .ok_or(Error::BlobMissing(partition.into(), hex(name)))?;
1351 }
1352 None => {
1353 partitions
1354 .remove(partition)
1355 .ok_or(Error::PartitionMissing(partition.into()))?;
1356 }
1357 }
1358 Ok(())
1359 }
1360
1361 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1362 self.executor.auditor.scan(partition);
1363 let partitions = self.executor.partitions.lock().unwrap();
1364 let partition = partitions
1365 .get(partition)
1366 .ok_or(Error::PartitionMissing(partition.into()))?;
1367 let mut results = Vec::with_capacity(partition.len());
1368 for name in partition.keys() {
1369 results.push(name.clone());
1370 }
1371 results.sort(); Ok(results)
1373 }
1374}
1375
1376impl crate::Blob for Blob {
1377 async fn len(&self) -> Result<u64, Error> {
1378 self.executor.auditor.len(&self.partition, &self.name);
1379 let content = self.content.read().unwrap();
1380 Ok(content.len() as u64)
1381 }
1382
1383 async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
1384 let buf_len = buf.len();
1385 self.executor
1386 .auditor
1387 .read_at(&self.partition, &self.name, buf_len, offset);
1388 let offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
1389 let content = self.content.read().unwrap();
1390 let content_len = content.len();
1391 if offset + buf_len > content_len {
1392 return Err(Error::BlobInsufficientLength);
1393 }
1394 buf.copy_from_slice(&content[offset..offset + buf_len]);
1395 self.executor.metrics.storage_reads.inc();
1396 self.executor
1397 .metrics
1398 .storage_read_bandwidth
1399 .inc_by(buf_len as u64);
1400 Ok(())
1401 }
1402
1403 async fn write_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
1404 self.executor
1405 .auditor
1406 .write_at(&self.partition, &self.name, buf, offset);
1407 let offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
1408 let mut content = self.content.write().unwrap();
1409 let required = offset + buf.len();
1410 if required > content.len() {
1411 content.resize(required, 0);
1412 }
1413 content[offset..offset + buf.len()].copy_from_slice(buf);
1414 self.executor.metrics.storage_writes.inc();
1415 self.executor
1416 .metrics
1417 .storage_write_bandwidth
1418 .inc_by(buf.len() as u64);
1419 Ok(())
1420 }
1421
1422 async fn truncate(&self, len: u64) -> Result<(), Error> {
1423 self.executor
1424 .auditor
1425 .truncate(&self.partition, &self.name, len);
1426 let len = len.try_into().map_err(|_| Error::OffsetOverflow)?;
1427 let mut content = self.content.write().unwrap();
1428 content.truncate(len);
1429 Ok(())
1430 }
1431
1432 async fn sync(&self) -> Result<(), Error> {
1433 let new_content = self.content.read().unwrap().clone();
1438
1439 self.executor.auditor.sync(&self.partition, &self.name);
1441 let mut partitions = self.executor.partitions.lock().unwrap();
1442 let partition = partitions
1443 .get_mut(&self.partition)
1444 .ok_or(Error::PartitionMissing(self.partition.clone()))?;
1445 let content = partition
1446 .get_mut(&self.name)
1447 .ok_or(Error::BlobMissing(self.partition.clone(), hex(&self.name)))?;
1448 *content = new_content;
1449 Ok(())
1450 }
1451
1452 async fn close(self) -> Result<(), Error> {
1453 self.executor.auditor.close(&self.partition, &self.name);
1454 self.sync().await?;
1455 Ok(())
1456 }
1457}
1458
1459impl Drop for Blob {
1460 fn drop(&mut self) {
1461 self.executor.metrics.open_blobs.dec();
1462 }
1463}
1464
1465#[cfg(test)]
1466mod tests {
1467 use super::*;
1468 use crate::{utils::run_tasks, Blob, Runner, Storage};
1469 use commonware_macros::test_traced;
1470 use futures::task::noop_waker;
1471
1472 fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1473 let (executor, context, auditor) = Executor::seeded(seed);
1474 let messages = run_tasks(5, executor, context);
1475 (auditor.state(), messages)
1476 }
1477
1478 #[test]
1479 fn test_same_seed_same_order() {
1480 let mut outputs = Vec::new();
1482 for seed in 0..1000 {
1483 let output = run_with_seed(seed);
1484 outputs.push(output);
1485 }
1486
1487 for seed in 0..1000 {
1489 let output = run_with_seed(seed);
1490 assert_eq!(output, outputs[seed as usize]);
1491 }
1492 }
1493
1494 #[test_traced("TRACE")]
1495 fn test_different_seeds_different_order() {
1496 let output1 = run_with_seed(12345);
1497 let output2 = run_with_seed(54321);
1498 assert_ne!(output1, output2);
1499 }
1500
1501 #[test]
1502 fn test_alarm_min_heap() {
1503 let now = SystemTime::now();
1505 let alarms = vec![
1506 Alarm {
1507 time: now + Duration::new(10, 0),
1508 waker: noop_waker(),
1509 },
1510 Alarm {
1511 time: now + Duration::new(5, 0),
1512 waker: noop_waker(),
1513 },
1514 Alarm {
1515 time: now + Duration::new(15, 0),
1516 waker: noop_waker(),
1517 },
1518 Alarm {
1519 time: now + Duration::new(5, 0),
1520 waker: noop_waker(),
1521 },
1522 ];
1523 let mut heap = BinaryHeap::new();
1524 for alarm in alarms {
1525 heap.push(alarm);
1526 }
1527
1528 let mut sorted_times = Vec::new();
1530 while let Some(alarm) = heap.pop() {
1531 sorted_times.push(alarm.time);
1532 }
1533 assert_eq!(
1534 sorted_times,
1535 vec![
1536 now + Duration::new(5, 0),
1537 now + Duration::new(5, 0),
1538 now + Duration::new(10, 0),
1539 now + Duration::new(15, 0),
1540 ]
1541 );
1542 }
1543
1544 #[test]
1545 #[should_panic(expected = "runtime timeout")]
1546 fn test_timeout() {
1547 let (executor, context, _) = Executor::timed(Duration::from_secs(10));
1548 executor.start(async move {
1549 loop {
1550 context.sleep(Duration::from_secs(1)).await;
1551 }
1552 });
1553 }
1554
1555 #[test]
1556 #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1557 fn test_bad_timeout() {
1558 let cfg = Config {
1559 timeout: Some(Duration::default()),
1560 cycle: Duration::default(),
1561 ..Config::default()
1562 };
1563 Executor::init(cfg);
1564 }
1565
1566 #[test]
1567 fn test_recover_synced_storage_persists() {
1568 let (executor1, context1, auditor1) = Executor::default();
1570 let partition = "test_partition";
1571 let name = b"test_blob";
1572 let data = b"Hello, world!".to_vec();
1573
1574 executor1.start({
1576 let context = context1.clone();
1577 let data = data.clone();
1578 async move {
1579 let blob = context.open(partition, name).await.unwrap();
1580 blob.write_at(&data, 0).await.unwrap();
1581 blob.sync().await.unwrap();
1582 }
1583 });
1584 let state1 = auditor1.state();
1585
1586 let (executor2, context2, auditor2) = context1.recover();
1588
1589 let state2 = auditor2.state();
1591 assert_eq!(state1, state2);
1592
1593 executor2.start(async move {
1595 let blob = context2.open(partition, name).await.unwrap();
1596 let len = blob.len().await.unwrap();
1597 assert_eq!(len, data.len() as u64);
1598 let mut buf = vec![0; len as usize];
1599 blob.read_at(&mut buf, 0).await.unwrap();
1600 assert_eq!(buf, data);
1601 });
1602 }
1603
1604 #[test]
1605 fn test_recover_unsynced_storage_does_not_persist() {
1606 let (executor1, context1, _) = Executor::default();
1608 let partition = "test_partition";
1609 let name = b"test_blob";
1610 let data = b"Hello, world!".to_vec();
1611
1612 executor1.start({
1614 let context = context1.clone();
1615 async move {
1616 let blob = context.open(partition, name).await.unwrap();
1617 blob.write_at(&data, 0).await.unwrap();
1618 }
1620 });
1621
1622 let (executor2, context2, _) = context1.recover();
1624
1625 executor2.start(async move {
1627 let blob = context2.open(partition, name).await.unwrap();
1628 let len = blob.len().await.unwrap();
1629 assert_eq!(len, 0);
1630 });
1631 }
1632
1633 #[test]
1634 #[should_panic(expected = "execution is not finished")]
1635 fn test_recover_before_finish_panics() {
1636 let (_, context, _) = Executor::default();
1638
1639 context.recover();
1641 }
1642
1643 #[test]
1644 #[should_panic(expected = "runtime has already been recovered")]
1645 fn test_recover_twice_panics() {
1646 let (executor, context, _) = Executor::default();
1648
1649 executor.start(async move {});
1651
1652 let cloned_context = context.clone();
1654 context.recover();
1655
1656 cloned_context.recover();
1658 }
1659}