1#[cfg(not(feature = "iouring-network"))]
2use crate::network::tokio::{Config as TokioNetworkConfig, Network as TokioNetwork};
3#[cfg(feature = "iouring-storage")]
4use crate::storage::iouring::{Config as IoUringConfig, Storage as IoUringStorage};
5#[cfg(not(feature = "iouring-storage"))]
6use crate::storage::tokio::{Config as TokioStorageConfig, Storage as TokioStorage};
7#[cfg(feature = "external")]
8use crate::Pacer;
9#[cfg(feature = "iouring-network")]
10use crate::{
11 iouring,
12 network::iouring::{Config as IoUringNetworkConfig, Network as IoUringNetwork},
13};
14use crate::{
15 network::metered::Network as MeteredNetwork,
16 process::metered::Metrics as MeteredProcess,
17 signal::Signal,
18 storage::metered::Storage as MeteredStorage,
19 telemetry::metrics::task::Label,
20 utils::{signal::Stopper, supervision::Tree, Panicker},
21 Clock, Error, Execution, Handle, SinkOf, StreamOf, METRICS_PREFIX,
22};
23use commonware_macros::select;
24use futures::{future::BoxFuture, FutureExt};
25use governor::clock::{Clock as GClock, ReasonablyRealtime};
26use prometheus_client::{
27 encoding::text::encode,
28 metrics::{counter::Counter, family::Family, gauge::Gauge},
29 registry::{Metric, Registry},
30};
31use rand::{rngs::OsRng, CryptoRng, RngCore};
32use std::{
33 env,
34 future::Future,
35 net::SocketAddr,
36 path::PathBuf,
37 sync::{Arc, Mutex},
38 thread,
39 time::{Duration, SystemTime},
40};
41use tokio::runtime::{Builder, Runtime};
42use tracing::{info_span, Instrument};
43
44#[cfg(feature = "iouring-network")]
45const IOURING_NETWORK_SIZE: u32 = 1024;
46#[cfg(feature = "iouring-network")]
47const IOURING_NETWORK_FORCE_POLL: Duration = Duration::from_millis(100);
48
49#[derive(Debug)]
50struct Metrics {
51 tasks_spawned: Family<Label, Counter>,
52 tasks_running: Family<Label, Gauge>,
53}
54
55impl Metrics {
56 pub fn init(registry: &mut Registry) -> Self {
57 let metrics = Self {
58 tasks_spawned: Family::default(),
59 tasks_running: Family::default(),
60 };
61 registry.register(
62 "tasks_spawned",
63 "Total number of tasks spawned",
64 metrics.tasks_spawned.clone(),
65 );
66 registry.register(
67 "tasks_running",
68 "Number of tasks currently running",
69 metrics.tasks_running.clone(),
70 );
71 metrics
72 }
73}
74
75#[derive(Clone, Debug)]
76pub struct NetworkConfig {
77 tcp_nodelay: Option<bool>,
80
81 read_write_timeout: Duration,
83}
84
85impl Default for NetworkConfig {
86 fn default() -> Self {
87 Self {
88 tcp_nodelay: None,
89 read_write_timeout: Duration::from_secs(60),
90 }
91 }
92}
93
94#[derive(Clone)]
96pub struct Config {
97 worker_threads: usize,
103
104 max_blocking_threads: usize,
112
113 catch_panics: bool,
115
116 storage_directory: PathBuf,
118
119 maximum_buffer_size: usize,
123
124 network_cfg: NetworkConfig,
126}
127
128impl Config {
129 pub fn new() -> Self {
131 let rng = OsRng.next_u64();
132 let storage_directory = env::temp_dir().join(format!("commonware_tokio_runtime_{rng}"));
133 Self {
134 worker_threads: 2,
135 max_blocking_threads: 512,
136 catch_panics: false,
137 storage_directory,
138 maximum_buffer_size: 2 * 1024 * 1024, network_cfg: NetworkConfig::default(),
140 }
141 }
142
143 pub fn with_worker_threads(mut self, n: usize) -> Self {
146 self.worker_threads = n;
147 self
148 }
149 pub fn with_max_blocking_threads(mut self, n: usize) -> Self {
151 self.max_blocking_threads = n;
152 self
153 }
154 pub fn with_catch_panics(mut self, b: bool) -> Self {
156 self.catch_panics = b;
157 self
158 }
159 pub fn with_read_write_timeout(mut self, d: Duration) -> Self {
161 self.network_cfg.read_write_timeout = d;
162 self
163 }
164 pub fn with_tcp_nodelay(mut self, n: Option<bool>) -> Self {
166 self.network_cfg.tcp_nodelay = n;
167 self
168 }
169 pub fn with_storage_directory(mut self, p: impl Into<PathBuf>) -> Self {
171 self.storage_directory = p.into();
172 self
173 }
174 pub fn with_maximum_buffer_size(mut self, n: usize) -> Self {
176 self.maximum_buffer_size = n;
177 self
178 }
179
180 pub fn worker_threads(&self) -> usize {
183 self.worker_threads
184 }
185 pub fn max_blocking_threads(&self) -> usize {
187 self.max_blocking_threads
188 }
189 pub fn catch_panics(&self) -> bool {
191 self.catch_panics
192 }
193 pub fn read_write_timeout(&self) -> Duration {
195 self.network_cfg.read_write_timeout
196 }
197 pub fn tcp_nodelay(&self) -> Option<bool> {
199 self.network_cfg.tcp_nodelay
200 }
201 pub fn storage_directory(&self) -> &PathBuf {
203 &self.storage_directory
204 }
205 pub fn maximum_buffer_size(&self) -> usize {
207 self.maximum_buffer_size
208 }
209}
210
211impl Default for Config {
212 fn default() -> Self {
213 Self::new()
214 }
215}
216
217pub struct Executor {
219 registry: Mutex<Registry>,
220 metrics: Arc<Metrics>,
221 runtime: Runtime,
222 shutdown: Mutex<Stopper>,
223 panicker: Panicker,
224}
225
226pub struct Runner {
228 cfg: Config,
229}
230
231impl Default for Runner {
232 fn default() -> Self {
233 Self::new(Config::default())
234 }
235}
236
237impl Runner {
238 pub fn new(cfg: Config) -> Self {
240 Self { cfg }
241 }
242}
243
244impl crate::Runner for Runner {
245 type Context = Context;
246
247 fn start<F, Fut>(self, f: F) -> Fut::Output
248 where
249 F: FnOnce(Self::Context) -> Fut,
250 Fut: Future,
251 {
252 let mut registry = Registry::default();
254 let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
255
256 let metrics = Arc::new(Metrics::init(runtime_registry));
258 let runtime = Builder::new_multi_thread()
259 .worker_threads(self.cfg.worker_threads)
260 .max_blocking_threads(self.cfg.max_blocking_threads)
261 .enable_all()
262 .build()
263 .expect("failed to create Tokio runtime");
264
265 let (panicker, panicked) = Panicker::new(self.cfg.catch_panics);
267
268 let process = MeteredProcess::init(runtime_registry);
273 runtime.spawn(process.collect(tokio::time::sleep));
274
275 cfg_if::cfg_if! {
277 if #[cfg(feature = "iouring-storage")] {
278 let iouring_registry = runtime_registry.sub_registry_with_prefix("iouring_storage");
279 let storage = MeteredStorage::new(
280 IoUringStorage::start(IoUringConfig {
281 storage_directory: self.cfg.storage_directory.clone(),
282 iouring_config: Default::default(),
283 }, iouring_registry),
284 runtime_registry,
285 );
286 } else {
287 let storage = MeteredStorage::new(
288 TokioStorage::new(TokioStorageConfig::new(
289 self.cfg.storage_directory.clone(),
290 self.cfg.maximum_buffer_size,
291 )),
292 runtime_registry,
293 );
294 }
295 }
296
297 cfg_if::cfg_if! {
299 if #[cfg(feature = "iouring-network")] {
300 let iouring_registry = runtime_registry.sub_registry_with_prefix("iouring_network");
301 let config = IoUringNetworkConfig {
302 tcp_nodelay: self.cfg.network_cfg.tcp_nodelay,
303 iouring_config: iouring::Config {
304 size: IOURING_NETWORK_SIZE,
306 op_timeout: Some(self.cfg.network_cfg.read_write_timeout),
307 force_poll: IOURING_NETWORK_FORCE_POLL,
308 shutdown_timeout: Some(self.cfg.network_cfg.read_write_timeout),
309 ..Default::default()
310 },
311 };
312 let network = MeteredNetwork::new(
313 IoUringNetwork::start(config, iouring_registry).unwrap(),
314 runtime_registry,
315 );
316 } else {
317 let config = TokioNetworkConfig::default().with_read_timeout(self.cfg.network_cfg.read_write_timeout)
318 .with_write_timeout(self.cfg.network_cfg.read_write_timeout)
319 .with_tcp_nodelay(self.cfg.network_cfg.tcp_nodelay);
320 let network = MeteredNetwork::new(
321 TokioNetwork::from(config),
322 runtime_registry,
323 );
324 }
325 }
326
327 let executor = Arc::new(Executor {
329 registry: Mutex::new(registry),
330 metrics,
331 runtime,
332 shutdown: Mutex::new(Stopper::default()),
333 panicker,
334 });
335
336 let label = Label::root();
338 executor.metrics.tasks_spawned.get_or_create(&label).inc();
339 let gauge = executor.metrics.tasks_running.get_or_create(&label).clone();
340
341 let context = Context {
343 storage,
344 name: label.name(),
345 executor: executor.clone(),
346 network,
347 tree: Tree::root(),
348 execution: Execution::default(),
349 instrumented: false,
350 };
351 let output = executor.runtime.block_on(panicked.interrupt(f(context)));
352 gauge.dec();
353
354 output
355 }
356}
357
358cfg_if::cfg_if! {
359 if #[cfg(feature = "iouring-storage")] {
360 type Storage = MeteredStorage<IoUringStorage>;
361 } else {
362 type Storage = MeteredStorage<TokioStorage>;
363 }
364}
365
366cfg_if::cfg_if! {
367 if #[cfg(feature = "iouring-network")] {
368 type Network = MeteredNetwork<IoUringNetwork>;
369 } else {
370 type Network = MeteredNetwork<TokioNetwork>;
371 }
372}
373
374pub struct Context {
378 name: String,
379 executor: Arc<Executor>,
380 storage: Storage,
381 network: Network,
382 tree: Arc<Tree>,
383 execution: Execution,
384 instrumented: bool,
385}
386
387impl Clone for Context {
388 fn clone(&self) -> Self {
389 let (child, _) = Tree::child(&self.tree);
390 Self {
391 name: self.name.clone(),
392 executor: self.executor.clone(),
393 storage: self.storage.clone(),
394 network: self.network.clone(),
395
396 tree: child,
397 execution: Execution::default(),
398 instrumented: false,
399 }
400 }
401}
402
403impl Context {
404 fn metrics(&self) -> &Metrics {
406 &self.executor.metrics
407 }
408}
409
410impl crate::Spawner for Context {
411 fn dedicated(mut self) -> Self {
412 self.execution = Execution::Dedicated;
413 self
414 }
415
416 fn shared(mut self, blocking: bool) -> Self {
417 self.execution = Execution::Shared(blocking);
418 self
419 }
420
421 fn instrumented(mut self) -> Self {
422 self.instrumented = true;
423 self
424 }
425
426 fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
427 where
428 F: FnOnce(Self) -> Fut + Send + 'static,
429 Fut: Future<Output = T> + Send + 'static,
430 T: Send + 'static,
431 {
432 let (label, metric) = spawn_metrics!(self);
434
435 let parent = Arc::clone(&self.tree);
437 let past = self.execution;
438 let is_instrumented = self.instrumented;
439 self.execution = Execution::default();
440 self.instrumented = false;
441 let (child, aborted) = Tree::child(&parent);
442 if aborted {
443 return Handle::closed(metric);
444 }
445 self.tree = child;
446
447 let executor = self.executor.clone();
449 let future: BoxFuture<T> = if is_instrumented {
450 f(self)
451 .instrument(info_span!("task", name = %label.name()))
452 .boxed()
453 } else {
454 f(self).boxed()
455 };
456 let (f, handle) = Handle::init(
457 future,
458 metric,
459 executor.panicker.clone(),
460 Arc::clone(&parent),
461 );
462
463 if matches!(past, Execution::Dedicated) {
464 thread::spawn({
465 let handle = executor.runtime.handle().clone();
467 move || {
468 handle.block_on(f);
469 }
470 });
471 } else if matches!(past, Execution::Shared(true)) {
472 executor.runtime.spawn_blocking({
473 let handle = executor.runtime.handle().clone();
475 move || {
476 handle.block_on(f);
477 }
478 });
479 } else {
480 executor.runtime.spawn(f);
481 }
482
483 if let Some(aborter) = handle.aborter() {
485 parent.register(aborter);
486 }
487
488 handle
489 }
490
491 async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
492 let stop_resolved = {
493 let mut shutdown = self.executor.shutdown.lock().unwrap();
494 shutdown.stop(value)
495 };
496
497 let timeout_future = match timeout {
499 Some(duration) => futures::future::Either::Left(self.sleep(duration)),
500 None => futures::future::Either::Right(futures::future::pending()),
501 };
502 select! {
503 result = stop_resolved => {
504 result.map_err(|_| Error::Closed)?;
505 Ok(())
506 },
507 _ = timeout_future => {
508 Err(Error::Timeout)
509 }
510 }
511 }
512
513 fn stopped(&self) -> Signal {
514 self.executor.shutdown.lock().unwrap().stopped()
515 }
516}
517
518impl crate::Metrics for Context {
519 fn with_label(&self, label: &str) -> Self {
520 let name = {
521 let prefix = self.name.clone();
522 if prefix.is_empty() {
523 label.to_string()
524 } else {
525 format!("{prefix}_{label}")
526 }
527 };
528 assert!(
529 !name.starts_with(METRICS_PREFIX),
530 "using runtime label is not allowed"
531 );
532 Self {
533 name,
534 ..self.clone()
535 }
536 }
537
538 fn label(&self) -> String {
539 self.name.clone()
540 }
541
542 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
543 let name = name.into();
544 let prefixed_name = {
545 let prefix = &self.name;
546 if prefix.is_empty() {
547 name
548 } else {
549 format!("{}_{}", *prefix, name)
550 }
551 };
552 self.executor
553 .registry
554 .lock()
555 .unwrap()
556 .register(prefixed_name, help, metric)
557 }
558
559 fn encode(&self) -> String {
560 let mut buffer = String::new();
561 encode(&mut buffer, &self.executor.registry.lock().unwrap()).expect("encoding failed");
562 buffer
563 }
564}
565
566impl Clock for Context {
567 fn current(&self) -> SystemTime {
568 SystemTime::now()
569 }
570
571 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
572 tokio::time::sleep(duration)
573 }
574
575 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
576 let now = SystemTime::now();
577 let duration_until_deadline = match deadline.duration_since(now) {
578 Ok(duration) => duration,
579 Err(_) => Duration::from_secs(0), };
581 let target_instant = tokio::time::Instant::now() + duration_until_deadline;
582 tokio::time::sleep_until(target_instant)
583 }
584}
585
586#[cfg(feature = "external")]
587impl Pacer for Context {
588 fn pace<'a, F, T>(
589 &'a self,
590 _latency: Duration,
591 future: F,
592 ) -> impl Future<Output = T> + Send + 'a
593 where
594 F: Future<Output = T> + Send + 'a,
595 T: Send + 'a,
596 {
597 future
599 }
600}
601
602impl GClock for Context {
603 type Instant = SystemTime;
604
605 fn now(&self) -> Self::Instant {
606 self.current()
607 }
608}
609
610impl ReasonablyRealtime for Context {}
611
612impl crate::Network for Context {
613 type Listener = <Network as crate::Network>::Listener;
614
615 async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
616 self.network.bind(socket).await
617 }
618
619 async fn dial(&self, socket: SocketAddr) -> Result<(SinkOf<Self>, StreamOf<Self>), Error> {
620 self.network.dial(socket).await
621 }
622}
623
624impl RngCore for Context {
625 fn next_u32(&mut self) -> u32 {
626 OsRng.next_u32()
627 }
628
629 fn next_u64(&mut self) -> u64 {
630 OsRng.next_u64()
631 }
632
633 fn fill_bytes(&mut self, dest: &mut [u8]) {
634 OsRng.fill_bytes(dest);
635 }
636
637 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
638 OsRng.try_fill_bytes(dest)
639 }
640}
641
642impl CryptoRng for Context {}
643
644impl crate::Storage for Context {
645 type Blob = <Storage as crate::Storage>::Blob;
646
647 async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> {
648 self.storage.open(partition, name).await
649 }
650
651 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
652 self.storage.remove(partition, name).await
653 }
654
655 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
656 self.storage.scan(partition).await
657 }
658}