Skip to main content

commonware_runtime/tokio/
runtime.rs

1#[cfg(not(feature = "iouring-network"))]
2use crate::network::tokio::{Config as TokioNetworkConfig, Network as TokioNetwork};
3#[cfg(feature = "iouring-storage")]
4use crate::storage::iouring::{Config as IoUringConfig, Storage as IoUringStorage};
5#[cfg(not(feature = "iouring-storage"))]
6use crate::storage::tokio::{Config as TokioStorageConfig, Storage as TokioStorage};
7#[cfg(feature = "external")]
8use crate::Pacer;
9#[cfg(feature = "iouring-network")]
10use crate::{
11    iouring,
12    network::iouring::{Config as IoUringNetworkConfig, Network as IoUringNetwork},
13};
14use crate::{
15    network::metered::Network as MeteredNetwork,
16    process::metered::Metrics as MeteredProcess,
17    signal::Signal,
18    storage::metered::Storage as MeteredStorage,
19    telemetry::metrics::task::Label,
20    utils::{add_attribute, signal::Stopper, supervision::Tree, Panicker, Registry, ScopeGuard},
21    BufferPool, BufferPoolConfig, Clock, Error, Execution, Handle, Metrics as _, SinkOf,
22    Spawner as _, StreamOf, METRICS_PREFIX,
23};
24use commonware_macros::{select, stability};
25#[stability(BETA)]
26use commonware_parallel::ThreadPool;
27use commonware_utils::sync::Mutex;
28use futures::{future::BoxFuture, FutureExt};
29use governor::clock::{Clock as GClock, ReasonablyRealtime};
30use prometheus_client::{
31    metrics::{counter::Counter, family::Family, gauge::Gauge},
32    registry::{Metric, Registry as PrometheusRegistry},
33};
34use rand::{rngs::OsRng, CryptoRng, RngCore};
35#[stability(BETA)]
36use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
37use std::{
38    borrow::Cow,
39    env,
40    future::Future,
41    net::{IpAddr, SocketAddr},
42    num::NonZeroUsize,
43    path::PathBuf,
44    sync::Arc,
45    thread,
46    time::{Duration, SystemTime},
47};
48use tokio::runtime::{Builder, Runtime};
49use tracing::{info_span, Instrument};
50use tracing_opentelemetry::OpenTelemetrySpanExt;
51
52#[cfg(feature = "iouring-network")]
53cfg_if::cfg_if! {
54    if #[cfg(test)] {
55        // Use a smaller ring in tests to reduce `io_uring_setup` failures
56        // under parallel test load due to mlock/resource limits.
57        const IOURING_NETWORK_SIZE: u32 = 128;
58    } else {
59        const IOURING_NETWORK_SIZE: u32 = 1024;
60    }
61}
62
63#[derive(Debug)]
64struct Metrics {
65    tasks_spawned: Family<Label, Counter>,
66    tasks_running: Family<Label, Gauge>,
67}
68
69impl Metrics {
70    pub fn init(registry: &mut PrometheusRegistry) -> Self {
71        let metrics = Self {
72            tasks_spawned: Family::default(),
73            tasks_running: Family::default(),
74        };
75        registry.register(
76            "tasks_spawned",
77            "Total number of tasks spawned",
78            metrics.tasks_spawned.clone(),
79        );
80        registry.register(
81            "tasks_running",
82            "Number of tasks currently running",
83            metrics.tasks_running.clone(),
84        );
85        metrics
86    }
87}
88
89#[derive(Clone, Debug)]
90pub struct NetworkConfig {
91    /// If Some, explicitly sets TCP_NODELAY on the socket.
92    /// Otherwise uses system default.
93    ///
94    /// Defaults to `Some(true)`.
95    tcp_nodelay: Option<bool>,
96
97    /// Whether or not to set the `SO_LINGER` socket option.
98    ///
99    /// When `None`, the system default is used. When
100    /// `Some(duration)`, `SO_LINGER` is enabled with the given timeout.
101    /// `Some(Duration::ZERO)` causes an immediate RST on close, avoiding
102    /// `TIME_WAIT` state. This is useful in adversarial environments to
103    /// reclaim socket resources immediately when closing connections to
104    /// misbehaving peers.
105    ///
106    /// Defaults to `Some(Duration::ZERO)`.
107    so_linger: Option<Duration>,
108
109    /// Read/write timeout for network operations.
110    read_write_timeout: Duration,
111}
112
113impl Default for NetworkConfig {
114    fn default() -> Self {
115        Self {
116            tcp_nodelay: Some(true),
117            so_linger: Some(Duration::ZERO),
118            read_write_timeout: Duration::from_secs(60),
119        }
120    }
121}
122
123/// Configuration for the `tokio` runtime.
124#[derive(Clone)]
125pub struct Config {
126    /// Number of threads to use for handling async tasks.
127    ///
128    /// Worker threads are always active (waiting for work).
129    ///
130    /// Tokio sets the default value to the number of logical CPUs.
131    worker_threads: usize,
132
133    /// Maximum number of threads to use for blocking tasks.
134    ///
135    /// Unlike worker threads, blocking threads are created as needed and
136    /// exit if left idle for too long.
137    ///
138    /// Tokio sets the default value to 512 to avoid hanging on lower-level
139    /// operations that require blocking (like `fs` and writing to `Stdout`).
140    max_blocking_threads: usize,
141
142    /// Whether or not to catch panics.
143    catch_panics: bool,
144
145    /// Base directory for all storage operations.
146    storage_directory: PathBuf,
147
148    /// Maximum buffer size for operations on blobs.
149    ///
150    /// Tokio sets the default value to 2MB.
151    maximum_buffer_size: usize,
152
153    /// Network configuration.
154    network_cfg: NetworkConfig,
155
156    /// Buffer pool configuration for network I/O.
157    network_buffer_pool_cfg: BufferPoolConfig,
158
159    /// Buffer pool configuration for storage I/O.
160    storage_buffer_pool_cfg: BufferPoolConfig,
161}
162
163impl Config {
164    /// Returns a new [Config] with default values.
165    pub fn new() -> Self {
166        let rng = OsRng.next_u64();
167        let storage_directory = env::temp_dir().join(format!("commonware_tokio_runtime_{rng}"));
168        Self {
169            worker_threads: 2,
170            max_blocking_threads: 512,
171            catch_panics: false,
172            storage_directory,
173            maximum_buffer_size: 2 * 1024 * 1024, // 2 MB
174            network_cfg: NetworkConfig::default(),
175            network_buffer_pool_cfg: BufferPoolConfig::for_network(),
176            storage_buffer_pool_cfg: BufferPoolConfig::for_storage(),
177        }
178    }
179
180    // Setters
181    /// See [Config]
182    pub const fn with_worker_threads(mut self, n: usize) -> Self {
183        self.worker_threads = n;
184        self
185    }
186    /// See [Config]
187    pub const fn with_max_blocking_threads(mut self, n: usize) -> Self {
188        self.max_blocking_threads = n;
189        self
190    }
191    /// See [Config]
192    pub const fn with_catch_panics(mut self, b: bool) -> Self {
193        self.catch_panics = b;
194        self
195    }
196    /// See [Config]
197    pub const fn with_read_write_timeout(mut self, d: Duration) -> Self {
198        self.network_cfg.read_write_timeout = d;
199        self
200    }
201    /// See [Config]
202    pub const fn with_tcp_nodelay(mut self, n: Option<bool>) -> Self {
203        self.network_cfg.tcp_nodelay = n;
204        self
205    }
206    /// See [Config]
207    pub const fn with_so_linger(mut self, l: Option<Duration>) -> Self {
208        self.network_cfg.so_linger = l;
209        self
210    }
211    /// See [Config]
212    pub fn with_storage_directory(mut self, p: impl Into<PathBuf>) -> Self {
213        self.storage_directory = p.into();
214        self
215    }
216    /// See [Config]
217    pub const fn with_maximum_buffer_size(mut self, n: usize) -> Self {
218        self.maximum_buffer_size = n;
219        self
220    }
221    /// See [Config]
222    pub const fn with_network_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
223        self.network_buffer_pool_cfg = cfg;
224        self
225    }
226    /// See [Config]
227    pub const fn with_storage_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
228        self.storage_buffer_pool_cfg = cfg;
229        self
230    }
231
232    // Getters
233    /// See [Config]
234    pub const fn worker_threads(&self) -> usize {
235        self.worker_threads
236    }
237    /// See [Config]
238    pub const fn max_blocking_threads(&self) -> usize {
239        self.max_blocking_threads
240    }
241    /// See [Config]
242    pub const fn catch_panics(&self) -> bool {
243        self.catch_panics
244    }
245    /// See [Config]
246    pub const fn read_write_timeout(&self) -> Duration {
247        self.network_cfg.read_write_timeout
248    }
249    /// See [Config]
250    pub const fn tcp_nodelay(&self) -> Option<bool> {
251        self.network_cfg.tcp_nodelay
252    }
253    /// See [Config]
254    pub const fn so_linger(&self) -> Option<Duration> {
255        self.network_cfg.so_linger
256    }
257    /// See [Config]
258    pub const fn storage_directory(&self) -> &PathBuf {
259        &self.storage_directory
260    }
261    /// See [Config]
262    pub const fn maximum_buffer_size(&self) -> usize {
263        self.maximum_buffer_size
264    }
265    /// See [Config]
266    pub const fn network_buffer_pool_config(&self) -> &BufferPoolConfig {
267        &self.network_buffer_pool_cfg
268    }
269    /// See [Config]
270    pub const fn storage_buffer_pool_config(&self) -> &BufferPoolConfig {
271        &self.storage_buffer_pool_cfg
272    }
273}
274
275impl Default for Config {
276    fn default() -> Self {
277        Self::new()
278    }
279}
280
281/// Runtime based on [Tokio](https://tokio.rs).
282pub struct Executor {
283    registry: Mutex<Registry>,
284    metrics: Arc<Metrics>,
285    runtime: Runtime,
286    shutdown: Mutex<Stopper>,
287    panicker: Panicker,
288}
289
290/// Implementation of [crate::Runner] for the `tokio` runtime.
291pub struct Runner {
292    cfg: Config,
293}
294
295impl Default for Runner {
296    fn default() -> Self {
297        Self::new(Config::default())
298    }
299}
300
301impl Runner {
302    /// Initialize a new `tokio` runtime with the given number of threads.
303    pub const fn new(cfg: Config) -> Self {
304        Self { cfg }
305    }
306}
307
308impl crate::Runner for Runner {
309    type Context = Context;
310
311    fn start<F, Fut>(self, f: F) -> Fut::Output
312    where
313        F: FnOnce(Self::Context) -> Fut,
314        Fut: Future,
315    {
316        // Create a new registry
317        let mut registry = Registry::new();
318        let runtime_registry = registry.root_mut().sub_registry_with_prefix(METRICS_PREFIX);
319
320        // Initialize runtime
321        let metrics = Arc::new(Metrics::init(runtime_registry));
322        let runtime = Builder::new_multi_thread()
323            .worker_threads(self.cfg.worker_threads)
324            .max_blocking_threads(self.cfg.max_blocking_threads)
325            .enable_all()
326            .build()
327            .expect("failed to create Tokio runtime");
328
329        // Initialize panicker
330        let (panicker, panicked) = Panicker::new(self.cfg.catch_panics);
331
332        // Collect process metrics.
333        //
334        // We prefer to collect process metrics outside of `Context` because
335        // we are using `runtime_registry` rather than the one provided by `Context`.
336        let process = MeteredProcess::init(runtime_registry);
337        runtime.spawn(process.collect(tokio::time::sleep));
338
339        // Initialize buffer pools
340        let network_buffer_pool = BufferPool::new(
341            self.cfg.network_buffer_pool_cfg.clone(),
342            runtime_registry.sub_registry_with_prefix("network_buffer_pool"),
343        );
344        let storage_buffer_pool = BufferPool::new(
345            self.cfg.storage_buffer_pool_cfg.clone(),
346            runtime_registry.sub_registry_with_prefix("storage_buffer_pool"),
347        );
348
349        // Initialize storage
350        cfg_if::cfg_if! {
351            if #[cfg(feature = "iouring-storage")] {
352                let iouring_registry =
353                    runtime_registry.sub_registry_with_prefix("iouring_storage");
354                let storage = MeteredStorage::new(
355                    IoUringStorage::start(
356                        IoUringConfig {
357                            storage_directory: self.cfg.storage_directory.clone(),
358                            iouring_config: Default::default(),
359                        },
360                        iouring_registry,
361                        storage_buffer_pool.clone(),
362                    ),
363                    runtime_registry,
364                );
365            } else {
366                let storage = MeteredStorage::new(
367                    TokioStorage::new(
368                        TokioStorageConfig::new(
369                            self.cfg.storage_directory.clone(),
370                            self.cfg.maximum_buffer_size,
371                        ),
372                        storage_buffer_pool.clone(),
373                    ),
374                    runtime_registry,
375                );
376            }
377        }
378
379        // Initialize network
380        cfg_if::cfg_if! {
381            if #[cfg(feature = "iouring-network")] {
382                let iouring_registry =
383                    runtime_registry.sub_registry_with_prefix("iouring_network");
384                let config = IoUringNetworkConfig {
385                    tcp_nodelay: self.cfg.network_cfg.tcp_nodelay,
386                    so_linger: self.cfg.network_cfg.so_linger,
387                    iouring_config: iouring::Config {
388                        // TODO (#1045): make `IOURING_NETWORK_SIZE` configurable
389                        size: IOURING_NETWORK_SIZE,
390                        op_timeout: Some(self.cfg.network_cfg.read_write_timeout),
391                        shutdown_timeout: Some(self.cfg.network_cfg.read_write_timeout),
392                        ..Default::default()
393                    },
394                    ..Default::default()
395                };
396                let network = MeteredNetwork::new(
397                    IoUringNetwork::start(
398                        config,
399                        iouring_registry,
400                        network_buffer_pool.clone(),
401                    )
402                    .unwrap(),
403                    runtime_registry,
404                );
405            } else {
406                let config = TokioNetworkConfig::default()
407                    .with_read_timeout(self.cfg.network_cfg.read_write_timeout)
408                    .with_write_timeout(self.cfg.network_cfg.read_write_timeout)
409                    .with_tcp_nodelay(self.cfg.network_cfg.tcp_nodelay)
410                    .with_so_linger(self.cfg.network_cfg.so_linger);
411                let network = MeteredNetwork::new(
412                    TokioNetwork::new(config, network_buffer_pool.clone()),
413                    runtime_registry,
414                );
415            }
416        }
417
418        // Initialize executor
419        let executor = Arc::new(Executor {
420            registry: Mutex::new(registry),
421            metrics,
422            runtime,
423            shutdown: Mutex::new(Stopper::default()),
424            panicker,
425        });
426
427        // Get metrics
428        let label = Label::root();
429        executor.metrics.tasks_spawned.get_or_create(&label).inc();
430        let gauge = executor.metrics.tasks_running.get_or_create(&label).clone();
431
432        // Run the future
433        let context = Context {
434            storage,
435            name: label.name(),
436            attributes: Vec::new(),
437            scope: None,
438            executor: executor.clone(),
439            network,
440            network_buffer_pool,
441            storage_buffer_pool,
442            tree: Tree::root(),
443            execution: Execution::default(),
444            instrumented: false,
445        };
446        let output = executor.runtime.block_on(panicked.interrupt(f(context)));
447        gauge.dec();
448
449        output
450    }
451}
452
453cfg_if::cfg_if! {
454    if #[cfg(feature = "iouring-storage")] {
455        type Storage = MeteredStorage<IoUringStorage>;
456    } else {
457        type Storage = MeteredStorage<TokioStorage>;
458    }
459}
460
461cfg_if::cfg_if! {
462    if #[cfg(feature = "iouring-network")] {
463        type Network = MeteredNetwork<IoUringNetwork>;
464    } else {
465        type Network = MeteredNetwork<TokioNetwork>;
466    }
467}
468
469/// Implementation of [crate::Spawner], [crate::Clock],
470/// [crate::Network], and [crate::Storage] for the `tokio`
471/// runtime.
472pub struct Context {
473    name: String,
474    attributes: Vec<(String, String)>,
475    scope: Option<Arc<ScopeGuard>>,
476    executor: Arc<Executor>,
477    storage: Storage,
478    network: Network,
479    network_buffer_pool: BufferPool,
480    storage_buffer_pool: BufferPool,
481    tree: Arc<Tree>,
482    execution: Execution,
483    instrumented: bool,
484}
485
486impl Clone for Context {
487    fn clone(&self) -> Self {
488        let (child, _) = Tree::child(&self.tree);
489        Self {
490            name: self.name.clone(),
491            attributes: self.attributes.clone(),
492            scope: self.scope.clone(),
493            executor: self.executor.clone(),
494            storage: self.storage.clone(),
495            network: self.network.clone(),
496            network_buffer_pool: self.network_buffer_pool.clone(),
497            storage_buffer_pool: self.storage_buffer_pool.clone(),
498            tree: child,
499            execution: Execution::default(),
500            instrumented: false,
501        }
502    }
503}
504
505impl Context {
506    /// Access the [Metrics] of the runtime.
507    fn metrics(&self) -> &Metrics {
508        &self.executor.metrics
509    }
510}
511
512impl crate::Spawner for Context {
513    fn dedicated(mut self) -> Self {
514        self.execution = Execution::Dedicated;
515        self
516    }
517
518    fn shared(mut self, blocking: bool) -> Self {
519        self.execution = Execution::Shared(blocking);
520        self
521    }
522
523    fn instrumented(mut self) -> Self {
524        self.instrumented = true;
525        self
526    }
527
528    fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
529    where
530        F: FnOnce(Self) -> Fut + Send + 'static,
531        Fut: Future<Output = T> + Send + 'static,
532        T: Send + 'static,
533    {
534        // Get metrics
535        let (label, metric) = spawn_metrics!(self);
536
537        // Track supervision before resetting configuration
538        let parent = Arc::clone(&self.tree);
539        let past = self.execution;
540        let is_instrumented = self.instrumented;
541        self.execution = Execution::default();
542        self.instrumented = false;
543        let (child, aborted) = Tree::child(&parent);
544        if aborted {
545            return Handle::closed(metric);
546        }
547        self.tree = child;
548
549        // Spawn the task
550        let executor = self.executor.clone();
551        let future: BoxFuture<'_, T> = if is_instrumented {
552            let span = info_span!("task", name = %label.name());
553            for (key, value) in &self.attributes {
554                span.set_attribute(key.clone(), value.clone());
555            }
556            f(self).instrument(span).boxed()
557        } else {
558            f(self).boxed()
559        };
560        let (f, handle) = Handle::init(
561            future,
562            metric,
563            executor.panicker.clone(),
564            Arc::clone(&parent),
565        );
566
567        if matches!(past, Execution::Dedicated) {
568            thread::spawn({
569                // Ensure the task can access the tokio runtime
570                let handle = executor.runtime.handle().clone();
571                move || {
572                    handle.block_on(f);
573                }
574            });
575        } else if matches!(past, Execution::Shared(true)) {
576            executor.runtime.spawn_blocking({
577                // Ensure the task can access the tokio runtime
578                let handle = executor.runtime.handle().clone();
579                move || {
580                    handle.block_on(f);
581                }
582            });
583        } else {
584            executor.runtime.spawn(f);
585        }
586
587        // Register the task on the parent
588        if let Some(aborter) = handle.aborter() {
589            parent.register(aborter);
590        }
591
592        handle
593    }
594
595    async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
596        let stop_resolved = {
597            let mut shutdown = self.executor.shutdown.lock();
598            shutdown.stop(value)
599        };
600
601        // Wait for all tasks to complete or the timeout to fire
602        let timeout_future = timeout.map_or_else(
603            || futures::future::Either::Right(futures::future::pending()),
604            |duration| futures::future::Either::Left(self.sleep(duration)),
605        );
606        select! {
607            result = stop_resolved => {
608                result.map_err(|_| Error::Closed)?;
609                Ok(())
610            },
611            _ = timeout_future => Err(Error::Timeout),
612        }
613    }
614
615    fn stopped(&self) -> Signal {
616        self.executor.shutdown.lock().stopped()
617    }
618}
619
620#[stability(BETA)]
621impl crate::ThreadPooler for Context {
622    fn create_thread_pool(
623        &self,
624        concurrency: NonZeroUsize,
625    ) -> Result<ThreadPool, ThreadPoolBuildError> {
626        ThreadPoolBuilder::new()
627            .num_threads(concurrency.get())
628            .spawn_handler(move |thread| {
629                // Tasks spawned in a thread pool are expected to run longer than any single
630                // task and thus should be provisioned as a dedicated thread.
631                self.with_label("rayon_thread")
632                    .dedicated()
633                    .spawn(move |_| async move { thread.run() });
634                Ok(())
635            })
636            .build()
637            .map(Arc::new)
638    }
639}
640
641impl crate::Metrics for Context {
642    fn with_label(&self, label: &str) -> Self {
643        // Construct the full label name
644        let name = {
645            let prefix = self.name.clone();
646            if prefix.is_empty() {
647                label.to_string()
648            } else {
649                format!("{prefix}_{label}")
650            }
651        };
652        Self {
653            name,
654            ..self.clone()
655        }
656    }
657
658    fn label(&self) -> String {
659        self.name.clone()
660    }
661
662    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
663        let name = name.into();
664        let prefixed_name = {
665            let prefix = &self.name;
666            if prefix.is_empty() {
667                name
668            } else {
669                format!("{}_{}", *prefix, name)
670            }
671        };
672
673        // Route to the appropriate registry (root or scoped)
674        let mut registry = self.executor.registry.lock();
675        let scoped = registry.get_scope(self.scope.as_ref().map(|s| s.scope_id()));
676        let sub_registry = self
677            .attributes
678            .iter()
679            .fold(scoped, |reg, (k, v): &(String, String)| {
680                reg.sub_registry_with_label((Cow::Owned(k.clone()), Cow::Owned(v.clone())))
681            });
682        sub_registry.register(prefixed_name, help, metric);
683    }
684
685    fn encode(&self) -> String {
686        self.executor.registry.lock().encode()
687    }
688
689    fn with_attribute(&self, key: &str, value: impl std::fmt::Display) -> Self {
690        let mut attributes = self.attributes.clone();
691        add_attribute(&mut attributes, key, value);
692        Self {
693            attributes,
694            ..self.clone()
695        }
696    }
697
698    fn with_scope(&self) -> Self {
699        // If already scoped, inherit the existing scope
700        if self.scope.is_some() {
701            return self.clone();
702        }
703
704        // RAII guard removes the scoped registry when all clones drop.
705        // Closure is infallible to avoid panicking in Drop.
706        let executor = self.executor.clone();
707        let scope_id = executor.registry.lock().create_scope();
708        let guard = Arc::new(ScopeGuard::new(scope_id, move |id| {
709            executor.registry.lock().remove_scope(id);
710        }));
711        Self {
712            scope: Some(guard),
713            ..self.clone()
714        }
715    }
716}
717
718impl Clock for Context {
719    fn current(&self) -> SystemTime {
720        SystemTime::now()
721    }
722
723    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
724        tokio::time::sleep(duration)
725    }
726
727    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
728        let now = SystemTime::now();
729        let duration_until_deadline = deadline.duration_since(now).unwrap_or_else(|_| {
730            // Deadline is in the past
731            Duration::from_secs(0)
732        });
733        let target_instant = tokio::time::Instant::now() + duration_until_deadline;
734        tokio::time::sleep_until(target_instant)
735    }
736}
737
738#[cfg(feature = "external")]
739impl Pacer for Context {
740    fn pace<'a, F, T>(
741        &'a self,
742        _latency: Duration,
743        future: F,
744    ) -> impl Future<Output = T> + Send + 'a
745    where
746        F: Future<Output = T> + Send + 'a,
747        T: Send + 'a,
748    {
749        // Execute the future immediately
750        future
751    }
752}
753
754impl GClock for Context {
755    type Instant = SystemTime;
756
757    fn now(&self) -> Self::Instant {
758        self.current()
759    }
760}
761
762impl ReasonablyRealtime for Context {}
763
764impl crate::Network for Context {
765    type Listener = <Network as crate::Network>::Listener;
766
767    async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
768        self.network.bind(socket).await
769    }
770
771    async fn dial(&self, socket: SocketAddr) -> Result<(SinkOf<Self>, StreamOf<Self>), Error> {
772        self.network.dial(socket).await
773    }
774}
775
776impl crate::Resolver for Context {
777    async fn resolve(&self, host: &str) -> Result<Vec<IpAddr>, Error> {
778        // Uses the host's DNS configuration (e.g. /etc/resolv.conf on Unix,
779        // registry on Windows). This delegates to the system's libc resolver.
780        //
781        // The `:0` port is required by lookup_host's API but is not used
782        // for DNS resolution.
783        let addrs = tokio::net::lookup_host(format!("{host}:0"))
784            .await
785            .map_err(|e| Error::ResolveFailed(e.to_string()))?;
786        Ok(addrs.map(|addr| addr.ip()).collect())
787    }
788}
789
790impl RngCore for Context {
791    fn next_u32(&mut self) -> u32 {
792        OsRng.next_u32()
793    }
794
795    fn next_u64(&mut self) -> u64 {
796        OsRng.next_u64()
797    }
798
799    fn fill_bytes(&mut self, dest: &mut [u8]) {
800        OsRng.fill_bytes(dest);
801    }
802
803    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
804        OsRng.try_fill_bytes(dest)
805    }
806}
807
808impl CryptoRng for Context {}
809
810impl crate::Storage for Context {
811    type Blob = <Storage as crate::Storage>::Blob;
812
813    async fn open_versioned(
814        &self,
815        partition: &str,
816        name: &[u8],
817        versions: std::ops::RangeInclusive<u16>,
818    ) -> Result<(Self::Blob, u64, u16), Error> {
819        self.storage.open_versioned(partition, name, versions).await
820    }
821
822    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
823        self.storage.remove(partition, name).await
824    }
825
826    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
827        self.storage.scan(partition).await
828    }
829}
830
831impl crate::BufferPooler for Context {
832    fn network_buffer_pool(&self) -> &BufferPool {
833        &self.network_buffer_pool
834    }
835
836    fn storage_buffer_pool(&self) -> &BufferPool {
837        &self.storage_buffer_pool
838    }
839}