1use core::panic;
21use std::{
22 borrow::Borrow,
23 collections::HashMap,
24 env,
25 future::Future,
26 ops::Deref,
27 sync::{
28 atomic::{AtomicUsize, Ordering},
29 OnceLock,
30 },
31 time::Duration,
32};
33
34use lazy_static::lazy_static;
35use serde::Deserialize;
36use tokio::{
37 runtime::{Handle, Runtime, RuntimeFlavor},
38 task::JoinHandle,
39};
40use zenoh_macros::{GenericRuntimeParam, RegisterParam};
41use zenoh_result::ZResult as Result;
42
43pub const ZENOH_RUNTIME_ENV: &str = "ZENOH_RUNTIME";
44
45#[derive(Deserialize, Debug, GenericRuntimeParam)]
47#[serde(deny_unknown_fields, default)]
48pub struct RuntimeParam {
49 pub worker_threads: usize,
51 pub max_blocking_threads: usize,
53 pub handover: Option<ZRuntime>,
55}
56
57impl Default for RuntimeParam {
58 fn default() -> Self {
59 Self {
60 worker_threads: 1,
61 max_blocking_threads: 50,
62 handover: None,
63 }
64 }
65}
66
67impl RuntimeParam {
68 pub fn build(&self, zrt: ZRuntime) -> Result<Runtime> {
69 let rt = tokio::runtime::Builder::new_multi_thread()
70 .worker_threads(self.worker_threads)
71 .max_blocking_threads(self.max_blocking_threads)
72 .enable_io()
73 .enable_time()
74 .thread_name_fn(move || {
75 let id = ZRUNTIME_INDEX
76 .get(&zrt)
77 .unwrap()
78 .fetch_add(1, Ordering::SeqCst);
79 format!("{zrt}-{id}")
80 })
81 .build()?;
82 Ok(rt)
83 }
84}
85
86#[derive(Hash, Eq, PartialEq, Clone, Copy, Debug, RegisterParam, Deserialize)]
102#[param(RuntimeParam)]
103pub enum ZRuntime {
104 #[serde(rename = "app")]
106 #[param(worker_threads = 1)]
107 Application,
108
109 #[serde(rename = "acc")]
111 #[param(worker_threads = 1)]
112 Acceptor,
113
114 #[serde(rename = "tx")]
116 #[param(worker_threads = 1)]
117 TX,
118
119 #[serde(rename = "rx")]
121 #[param(worker_threads = 2)]
122 RX,
123
124 #[serde(rename = "net")]
126 #[param(worker_threads = 1)]
127 Net,
128}
129
130impl ZRuntime {
131 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
132 where
133 F: Future + Send + 'static,
134 F::Output: Send + 'static,
135 {
136 #[cfg(feature = "tracing-instrument")]
137 let future = tracing::Instrument::instrument(future, tracing::Span::current());
138
139 self.deref().spawn(future)
140 }
141
142 pub fn block_in_place<F, R>(&self, f: F) -> R
143 where
144 F: Future<Output = R>,
145 {
146 match Handle::try_current() {
147 Ok(handle) => {
148 if handle.runtime_flavor() == RuntimeFlavor::CurrentThread {
149 panic!("Zenoh runtime doesn't support Tokio's current thread scheduler. Please use multi thread scheduler instead, e.g. a multi thread scheduler with one worker thread: `#[tokio::main(flavor = \"multi_thread\", worker_threads = 1)]`");
150 }
151 }
152 Err(e) => {
153 if e.is_thread_local_destroyed() {
154 panic!("The Thread Local Storage inside Tokio is destroyed. This might happen when Zenoh API is called at process exit, e.g. in the atexit handler. Calling the Zenoh API at process exit is not supported and should be avoided.");
155 }
156 }
157 }
158
159 #[cfg(feature = "tracing-instrument")]
160 let f = tracing::Instrument::instrument(f, tracing::Span::current());
161
162 tokio::task::block_in_place(move || self.block_on(f))
163 }
164}
165
166impl Deref for ZRuntime {
167 type Target = Handle;
168 fn deref(&self) -> &Self::Target {
169 ZRUNTIME_POOL.get(self)
170 }
171}
172
173lazy_static! {
174 pub static ref ZRUNTIME_POOL: ZRuntimePool = ZRuntimePool::new();
175 pub static ref ZRUNTIME_INDEX: HashMap<ZRuntime, AtomicUsize> = ZRuntime::iter()
176 .map(|zrt| (zrt, AtomicUsize::new(0)))
177 .collect();
178}
179
180pub struct ZRuntimePoolGuard;
182
183impl Drop for ZRuntimePoolGuard {
184 fn drop(&mut self) {
185 unsafe {
186 std::mem::drop((ZRUNTIME_POOL.deref() as *const ZRuntimePool).read());
187 std::mem::drop(
188 (ZRUNTIME_INDEX.deref() as *const HashMap<ZRuntime, AtomicUsize>).read(),
189 );
190 }
191 }
192}
193
194pub struct ZRuntimePool(HashMap<ZRuntime, OnceLock<Runtime>>);
195
196impl ZRuntimePool {
197 fn new() -> Self {
198 Self(ZRuntime::iter().map(|zrt| (zrt, OnceLock::new())).collect())
199 }
200
201 pub fn get(&self, zrt: &ZRuntime) -> &Handle {
202 let param: &RuntimeParam = zrt.borrow();
205 let zrt = match param.handover {
206 Some(handover) => handover,
207 None => *zrt,
208 };
209
210 self.0
211 .get(&zrt)
212 .unwrap_or_else(|| panic!("The hashmap should contains {zrt} after initialization"))
213 .get_or_init(|| {
214 zrt.init()
215 .unwrap_or_else(|_| panic!("Failed to init {zrt}"))
216 })
217 .handle()
218 }
219}
220
221impl Drop for ZRuntimePool {
223 fn drop(&mut self) {
224 let handles: Vec<_> = self
225 .0
226 .drain()
227 .filter_map(|(_name, mut rt)| {
228 rt.take()
229 .map(|r| std::thread::spawn(move || r.shutdown_timeout(Duration::from_secs(1))))
230 })
231 .collect();
232
233 for hd in handles {
234 let _ = hd.join();
235 }
236 }
237}
238
239#[should_panic(expected = "Zenoh runtime doesn't support")]
240#[tokio::test]
241async fn block_in_place_fail_test() {
242 use crate::ZRuntime;
243 ZRuntime::TX.block_in_place(async { println!("Done") });
244}