zenoh_runtime/
lib.rs

1//
2// Copyright (c) 2024 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15//! ⚠️ WARNING ⚠️
16//!
17//! This crate is intended for Zenoh's internal use.
18//!
19//! [Click here for Zenoh's documentation](https://docs.rs/zenoh/latest/zenoh)
20use core::panic;
21use std::{
22    borrow::Borrow,
23    collections::HashMap,
24    env,
25    future::Future,
26    ops::Deref,
27    sync::{
28        atomic::{AtomicUsize, Ordering},
29        OnceLock,
30    },
31    time::Duration,
32};
33
34use lazy_static::lazy_static;
35use serde::Deserialize;
36use tokio::{
37    runtime::{Handle, Runtime, RuntimeFlavor},
38    task::JoinHandle,
39};
40use zenoh_macros::{GenericRuntimeParam, RegisterParam};
41use zenoh_result::ZResult as Result;
42
43pub const ZENOH_RUNTIME_ENV: &str = "ZENOH_RUNTIME";
44
45/// Available parameters to configure the ZRuntime.
46#[derive(Deserialize, Debug, GenericRuntimeParam)]
47#[serde(deny_unknown_fields, default)]
48pub struct RuntimeParam {
49    /// Number of async worker threads. At least one.
50    pub worker_threads: usize,
51    /// Number of maximal worker threads for blocking tasks. At least one.
52    pub max_blocking_threads: usize,
53    /// Hand over one ZRuntime to another one.
54    pub handover: Option<ZRuntime>,
55}
56
57impl Default for RuntimeParam {
58    fn default() -> Self {
59        Self {
60            worker_threads: 1,
61            max_blocking_threads: 50,
62            handover: None,
63        }
64    }
65}
66
67impl RuntimeParam {
68    pub fn build(&self, zrt: ZRuntime) -> Result<Runtime> {
69        let rt = tokio::runtime::Builder::new_multi_thread()
70            .worker_threads(self.worker_threads)
71            .max_blocking_threads(self.max_blocking_threads)
72            .enable_io()
73            .enable_time()
74            .thread_name_fn(move || {
75                let id = ZRUNTIME_INDEX
76                    .get(&zrt)
77                    .unwrap()
78                    .fetch_add(1, Ordering::SeqCst);
79                format!("{zrt}-{id}")
80            })
81            .build()?;
82        Ok(rt)
83    }
84}
85
86/// [`ZRuntime`], the access point of manipulate runtimes within zenoh.
87/// The runtime parameter can be configured by setting the environmental variable [`ZENOH_RUNTIME_ENV`].
88/// The parsing syntax use [RON](https://github.com/ron-rs/ron). An example configuration looks
89/// like
90///
91/// ```console
92/// ZENOH_RUNTIME='(
93///   rx: (handover: app),
94///   acc: (handover: app),
95///   app: (worker_threads: 2),
96///   tx: (max_blocking_threads: 1)
97/// )'
98/// ```
99/// Note: The runtime parameter takes effect at the beginning of the zenoh process and no longer be
100/// changed after the initialization.
101#[derive(Hash, Eq, PartialEq, Clone, Copy, Debug, RegisterParam, Deserialize)]
102#[param(RuntimeParam)]
103pub enum ZRuntime {
104    /// Renamed to app. Default param: worker_threads = 1.
105    #[serde(rename = "app")]
106    #[param(worker_threads = 1)]
107    Application,
108
109    /// Renamed to acc. Default param: worker_threads = 1.
110    #[serde(rename = "acc")]
111    #[param(worker_threads = 1)]
112    Acceptor,
113
114    /// Renamed to tx. Default param: worker_threads = 1.
115    #[serde(rename = "tx")]
116    #[param(worker_threads = 1)]
117    TX,
118
119    /// Renamed to rx. Default param: worker_threads = 2.
120    #[serde(rename = "rx")]
121    #[param(worker_threads = 2)]
122    RX,
123
124    /// Renamed to net. Default param: worker_threads = 1.
125    #[serde(rename = "net")]
126    #[param(worker_threads = 1)]
127    Net,
128}
129
130impl ZRuntime {
131    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
132    where
133        F: Future + Send + 'static,
134        F::Output: Send + 'static,
135    {
136        #[cfg(feature = "tracing-instrument")]
137        let future = tracing::Instrument::instrument(future, tracing::Span::current());
138
139        self.deref().spawn(future)
140    }
141
142    pub fn block_in_place<F, R>(&self, f: F) -> R
143    where
144        F: Future<Output = R>,
145    {
146        match Handle::try_current() {
147            Ok(handle) => {
148                if handle.runtime_flavor() == RuntimeFlavor::CurrentThread {
149                    panic!("Zenoh runtime doesn't support Tokio's current thread scheduler. Please use multi thread scheduler instead, e.g. a multi thread scheduler with one worker thread: `#[tokio::main(flavor = \"multi_thread\", worker_threads = 1)]`");
150                }
151            }
152            Err(e) => {
153                if e.is_thread_local_destroyed() {
154                    panic!("The Thread Local Storage inside Tokio is destroyed. This might happen when Zenoh API is called at process exit, e.g. in the atexit handler. Calling the Zenoh API at process exit is not supported and should be avoided.");
155                }
156            }
157        }
158
159        #[cfg(feature = "tracing-instrument")]
160        let f = tracing::Instrument::instrument(f, tracing::Span::current());
161
162        tokio::task::block_in_place(move || self.block_on(f))
163    }
164}
165
166impl Deref for ZRuntime {
167    type Target = Handle;
168    fn deref(&self) -> &Self::Target {
169        ZRUNTIME_POOL.get(self)
170    }
171}
172
173lazy_static! {
174    pub static ref ZRUNTIME_POOL: ZRuntimePool = ZRuntimePool::new();
175    pub static ref ZRUNTIME_INDEX: HashMap<ZRuntime, AtomicUsize> = ZRuntime::iter()
176        .map(|zrt| (zrt, AtomicUsize::new(0)))
177        .collect();
178}
179
180// A runtime guard used to explicitly drop the static variables that Rust doesn't drop by default
181pub struct ZRuntimePoolGuard;
182
183impl Drop for ZRuntimePoolGuard {
184    fn drop(&mut self) {
185        unsafe {
186            std::mem::drop((ZRUNTIME_POOL.deref() as *const ZRuntimePool).read());
187            std::mem::drop(
188                (ZRUNTIME_INDEX.deref() as *const HashMap<ZRuntime, AtomicUsize>).read(),
189            );
190        }
191    }
192}
193
194pub struct ZRuntimePool(HashMap<ZRuntime, OnceLock<Runtime>>);
195
196impl ZRuntimePool {
197    fn new() -> Self {
198        Self(ZRuntime::iter().map(|zrt| (zrt, OnceLock::new())).collect())
199    }
200
201    pub fn get(&self, zrt: &ZRuntime) -> &Handle {
202        // Although the ZRuntime is called to use `zrt`, it may be handed over to another one
203        // specified via the environmental variable.
204        let param: &RuntimeParam = zrt.borrow();
205        let zrt = match param.handover {
206            Some(handover) => handover,
207            None => *zrt,
208        };
209
210        self.0
211            .get(&zrt)
212            .unwrap_or_else(|| panic!("The hashmap should contains {zrt} after initialization"))
213            .get_or_init(|| {
214                zrt.init()
215                    .unwrap_or_else(|_| panic!("Failed to init {zrt}"))
216            })
217            .handle()
218    }
219}
220
221// If there are any blocking tasks spawned by ZRuntimes, the function will block until they return.
222impl Drop for ZRuntimePool {
223    fn drop(&mut self) {
224        let handles: Vec<_> = self
225            .0
226            .drain()
227            .filter_map(|(_name, mut rt)| {
228                rt.take()
229                    .map(|r| std::thread::spawn(move || r.shutdown_timeout(Duration::from_secs(1))))
230            })
231            .collect();
232
233        for hd in handles {
234            let _ = hd.join();
235        }
236    }
237}
238
239#[should_panic(expected = "Zenoh runtime doesn't support")]
240#[tokio::test]
241async fn block_in_place_fail_test() {
242    use crate::ZRuntime;
243    ZRuntime::TX.block_in_place(async { println!("Done") });
244}