Skip to main content

commonware_runtime/tokio/
runtime.rs

1#[cfg(not(feature = "iouring-network"))]
2use crate::network::tokio::{Config as TokioNetworkConfig, Network as TokioNetwork};
3#[cfg(feature = "iouring-storage")]
4use crate::storage::iouring::{Config as IoUringConfig, Storage as IoUringStorage};
5#[cfg(not(feature = "iouring-storage"))]
6use crate::storage::tokio::{Config as TokioStorageConfig, Storage as TokioStorage};
7#[cfg(feature = "external")]
8use crate::Pacer;
9use crate::{
10    child_label,
11    network::metered::Network as MeteredNetwork,
12    prefixed_name,
13    process::metered::Metrics as MeteredProcess,
14    signal::Signal,
15    storage::metered::Storage as MeteredStorage,
16    telemetry::metrics::{
17        add_attribute, raw, task::Label, validate_label, CounterFamily, GaugeFamily, Metric,
18        Register, Registered, Registry,
19    },
20    utils::{self, signal::Stopper, supervision::Tree, Panicker},
21    BufferPool, BufferPoolConfig, Clock, Error, Execution, Handle, Name, SinkOf, Spawner as _,
22    StreamOf, Supervisor as _, METRICS_PREFIX,
23};
24#[cfg(feature = "iouring-network")]
25use crate::{
26    iouring,
27    network::iouring::{Config as IoUringNetworkConfig, Network as IoUringNetwork},
28};
29use commonware_macros::{select, stability};
30#[stability(BETA)]
31use commonware_parallel::ThreadPool;
32use commonware_utils::{sync::Mutex, NZUsize};
33use futures::future::Either;
34use governor::clock::{Clock as GClock, ReasonablyRealtime};
35use rand::{rngs::OsRng, CryptoRng, RngCore};
36#[stability(BETA)]
37use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
38use std::{
39    env,
40    future::Future,
41    net::{IpAddr, SocketAddr},
42    num::NonZeroUsize,
43    path::PathBuf,
44    sync::Arc,
45    time::{Duration, SystemTime},
46};
47use tokio::runtime::{Builder, Runtime};
48use tracing::{info_span, Instrument};
49use tracing_opentelemetry::OpenTelemetrySpanExt;
50
51#[cfg(feature = "iouring-network")]
52cfg_if::cfg_if! {
53    if #[cfg(test)] {
54        // Use a smaller ring in tests to reduce `io_uring_setup` failures
55        // under parallel test load due to mlock/resource limits.
56        const IOURING_NETWORK_SIZE: u32 = 128;
57    } else {
58        const IOURING_NETWORK_SIZE: u32 = 1024;
59    }
60}
61
62#[derive(Debug)]
63struct Metrics {
64    tasks_spawned: CounterFamily<Label>,
65    tasks_running: GaugeFamily<Label>,
66}
67
68impl Metrics {
69    pub fn init(registry: &mut impl Register) -> Self {
70        Self {
71            tasks_spawned: registry.register(
72                "tasks_spawned",
73                "Total number of tasks spawned",
74                raw::Family::default(),
75            ),
76            tasks_running: registry.register(
77                "tasks_running",
78                "Number of tasks currently running",
79                raw::Family::default(),
80            ),
81        }
82    }
83}
84
85#[derive(Clone, Debug)]
86pub struct NetworkConfig {
87    /// If Some, explicitly sets TCP_NODELAY on the socket.
88    /// Otherwise uses system default.
89    ///
90    /// Defaults to `Some(true)`.
91    tcp_nodelay: Option<bool>,
92
93    /// Whether to set `SO_LINGER` to zero on the socket.
94    ///
95    /// When enabled, causes an immediate RST on close, avoiding
96    /// `TIME_WAIT` state. This is useful in adversarial environments to
97    /// reclaim socket resources immediately when closing connections to
98    /// misbehaving peers.
99    ///
100    /// Defaults to `true`.
101    zero_linger: bool,
102
103    /// Read/write timeout for network operations.
104    ///
105    /// Bounds the full `Sink::send` and `Stream::recv` calls rather than each
106    /// individual socket syscall. Larger
107    /// batched writes may therefore require a larger timeout.
108    read_write_timeout: Duration,
109}
110
111impl Default for NetworkConfig {
112    fn default() -> Self {
113        Self {
114            tcp_nodelay: Some(true),
115            zero_linger: true,
116            read_write_timeout: Duration::from_secs(60),
117        }
118    }
119}
120
121/// Configuration for the `tokio` runtime.
122#[derive(Clone)]
123pub struct Config {
124    /// Number of threads to use for handling async tasks.
125    ///
126    /// Worker threads are always active (waiting for work).
127    ///
128    /// Tokio sets the default value to the number of logical CPUs.
129    worker_threads: usize,
130
131    /// Number of scheduler ticks between global queue polls.
132    ///
133    /// When unset, Tokio uses its default behavior for the multi-thread
134    /// scheduler. Smaller values reduce the delay before tasks woken from
135    /// outside a worker, such as io_uring completion notifications, are polled
136    /// from the global queue again.
137    global_queue_interval: Option<u32>,
138
139    /// Maximum number of threads to use for blocking tasks.
140    ///
141    /// Unlike worker threads, blocking threads are created as needed and
142    /// exit if left idle for too long.
143    ///
144    /// Tokio sets the default value to 512 to avoid hanging on lower-level
145    /// operations that require blocking (like `fs` and writing to `Stdout`).
146    max_blocking_threads: usize,
147
148    /// Stack size to use for runtime-owned threads.
149    ///
150    /// Defaults to the system stack size when the current platform exposes it,
151    /// and otherwise falls back to Rust's default spawned-thread stack size.
152    ///
153    /// See [utils::thread::system_thread_stack_size].
154    thread_stack_size: usize,
155
156    /// Whether or not to catch panics.
157    catch_panics: bool,
158
159    /// Base directory for all storage operations.
160    storage_directory: PathBuf,
161
162    /// Maximum buffer size for operations on blobs.
163    ///
164    /// Tokio sets the default value to 2MB.
165    maximum_buffer_size: usize,
166
167    /// Network configuration.
168    network_cfg: NetworkConfig,
169
170    /// Explicit buffer pool configuration for network I/O, if provided.
171    network_buffer_pool_cfg: Option<BufferPoolConfig>,
172
173    /// Explicit buffer pool configuration for storage I/O, if provided.
174    storage_buffer_pool_cfg: Option<BufferPoolConfig>,
175}
176
177impl Config {
178    /// Returns a new [Config] with default values.
179    pub fn new() -> Self {
180        let rng = OsRng.next_u64();
181        let storage_directory = env::temp_dir().join(format!("commonware_tokio_runtime_{rng}"));
182        Self {
183            worker_threads: 2,
184            global_queue_interval: None,
185            max_blocking_threads: 512,
186            thread_stack_size: utils::thread::system_thread_stack_size(),
187            catch_panics: false,
188            storage_directory,
189            maximum_buffer_size: 2 * 1024 * 1024, // 2 MB
190            network_cfg: NetworkConfig::default(),
191            network_buffer_pool_cfg: None,
192            storage_buffer_pool_cfg: None,
193        }
194    }
195
196    // Setters
197    /// See [Config]
198    pub const fn with_worker_threads(mut self, n: usize) -> Self {
199        self.worker_threads = n;
200        self
201    }
202    /// See [Config]
203    pub const fn with_global_queue_interval(mut self, n: u32) -> Self {
204        self.global_queue_interval = Some(n);
205        self
206    }
207    /// See [Config]
208    pub const fn with_max_blocking_threads(mut self, n: usize) -> Self {
209        self.max_blocking_threads = n;
210        self
211    }
212    /// See [Config]
213    pub const fn with_thread_stack_size(mut self, n: usize) -> Self {
214        self.thread_stack_size = n;
215        self
216    }
217    /// See [Config]
218    pub const fn with_catch_panics(mut self, b: bool) -> Self {
219        self.catch_panics = b;
220        self
221    }
222    /// See [Config]
223    pub const fn with_read_write_timeout(mut self, d: Duration) -> Self {
224        self.network_cfg.read_write_timeout = d;
225        self
226    }
227    /// See [Config]
228    pub const fn with_tcp_nodelay(mut self, n: Option<bool>) -> Self {
229        self.network_cfg.tcp_nodelay = n;
230        self
231    }
232    /// See [Config]
233    pub const fn with_zero_linger(mut self, l: bool) -> Self {
234        self.network_cfg.zero_linger = l;
235        self
236    }
237    /// See [Config]
238    pub fn with_storage_directory(mut self, p: impl Into<PathBuf>) -> Self {
239        self.storage_directory = p.into();
240        self
241    }
242    /// See [Config]
243    pub const fn with_maximum_buffer_size(mut self, n: usize) -> Self {
244        self.maximum_buffer_size = n;
245        self
246    }
247    /// See [Config]
248    pub const fn with_network_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
249        self.network_buffer_pool_cfg = Some(cfg);
250        self
251    }
252    /// See [Config]
253    pub const fn with_storage_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
254        self.storage_buffer_pool_cfg = Some(cfg);
255        self
256    }
257
258    // Getters
259    /// See [Config]
260    pub const fn worker_threads(&self) -> usize {
261        self.worker_threads
262    }
263    /// See [Config]
264    pub const fn global_queue_interval(&self) -> Option<u32> {
265        self.global_queue_interval
266    }
267    /// See [Config]
268    pub const fn max_blocking_threads(&self) -> usize {
269        self.max_blocking_threads
270    }
271    /// See [Config]
272    pub const fn thread_stack_size(&self) -> usize {
273        self.thread_stack_size
274    }
275    /// See [Config]
276    pub const fn catch_panics(&self) -> bool {
277        self.catch_panics
278    }
279    /// See [Config]
280    pub const fn read_write_timeout(&self) -> Duration {
281        self.network_cfg.read_write_timeout
282    }
283    /// See [Config]
284    pub const fn tcp_nodelay(&self) -> Option<bool> {
285        self.network_cfg.tcp_nodelay
286    }
287    /// See [Config]
288    pub const fn zero_linger(&self) -> bool {
289        self.network_cfg.zero_linger
290    }
291    /// See [Config]
292    pub const fn storage_directory(&self) -> &PathBuf {
293        &self.storage_directory
294    }
295    /// See [Config]
296    pub const fn maximum_buffer_size(&self) -> usize {
297        self.maximum_buffer_size
298    }
299
300    /// Returns the network buffer pool config, deriving pool parallelism from
301    /// `worker_threads` if not explicitly configured.
302    fn resolved_network_buffer_pool_config(&self) -> BufferPoolConfig {
303        self.network_buffer_pool_cfg.clone().unwrap_or_else(|| {
304            BufferPoolConfig::for_network().with_parallelism(NZUsize!(self.worker_threads))
305        })
306    }
307
308    /// Returns the storage buffer pool config, deriving pool parallelism from
309    /// `worker_threads` if not explicitly configured.
310    fn resolved_storage_buffer_pool_config(&self) -> BufferPoolConfig {
311        self.storage_buffer_pool_cfg.clone().unwrap_or_else(|| {
312            BufferPoolConfig::for_storage().with_parallelism(NZUsize!(self.worker_threads))
313        })
314    }
315}
316
317impl Default for Config {
318    fn default() -> Self {
319        Self::new()
320    }
321}
322
323/// Runtime based on [Tokio](https://tokio.rs).
324pub struct Executor {
325    registry: Registry,
326    metrics: Arc<Metrics>,
327    runtime: Runtime,
328    shutdown: Mutex<Stopper>,
329    panicker: Panicker,
330    thread_stack_size: usize,
331}
332
333/// Implementation of [crate::Runner] for the `tokio` runtime.
334pub struct Runner {
335    cfg: Config,
336}
337
338impl Default for Runner {
339    fn default() -> Self {
340        Self::new(Config::default())
341    }
342}
343
344impl Runner {
345    /// Initialize a new `tokio` runtime with the given number of threads.
346    pub const fn new(cfg: Config) -> Self {
347        Self { cfg }
348    }
349}
350
351impl crate::Runner for Runner {
352    type Context = Context;
353
354    fn start<F, Fut>(self, f: F) -> Fut::Output
355    where
356        F: FnOnce(Self::Context) -> Fut,
357        Fut: Future,
358    {
359        // Create a new registry
360        let mut registry = Registry::new();
361        let mut runtime_registry = registry.sub_registry(METRICS_PREFIX);
362
363        // Initialize runtime
364        let metrics = Arc::new(Metrics::init(&mut runtime_registry));
365        let mut builder = Builder::new_multi_thread();
366        builder
367            .worker_threads(self.cfg.worker_threads)
368            .max_blocking_threads(self.cfg.max_blocking_threads)
369            .thread_stack_size(self.cfg.thread_stack_size)
370            .enable_all();
371        if let Some(global_queue_interval) = self.cfg.global_queue_interval {
372            builder.global_queue_interval(global_queue_interval);
373        }
374        let runtime = builder.build().expect("failed to create Tokio runtime");
375
376        // Initialize panicker
377        let (panicker, panicked) = Panicker::new(self.cfg.catch_panics);
378
379        // Collect process metrics.
380        //
381        // We prefer to collect process metrics outside of `Context` because
382        // we are using `runtime_registry` rather than the one provided by `Context`.
383        let process = MeteredProcess::init(&mut runtime_registry);
384        runtime.spawn(process.collect(tokio::time::sleep));
385
386        // Initialize buffer pools
387        let network_buffer_pool = BufferPool::new(
388            self.cfg.resolved_network_buffer_pool_config(),
389            &mut runtime_registry.sub_registry("network_buffer_pool"),
390        );
391        let storage_buffer_pool = BufferPool::new(
392            self.cfg.resolved_storage_buffer_pool_config(),
393            &mut runtime_registry.sub_registry("storage_buffer_pool"),
394        );
395
396        // Initialize storage
397        cfg_if::cfg_if! {
398            if #[cfg(feature = "iouring-storage")] {
399                let mut iouring_registry = runtime_registry.sub_registry("iouring_storage");
400                let storage = MeteredStorage::new(
401                    IoUringStorage::start(
402                        IoUringConfig {
403                            storage_directory: self.cfg.storage_directory.clone(),
404                            iouring_config: Default::default(),
405                            thread_stack_size: self.cfg.thread_stack_size,
406                        },
407                        &mut iouring_registry,
408                        storage_buffer_pool.clone(),
409                    ),
410                    &mut runtime_registry,
411                );
412            } else {
413                let storage = MeteredStorage::new(
414                    TokioStorage::new(
415                        TokioStorageConfig::new(
416                            self.cfg.storage_directory.clone(),
417                            self.cfg.maximum_buffer_size,
418                        ),
419                        storage_buffer_pool.clone(),
420                    ),
421                    &mut runtime_registry,
422                );
423            }
424        }
425
426        // Initialize network
427        cfg_if::cfg_if! {
428            if #[cfg(feature = "iouring-network")] {
429                let mut iouring_registry = runtime_registry.sub_registry("iouring_network");
430                let config = IoUringNetworkConfig {
431                    tcp_nodelay: self.cfg.network_cfg.tcp_nodelay,
432                    zero_linger: self.cfg.network_cfg.zero_linger,
433                    read_write_timeout: self.cfg.network_cfg.read_write_timeout,
434                    iouring_config: iouring::Config {
435                        // TODO (#1045): make `IOURING_NETWORK_SIZE` configurable
436                        size: IOURING_NETWORK_SIZE,
437                        max_request_timeout: self.cfg.network_cfg.read_write_timeout,
438                        shutdown_timeout: Some(self.cfg.network_cfg.read_write_timeout),
439                        ..Default::default()
440                    },
441                    thread_stack_size: self.cfg.thread_stack_size,
442                    ..Default::default()
443                };
444                let network = MeteredNetwork::new(
445                    IoUringNetwork::start(
446                        config,
447                        &mut iouring_registry,
448                        network_buffer_pool.clone(),
449                    )
450                    .unwrap(),
451                    &mut runtime_registry,
452                );
453            } else {
454                let config = TokioNetworkConfig::default()
455                    .with_read_timeout(self.cfg.network_cfg.read_write_timeout)
456                    .with_write_timeout(self.cfg.network_cfg.read_write_timeout)
457                    .with_tcp_nodelay(self.cfg.network_cfg.tcp_nodelay)
458                    .with_zero_linger(self.cfg.network_cfg.zero_linger);
459                let network = MeteredNetwork::new(
460                    TokioNetwork::new(config, network_buffer_pool.clone()),
461                    &mut runtime_registry,
462                );
463            }
464        }
465
466        // Initialize executor
467        let executor = Arc::new(Executor {
468            registry,
469            metrics,
470            runtime,
471            shutdown: Mutex::new(Stopper::default()),
472            panicker,
473            thread_stack_size: self.cfg.thread_stack_size,
474        });
475
476        // Get metrics
477        let label = Label::root();
478        executor.metrics.tasks_spawned.get_or_create(&label).inc();
479        let gauge = executor.metrics.tasks_running.get_or_create(&label).clone();
480
481        // Run the future
482        let context = Context {
483            storage,
484            name: label.name(),
485            attributes: Vec::new(),
486            executor: executor.clone(),
487            network,
488            network_buffer_pool,
489            storage_buffer_pool,
490            tree: Tree::root(),
491            execution: Execution::default(),
492            traced: false,
493        };
494        let output = executor.runtime.block_on(panicked.interrupt(f(context)));
495        gauge.dec();
496
497        output
498    }
499}
500
501cfg_if::cfg_if! {
502    if #[cfg(feature = "iouring-storage")] {
503        type Storage = MeteredStorage<IoUringStorage>;
504    } else {
505        type Storage = MeteredStorage<TokioStorage>;
506    }
507}
508
509cfg_if::cfg_if! {
510    if #[cfg(feature = "iouring-network")] {
511        type Network = MeteredNetwork<IoUringNetwork>;
512    } else {
513        type Network = MeteredNetwork<TokioNetwork>;
514    }
515}
516
517/// Implementation of [crate::Spawner], [crate::Clock],
518/// [crate::Network], and [crate::Storage] for the `tokio`
519/// runtime.
520pub struct Context {
521    name: String,
522    attributes: Vec<(String, String)>,
523    executor: Arc<Executor>,
524    storage: Storage,
525    network: Network,
526    network_buffer_pool: BufferPool,
527    storage_buffer_pool: BufferPool,
528    tree: Arc<Tree>,
529    execution: Execution,
530    traced: bool,
531}
532
533impl Context {
534    /// Access the [Metrics] of the runtime.
535    fn metrics(&self) -> &Metrics {
536        &self.executor.metrics
537    }
538}
539
540impl crate::Spawner for Context {
541    fn dedicated(mut self) -> Self {
542        self.execution = Execution::Dedicated;
543        self
544    }
545
546    fn shared(mut self, blocking: bool) -> Self {
547        self.execution = Execution::Shared(blocking);
548        self
549    }
550
551    fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
552    where
553        F: FnOnce(Self) -> Fut + Send + 'static,
554        Fut: Future<Output = T> + Send + 'static,
555        T: Send + 'static,
556    {
557        // Get metrics
558        let (label, metric) = spawn_metrics!(self);
559
560        // Track supervision before resetting configuration
561        let parent = Arc::clone(&self.tree);
562        let past = self.execution;
563        let traced = self.traced;
564        self.execution = Execution::default();
565        self.traced = false;
566        let (child, aborted) = Tree::child(&parent);
567        if aborted {
568            return Handle::closed(metric);
569        }
570        self.tree = child;
571
572        // Spawn the task
573        let executor = self.executor.clone();
574        let future = if traced {
575            let span = info_span!("task", name = %label.name());
576            for (key, value) in &self.attributes {
577                span.set_attribute(key.clone(), value.clone());
578            }
579            Either::Left(f(self).instrument(span))
580        } else {
581            Either::Right(f(self))
582        };
583        let (f, handle) = Handle::init(
584            future,
585            metric,
586            executor.panicker.clone(),
587            Arc::clone(&parent),
588        );
589
590        if matches!(past, Execution::Dedicated) {
591            utils::thread::spawn(executor.thread_stack_size, {
592                // Ensure the task can access the tokio runtime
593                let handle = executor.runtime.handle().clone();
594                move || {
595                    handle.block_on(f);
596                }
597            });
598        } else if matches!(past, Execution::Shared(true)) {
599            executor.runtime.spawn_blocking({
600                // Ensure the task can access the tokio runtime
601                let handle = executor.runtime.handle().clone();
602                move || {
603                    handle.block_on(f);
604                }
605            });
606        } else {
607            executor.runtime.spawn(f);
608        }
609
610        // Register the task on the parent
611        if let Some(aborter) = handle.aborter() {
612            parent.register(aborter);
613        }
614
615        handle
616    }
617
618    async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
619        let stop_resolved = {
620            let mut shutdown = self.executor.shutdown.lock();
621            shutdown.stop(value)
622        };
623
624        // Wait for all tasks to complete or the timeout to fire
625        let timeout_future = timeout.map_or_else(
626            || futures::future::Either::Right(futures::future::pending()),
627            |duration| futures::future::Either::Left(self.sleep(duration)),
628        );
629        select! {
630            result = stop_resolved => {
631                result.map_err(|_| Error::Closed)?;
632                Ok(())
633            },
634            _ = timeout_future => Err(Error::Timeout),
635        }
636    }
637
638    fn stopped(&self) -> Signal {
639        self.executor.shutdown.lock().stopped()
640    }
641}
642
643#[stability(BETA)]
644impl crate::ThreadPooler for Context {
645    fn create_thread_pool(
646        &self,
647        concurrency: NonZeroUsize,
648    ) -> Result<ThreadPool, ThreadPoolBuildError> {
649        ThreadPoolBuilder::new()
650            .num_threads(concurrency.get())
651            .spawn_handler(move |thread| {
652                // Tasks spawned in a thread pool are expected to run longer than any single
653                // task and thus should be provisioned as a dedicated thread.
654                self.child("rayon_thread")
655                    .dedicated()
656                    .spawn(move |_| async move { thread.run() });
657                Ok(())
658            })
659            .build()
660            .map(Arc::new)
661    }
662}
663
664impl crate::Supervisor for Context {
665    fn child(&self, label: &'static str) -> Self {
666        let (tree, _) = Tree::child(&self.tree);
667        Self {
668            name: child_label(&self.name, label),
669            attributes: self.attributes.clone(),
670            executor: self.executor.clone(),
671            storage: self.storage.clone(),
672            network: self.network.clone(),
673            network_buffer_pool: self.network_buffer_pool.clone(),
674            storage_buffer_pool: self.storage_buffer_pool.clone(),
675            tree,
676            execution: Execution::default(),
677            traced: false,
678        }
679    }
680
681    fn with_attribute(mut self, key: &'static str, value: impl std::fmt::Display) -> Self {
682        // Validate label format (must match [a-zA-Z][a-zA-Z0-9_]*)
683        validate_label(key);
684
685        // Add the attribute to the list of attributes
686        add_attribute(&mut self.attributes, key, value);
687        self
688    }
689
690    fn name(&self) -> Name {
691        Name {
692            label: self.name.clone(),
693            attributes: self.attributes.clone(),
694        }
695    }
696}
697
698impl crate::Tracing for Context {
699    fn with_span(mut self) -> Self {
700        self.traced = true;
701        self
702    }
703}
704
705impl crate::Metrics for Context {
706    fn register<N: Into<String>, H: Into<String>, M: Metric>(
707        &self,
708        name: N,
709        help: H,
710        metric: M,
711    ) -> Registered<M> {
712        let name = name.into();
713        let help = help.into();
714        let metric = Arc::new(metric);
715        self.executor.registry.register(
716            prefixed_name(&self.name, &name),
717            help,
718            self.attributes.clone(),
719            metric,
720        )
721    }
722
723    fn encode(&self) -> String {
724        self.executor.registry.encode()
725    }
726}
727
728impl Clock for Context {
729    fn current(&self) -> SystemTime {
730        SystemTime::now()
731    }
732
733    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
734        tokio::time::sleep(duration)
735    }
736
737    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
738        let duration_until_deadline = deadline.duration_since(self.current()).unwrap_or_default();
739        tokio::time::sleep(duration_until_deadline)
740    }
741}
742
743#[cfg(feature = "external")]
744impl Pacer for Context {
745    fn pace<'a, F, T>(
746        &'a self,
747        _latency: Duration,
748        future: F,
749    ) -> impl Future<Output = T> + Send + 'a
750    where
751        F: Future<Output = T> + Send + 'a,
752        T: Send + 'a,
753    {
754        // Execute the future immediately
755        future
756    }
757}
758
759impl GClock for Context {
760    type Instant = SystemTime;
761
762    fn now(&self) -> Self::Instant {
763        self.current()
764    }
765}
766
767impl ReasonablyRealtime for Context {}
768
769impl crate::Network for Context {
770    type Listener = <Network as crate::Network>::Listener;
771
772    async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
773        self.network.bind(socket).await
774    }
775
776    async fn dial(&self, socket: SocketAddr) -> Result<(SinkOf<Self>, StreamOf<Self>), Error> {
777        self.network.dial(socket).await
778    }
779}
780
781impl crate::Resolver for Context {
782    async fn resolve(&self, host: &str) -> Result<Vec<IpAddr>, Error> {
783        // Uses the host's DNS configuration (e.g. /etc/resolv.conf on Unix,
784        // registry on Windows). This delegates to the system's libc resolver.
785        //
786        // The `:0` port is required by lookup_host's API but is not used
787        // for DNS resolution.
788        let addrs = tokio::net::lookup_host(format!("{host}:0"))
789            .await
790            .map_err(|e| Error::ResolveFailed(e.to_string()))?;
791        Ok(addrs.map(|addr| addr.ip()).collect())
792    }
793}
794
795impl RngCore for Context {
796    fn next_u32(&mut self) -> u32 {
797        OsRng.next_u32()
798    }
799
800    fn next_u64(&mut self) -> u64 {
801        OsRng.next_u64()
802    }
803
804    fn fill_bytes(&mut self, dest: &mut [u8]) {
805        OsRng.fill_bytes(dest);
806    }
807
808    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
809        OsRng.try_fill_bytes(dest)
810    }
811}
812
813impl CryptoRng for Context {}
814
815impl crate::Storage for Context {
816    type Blob = <Storage as crate::Storage>::Blob;
817
818    async fn open_versioned(
819        &self,
820        partition: &str,
821        name: &[u8],
822        versions: std::ops::RangeInclusive<u16>,
823    ) -> Result<(Self::Blob, u64, u16), Error> {
824        self.storage.open_versioned(partition, name, versions).await
825    }
826
827    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
828        self.storage.remove(partition, name).await
829    }
830
831    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
832        self.storage.scan(partition).await
833    }
834}
835
836impl crate::BufferPooler for Context {
837    fn network_buffer_pool(&self) -> &BufferPool {
838        &self.network_buffer_pool
839    }
840
841    fn storage_buffer_pool(&self) -> &BufferPool {
842        &self.storage_buffer_pool
843    }
844}
845
846#[cfg(test)]
847mod tests {
848    use super::*;
849
850    #[test]
851    fn test_worker_threads_updates_default_buffer_pool_parallelism() {
852        let cfg = Config::new().with_worker_threads(8);
853
854        assert_eq!(cfg.worker_threads, 8);
855        let network = cfg.resolved_network_buffer_pool_config();
856        assert_eq!(network.parallelism, NZUsize!(8));
857        assert_eq!(
858            network.thread_cache_config,
859            BufferPoolConfig::for_network().thread_cache_config
860        );
861
862        let storage = cfg.resolved_storage_buffer_pool_config();
863        assert_eq!(storage.parallelism, NZUsize!(8));
864        assert_eq!(
865            storage.thread_cache_config,
866            BufferPoolConfig::for_storage().thread_cache_config
867        );
868    }
869
870    #[test]
871    fn test_default_thread_stack_size_uses_system_default() {
872        let cfg = Config::new();
873        assert_eq!(
874            cfg.thread_stack_size(),
875            utils::thread::system_thread_stack_size()
876        );
877    }
878
879    #[test]
880    fn test_thread_stack_size_override() {
881        let cfg = Config::new().with_thread_stack_size(4 * 1024 * 1024);
882        assert_eq!(cfg.thread_stack_size(), 4 * 1024 * 1024);
883    }
884
885    #[test]
886    fn test_explicit_buffer_pool_configs_override_worker_threads() {
887        // Order does not matter -- explicit configs always win.
888        let cfg = Config::new()
889            .with_network_buffer_pool_config(
890                BufferPoolConfig::for_network().with_parallelism(NZUsize!(2)),
891            )
892            .with_worker_threads(8)
893            .with_storage_buffer_pool_config(
894                BufferPoolConfig::for_storage().with_thread_cache_disabled(),
895            );
896
897        let network = cfg.resolved_network_buffer_pool_config();
898        assert_eq!(network.parallelism, NZUsize!(2));
899        assert_eq!(
900            network.thread_cache_config,
901            BufferPoolConfig::for_network().thread_cache_config
902        );
903
904        let storage = cfg.resolved_storage_buffer_pool_config();
905        assert_eq!(storage.parallelism, NZUsize!(1));
906        assert_eq!(
907            storage.thread_cache_config,
908            BufferPoolConfig::for_storage()
909                .with_thread_cache_disabled()
910                .thread_cache_config
911        );
912    }
913}