Skip to main content

commonware_runtime/tokio/
runtime.rs

1#[cfg(not(feature = "iouring-network"))]
2use crate::network::tokio::{Config as TokioNetworkConfig, Network as TokioNetwork};
3#[cfg(feature = "iouring-storage")]
4use crate::storage::iouring::{Config as IoUringConfig, Storage as IoUringStorage};
5#[cfg(not(feature = "iouring-storage"))]
6use crate::storage::tokio::{Config as TokioStorageConfig, Storage as TokioStorage};
7#[cfg(feature = "external")]
8use crate::Pacer;
9#[cfg(feature = "iouring-network")]
10use crate::{
11    iouring,
12    network::iouring::{Config as IoUringNetworkConfig, Network as IoUringNetwork},
13};
14use crate::{
15    network::metered::Network as MeteredNetwork,
16    process::metered::Metrics as MeteredProcess,
17    signal::Signal,
18    storage::metered::Storage as MeteredStorage,
19    telemetry::metrics::task::Label,
20    utils::{
21        self, add_attribute, signal::Stopper, supervision::Tree, Panicker, Registry, ScopeGuard,
22    },
23    BufferPool, BufferPoolConfig, Clock, Error, Execution, Handle, Metrics as _, SinkOf,
24    Spawner as _, StreamOf, METRICS_PREFIX,
25};
26use commonware_macros::{select, stability};
27#[stability(BETA)]
28use commonware_parallel::ThreadPool;
29use commonware_utils::{sync::Mutex, NZUsize};
30use futures::future::Either;
31use governor::clock::{Clock as GClock, ReasonablyRealtime};
32use prometheus_client::{
33    metrics::{counter::Counter, family::Family, gauge::Gauge},
34    registry::{Metric, Registry as PrometheusRegistry},
35};
36use rand::{rngs::OsRng, CryptoRng, RngCore};
37#[stability(BETA)]
38use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
39use std::{
40    borrow::Cow,
41    env,
42    future::Future,
43    net::{IpAddr, SocketAddr},
44    num::NonZeroUsize,
45    path::PathBuf,
46    sync::Arc,
47    time::{Duration, SystemTime},
48};
49use tokio::runtime::{Builder, Runtime};
50use tracing::{info_span, Instrument};
51use tracing_opentelemetry::OpenTelemetrySpanExt;
52
53#[cfg(feature = "iouring-network")]
54cfg_if::cfg_if! {
55    if #[cfg(test)] {
56        // Use a smaller ring in tests to reduce `io_uring_setup` failures
57        // under parallel test load due to mlock/resource limits.
58        const IOURING_NETWORK_SIZE: u32 = 128;
59    } else {
60        const IOURING_NETWORK_SIZE: u32 = 1024;
61    }
62}
63
64#[derive(Debug)]
65struct Metrics {
66    tasks_spawned: Family<Label, Counter>,
67    tasks_running: Family<Label, Gauge>,
68}
69
70impl Metrics {
71    pub fn init(registry: &mut PrometheusRegistry) -> Self {
72        let metrics = Self {
73            tasks_spawned: Family::default(),
74            tasks_running: Family::default(),
75        };
76        registry.register(
77            "tasks_spawned",
78            "Total number of tasks spawned",
79            metrics.tasks_spawned.clone(),
80        );
81        registry.register(
82            "tasks_running",
83            "Number of tasks currently running",
84            metrics.tasks_running.clone(),
85        );
86        metrics
87    }
88}
89
90#[derive(Clone, Debug)]
91pub struct NetworkConfig {
92    /// If Some, explicitly sets TCP_NODELAY on the socket.
93    /// Otherwise uses system default.
94    ///
95    /// Defaults to `Some(true)`.
96    tcp_nodelay: Option<bool>,
97
98    /// Whether to set `SO_LINGER` to zero on the socket.
99    ///
100    /// When enabled, causes an immediate RST on close, avoiding
101    /// `TIME_WAIT` state. This is useful in adversarial environments to
102    /// reclaim socket resources immediately when closing connections to
103    /// misbehaving peers.
104    ///
105    /// Defaults to `true`.
106    zero_linger: bool,
107
108    /// Read/write timeout for network operations.
109    ///
110    /// Bounds the full `Sink::send` and `Stream::recv` calls rather than each
111    /// individual socket syscall. Larger
112    /// batched writes may therefore require a larger timeout.
113    read_write_timeout: Duration,
114}
115
116impl Default for NetworkConfig {
117    fn default() -> Self {
118        Self {
119            tcp_nodelay: Some(true),
120            zero_linger: true,
121            read_write_timeout: Duration::from_secs(60),
122        }
123    }
124}
125
126/// Configuration for the `tokio` runtime.
127#[derive(Clone)]
128pub struct Config {
129    /// Number of threads to use for handling async tasks.
130    ///
131    /// Worker threads are always active (waiting for work).
132    ///
133    /// Tokio sets the default value to the number of logical CPUs.
134    worker_threads: usize,
135
136    /// Maximum number of threads to use for blocking tasks.
137    ///
138    /// Unlike worker threads, blocking threads are created as needed and
139    /// exit if left idle for too long.
140    ///
141    /// Tokio sets the default value to 512 to avoid hanging on lower-level
142    /// operations that require blocking (like `fs` and writing to `Stdout`).
143    max_blocking_threads: usize,
144
145    /// Stack size to use for runtime-owned threads.
146    ///
147    /// Defaults to the system stack size when the current platform exposes it,
148    /// and otherwise falls back to Rust's default spawned-thread stack size.
149    ///
150    /// See [utils::thread::system_thread_stack_size].
151    thread_stack_size: usize,
152
153    /// Whether or not to catch panics.
154    catch_panics: bool,
155
156    /// Base directory for all storage operations.
157    storage_directory: PathBuf,
158
159    /// Maximum buffer size for operations on blobs.
160    ///
161    /// Tokio sets the default value to 2MB.
162    maximum_buffer_size: usize,
163
164    /// Network configuration.
165    network_cfg: NetworkConfig,
166
167    /// Explicit buffer pool configuration for network I/O, if provided.
168    network_buffer_pool_cfg: Option<BufferPoolConfig>,
169
170    /// Explicit buffer pool configuration for storage I/O, if provided.
171    storage_buffer_pool_cfg: Option<BufferPoolConfig>,
172}
173
174impl Config {
175    /// Returns a new [Config] with default values.
176    pub fn new() -> Self {
177        let rng = OsRng.next_u64();
178        let storage_directory = env::temp_dir().join(format!("commonware_tokio_runtime_{rng}"));
179        Self {
180            worker_threads: 2,
181            max_blocking_threads: 512,
182            thread_stack_size: utils::thread::system_thread_stack_size(),
183            catch_panics: false,
184            storage_directory,
185            maximum_buffer_size: 2 * 1024 * 1024, // 2 MB
186            network_cfg: NetworkConfig::default(),
187            network_buffer_pool_cfg: None,
188            storage_buffer_pool_cfg: None,
189        }
190    }
191
192    // Setters
193    /// See [Config]
194    pub const fn with_worker_threads(mut self, n: usize) -> Self {
195        self.worker_threads = n;
196        self
197    }
198    /// See [Config]
199    pub const fn with_max_blocking_threads(mut self, n: usize) -> Self {
200        self.max_blocking_threads = n;
201        self
202    }
203    /// See [Config]
204    pub const fn with_thread_stack_size(mut self, n: usize) -> Self {
205        self.thread_stack_size = n;
206        self
207    }
208    /// See [Config]
209    pub const fn with_catch_panics(mut self, b: bool) -> Self {
210        self.catch_panics = b;
211        self
212    }
213    /// See [Config]
214    pub const fn with_read_write_timeout(mut self, d: Duration) -> Self {
215        self.network_cfg.read_write_timeout = d;
216        self
217    }
218    /// See [Config]
219    pub const fn with_tcp_nodelay(mut self, n: Option<bool>) -> Self {
220        self.network_cfg.tcp_nodelay = n;
221        self
222    }
223    /// See [Config]
224    pub const fn with_zero_linger(mut self, l: bool) -> Self {
225        self.network_cfg.zero_linger = l;
226        self
227    }
228    /// See [Config]
229    pub fn with_storage_directory(mut self, p: impl Into<PathBuf>) -> Self {
230        self.storage_directory = p.into();
231        self
232    }
233    /// See [Config]
234    pub const fn with_maximum_buffer_size(mut self, n: usize) -> Self {
235        self.maximum_buffer_size = n;
236        self
237    }
238    /// See [Config]
239    pub const fn with_network_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
240        self.network_buffer_pool_cfg = Some(cfg);
241        self
242    }
243    /// See [Config]
244    pub const fn with_storage_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
245        self.storage_buffer_pool_cfg = Some(cfg);
246        self
247    }
248
249    // Getters
250    /// See [Config]
251    pub const fn worker_threads(&self) -> usize {
252        self.worker_threads
253    }
254    /// See [Config]
255    pub const fn max_blocking_threads(&self) -> usize {
256        self.max_blocking_threads
257    }
258    /// See [Config]
259    pub const fn thread_stack_size(&self) -> usize {
260        self.thread_stack_size
261    }
262    /// See [Config]
263    pub const fn catch_panics(&self) -> bool {
264        self.catch_panics
265    }
266    /// See [Config]
267    pub const fn read_write_timeout(&self) -> Duration {
268        self.network_cfg.read_write_timeout
269    }
270    /// See [Config]
271    pub const fn tcp_nodelay(&self) -> Option<bool> {
272        self.network_cfg.tcp_nodelay
273    }
274    /// See [Config]
275    pub const fn zero_linger(&self) -> bool {
276        self.network_cfg.zero_linger
277    }
278    /// See [Config]
279    pub const fn storage_directory(&self) -> &PathBuf {
280        &self.storage_directory
281    }
282    /// See [Config]
283    pub const fn maximum_buffer_size(&self) -> usize {
284        self.maximum_buffer_size
285    }
286
287    /// Returns the network buffer pool config, deriving thread-cache
288    /// parallelism from `worker_threads` if not explicitly configured.
289    fn resolved_network_buffer_pool_config(&self) -> BufferPoolConfig {
290        self.network_buffer_pool_cfg.clone().unwrap_or_else(|| {
291            BufferPoolConfig::for_network()
292                .with_thread_cache_for_parallelism(NZUsize!(self.worker_threads))
293        })
294    }
295
296    /// Returns the storage buffer pool config, deriving thread-cache
297    /// parallelism from `worker_threads` if not explicitly configured.
298    fn resolved_storage_buffer_pool_config(&self) -> BufferPoolConfig {
299        self.storage_buffer_pool_cfg.clone().unwrap_or_else(|| {
300            BufferPoolConfig::for_storage()
301                .with_thread_cache_for_parallelism(NZUsize!(self.worker_threads))
302        })
303    }
304}
305
306impl Default for Config {
307    fn default() -> Self {
308        Self::new()
309    }
310}
311
312/// Runtime based on [Tokio](https://tokio.rs).
313pub struct Executor {
314    registry: Mutex<Registry>,
315    metrics: Arc<Metrics>,
316    runtime: Runtime,
317    shutdown: Mutex<Stopper>,
318    panicker: Panicker,
319    thread_stack_size: usize,
320}
321
322/// Implementation of [crate::Runner] for the `tokio` runtime.
323pub struct Runner {
324    cfg: Config,
325}
326
327impl Default for Runner {
328    fn default() -> Self {
329        Self::new(Config::default())
330    }
331}
332
333impl Runner {
334    /// Initialize a new `tokio` runtime with the given number of threads.
335    pub const fn new(cfg: Config) -> Self {
336        Self { cfg }
337    }
338}
339
340impl crate::Runner for Runner {
341    type Context = Context;
342
343    fn start<F, Fut>(self, f: F) -> Fut::Output
344    where
345        F: FnOnce(Self::Context) -> Fut,
346        Fut: Future,
347    {
348        // Create a new registry
349        let mut registry = Registry::new();
350        let runtime_registry = registry.root_mut().sub_registry_with_prefix(METRICS_PREFIX);
351
352        // Initialize runtime
353        let metrics = Arc::new(Metrics::init(runtime_registry));
354        let runtime = Builder::new_multi_thread()
355            .worker_threads(self.cfg.worker_threads)
356            .max_blocking_threads(self.cfg.max_blocking_threads)
357            .thread_stack_size(self.cfg.thread_stack_size)
358            .enable_all()
359            .build()
360            .expect("failed to create Tokio runtime");
361
362        // Initialize panicker
363        let (panicker, panicked) = Panicker::new(self.cfg.catch_panics);
364
365        // Collect process metrics.
366        //
367        // We prefer to collect process metrics outside of `Context` because
368        // we are using `runtime_registry` rather than the one provided by `Context`.
369        let process = MeteredProcess::init(runtime_registry);
370        runtime.spawn(process.collect(tokio::time::sleep));
371
372        // Initialize buffer pools
373        let network_buffer_pool = BufferPool::new(
374            self.cfg.resolved_network_buffer_pool_config(),
375            runtime_registry.sub_registry_with_prefix("network_buffer_pool"),
376        );
377        let storage_buffer_pool = BufferPool::new(
378            self.cfg.resolved_storage_buffer_pool_config(),
379            runtime_registry.sub_registry_with_prefix("storage_buffer_pool"),
380        );
381
382        // Initialize storage
383        cfg_if::cfg_if! {
384            if #[cfg(feature = "iouring-storage")] {
385                let iouring_registry =
386                    runtime_registry.sub_registry_with_prefix("iouring_storage");
387                let storage = MeteredStorage::new(
388                    IoUringStorage::start(
389                        IoUringConfig {
390                            storage_directory: self.cfg.storage_directory.clone(),
391                            iouring_config: Default::default(),
392                            thread_stack_size: self.cfg.thread_stack_size,
393                        },
394                        iouring_registry,
395                        storage_buffer_pool.clone(),
396                    ),
397                    runtime_registry,
398                );
399            } else {
400                let storage = MeteredStorage::new(
401                    TokioStorage::new(
402                        TokioStorageConfig::new(
403                            self.cfg.storage_directory.clone(),
404                            self.cfg.maximum_buffer_size,
405                        ),
406                        storage_buffer_pool.clone(),
407                    ),
408                    runtime_registry,
409                );
410            }
411        }
412
413        // Initialize network
414        cfg_if::cfg_if! {
415            if #[cfg(feature = "iouring-network")] {
416                let iouring_registry =
417                    runtime_registry.sub_registry_with_prefix("iouring_network");
418                let config = IoUringNetworkConfig {
419                    tcp_nodelay: self.cfg.network_cfg.tcp_nodelay,
420                    zero_linger: self.cfg.network_cfg.zero_linger,
421                    read_write_timeout: self.cfg.network_cfg.read_write_timeout,
422                    iouring_config: iouring::Config {
423                        // TODO (#1045): make `IOURING_NETWORK_SIZE` configurable
424                        size: IOURING_NETWORK_SIZE,
425                        max_request_timeout: self.cfg.network_cfg.read_write_timeout,
426                        shutdown_timeout: Some(self.cfg.network_cfg.read_write_timeout),
427                        ..Default::default()
428                    },
429                    thread_stack_size: self.cfg.thread_stack_size,
430                    ..Default::default()
431                };
432                let network = MeteredNetwork::new(
433                    IoUringNetwork::start(
434                        config,
435                        iouring_registry,
436                        network_buffer_pool.clone(),
437                    )
438                    .unwrap(),
439                    runtime_registry,
440                );
441            } else {
442                let config = TokioNetworkConfig::default()
443                    .with_read_timeout(self.cfg.network_cfg.read_write_timeout)
444                    .with_write_timeout(self.cfg.network_cfg.read_write_timeout)
445                    .with_tcp_nodelay(self.cfg.network_cfg.tcp_nodelay)
446                    .with_zero_linger(self.cfg.network_cfg.zero_linger);
447                let network = MeteredNetwork::new(
448                    TokioNetwork::new(config, network_buffer_pool.clone()),
449                    runtime_registry,
450                );
451            }
452        }
453
454        // Initialize executor
455        let executor = Arc::new(Executor {
456            registry: Mutex::new(registry),
457            metrics,
458            runtime,
459            shutdown: Mutex::new(Stopper::default()),
460            panicker,
461            thread_stack_size: self.cfg.thread_stack_size,
462        });
463
464        // Get metrics
465        let label = Label::root();
466        executor.metrics.tasks_spawned.get_or_create(&label).inc();
467        let gauge = executor.metrics.tasks_running.get_or_create(&label).clone();
468
469        // Run the future
470        let context = Context {
471            storage,
472            name: label.name(),
473            attributes: Vec::new(),
474            scope: None,
475            executor: executor.clone(),
476            network,
477            network_buffer_pool,
478            storage_buffer_pool,
479            tree: Tree::root(),
480            execution: Execution::default(),
481            traced: false,
482        };
483        let output = executor.runtime.block_on(panicked.interrupt(f(context)));
484        gauge.dec();
485
486        output
487    }
488}
489
490cfg_if::cfg_if! {
491    if #[cfg(feature = "iouring-storage")] {
492        type Storage = MeteredStorage<IoUringStorage>;
493    } else {
494        type Storage = MeteredStorage<TokioStorage>;
495    }
496}
497
498cfg_if::cfg_if! {
499    if #[cfg(feature = "iouring-network")] {
500        type Network = MeteredNetwork<IoUringNetwork>;
501    } else {
502        type Network = MeteredNetwork<TokioNetwork>;
503    }
504}
505
506/// Implementation of [crate::Spawner], [crate::Clock],
507/// [crate::Network], and [crate::Storage] for the `tokio`
508/// runtime.
509pub struct Context {
510    name: String,
511    attributes: Vec<(String, String)>,
512    scope: Option<Arc<ScopeGuard>>,
513    executor: Arc<Executor>,
514    storage: Storage,
515    network: Network,
516    network_buffer_pool: BufferPool,
517    storage_buffer_pool: BufferPool,
518    tree: Arc<Tree>,
519    execution: Execution,
520    traced: bool,
521}
522
523impl Clone for Context {
524    fn clone(&self) -> Self {
525        let (child, _) = Tree::child(&self.tree);
526        Self {
527            name: self.name.clone(),
528            attributes: self.attributes.clone(),
529            scope: self.scope.clone(),
530            executor: self.executor.clone(),
531            storage: self.storage.clone(),
532            network: self.network.clone(),
533            network_buffer_pool: self.network_buffer_pool.clone(),
534            storage_buffer_pool: self.storage_buffer_pool.clone(),
535            tree: child,
536            execution: Execution::default(),
537            traced: false,
538        }
539    }
540}
541
542impl Context {
543    /// Access the [Metrics] of the runtime.
544    fn metrics(&self) -> &Metrics {
545        &self.executor.metrics
546    }
547}
548
549impl crate::Spawner for Context {
550    fn dedicated(mut self) -> Self {
551        self.execution = Execution::Dedicated;
552        self
553    }
554
555    fn shared(mut self, blocking: bool) -> Self {
556        self.execution = Execution::Shared(blocking);
557        self
558    }
559
560    fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
561    where
562        F: FnOnce(Self) -> Fut + Send + 'static,
563        Fut: Future<Output = T> + Send + 'static,
564        T: Send + 'static,
565    {
566        // Get metrics
567        let (label, metric) = spawn_metrics!(self);
568
569        // Track supervision before resetting configuration
570        let parent = Arc::clone(&self.tree);
571        let past = self.execution;
572        let traced = self.traced;
573        self.execution = Execution::default();
574        self.traced = false;
575        let (child, aborted) = Tree::child(&parent);
576        if aborted {
577            return Handle::closed(metric);
578        }
579        self.tree = child;
580
581        // Spawn the task
582        let executor = self.executor.clone();
583        let future = if traced {
584            let span = info_span!("task", name = %label.name());
585            for (key, value) in &self.attributes {
586                span.set_attribute(key.clone(), value.clone());
587            }
588            Either::Left(f(self).instrument(span))
589        } else {
590            Either::Right(f(self))
591        };
592        let (f, handle) = Handle::init(
593            future,
594            metric,
595            executor.panicker.clone(),
596            Arc::clone(&parent),
597        );
598
599        if matches!(past, Execution::Dedicated) {
600            utils::thread::spawn(executor.thread_stack_size, {
601                // Ensure the task can access the tokio runtime
602                let handle = executor.runtime.handle().clone();
603                move || {
604                    handle.block_on(f);
605                }
606            });
607        } else if matches!(past, Execution::Shared(true)) {
608            executor.runtime.spawn_blocking({
609                // Ensure the task can access the tokio runtime
610                let handle = executor.runtime.handle().clone();
611                move || {
612                    handle.block_on(f);
613                }
614            });
615        } else {
616            executor.runtime.spawn(f);
617        }
618
619        // Register the task on the parent
620        if let Some(aborter) = handle.aborter() {
621            parent.register(aborter);
622        }
623
624        handle
625    }
626
627    async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
628        let stop_resolved = {
629            let mut shutdown = self.executor.shutdown.lock();
630            shutdown.stop(value)
631        };
632
633        // Wait for all tasks to complete or the timeout to fire
634        let timeout_future = timeout.map_or_else(
635            || futures::future::Either::Right(futures::future::pending()),
636            |duration| futures::future::Either::Left(self.sleep(duration)),
637        );
638        select! {
639            result = stop_resolved => {
640                result.map_err(|_| Error::Closed)?;
641                Ok(())
642            },
643            _ = timeout_future => Err(Error::Timeout),
644        }
645    }
646
647    fn stopped(&self) -> Signal {
648        self.executor.shutdown.lock().stopped()
649    }
650}
651
652#[stability(BETA)]
653impl crate::ThreadPooler for Context {
654    fn create_thread_pool(
655        &self,
656        concurrency: NonZeroUsize,
657    ) -> Result<ThreadPool, ThreadPoolBuildError> {
658        ThreadPoolBuilder::new()
659            .num_threads(concurrency.get())
660            .spawn_handler(move |thread| {
661                // Tasks spawned in a thread pool are expected to run longer than any single
662                // task and thus should be provisioned as a dedicated thread.
663                self.with_label("rayon_thread")
664                    .dedicated()
665                    .spawn(move |_| async move { thread.run() });
666                Ok(())
667            })
668            .build()
669            .map(Arc::new)
670    }
671}
672
673impl crate::Metrics for Context {
674    fn label(&self) -> String {
675        self.name.clone()
676    }
677
678    fn with_label(&self, label: &str) -> Self {
679        // Construct the full label name
680        let name = {
681            let prefix = self.name.clone();
682            if prefix.is_empty() {
683                label.to_string()
684            } else {
685                format!("{prefix}_{label}")
686            }
687        };
688        Self {
689            name,
690            ..self.clone()
691        }
692    }
693
694    fn with_attribute(&self, key: &str, value: impl std::fmt::Display) -> Self {
695        let mut attributes = self.attributes.clone();
696        add_attribute(&mut attributes, key, value);
697        Self {
698            attributes,
699            ..self.clone()
700        }
701    }
702
703    fn with_scope(&self) -> Self {
704        // If already scoped, inherit the existing scope
705        if self.scope.is_some() {
706            return self.clone();
707        }
708
709        // RAII guard removes the scoped registry when all clones drop.
710        // Closure is infallible to avoid panicking in Drop.
711        let executor = self.executor.clone();
712        let scope_id = executor.registry.lock().create_scope();
713        let guard = Arc::new(ScopeGuard::new(scope_id, move |id| {
714            executor.registry.lock().remove_scope(id);
715        }));
716        Self {
717            scope: Some(guard),
718            ..self.clone()
719        }
720    }
721
722    fn with_span(&self) -> Self {
723        Self {
724            traced: true,
725            ..self.clone()
726        }
727    }
728
729    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
730        let name = name.into();
731        let prefixed_name = {
732            let prefix = &self.name;
733            if prefix.is_empty() {
734                name
735            } else {
736                format!("{}_{}", *prefix, name)
737            }
738        };
739
740        // Route to the appropriate registry (root or scoped)
741        let mut registry = self.executor.registry.lock();
742        let scoped = registry.get_scope(self.scope.as_ref().map(|s| s.scope_id()));
743        let sub_registry = self
744            .attributes
745            .iter()
746            .fold(scoped, |reg, (k, v): &(String, String)| {
747                reg.sub_registry_with_label((Cow::Owned(k.clone()), Cow::Owned(v.clone())))
748            });
749        sub_registry.register(prefixed_name, help, metric);
750    }
751
752    fn encode(&self) -> String {
753        self.executor.registry.lock().encode()
754    }
755}
756
757impl Clock for Context {
758    fn current(&self) -> SystemTime {
759        SystemTime::now()
760    }
761
762    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
763        tokio::time::sleep(duration)
764    }
765
766    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
767        let duration_until_deadline = deadline.duration_since(self.current()).unwrap_or_default();
768        tokio::time::sleep(duration_until_deadline)
769    }
770}
771
772#[cfg(feature = "external")]
773impl Pacer for Context {
774    fn pace<'a, F, T>(
775        &'a self,
776        _latency: Duration,
777        future: F,
778    ) -> impl Future<Output = T> + Send + 'a
779    where
780        F: Future<Output = T> + Send + 'a,
781        T: Send + 'a,
782    {
783        // Execute the future immediately
784        future
785    }
786}
787
788impl GClock for Context {
789    type Instant = SystemTime;
790
791    fn now(&self) -> Self::Instant {
792        self.current()
793    }
794}
795
796impl ReasonablyRealtime for Context {}
797
798impl crate::Network for Context {
799    type Listener = <Network as crate::Network>::Listener;
800
801    async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
802        self.network.bind(socket).await
803    }
804
805    async fn dial(&self, socket: SocketAddr) -> Result<(SinkOf<Self>, StreamOf<Self>), Error> {
806        self.network.dial(socket).await
807    }
808}
809
810impl crate::Resolver for Context {
811    async fn resolve(&self, host: &str) -> Result<Vec<IpAddr>, Error> {
812        // Uses the host's DNS configuration (e.g. /etc/resolv.conf on Unix,
813        // registry on Windows). This delegates to the system's libc resolver.
814        //
815        // The `:0` port is required by lookup_host's API but is not used
816        // for DNS resolution.
817        let addrs = tokio::net::lookup_host(format!("{host}:0"))
818            .await
819            .map_err(|e| Error::ResolveFailed(e.to_string()))?;
820        Ok(addrs.map(|addr| addr.ip()).collect())
821    }
822}
823
824impl RngCore for Context {
825    fn next_u32(&mut self) -> u32 {
826        OsRng.next_u32()
827    }
828
829    fn next_u64(&mut self) -> u64 {
830        OsRng.next_u64()
831    }
832
833    fn fill_bytes(&mut self, dest: &mut [u8]) {
834        OsRng.fill_bytes(dest);
835    }
836
837    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
838        OsRng.try_fill_bytes(dest)
839    }
840}
841
842impl CryptoRng for Context {}
843
844impl crate::Storage for Context {
845    type Blob = <Storage as crate::Storage>::Blob;
846
847    async fn open_versioned(
848        &self,
849        partition: &str,
850        name: &[u8],
851        versions: std::ops::RangeInclusive<u16>,
852    ) -> Result<(Self::Blob, u64, u16), Error> {
853        self.storage.open_versioned(partition, name, versions).await
854    }
855
856    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
857        self.storage.remove(partition, name).await
858    }
859
860    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
861        self.storage.scan(partition).await
862    }
863}
864
865impl crate::BufferPooler for Context {
866    fn network_buffer_pool(&self) -> &BufferPool {
867        &self.network_buffer_pool
868    }
869
870    fn storage_buffer_pool(&self) -> &BufferPool {
871        &self.storage_buffer_pool
872    }
873}
874
875#[cfg(test)]
876mod tests {
877    use super::*;
878
879    #[test]
880    fn test_worker_threads_updates_default_buffer_pool_parallelism() {
881        let cfg = Config::new().with_worker_threads(8);
882
883        assert_eq!(cfg.worker_threads, 8);
884        assert_eq!(
885            cfg.resolved_network_buffer_pool_config()
886                .thread_cache_config,
887            BufferPoolConfig::for_network()
888                .with_thread_cache_for_parallelism(NZUsize!(8))
889                .thread_cache_config
890        );
891        assert_eq!(
892            cfg.resolved_storage_buffer_pool_config()
893                .thread_cache_config,
894            BufferPoolConfig::for_storage()
895                .with_thread_cache_for_parallelism(NZUsize!(8))
896                .thread_cache_config
897        );
898    }
899
900    #[test]
901    fn test_default_thread_stack_size_uses_system_default() {
902        let cfg = Config::new();
903        assert_eq!(
904            cfg.thread_stack_size(),
905            utils::thread::system_thread_stack_size()
906        );
907    }
908
909    #[test]
910    fn test_thread_stack_size_override() {
911        let cfg = Config::new().with_thread_stack_size(4 * 1024 * 1024);
912        assert_eq!(cfg.thread_stack_size(), 4 * 1024 * 1024);
913    }
914
915    #[test]
916    fn test_explicit_buffer_pool_configs_override_worker_threads() {
917        // Order does not matter -- explicit configs always win.
918        let cfg = Config::new()
919            .with_network_buffer_pool_config(
920                BufferPoolConfig::for_network().with_thread_cache_for_parallelism(NZUsize!(2)),
921            )
922            .with_worker_threads(8)
923            .with_storage_buffer_pool_config(
924                BufferPoolConfig::for_storage().with_thread_cache_disabled(),
925            );
926
927        assert_eq!(
928            cfg.resolved_network_buffer_pool_config()
929                .thread_cache_config,
930            BufferPoolConfig::for_network()
931                .with_thread_cache_for_parallelism(NZUsize!(2))
932                .thread_cache_config
933        );
934        assert_eq!(
935            cfg.resolved_storage_buffer_pool_config()
936                .thread_cache_config,
937            BufferPoolConfig::for_storage()
938                .with_thread_cache_disabled()
939                .thread_cache_config
940        );
941    }
942}