commonware_runtime/tokio/
runtime.rs

1#[cfg(not(feature = "iouring-network"))]
2use crate::network::tokio::{Config as TokioNetworkConfig, Network as TokioNetwork};
3#[cfg(feature = "iouring-storage")]
4use crate::storage::iouring::{Config as IoUringConfig, Storage as IoUringStorage};
5#[cfg(not(feature = "iouring-storage"))]
6use crate::storage::tokio::{Config as TokioStorageConfig, Storage as TokioStorage};
7#[cfg(feature = "external")]
8use crate::Pacer;
9#[cfg(feature = "iouring-network")]
10use crate::{
11    iouring,
12    network::iouring::{Config as IoUringNetworkConfig, Network as IoUringNetwork},
13};
14use crate::{
15    network::metered::Network as MeteredNetwork,
16    process::metered::Metrics as MeteredProcess,
17    signal::Signal,
18    storage::metered::Storage as MeteredStorage,
19    telemetry::metrics::task::Label,
20    utils::{signal::Stopper, supervision::Tree, Panicker},
21    Clock, Error, Execution, Handle, SinkOf, StreamOf, METRICS_PREFIX,
22};
23use commonware_macros::select;
24use futures::{future::BoxFuture, FutureExt};
25use governor::clock::{Clock as GClock, ReasonablyRealtime};
26use prometheus_client::{
27    encoding::text::encode,
28    metrics::{counter::Counter, family::Family, gauge::Gauge},
29    registry::{Metric, Registry},
30};
31use rand::{rngs::OsRng, CryptoRng, RngCore};
32use std::{
33    env,
34    future::Future,
35    net::SocketAddr,
36    path::PathBuf,
37    sync::{Arc, Mutex},
38    thread,
39    time::{Duration, SystemTime},
40};
41use tokio::runtime::{Builder, Runtime};
42use tracing::{info_span, Instrument};
43
44#[cfg(feature = "iouring-network")]
45const IOURING_NETWORK_SIZE: u32 = 1024;
46#[cfg(feature = "iouring-network")]
47const IOURING_NETWORK_FORCE_POLL: Duration = Duration::from_millis(100);
48
49#[derive(Debug)]
50struct Metrics {
51    tasks_spawned: Family<Label, Counter>,
52    tasks_running: Family<Label, Gauge>,
53}
54
55impl Metrics {
56    pub fn init(registry: &mut Registry) -> Self {
57        let metrics = Self {
58            tasks_spawned: Family::default(),
59            tasks_running: Family::default(),
60        };
61        registry.register(
62            "tasks_spawned",
63            "Total number of tasks spawned",
64            metrics.tasks_spawned.clone(),
65        );
66        registry.register(
67            "tasks_running",
68            "Number of tasks currently running",
69            metrics.tasks_running.clone(),
70        );
71        metrics
72    }
73}
74
75#[derive(Clone, Debug)]
76pub struct NetworkConfig {
77    /// If Some, explicitly sets TCP_NODELAY on the socket.
78    /// Otherwise uses system default.
79    tcp_nodelay: Option<bool>,
80
81    /// Read/write timeout for network operations.
82    read_write_timeout: Duration,
83}
84
85impl Default for NetworkConfig {
86    fn default() -> Self {
87        Self {
88            tcp_nodelay: None,
89            read_write_timeout: Duration::from_secs(60),
90        }
91    }
92}
93
94/// Configuration for the `tokio` runtime.
95#[derive(Clone)]
96pub struct Config {
97    /// Number of threads to use for handling async tasks.
98    ///
99    /// Worker threads are always active (waiting for work).
100    ///
101    /// Tokio sets the default value to the number of logical CPUs.
102    worker_threads: usize,
103
104    /// Maximum number of threads to use for blocking tasks.
105    ///
106    /// Unlike worker threads, blocking threads are created as needed and
107    /// exit if left idle for too long.
108    ///
109    /// Tokio sets the default value to 512 to avoid hanging on lower-level
110    /// operations that require blocking (like `fs` and writing to `Stdout`).
111    max_blocking_threads: usize,
112
113    /// Whether or not to catch panics.
114    catch_panics: bool,
115
116    /// Base directory for all storage operations.
117    storage_directory: PathBuf,
118
119    /// Maximum buffer size for operations on blobs.
120    ///
121    /// Tokio sets the default value to 2MB.
122    maximum_buffer_size: usize,
123
124    /// Network configuration.
125    network_cfg: NetworkConfig,
126}
127
128impl Config {
129    /// Returns a new [Config] with default values.
130    pub fn new() -> Self {
131        let rng = OsRng.next_u64();
132        let storage_directory = env::temp_dir().join(format!("commonware_tokio_runtime_{rng}"));
133        Self {
134            worker_threads: 2,
135            max_blocking_threads: 512,
136            catch_panics: false,
137            storage_directory,
138            maximum_buffer_size: 2 * 1024 * 1024, // 2 MB
139            network_cfg: NetworkConfig::default(),
140        }
141    }
142
143    // Setters
144    /// See [Config]
145    pub fn with_worker_threads(mut self, n: usize) -> Self {
146        self.worker_threads = n;
147        self
148    }
149    /// See [Config]
150    pub fn with_max_blocking_threads(mut self, n: usize) -> Self {
151        self.max_blocking_threads = n;
152        self
153    }
154    /// See [Config]
155    pub fn with_catch_panics(mut self, b: bool) -> Self {
156        self.catch_panics = b;
157        self
158    }
159    /// See [Config]
160    pub fn with_read_write_timeout(mut self, d: Duration) -> Self {
161        self.network_cfg.read_write_timeout = d;
162        self
163    }
164    /// See [Config]
165    pub fn with_tcp_nodelay(mut self, n: Option<bool>) -> Self {
166        self.network_cfg.tcp_nodelay = n;
167        self
168    }
169    /// See [Config]
170    pub fn with_storage_directory(mut self, p: impl Into<PathBuf>) -> Self {
171        self.storage_directory = p.into();
172        self
173    }
174    /// See [Config]
175    pub fn with_maximum_buffer_size(mut self, n: usize) -> Self {
176        self.maximum_buffer_size = n;
177        self
178    }
179
180    // Getters
181    /// See [Config]
182    pub fn worker_threads(&self) -> usize {
183        self.worker_threads
184    }
185    /// See [Config]
186    pub fn max_blocking_threads(&self) -> usize {
187        self.max_blocking_threads
188    }
189    /// See [Config]
190    pub fn catch_panics(&self) -> bool {
191        self.catch_panics
192    }
193    /// See [Config]
194    pub fn read_write_timeout(&self) -> Duration {
195        self.network_cfg.read_write_timeout
196    }
197    /// See [Config]
198    pub fn tcp_nodelay(&self) -> Option<bool> {
199        self.network_cfg.tcp_nodelay
200    }
201    /// See [Config]
202    pub fn storage_directory(&self) -> &PathBuf {
203        &self.storage_directory
204    }
205    /// See [Config]
206    pub fn maximum_buffer_size(&self) -> usize {
207        self.maximum_buffer_size
208    }
209}
210
211impl Default for Config {
212    fn default() -> Self {
213        Self::new()
214    }
215}
216
217/// Runtime based on [Tokio](https://tokio.rs).
218pub struct Executor {
219    registry: Mutex<Registry>,
220    metrics: Arc<Metrics>,
221    runtime: Runtime,
222    shutdown: Mutex<Stopper>,
223    panicker: Panicker,
224}
225
226/// Implementation of [crate::Runner] for the `tokio` runtime.
227pub struct Runner {
228    cfg: Config,
229}
230
231impl Default for Runner {
232    fn default() -> Self {
233        Self::new(Config::default())
234    }
235}
236
237impl Runner {
238    /// Initialize a new `tokio` runtime with the given number of threads.
239    pub fn new(cfg: Config) -> Self {
240        Self { cfg }
241    }
242}
243
244impl crate::Runner for Runner {
245    type Context = Context;
246
247    fn start<F, Fut>(self, f: F) -> Fut::Output
248    where
249        F: FnOnce(Self::Context) -> Fut,
250        Fut: Future,
251    {
252        // Create a new registry
253        let mut registry = Registry::default();
254        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
255
256        // Initialize runtime
257        let metrics = Arc::new(Metrics::init(runtime_registry));
258        let runtime = Builder::new_multi_thread()
259            .worker_threads(self.cfg.worker_threads)
260            .max_blocking_threads(self.cfg.max_blocking_threads)
261            .enable_all()
262            .build()
263            .expect("failed to create Tokio runtime");
264
265        // Initialize panicker
266        let (panicker, panicked) = Panicker::new(self.cfg.catch_panics);
267
268        // Collect process metrics.
269        //
270        // We prefer to collect process metrics outside of `Context` because
271        // we are using `runtime_registry` rather than the one provided by `Context`.
272        let process = MeteredProcess::init(runtime_registry);
273        runtime.spawn(process.collect(tokio::time::sleep));
274
275        // Initialize storage
276        cfg_if::cfg_if! {
277            if #[cfg(feature = "iouring-storage")] {
278                let iouring_registry = runtime_registry.sub_registry_with_prefix("iouring_storage");
279                let storage = MeteredStorage::new(
280                    IoUringStorage::start(IoUringConfig {
281                        storage_directory: self.cfg.storage_directory.clone(),
282                        iouring_config: Default::default(),
283                    }, iouring_registry),
284                    runtime_registry,
285                );
286            } else {
287                let storage = MeteredStorage::new(
288                    TokioStorage::new(TokioStorageConfig::new(
289                        self.cfg.storage_directory.clone(),
290                        self.cfg.maximum_buffer_size,
291                    )),
292                    runtime_registry,
293                );
294            }
295        }
296
297        // Initialize network
298        cfg_if::cfg_if! {
299            if #[cfg(feature = "iouring-network")] {
300                let iouring_registry = runtime_registry.sub_registry_with_prefix("iouring_network");
301                let config = IoUringNetworkConfig {
302                    tcp_nodelay: self.cfg.network_cfg.tcp_nodelay,
303                    iouring_config: iouring::Config {
304                        // TODO (#1045): make `IOURING_NETWORK_SIZE` configurable
305                        size: IOURING_NETWORK_SIZE,
306                        op_timeout: Some(self.cfg.network_cfg.read_write_timeout),
307                        force_poll: IOURING_NETWORK_FORCE_POLL,
308                        shutdown_timeout: Some(self.cfg.network_cfg.read_write_timeout),
309                        ..Default::default()
310                    },
311                };
312                let network = MeteredNetwork::new(
313                    IoUringNetwork::start(config, iouring_registry).unwrap(),
314                runtime_registry,
315            );
316        } else {
317            let config = TokioNetworkConfig::default().with_read_timeout(self.cfg.network_cfg.read_write_timeout)
318                .with_write_timeout(self.cfg.network_cfg.read_write_timeout)
319                .with_tcp_nodelay(self.cfg.network_cfg.tcp_nodelay);
320                let network = MeteredNetwork::new(
321                    TokioNetwork::from(config),
322                    runtime_registry,
323                );
324            }
325        }
326
327        // Initialize executor
328        let executor = Arc::new(Executor {
329            registry: Mutex::new(registry),
330            metrics,
331            runtime,
332            shutdown: Mutex::new(Stopper::default()),
333            panicker,
334        });
335
336        // Get metrics
337        let label = Label::root();
338        executor.metrics.tasks_spawned.get_or_create(&label).inc();
339        let gauge = executor.metrics.tasks_running.get_or_create(&label).clone();
340
341        // Run the future
342        let context = Context {
343            storage,
344            name: label.name(),
345            executor: executor.clone(),
346            network,
347            tree: Tree::root(),
348            execution: Execution::default(),
349            instrumented: false,
350        };
351        let output = executor.runtime.block_on(panicked.interrupt(f(context)));
352        gauge.dec();
353
354        output
355    }
356}
357
358cfg_if::cfg_if! {
359    if #[cfg(feature = "iouring-storage")] {
360        type Storage = MeteredStorage<IoUringStorage>;
361    } else {
362        type Storage = MeteredStorage<TokioStorage>;
363    }
364}
365
366cfg_if::cfg_if! {
367    if #[cfg(feature = "iouring-network")] {
368        type Network = MeteredNetwork<IoUringNetwork>;
369    } else {
370        type Network = MeteredNetwork<TokioNetwork>;
371    }
372}
373
374/// Implementation of [crate::Spawner], [crate::Clock],
375/// [crate::Network], and [crate::Storage] for the `tokio`
376/// runtime.
377pub struct Context {
378    name: String,
379    executor: Arc<Executor>,
380    storage: Storage,
381    network: Network,
382    tree: Arc<Tree>,
383    execution: Execution,
384    instrumented: bool,
385}
386
387impl Clone for Context {
388    fn clone(&self) -> Self {
389        let (child, _) = Tree::child(&self.tree);
390        Self {
391            name: self.name.clone(),
392            executor: self.executor.clone(),
393            storage: self.storage.clone(),
394            network: self.network.clone(),
395
396            tree: child,
397            execution: Execution::default(),
398            instrumented: false,
399        }
400    }
401}
402
403impl Context {
404    /// Access the [Metrics] of the runtime.
405    fn metrics(&self) -> &Metrics {
406        &self.executor.metrics
407    }
408}
409
410impl crate::Spawner for Context {
411    fn dedicated(mut self) -> Self {
412        self.execution = Execution::Dedicated;
413        self
414    }
415
416    fn shared(mut self, blocking: bool) -> Self {
417        self.execution = Execution::Shared(blocking);
418        self
419    }
420
421    fn instrumented(mut self) -> Self {
422        self.instrumented = true;
423        self
424    }
425
426    fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
427    where
428        F: FnOnce(Self) -> Fut + Send + 'static,
429        Fut: Future<Output = T> + Send + 'static,
430        T: Send + 'static,
431    {
432        // Get metrics
433        let (label, metric) = spawn_metrics!(self);
434
435        // Track supervision before resetting configuration
436        let parent = Arc::clone(&self.tree);
437        let past = self.execution;
438        let is_instrumented = self.instrumented;
439        self.execution = Execution::default();
440        self.instrumented = false;
441        let (child, aborted) = Tree::child(&parent);
442        if aborted {
443            return Handle::closed(metric);
444        }
445        self.tree = child;
446
447        // Spawn the task
448        let executor = self.executor.clone();
449        let future: BoxFuture<T> = if is_instrumented {
450            f(self)
451                .instrument(info_span!("task", name = %label.name()))
452                .boxed()
453        } else {
454            f(self).boxed()
455        };
456        let (f, handle) = Handle::init(
457            future,
458            metric,
459            executor.panicker.clone(),
460            Arc::clone(&parent),
461        );
462
463        if matches!(past, Execution::Dedicated) {
464            thread::spawn({
465                // Ensure the task can access the tokio runtime
466                let handle = executor.runtime.handle().clone();
467                move || {
468                    handle.block_on(f);
469                }
470            });
471        } else if matches!(past, Execution::Shared(true)) {
472            executor.runtime.spawn_blocking({
473                // Ensure the task can access the tokio runtime
474                let handle = executor.runtime.handle().clone();
475                move || {
476                    handle.block_on(f);
477                }
478            });
479        } else {
480            executor.runtime.spawn(f);
481        }
482
483        // Register the task on the parent
484        if let Some(aborter) = handle.aborter() {
485            parent.register(aborter);
486        }
487
488        handle
489    }
490
491    async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
492        let stop_resolved = {
493            let mut shutdown = self.executor.shutdown.lock().unwrap();
494            shutdown.stop(value)
495        };
496
497        // Wait for all tasks to complete or the timeout to fire
498        let timeout_future = match timeout {
499            Some(duration) => futures::future::Either::Left(self.sleep(duration)),
500            None => futures::future::Either::Right(futures::future::pending()),
501        };
502        select! {
503            result = stop_resolved => {
504                result.map_err(|_| Error::Closed)?;
505                Ok(())
506            },
507            _ = timeout_future => {
508                Err(Error::Timeout)
509            }
510        }
511    }
512
513    fn stopped(&self) -> Signal {
514        self.executor.shutdown.lock().unwrap().stopped()
515    }
516}
517
518impl crate::Metrics for Context {
519    fn with_label(&self, label: &str) -> Self {
520        let name = {
521            let prefix = self.name.clone();
522            if prefix.is_empty() {
523                label.to_string()
524            } else {
525                format!("{prefix}_{label}")
526            }
527        };
528        assert!(
529            !name.starts_with(METRICS_PREFIX),
530            "using runtime label is not allowed"
531        );
532        Self {
533            name,
534            ..self.clone()
535        }
536    }
537
538    fn label(&self) -> String {
539        self.name.clone()
540    }
541
542    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
543        let name = name.into();
544        let prefixed_name = {
545            let prefix = &self.name;
546            if prefix.is_empty() {
547                name
548            } else {
549                format!("{}_{}", *prefix, name)
550            }
551        };
552        self.executor
553            .registry
554            .lock()
555            .unwrap()
556            .register(prefixed_name, help, metric)
557    }
558
559    fn encode(&self) -> String {
560        let mut buffer = String::new();
561        encode(&mut buffer, &self.executor.registry.lock().unwrap()).expect("encoding failed");
562        buffer
563    }
564}
565
566impl Clock for Context {
567    fn current(&self) -> SystemTime {
568        SystemTime::now()
569    }
570
571    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
572        tokio::time::sleep(duration)
573    }
574
575    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
576        let now = SystemTime::now();
577        let duration_until_deadline = match deadline.duration_since(now) {
578            Ok(duration) => duration,
579            Err(_) => Duration::from_secs(0), // Deadline is in the past
580        };
581        let target_instant = tokio::time::Instant::now() + duration_until_deadline;
582        tokio::time::sleep_until(target_instant)
583    }
584}
585
586#[cfg(feature = "external")]
587impl Pacer for Context {
588    fn pace<'a, F, T>(
589        &'a self,
590        _latency: Duration,
591        future: F,
592    ) -> impl Future<Output = T> + Send + 'a
593    where
594        F: Future<Output = T> + Send + 'a,
595        T: Send + 'a,
596    {
597        // Execute the future immediately
598        future
599    }
600}
601
602impl GClock for Context {
603    type Instant = SystemTime;
604
605    fn now(&self) -> Self::Instant {
606        self.current()
607    }
608}
609
610impl ReasonablyRealtime for Context {}
611
612impl crate::Network for Context {
613    type Listener = <Network as crate::Network>::Listener;
614
615    async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
616        self.network.bind(socket).await
617    }
618
619    async fn dial(&self, socket: SocketAddr) -> Result<(SinkOf<Self>, StreamOf<Self>), Error> {
620        self.network.dial(socket).await
621    }
622}
623
624impl RngCore for Context {
625    fn next_u32(&mut self) -> u32 {
626        OsRng.next_u32()
627    }
628
629    fn next_u64(&mut self) -> u64 {
630        OsRng.next_u64()
631    }
632
633    fn fill_bytes(&mut self, dest: &mut [u8]) {
634        OsRng.fill_bytes(dest);
635    }
636
637    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
638        OsRng.try_fill_bytes(dest)
639    }
640}
641
642impl CryptoRng for Context {}
643
644impl crate::Storage for Context {
645    type Blob = <Storage as crate::Storage>::Blob;
646
647    async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> {
648        self.storage.open(partition, name).await
649    }
650
651    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
652        self.storage.remove(partition, name).await
653    }
654
655    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
656        self.storage.scan(partition).await
657    }
658}