1#[cfg(not(feature = "iouring-network"))]
2use crate::network::tokio::{Config as TokioNetworkConfig, Network as TokioNetwork};
3#[cfg(feature = "iouring-storage")]
4use crate::storage::iouring::{Config as IoUringConfig, Storage as IoUringStorage};
5#[cfg(not(feature = "iouring-storage"))]
6use crate::storage::tokio::{Config as TokioStorageConfig, Storage as TokioStorage};
7#[cfg(feature = "external")]
8use crate::Pacer;
9#[cfg(feature = "iouring-network")]
10use crate::{
11 iouring,
12 network::iouring::{Config as IoUringNetworkConfig, Network as IoUringNetwork},
13};
14use crate::{
15 network::metered::Network as MeteredNetwork,
16 process::metered::Metrics as MeteredProcess,
17 signal::Signal,
18 storage::metered::Storage as MeteredStorage,
19 telemetry::metrics::task::Label,
20 utils::{add_attribute, signal::Stopper, supervision::Tree, Panicker, Registry, ScopeGuard},
21 BufferPool, BufferPoolConfig, Clock, Error, Execution, Handle, Metrics as _, SinkOf,
22 Spawner as _, StreamOf, METRICS_PREFIX,
23};
24use commonware_macros::{select, stability};
25#[stability(BETA)]
26use commonware_parallel::ThreadPool;
27use commonware_utils::sync::Mutex;
28use futures::{future::BoxFuture, FutureExt};
29use governor::clock::{Clock as GClock, ReasonablyRealtime};
30use prometheus_client::{
31 metrics::{counter::Counter, family::Family, gauge::Gauge},
32 registry::{Metric, Registry as PrometheusRegistry},
33};
34use rand::{rngs::OsRng, CryptoRng, RngCore};
35#[stability(BETA)]
36use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
37use std::{
38 borrow::Cow,
39 env,
40 future::Future,
41 net::{IpAddr, SocketAddr},
42 num::NonZeroUsize,
43 path::PathBuf,
44 sync::Arc,
45 thread,
46 time::{Duration, SystemTime},
47};
48use tokio::runtime::{Builder, Runtime};
49use tracing::{info_span, Instrument};
50use tracing_opentelemetry::OpenTelemetrySpanExt;
51
52#[cfg(feature = "iouring-network")]
53cfg_if::cfg_if! {
54 if #[cfg(test)] {
55 const IOURING_NETWORK_SIZE: u32 = 128;
58 } else {
59 const IOURING_NETWORK_SIZE: u32 = 1024;
60 }
61}
62
63#[derive(Debug)]
64struct Metrics {
65 tasks_spawned: Family<Label, Counter>,
66 tasks_running: Family<Label, Gauge>,
67}
68
69impl Metrics {
70 pub fn init(registry: &mut PrometheusRegistry) -> Self {
71 let metrics = Self {
72 tasks_spawned: Family::default(),
73 tasks_running: Family::default(),
74 };
75 registry.register(
76 "tasks_spawned",
77 "Total number of tasks spawned",
78 metrics.tasks_spawned.clone(),
79 );
80 registry.register(
81 "tasks_running",
82 "Number of tasks currently running",
83 metrics.tasks_running.clone(),
84 );
85 metrics
86 }
87}
88
89#[derive(Clone, Debug)]
90pub struct NetworkConfig {
91 tcp_nodelay: Option<bool>,
96
97 so_linger: Option<Duration>,
108
109 read_write_timeout: Duration,
111}
112
113impl Default for NetworkConfig {
114 fn default() -> Self {
115 Self {
116 tcp_nodelay: Some(true),
117 so_linger: Some(Duration::ZERO),
118 read_write_timeout: Duration::from_secs(60),
119 }
120 }
121}
122
123#[derive(Clone)]
125pub struct Config {
126 worker_threads: usize,
132
133 max_blocking_threads: usize,
141
142 catch_panics: bool,
144
145 storage_directory: PathBuf,
147
148 maximum_buffer_size: usize,
152
153 network_cfg: NetworkConfig,
155
156 network_buffer_pool_cfg: BufferPoolConfig,
158
159 storage_buffer_pool_cfg: BufferPoolConfig,
161}
162
163impl Config {
164 pub fn new() -> Self {
166 let rng = OsRng.next_u64();
167 let storage_directory = env::temp_dir().join(format!("commonware_tokio_runtime_{rng}"));
168 Self {
169 worker_threads: 2,
170 max_blocking_threads: 512,
171 catch_panics: false,
172 storage_directory,
173 maximum_buffer_size: 2 * 1024 * 1024, network_cfg: NetworkConfig::default(),
175 network_buffer_pool_cfg: BufferPoolConfig::for_network(),
176 storage_buffer_pool_cfg: BufferPoolConfig::for_storage(),
177 }
178 }
179
180 pub const fn with_worker_threads(mut self, n: usize) -> Self {
183 self.worker_threads = n;
184 self
185 }
186 pub const fn with_max_blocking_threads(mut self, n: usize) -> Self {
188 self.max_blocking_threads = n;
189 self
190 }
191 pub const fn with_catch_panics(mut self, b: bool) -> Self {
193 self.catch_panics = b;
194 self
195 }
196 pub const fn with_read_write_timeout(mut self, d: Duration) -> Self {
198 self.network_cfg.read_write_timeout = d;
199 self
200 }
201 pub const fn with_tcp_nodelay(mut self, n: Option<bool>) -> Self {
203 self.network_cfg.tcp_nodelay = n;
204 self
205 }
206 pub const fn with_so_linger(mut self, l: Option<Duration>) -> Self {
208 self.network_cfg.so_linger = l;
209 self
210 }
211 pub fn with_storage_directory(mut self, p: impl Into<PathBuf>) -> Self {
213 self.storage_directory = p.into();
214 self
215 }
216 pub const fn with_maximum_buffer_size(mut self, n: usize) -> Self {
218 self.maximum_buffer_size = n;
219 self
220 }
221 pub const fn with_network_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
223 self.network_buffer_pool_cfg = cfg;
224 self
225 }
226 pub const fn with_storage_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
228 self.storage_buffer_pool_cfg = cfg;
229 self
230 }
231
232 pub const fn worker_threads(&self) -> usize {
235 self.worker_threads
236 }
237 pub const fn max_blocking_threads(&self) -> usize {
239 self.max_blocking_threads
240 }
241 pub const fn catch_panics(&self) -> bool {
243 self.catch_panics
244 }
245 pub const fn read_write_timeout(&self) -> Duration {
247 self.network_cfg.read_write_timeout
248 }
249 pub const fn tcp_nodelay(&self) -> Option<bool> {
251 self.network_cfg.tcp_nodelay
252 }
253 pub const fn so_linger(&self) -> Option<Duration> {
255 self.network_cfg.so_linger
256 }
257 pub const fn storage_directory(&self) -> &PathBuf {
259 &self.storage_directory
260 }
261 pub const fn maximum_buffer_size(&self) -> usize {
263 self.maximum_buffer_size
264 }
265 pub const fn network_buffer_pool_config(&self) -> &BufferPoolConfig {
267 &self.network_buffer_pool_cfg
268 }
269 pub const fn storage_buffer_pool_config(&self) -> &BufferPoolConfig {
271 &self.storage_buffer_pool_cfg
272 }
273}
274
275impl Default for Config {
276 fn default() -> Self {
277 Self::new()
278 }
279}
280
281pub struct Executor {
283 registry: Mutex<Registry>,
284 metrics: Arc<Metrics>,
285 runtime: Runtime,
286 shutdown: Mutex<Stopper>,
287 panicker: Panicker,
288}
289
290pub struct Runner {
292 cfg: Config,
293}
294
295impl Default for Runner {
296 fn default() -> Self {
297 Self::new(Config::default())
298 }
299}
300
301impl Runner {
302 pub const fn new(cfg: Config) -> Self {
304 Self { cfg }
305 }
306}
307
308impl crate::Runner for Runner {
309 type Context = Context;
310
311 fn start<F, Fut>(self, f: F) -> Fut::Output
312 where
313 F: FnOnce(Self::Context) -> Fut,
314 Fut: Future,
315 {
316 let mut registry = Registry::new();
318 let runtime_registry = registry.root_mut().sub_registry_with_prefix(METRICS_PREFIX);
319
320 let metrics = Arc::new(Metrics::init(runtime_registry));
322 let runtime = Builder::new_multi_thread()
323 .worker_threads(self.cfg.worker_threads)
324 .max_blocking_threads(self.cfg.max_blocking_threads)
325 .enable_all()
326 .build()
327 .expect("failed to create Tokio runtime");
328
329 let (panicker, panicked) = Panicker::new(self.cfg.catch_panics);
331
332 let process = MeteredProcess::init(runtime_registry);
337 runtime.spawn(process.collect(tokio::time::sleep));
338
339 let network_buffer_pool = BufferPool::new(
341 self.cfg.network_buffer_pool_cfg.clone(),
342 runtime_registry.sub_registry_with_prefix("network_buffer_pool"),
343 );
344 let storage_buffer_pool = BufferPool::new(
345 self.cfg.storage_buffer_pool_cfg.clone(),
346 runtime_registry.sub_registry_with_prefix("storage_buffer_pool"),
347 );
348
349 cfg_if::cfg_if! {
351 if #[cfg(feature = "iouring-storage")] {
352 let iouring_registry =
353 runtime_registry.sub_registry_with_prefix("iouring_storage");
354 let storage = MeteredStorage::new(
355 IoUringStorage::start(
356 IoUringConfig {
357 storage_directory: self.cfg.storage_directory.clone(),
358 iouring_config: Default::default(),
359 },
360 iouring_registry,
361 storage_buffer_pool.clone(),
362 ),
363 runtime_registry,
364 );
365 } else {
366 let storage = MeteredStorage::new(
367 TokioStorage::new(
368 TokioStorageConfig::new(
369 self.cfg.storage_directory.clone(),
370 self.cfg.maximum_buffer_size,
371 ),
372 storage_buffer_pool.clone(),
373 ),
374 runtime_registry,
375 );
376 }
377 }
378
379 cfg_if::cfg_if! {
381 if #[cfg(feature = "iouring-network")] {
382 let iouring_registry =
383 runtime_registry.sub_registry_with_prefix("iouring_network");
384 let config = IoUringNetworkConfig {
385 tcp_nodelay: self.cfg.network_cfg.tcp_nodelay,
386 so_linger: self.cfg.network_cfg.so_linger,
387 iouring_config: iouring::Config {
388 size: IOURING_NETWORK_SIZE,
390 op_timeout: Some(self.cfg.network_cfg.read_write_timeout),
391 shutdown_timeout: Some(self.cfg.network_cfg.read_write_timeout),
392 ..Default::default()
393 },
394 ..Default::default()
395 };
396 let network = MeteredNetwork::new(
397 IoUringNetwork::start(
398 config,
399 iouring_registry,
400 network_buffer_pool.clone(),
401 )
402 .unwrap(),
403 runtime_registry,
404 );
405 } else {
406 let config = TokioNetworkConfig::default()
407 .with_read_timeout(self.cfg.network_cfg.read_write_timeout)
408 .with_write_timeout(self.cfg.network_cfg.read_write_timeout)
409 .with_tcp_nodelay(self.cfg.network_cfg.tcp_nodelay)
410 .with_so_linger(self.cfg.network_cfg.so_linger);
411 let network = MeteredNetwork::new(
412 TokioNetwork::new(config, network_buffer_pool.clone()),
413 runtime_registry,
414 );
415 }
416 }
417
418 let executor = Arc::new(Executor {
420 registry: Mutex::new(registry),
421 metrics,
422 runtime,
423 shutdown: Mutex::new(Stopper::default()),
424 panicker,
425 });
426
427 let label = Label::root();
429 executor.metrics.tasks_spawned.get_or_create(&label).inc();
430 let gauge = executor.metrics.tasks_running.get_or_create(&label).clone();
431
432 let context = Context {
434 storage,
435 name: label.name(),
436 attributes: Vec::new(),
437 scope: None,
438 executor: executor.clone(),
439 network,
440 network_buffer_pool,
441 storage_buffer_pool,
442 tree: Tree::root(),
443 execution: Execution::default(),
444 instrumented: false,
445 };
446 let output = executor.runtime.block_on(panicked.interrupt(f(context)));
447 gauge.dec();
448
449 output
450 }
451}
452
453cfg_if::cfg_if! {
454 if #[cfg(feature = "iouring-storage")] {
455 type Storage = MeteredStorage<IoUringStorage>;
456 } else {
457 type Storage = MeteredStorage<TokioStorage>;
458 }
459}
460
461cfg_if::cfg_if! {
462 if #[cfg(feature = "iouring-network")] {
463 type Network = MeteredNetwork<IoUringNetwork>;
464 } else {
465 type Network = MeteredNetwork<TokioNetwork>;
466 }
467}
468
469pub struct Context {
473 name: String,
474 attributes: Vec<(String, String)>,
475 scope: Option<Arc<ScopeGuard>>,
476 executor: Arc<Executor>,
477 storage: Storage,
478 network: Network,
479 network_buffer_pool: BufferPool,
480 storage_buffer_pool: BufferPool,
481 tree: Arc<Tree>,
482 execution: Execution,
483 instrumented: bool,
484}
485
486impl Clone for Context {
487 fn clone(&self) -> Self {
488 let (child, _) = Tree::child(&self.tree);
489 Self {
490 name: self.name.clone(),
491 attributes: self.attributes.clone(),
492 scope: self.scope.clone(),
493 executor: self.executor.clone(),
494 storage: self.storage.clone(),
495 network: self.network.clone(),
496 network_buffer_pool: self.network_buffer_pool.clone(),
497 storage_buffer_pool: self.storage_buffer_pool.clone(),
498 tree: child,
499 execution: Execution::default(),
500 instrumented: false,
501 }
502 }
503}
504
505impl Context {
506 fn metrics(&self) -> &Metrics {
508 &self.executor.metrics
509 }
510}
511
512impl crate::Spawner for Context {
513 fn dedicated(mut self) -> Self {
514 self.execution = Execution::Dedicated;
515 self
516 }
517
518 fn shared(mut self, blocking: bool) -> Self {
519 self.execution = Execution::Shared(blocking);
520 self
521 }
522
523 fn instrumented(mut self) -> Self {
524 self.instrumented = true;
525 self
526 }
527
528 fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
529 where
530 F: FnOnce(Self) -> Fut + Send + 'static,
531 Fut: Future<Output = T> + Send + 'static,
532 T: Send + 'static,
533 {
534 let (label, metric) = spawn_metrics!(self);
536
537 let parent = Arc::clone(&self.tree);
539 let past = self.execution;
540 let is_instrumented = self.instrumented;
541 self.execution = Execution::default();
542 self.instrumented = false;
543 let (child, aborted) = Tree::child(&parent);
544 if aborted {
545 return Handle::closed(metric);
546 }
547 self.tree = child;
548
549 let executor = self.executor.clone();
551 let future: BoxFuture<'_, T> = if is_instrumented {
552 let span = info_span!("task", name = %label.name());
553 for (key, value) in &self.attributes {
554 span.set_attribute(key.clone(), value.clone());
555 }
556 f(self).instrument(span).boxed()
557 } else {
558 f(self).boxed()
559 };
560 let (f, handle) = Handle::init(
561 future,
562 metric,
563 executor.panicker.clone(),
564 Arc::clone(&parent),
565 );
566
567 if matches!(past, Execution::Dedicated) {
568 thread::spawn({
569 let handle = executor.runtime.handle().clone();
571 move || {
572 handle.block_on(f);
573 }
574 });
575 } else if matches!(past, Execution::Shared(true)) {
576 executor.runtime.spawn_blocking({
577 let handle = executor.runtime.handle().clone();
579 move || {
580 handle.block_on(f);
581 }
582 });
583 } else {
584 executor.runtime.spawn(f);
585 }
586
587 if let Some(aborter) = handle.aborter() {
589 parent.register(aborter);
590 }
591
592 handle
593 }
594
595 async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
596 let stop_resolved = {
597 let mut shutdown = self.executor.shutdown.lock();
598 shutdown.stop(value)
599 };
600
601 let timeout_future = timeout.map_or_else(
603 || futures::future::Either::Right(futures::future::pending()),
604 |duration| futures::future::Either::Left(self.sleep(duration)),
605 );
606 select! {
607 result = stop_resolved => {
608 result.map_err(|_| Error::Closed)?;
609 Ok(())
610 },
611 _ = timeout_future => Err(Error::Timeout),
612 }
613 }
614
615 fn stopped(&self) -> Signal {
616 self.executor.shutdown.lock().stopped()
617 }
618}
619
620#[stability(BETA)]
621impl crate::ThreadPooler for Context {
622 fn create_thread_pool(
623 &self,
624 concurrency: NonZeroUsize,
625 ) -> Result<ThreadPool, ThreadPoolBuildError> {
626 ThreadPoolBuilder::new()
627 .num_threads(concurrency.get())
628 .spawn_handler(move |thread| {
629 self.with_label("rayon_thread")
632 .dedicated()
633 .spawn(move |_| async move { thread.run() });
634 Ok(())
635 })
636 .build()
637 .map(Arc::new)
638 }
639}
640
641impl crate::Metrics for Context {
642 fn with_label(&self, label: &str) -> Self {
643 let name = {
645 let prefix = self.name.clone();
646 if prefix.is_empty() {
647 label.to_string()
648 } else {
649 format!("{prefix}_{label}")
650 }
651 };
652 Self {
653 name,
654 ..self.clone()
655 }
656 }
657
658 fn label(&self) -> String {
659 self.name.clone()
660 }
661
662 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
663 let name = name.into();
664 let prefixed_name = {
665 let prefix = &self.name;
666 if prefix.is_empty() {
667 name
668 } else {
669 format!("{}_{}", *prefix, name)
670 }
671 };
672
673 let mut registry = self.executor.registry.lock();
675 let scoped = registry.get_scope(self.scope.as_ref().map(|s| s.scope_id()));
676 let sub_registry = self
677 .attributes
678 .iter()
679 .fold(scoped, |reg, (k, v): &(String, String)| {
680 reg.sub_registry_with_label((Cow::Owned(k.clone()), Cow::Owned(v.clone())))
681 });
682 sub_registry.register(prefixed_name, help, metric);
683 }
684
685 fn encode(&self) -> String {
686 self.executor.registry.lock().encode()
687 }
688
689 fn with_attribute(&self, key: &str, value: impl std::fmt::Display) -> Self {
690 let mut attributes = self.attributes.clone();
691 add_attribute(&mut attributes, key, value);
692 Self {
693 attributes,
694 ..self.clone()
695 }
696 }
697
698 fn with_scope(&self) -> Self {
699 if self.scope.is_some() {
701 return self.clone();
702 }
703
704 let executor = self.executor.clone();
707 let scope_id = executor.registry.lock().create_scope();
708 let guard = Arc::new(ScopeGuard::new(scope_id, move |id| {
709 executor.registry.lock().remove_scope(id);
710 }));
711 Self {
712 scope: Some(guard),
713 ..self.clone()
714 }
715 }
716}
717
718impl Clock for Context {
719 fn current(&self) -> SystemTime {
720 SystemTime::now()
721 }
722
723 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
724 tokio::time::sleep(duration)
725 }
726
727 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
728 let now = SystemTime::now();
729 let duration_until_deadline = deadline.duration_since(now).unwrap_or_else(|_| {
730 Duration::from_secs(0)
732 });
733 let target_instant = tokio::time::Instant::now() + duration_until_deadline;
734 tokio::time::sleep_until(target_instant)
735 }
736}
737
738#[cfg(feature = "external")]
739impl Pacer for Context {
740 fn pace<'a, F, T>(
741 &'a self,
742 _latency: Duration,
743 future: F,
744 ) -> impl Future<Output = T> + Send + 'a
745 where
746 F: Future<Output = T> + Send + 'a,
747 T: Send + 'a,
748 {
749 future
751 }
752}
753
754impl GClock for Context {
755 type Instant = SystemTime;
756
757 fn now(&self) -> Self::Instant {
758 self.current()
759 }
760}
761
762impl ReasonablyRealtime for Context {}
763
764impl crate::Network for Context {
765 type Listener = <Network as crate::Network>::Listener;
766
767 async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
768 self.network.bind(socket).await
769 }
770
771 async fn dial(&self, socket: SocketAddr) -> Result<(SinkOf<Self>, StreamOf<Self>), Error> {
772 self.network.dial(socket).await
773 }
774}
775
776impl crate::Resolver for Context {
777 async fn resolve(&self, host: &str) -> Result<Vec<IpAddr>, Error> {
778 let addrs = tokio::net::lookup_host(format!("{host}:0"))
784 .await
785 .map_err(|e| Error::ResolveFailed(e.to_string()))?;
786 Ok(addrs.map(|addr| addr.ip()).collect())
787 }
788}
789
790impl RngCore for Context {
791 fn next_u32(&mut self) -> u32 {
792 OsRng.next_u32()
793 }
794
795 fn next_u64(&mut self) -> u64 {
796 OsRng.next_u64()
797 }
798
799 fn fill_bytes(&mut self, dest: &mut [u8]) {
800 OsRng.fill_bytes(dest);
801 }
802
803 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
804 OsRng.try_fill_bytes(dest)
805 }
806}
807
808impl CryptoRng for Context {}
809
810impl crate::Storage for Context {
811 type Blob = <Storage as crate::Storage>::Blob;
812
813 async fn open_versioned(
814 &self,
815 partition: &str,
816 name: &[u8],
817 versions: std::ops::RangeInclusive<u16>,
818 ) -> Result<(Self::Blob, u64, u16), Error> {
819 self.storage.open_versioned(partition, name, versions).await
820 }
821
822 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
823 self.storage.remove(partition, name).await
824 }
825
826 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
827 self.storage.scan(partition).await
828 }
829}
830
831impl crate::BufferPooler for Context {
832 fn network_buffer_pool(&self) -> &BufferPool {
833 &self.network_buffer_pool
834 }
835
836 fn storage_buffer_pool(&self) -> &BufferPool {
837 &self.storage_buffer_pool
838 }
839}