1use crate::{utils::Signaler, Clock, Error, Handle, Signal, METRICS_PREFIX};
2use commonware_utils::{from_hex, hex};
3use governor::clock::{Clock as GClock, ReasonablyRealtime};
4use prometheus_client::{
5 encoding::{text::encode, EncodeLabelSet},
6 metrics::{counter::Counter, family::Family, gauge::Gauge},
7 registry::{Metric, Registry},
8};
9use rand::{rngs::OsRng, CryptoRng, RngCore};
10use std::{
11 env,
12 future::Future,
13 io::{self, SeekFrom},
14 net::SocketAddr,
15 path::PathBuf,
16 sync::{Arc, Mutex},
17 time::{Duration, SystemTime},
18};
19use tokio::{
20 fs,
21 io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
22 net::{tcp::OwnedReadHalf, tcp::OwnedWriteHalf, TcpListener, TcpStream},
23 runtime::{Builder, Runtime},
24 sync::Mutex as AsyncMutex,
25 time::timeout,
26};
27use tracing::warn;
28
29#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
30struct Work {
31 label: String,
32}
33
34#[derive(Debug)]
35struct Metrics {
36 tasks_spawned: Family<Work, Counter>,
37 tasks_running: Family<Work, Gauge>,
38 blocking_tasks_spawned: Family<Work, Counter>,
39 blocking_tasks_running: Family<Work, Gauge>,
40
41 inbound_connections: Counter,
44 outbound_connections: Counter,
45 inbound_bandwidth: Counter,
46 outbound_bandwidth: Counter,
47
48 open_blobs: Gauge,
49 storage_reads: Counter,
50 storage_read_bytes: Counter,
51 storage_writes: Counter,
52 storage_write_bytes: Counter,
53}
54
55impl Metrics {
56 pub fn init(registry: &mut Registry) -> Self {
57 let metrics = Self {
58 tasks_spawned: Family::default(),
59 tasks_running: Family::default(),
60 blocking_tasks_spawned: Family::default(),
61 blocking_tasks_running: Family::default(),
62 inbound_connections: Counter::default(),
63 outbound_connections: Counter::default(),
64 inbound_bandwidth: Counter::default(),
65 outbound_bandwidth: Counter::default(),
66 open_blobs: Gauge::default(),
67 storage_reads: Counter::default(),
68 storage_read_bytes: Counter::default(),
69 storage_writes: Counter::default(),
70 storage_write_bytes: Counter::default(),
71 };
72 registry.register(
73 "tasks_spawned",
74 "Total number of tasks spawned",
75 metrics.tasks_spawned.clone(),
76 );
77 registry.register(
78 "tasks_running",
79 "Number of tasks currently running",
80 metrics.tasks_running.clone(),
81 );
82 registry.register(
83 "blocking_tasks_spawned",
84 "Total number of blocking tasks spawned",
85 metrics.blocking_tasks_spawned.clone(),
86 );
87 registry.register(
88 "blocking_tasks_running",
89 "Number of blocking tasks currently running",
90 metrics.blocking_tasks_running.clone(),
91 );
92 registry.register(
93 "inbound_connections",
94 "Number of connections created by dialing us",
95 metrics.inbound_connections.clone(),
96 );
97 registry.register(
98 "outbound_connections",
99 "Number of connections created by dialing others",
100 metrics.outbound_connections.clone(),
101 );
102 registry.register(
103 "inbound_bandwidth",
104 "Bandwidth used by receiving data from others",
105 metrics.inbound_bandwidth.clone(),
106 );
107 registry.register(
108 "outbound_bandwidth",
109 "Bandwidth used by sending data to others",
110 metrics.outbound_bandwidth.clone(),
111 );
112 registry.register(
113 "open_blobs",
114 "Number of open blobs",
115 metrics.open_blobs.clone(),
116 );
117 registry.register(
118 "storage_reads",
119 "Total number of disk reads",
120 metrics.storage_reads.clone(),
121 );
122 registry.register(
123 "storage_read_bytes",
124 "Total amount of data read from disk",
125 metrics.storage_read_bytes.clone(),
126 );
127 registry.register(
128 "storage_writes",
129 "Total number of disk writes",
130 metrics.storage_writes.clone(),
131 );
132 registry.register(
133 "storage_write_bytes",
134 "Total amount of data written to disk",
135 metrics.storage_write_bytes.clone(),
136 );
137 metrics
138 }
139}
140
141#[derive(Clone)]
143pub struct Config {
144 pub worker_threads: usize,
150
151 pub max_blocking_threads: usize,
159
160 pub catch_panics: bool,
162
163 pub read_timeout: Duration,
165
166 pub write_timeout: Duration,
168
169 pub tcp_nodelay: Option<bool>,
180
181 pub storage_directory: PathBuf,
183
184 pub maximum_buffer_size: usize,
188}
189
190impl Default for Config {
191 fn default() -> Self {
192 let rng = OsRng.next_u64();
194 let storage_directory = env::temp_dir().join(format!("commonware_tokio_runtime_{}", rng));
195
196 Self {
198 worker_threads: 2,
199 max_blocking_threads: 512,
200 catch_panics: true,
201 read_timeout: Duration::from_secs(60),
202 write_timeout: Duration::from_secs(30),
203 tcp_nodelay: None,
204 storage_directory,
205 maximum_buffer_size: 2 * 1024 * 1024, }
207 }
208}
209
210pub struct Executor {
212 cfg: Config,
213 registry: Mutex<Registry>,
214 metrics: Arc<Metrics>,
215 runtime: Runtime,
216 fs: AsyncMutex<()>,
217 signaler: Mutex<Signaler>,
218 signal: Signal,
219}
220
221impl Executor {
222 pub fn init(cfg: Config) -> (Runner, Context) {
224 let mut registry = Registry::default();
226 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
227
228 let metrics = Arc::new(Metrics::init(runtime_registry));
230 let runtime = Builder::new_multi_thread()
231 .worker_threads(cfg.worker_threads)
232 .max_blocking_threads(cfg.max_blocking_threads)
233 .enable_all()
234 .build()
235 .expect("failed to create Tokio runtime");
236 let (signaler, signal) = Signaler::new();
237 let executor = Arc::new(Self {
238 cfg,
239 registry: Mutex::new(registry),
240 metrics,
241 runtime,
242 fs: AsyncMutex::new(()),
243 signaler: Mutex::new(signaler),
244 signal,
245 });
246 (
247 Runner {
248 executor: executor.clone(),
249 },
250 Context {
251 label: String::new(),
252 spawned: false,
253 executor,
254 },
255 )
256 }
257
258 #[allow(clippy::should_implement_trait)]
261 pub fn default() -> (Runner, Context) {
262 Self::init(Config::default())
263 }
264}
265
266pub struct Runner {
268 executor: Arc<Executor>,
269}
270
271impl crate::Runner for Runner {
272 fn start<F>(self, f: F) -> F::Output
273 where
274 F: Future + Send + 'static,
275 F::Output: Send + 'static,
276 {
277 self.executor.runtime.block_on(f)
278 }
279}
280
281pub struct Context {
285 label: String,
286 spawned: bool,
287 executor: Arc<Executor>,
288}
289
290impl Clone for Context {
291 fn clone(&self) -> Self {
292 Self {
293 label: self.label.clone(),
294 spawned: false,
295 executor: self.executor.clone(),
296 }
297 }
298}
299
300impl crate::Spawner for Context {
301 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
302 where
303 F: FnOnce(Self) -> Fut + Send + 'static,
304 Fut: Future<Output = T> + Send + 'static,
305 T: Send + 'static,
306 {
307 assert!(!self.spawned, "already spawned");
309
310 let work = Work {
312 label: self.label.clone(),
313 };
314 self.executor
315 .metrics
316 .tasks_spawned
317 .get_or_create(&work)
318 .inc();
319 let gauge = self
320 .executor
321 .metrics
322 .tasks_running
323 .get_or_create(&work)
324 .clone();
325
326 let catch_panics = self.executor.cfg.catch_panics;
328 let executor = self.executor.clone();
329 let future = f(self);
330 let (f, handle) = Handle::init(future, gauge, catch_panics);
331
332 executor.runtime.spawn(f);
334 handle
335 }
336
337 fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
338 where
339 F: Future<Output = T> + Send + 'static,
340 T: Send + 'static,
341 {
342 assert!(!self.spawned, "already spawned");
344 self.spawned = true;
345
346 let work = Work {
348 label: self.label.clone(),
349 };
350 self.executor
351 .metrics
352 .tasks_spawned
353 .get_or_create(&work)
354 .inc();
355 let gauge = self
356 .executor
357 .metrics
358 .tasks_running
359 .get_or_create(&work)
360 .clone();
361
362 let executor = self.executor.clone();
364 move |f: F| {
365 let (f, handle) = Handle::init(f, gauge, executor.cfg.catch_panics);
366
367 executor.runtime.spawn(f);
369 handle
370 }
371 }
372
373 fn spawn_blocking<F, T>(self, f: F) -> Handle<T>
374 where
375 F: FnOnce() -> T + Send + 'static,
376 T: Send + 'static,
377 {
378 assert!(!self.spawned, "already spawned");
380
381 let work = Work {
383 label: self.label.clone(),
384 };
385 self.executor
386 .metrics
387 .blocking_tasks_spawned
388 .get_or_create(&work)
389 .inc();
390 let gauge = self
391 .executor
392 .metrics
393 .blocking_tasks_running
394 .get_or_create(&work)
395 .clone();
396
397 let (f, handle) = Handle::init_blocking(f, gauge, self.executor.cfg.catch_panics);
399
400 self.executor.runtime.spawn_blocking(f);
402 handle
403 }
404
405 fn stop(&self, value: i32) {
406 self.executor.signaler.lock().unwrap().signal(value);
407 }
408
409 fn stopped(&self) -> Signal {
410 self.executor.signal.clone()
411 }
412}
413
414impl crate::Metrics for Context {
415 fn with_label(&self, label: &str) -> Self {
416 let label = {
417 let prefix = self.label.clone();
418 if prefix.is_empty() {
419 label.to_string()
420 } else {
421 format!("{}_{}", prefix, label)
422 }
423 };
424 assert!(
425 !label.starts_with(METRICS_PREFIX),
426 "using runtime label is not allowed"
427 );
428 Self {
429 label,
430 spawned: false,
431 executor: self.executor.clone(),
432 }
433 }
434
435 fn label(&self) -> String {
436 self.label.clone()
437 }
438
439 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
440 let name = name.into();
441 let prefixed_name = {
442 let prefix = &self.label;
443 if prefix.is_empty() {
444 name
445 } else {
446 format!("{}_{}", *prefix, name)
447 }
448 };
449 self.executor
450 .registry
451 .lock()
452 .unwrap()
453 .register(prefixed_name, help, metric)
454 }
455
456 fn encode(&self) -> String {
457 let mut buffer = String::new();
458 encode(&mut buffer, &self.executor.registry.lock().unwrap()).expect("encoding failed");
459 buffer
460 }
461}
462
463impl Clock for Context {
464 fn current(&self) -> SystemTime {
465 SystemTime::now()
466 }
467
468 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
469 tokio::time::sleep(duration)
470 }
471
472 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
473 let now = SystemTime::now();
474 let duration_until_deadline = match deadline.duration_since(now) {
475 Ok(duration) => duration,
476 Err(_) => Duration::from_secs(0), };
478 let target_instant = tokio::time::Instant::now() + duration_until_deadline;
479 tokio::time::sleep_until(target_instant)
480 }
481}
482
483impl GClock for Context {
484 type Instant = SystemTime;
485
486 fn now(&self) -> Self::Instant {
487 self.current()
488 }
489}
490
491impl ReasonablyRealtime for Context {}
492
493impl crate::Network<Listener, Sink, Stream> for Context {
494 async fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
495 TcpListener::bind(socket)
496 .await
497 .map_err(|_| Error::BindFailed)
498 .map(|listener| Listener {
499 context: self.clone(),
500 listener,
501 })
502 }
503
504 async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
505 let stream = TcpStream::connect(socket)
507 .await
508 .map_err(|_| Error::ConnectionFailed)?;
509 self.executor.metrics.outbound_connections.inc();
510
511 if let Some(tcp_nodelay) = self.executor.cfg.tcp_nodelay {
513 if let Err(err) = stream.set_nodelay(tcp_nodelay) {
514 warn!(?err, "failed to set TCP_NODELAY");
515 }
516 }
517
518 let context = self.clone();
520 let (stream, sink) = stream.into_split();
521 Ok((
522 Sink {
523 context: context.clone(),
524 sink,
525 },
526 Stream { context, stream },
527 ))
528 }
529}
530
531pub struct Listener {
533 context: Context,
534 listener: TcpListener,
535}
536
537impl crate::Listener<Sink, Stream> for Listener {
538 async fn accept(&mut self) -> Result<(SocketAddr, Sink, Stream), Error> {
539 let (stream, addr) = self.listener.accept().await.map_err(|_| Error::Closed)?;
541 self.context.executor.metrics.inbound_connections.inc();
542
543 if let Some(tcp_nodelay) = self.context.executor.cfg.tcp_nodelay {
545 if let Err(err) = stream.set_nodelay(tcp_nodelay) {
546 warn!(?err, "failed to set TCP_NODELAY");
547 }
548 }
549
550 let context = self.context.clone();
552 let (stream, sink) = stream.into_split();
553 Ok((
554 addr,
555 Sink {
556 context: context.clone(),
557 sink,
558 },
559 Stream { context, stream },
560 ))
561 }
562}
563
564impl axum::serve::Listener for Listener {
565 type Io = TcpStream;
566 type Addr = SocketAddr;
567
568 async fn accept(&mut self) -> (Self::Io, Self::Addr) {
569 let (stream, addr) = self.listener.accept().await.unwrap();
570 (stream, addr)
571 }
572
573 fn local_addr(&self) -> io::Result<Self::Addr> {
574 self.listener.local_addr()
575 }
576}
577
578pub struct Sink {
580 context: Context,
581 sink: OwnedWriteHalf,
582}
583
584impl crate::Sink for Sink {
585 async fn send(&mut self, msg: &[u8]) -> Result<(), Error> {
586 let len = msg.len();
587 timeout(
588 self.context.executor.cfg.write_timeout,
589 self.sink.write_all(msg),
590 )
591 .await
592 .map_err(|_| Error::Timeout)?
593 .map_err(|_| Error::SendFailed)?;
594 self.context
595 .executor
596 .metrics
597 .outbound_bandwidth
598 .inc_by(len as u64);
599 Ok(())
600 }
601}
602
603pub struct Stream {
605 context: Context,
606 stream: OwnedReadHalf,
607}
608
609impl crate::Stream for Stream {
610 async fn recv(&mut self, buf: &mut [u8]) -> Result<(), Error> {
611 timeout(
613 self.context.executor.cfg.read_timeout,
614 self.stream.read_exact(buf),
615 )
616 .await
617 .map_err(|_| Error::Timeout)?
618 .map_err(|_| Error::RecvFailed)?;
619
620 self.context
622 .executor
623 .metrics
624 .inbound_bandwidth
625 .inc_by(buf.len() as u64);
626
627 Ok(())
628 }
629}
630
631impl RngCore for Context {
632 fn next_u32(&mut self) -> u32 {
633 OsRng.next_u32()
634 }
635
636 fn next_u64(&mut self) -> u64 {
637 OsRng.next_u64()
638 }
639
640 fn fill_bytes(&mut self, dest: &mut [u8]) {
641 OsRng.fill_bytes(dest);
642 }
643
644 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
645 OsRng.try_fill_bytes(dest)
646 }
647}
648
649impl CryptoRng for Context {}
650
651pub struct Blob {
653 metrics: Arc<Metrics>,
654
655 partition: String,
656 name: Vec<u8>,
657
658 file: Arc<AsyncMutex<(fs::File, u64)>>,
665}
666
667impl Blob {
668 fn new(
669 metrics: Arc<Metrics>,
670 partition: String,
671 name: &[u8],
672 file: fs::File,
673 len: u64,
674 ) -> Self {
675 metrics.open_blobs.inc();
676 Self {
677 metrics,
678 partition,
679 name: name.into(),
680 file: Arc::new(AsyncMutex::new((file, len))),
681 }
682 }
683}
684
685impl Clone for Blob {
686 fn clone(&self) -> Self {
687 self.metrics.open_blobs.inc();
689 Self {
690 metrics: self.metrics.clone(),
691 partition: self.partition.clone(),
692 name: self.name.clone(),
693 file: self.file.clone(),
694 }
695 }
696}
697
698impl crate::Storage<Blob> for Context {
699 async fn open(&self, partition: &str, name: &[u8]) -> Result<Blob, Error> {
700 let _guard = self.executor.fs.lock().await;
702
703 let path = self
705 .executor
706 .cfg
707 .storage_directory
708 .join(partition)
709 .join(hex(name));
710 let parent = match path.parent() {
711 Some(parent) => parent,
712 None => return Err(Error::PartitionCreationFailed(partition.into())),
713 };
714
715 fs::create_dir_all(parent)
717 .await
718 .map_err(|_| Error::PartitionCreationFailed(partition.into()))?;
719
720 let mut file = fs::OpenOptions::new()
722 .read(true)
723 .write(true)
724 .create(true)
725 .truncate(false)
726 .open(&path)
727 .await
728 .map_err(|_| Error::BlobOpenFailed(partition.into(), hex(name)))?;
729
730 file.set_max_buf_size(self.executor.cfg.maximum_buffer_size);
732
733 let len = file.metadata().await.map_err(|_| Error::ReadFailed)?.len();
735
736 Ok(Blob::new(
738 self.executor.metrics.clone(),
739 partition.into(),
740 name,
741 file,
742 len,
743 ))
744 }
745
746 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
747 let _guard = self.executor.fs.lock().await;
749
750 let path = self.executor.cfg.storage_directory.join(partition);
752 if let Some(name) = name {
753 let blob_path = path.join(hex(name));
754 fs::remove_file(blob_path)
755 .await
756 .map_err(|_| Error::BlobMissing(partition.into(), hex(name)))?;
757 } else {
758 fs::remove_dir_all(path)
759 .await
760 .map_err(|_| Error::PartitionMissing(partition.into()))?;
761 }
762 Ok(())
763 }
764
765 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
766 let _guard = self.executor.fs.lock().await;
768
769 let path = self.executor.cfg.storage_directory.join(partition);
771 let mut entries = fs::read_dir(path)
772 .await
773 .map_err(|_| Error::PartitionMissing(partition.into()))?;
774 let mut blobs = Vec::new();
775 while let Some(entry) = entries.next_entry().await.map_err(|_| Error::ReadFailed)? {
776 let file_type = entry.file_type().await.map_err(|_| Error::ReadFailed)?;
777 if !file_type.is_file() {
778 return Err(Error::PartitionCorrupt(partition.into()));
779 }
780 if let Some(name) = entry.file_name().to_str() {
781 let name = from_hex(name).ok_or(Error::PartitionCorrupt(partition.into()))?;
782 blobs.push(name);
783 }
784 }
785 Ok(blobs)
786 }
787}
788
789impl crate::Blob for Blob {
790 async fn len(&self) -> Result<u64, Error> {
791 let (_, len) = *self.file.lock().await;
792 Ok(len)
793 }
794
795 async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
796 let mut file = self.file.lock().await;
798 if offset + buf.len() as u64 > file.1 {
799 return Err(Error::BlobInsufficientLength);
800 }
801
802 file.0
804 .seek(SeekFrom::Start(offset))
805 .await
806 .map_err(|_| Error::ReadFailed)?;
807 file.0
808 .read_exact(buf)
809 .await
810 .map_err(|_| Error::ReadFailed)?;
811 self.metrics.storage_reads.inc();
812 self.metrics.storage_read_bytes.inc_by(buf.len() as u64);
813 Ok(())
814 }
815
816 async fn write_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
817 let mut file = self.file.lock().await;
819 file.0
820 .seek(SeekFrom::Start(offset))
821 .await
822 .map_err(|_| Error::WriteFailed)?;
823 file.0
824 .write_all(buf)
825 .await
826 .map_err(|_| Error::WriteFailed)?;
827
828 let max_len = offset + buf.len() as u64;
830 if max_len > file.1 {
831 file.1 = max_len;
832 }
833 self.metrics.storage_writes.inc();
834 self.metrics.storage_write_bytes.inc_by(buf.len() as u64);
835 Ok(())
836 }
837
838 async fn truncate(&self, len: u64) -> Result<(), Error> {
839 let mut file = self.file.lock().await;
841 file.0
842 .set_len(len)
843 .await
844 .map_err(|_| Error::BlobTruncateFailed(self.partition.clone(), hex(&self.name)))?;
845
846 file.1 = len;
848 Ok(())
849 }
850
851 async fn sync(&self) -> Result<(), Error> {
852 let file = self.file.lock().await;
853 file.0
854 .sync_all()
855 .await
856 .map_err(|_| Error::BlobSyncFailed(self.partition.clone(), hex(&self.name)))
857 }
858
859 async fn close(self) -> Result<(), Error> {
860 let mut file = self.file.lock().await;
861 file.0
862 .sync_all()
863 .await
864 .map_err(|_| Error::BlobSyncFailed(self.partition.clone(), hex(&self.name)))?;
865 file.0
866 .shutdown()
867 .await
868 .map_err(|_| Error::BlobCloseFailed(self.partition.clone(), hex(&self.name)))
869 }
870}
871
872impl Drop for Blob {
873 fn drop(&mut self) {
874 self.metrics.open_blobs.dec();
875 }
876}