1#![allow(clippy::missing_panics_doc)]
2use std::any::{Any, TypeId};
3use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
4use std::{cell::RefCell, collections::HashMap, fmt, future::Future, pin::Pin, thread};
5
6use async_channel::{Receiver, Sender, unbounded};
7
8use crate::Handle;
9use crate::system::{FnExec, Id, System, SystemCommand};
10
11thread_local!(
12 static ADDR: RefCell<Option<Arbiter>> = const { RefCell::new(None) };
13 static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
14);
15
16pub(super) static COUNT: AtomicUsize = AtomicUsize::new(0);
17
18pub(super) enum ArbiterCommand {
19 Stop,
20 Execute(Pin<Box<dyn Future<Output = ()> + Send>>),
21 ExecuteFn(Box<dyn FnExec>),
22}
23
24pub struct Arbiter {
30 id: usize,
31 pub(crate) sys_id: usize,
32 name: Arc<String>,
33 pub(crate) hnd: Option<Handle>,
34 pub(crate) sender: Sender<ArbiterCommand>,
35 thread_handle: Option<thread::JoinHandle<()>>,
36}
37
38impl fmt::Debug for Arbiter {
39 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40 write!(f, "Arbiter({:?})", self.name.as_ref())
41 }
42}
43
44impl Default for Arbiter {
45 fn default() -> Arbiter {
46 Arbiter::new()
47 }
48}
49
50impl Clone for Arbiter {
51 fn clone(&self) -> Self {
52 Self {
53 id: self.id,
54 sys_id: self.sys_id,
55 name: self.name.clone(),
56 sender: self.sender.clone(),
57 hnd: self.hnd.clone(),
58 thread_handle: None,
59 }
60 }
61}
62
63impl Arbiter {
64 #[allow(clippy::borrowed_box)]
65 pub(super) fn new_system(sys_id: usize, name: String) -> (Self, ArbiterController) {
66 let (tx, rx) = unbounded();
67
68 let arb = Arbiter::with_sender(sys_id, 0, Arc::new(name), tx);
69 ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
70 STORAGE.with(|cell| cell.borrow_mut().clear());
71
72 (arb, ArbiterController { rx, stop: None })
73 }
74
75 pub(super) fn dummy() -> Self {
76 Arbiter {
77 id: 0,
78 hnd: None,
79 name: String::new().into(),
80 sys_id: 0,
81 sender: unbounded().0,
82 thread_handle: None,
83 }
84 }
85
86 pub fn current() -> Arbiter {
92 ADDR.with(|cell| match *cell.borrow() {
93 Some(ref addr) => addr.clone(),
94 None => panic!("Arbiter is not running"),
95 })
96 }
97
98 pub(crate) fn set_current(&self) {
99 ADDR.with(|cell| {
100 *cell.borrow_mut() = Some(self.clone());
101 });
102 }
103
104 pub fn stop(&self) {
106 let _ = self.sender.try_send(ArbiterCommand::Stop);
107 }
108
109 pub fn new() -> Arbiter {
112 let id = COUNT.load(Ordering::Relaxed) + 1;
113 Arbiter::with_name(format!("{}:arb:{}", System::current().name(), id))
114 }
115
116 pub fn with_name(name: String) -> Arbiter {
120 let id = COUNT.fetch_add(1, Ordering::Relaxed);
121 let sys = System::current();
122 let name2 = Arc::new(name.clone());
123 let config = sys.config();
124 let (arb_tx, arb_rx) = unbounded();
125 let arb_tx2 = arb_tx.clone();
126
127 let builder = if sys.config().stack_size > 0 {
128 thread::Builder::new()
129 .name(name)
130 .stack_size(sys.config().stack_size)
131 } else {
132 thread::Builder::new().name(name)
133 };
134
135 let name = name2.clone();
136 let sys_id = sys.id();
137 let (arb_hnd_tx, arb_hnd_rx) = oneshot::channel();
138
139 let handle = builder
140 .spawn(move || {
141 log::info!("Starting {name2:?} arbiter");
142
143 let (stop, stop_rx) = oneshot::channel();
144 STORAGE.with(|cell| cell.borrow_mut().clear());
145
146 System::set_current(sys.clone());
147
148 crate::driver::block_on(config.runner.as_ref(), async move {
149 let arb = Arbiter::with_sender(sys_id.0, id, name2, arb_tx);
150 arb_hnd_tx
151 .send(arb.hnd.clone())
152 .expect("Controller thread has gone");
153
154 crate::spawn(
156 ArbiterController {
157 stop: Some(stop),
158 rx: arb_rx,
159 }
160 .run(),
161 );
162 ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
163
164 let _ = sys
166 .sys()
167 .try_send(SystemCommand::RegisterArbiter(Id(id), arb));
168
169 let _ = stop_rx.await;
171 });
172
173 let _ = System::current()
175 .sys()
176 .try_send(SystemCommand::UnregisterArbiter(Id(id)));
177
178 STORAGE.with(|cell| cell.borrow_mut().clear());
179 })
180 .unwrap_or_else(|err| {
181 panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err)
182 });
183
184 let hnd = arb_hnd_rx.recv().expect("Could not start new arbiter");
185
186 Arbiter {
187 id,
188 hnd,
189 name,
190 sys_id: sys_id.0,
191 sender: arb_tx2,
192 thread_handle: Some(handle),
193 }
194 }
195
196 fn with_sender(
197 sys_id: usize,
198 id: usize,
199 name: Arc<String>,
200 sender: Sender<ArbiterCommand>,
201 ) -> Self {
202 #[cfg(feature = "tokio")]
203 let hnd = { Handle::new(sender.clone()) };
204
205 #[cfg(feature = "compio")]
206 let hnd = { Handle::new(sender.clone()) };
207
208 #[cfg(all(not(feature = "compio"), not(feature = "tokio")))]
209 let hnd = { Handle::current() };
210
211 Self {
212 id,
213 sys_id,
214 name,
215 sender,
216 hnd: Some(hnd),
217 thread_handle: None,
218 }
219 }
220
221 pub fn id(&self) -> Id {
223 Id(self.id)
224 }
225
226 pub fn name(&self) -> &str {
228 self.name.as_ref()
229 }
230
231 #[inline]
232 pub fn handle(&self) -> &Handle {
234 self.hnd.as_ref().unwrap()
235 }
236
237 #[doc(hidden)]
238 #[deprecated(since = "3.8.0", note = "use `ntex_rt::spawn()`")]
239 pub fn spawn<F>(&self, future: F)
241 where
242 F: Future<Output = ()> + Send + 'static,
243 {
244 let _ = self
245 .sender
246 .try_send(ArbiterCommand::Execute(Box::pin(future)));
247 }
248
249 #[doc(hidden)]
250 #[deprecated(since = "3.8.0", note = "use `ntex_rt::Handle::spawn()`")]
251 pub fn spawn_with<F, R, O>(
254 &self,
255 f: F,
256 ) -> impl Future<Output = Result<O, oneshot::RecvError>> + Send + 'static
257 where
258 F: FnOnce() -> R + Send + 'static,
259 R: Future<Output = O> + 'static,
260 O: Send + 'static,
261 {
262 let (tx, rx) = oneshot::async_channel();
263 let _ = self
264 .sender
265 .try_send(ArbiterCommand::ExecuteFn(Box::new(move || {
266 crate::spawn(async move {
267 let _ = tx.send(f().await);
268 });
269 })));
270 rx
271 }
272
273 #[doc(hidden)]
274 #[deprecated(since = "3.8.0", note = "use `ntex_rt::Handle::spawn()`")]
275 pub fn exec<F, R>(
279 &self,
280 f: F,
281 ) -> impl Future<Output = Result<R, oneshot::RecvError>> + Send + 'static
282 where
283 F: FnOnce() -> R + Send + 'static,
284 R: Send + 'static,
285 {
286 let (tx, rx) = oneshot::async_channel();
287 let _ = self
288 .sender
289 .try_send(ArbiterCommand::ExecuteFn(Box::new(move || {
290 let _ = tx.send(f());
291 })));
292 rx
293 }
294
295 #[doc(hidden)]
296 #[deprecated(since = "3.8.0", note = "use `ntex_rt::Handle::spawn()`")]
297 pub fn exec_fn<F>(&self, f: F)
300 where
301 F: FnOnce() + Send + 'static,
302 {
303 let _ = self
304 .sender
305 .try_send(ArbiterCommand::ExecuteFn(Box::new(move || {
306 f();
307 })));
308 }
309
310 #[doc(hidden)]
311 #[deprecated(since = "3.8.0", note = "use `ntex_rt::set_item()`")]
312 pub fn set_item<T: 'static>(item: T) {
314 set_item(item);
315 }
316
317 #[doc(hidden)]
318 #[deprecated(since = "3.8.0", note = "use `ntex_rt::get_item()`")]
319 pub fn contains_item<T: 'static>() -> bool {
321 STORAGE.with(move |cell| cell.borrow().get(&TypeId::of::<T>()).is_some())
322 }
323
324 #[doc(hidden)]
325 #[deprecated(since = "3.8.0", note = "use `ntex_rt::get_item()`")]
326 pub fn get_item<T: 'static, F, R>(f: F) -> R
332 where
333 F: FnOnce(&T) -> R,
334 {
335 STORAGE.with(move |cell| {
336 let mut st = cell.borrow_mut();
337 let item = st
338 .get_mut(&TypeId::of::<T>())
339 .and_then(|boxed| (&mut **boxed as &mut (dyn Any + 'static)).downcast_mut())
340 .unwrap();
341 f(item)
342 })
343 }
344
345 pub fn get_value<T, F>(f: F) -> T
347 where
348 T: Clone + 'static,
349 F: FnOnce() -> T,
350 {
351 STORAGE.with(move |cell| {
352 let mut st = cell.borrow_mut();
353 if let Some(boxed) = st.get(&TypeId::of::<T>())
354 && let Some(val) = (&**boxed as &(dyn Any + 'static)).downcast_ref::<T>()
355 {
356 return val.clone();
357 }
358 let val = f();
359 st.insert(TypeId::of::<T>(), Box::new(val.clone()));
360 val
361 })
362 }
363
364 pub fn join(&mut self) -> thread::Result<()> {
366 if let Some(thread_handle) = self.thread_handle.take() {
367 thread_handle.join()
368 } else {
369 Ok(())
370 }
371 }
372}
373
374impl Eq for Arbiter {}
375
376impl PartialEq for Arbiter {
377 fn eq(&self, other: &Self) -> bool {
378 self.id == other.id && self.sys_id == other.sys_id
379 }
380}
381
382pub(crate) struct ArbiterController {
383 stop: Option<oneshot::Sender<i32>>,
384 rx: Receiver<ArbiterCommand>,
385}
386
387impl Drop for ArbiterController {
388 fn drop(&mut self) {
389 if thread::panicking() {
390 if System::current().stop_on_panic() {
391 eprintln!("Panic in Arbiter thread, shutting down system.");
392 System::current().stop_with_code(1);
393 } else {
394 eprintln!("Panic in Arbiter thread.");
395 }
396 }
397 }
398}
399
400impl ArbiterController {
401 pub(super) async fn run(mut self) {
402 loop {
403 match self.rx.recv().await {
404 Ok(ArbiterCommand::Stop) => {
405 if let Some(stop) = self.stop.take() {
406 let _ = stop.send(0);
407 }
408 break;
409 }
410 Ok(ArbiterCommand::Execute(fut)) => {
411 crate::spawn(fut);
412 }
413 Ok(ArbiterCommand::ExecuteFn(f)) => {
414 f.call_box();
415 }
416 Err(_) => break,
417 }
418 }
419 }
420}
421
422pub fn set_item<T: 'static>(item: T) {
424 STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::<T>(), Box::new(item)));
425}
426
427pub fn get_item<T: Clone + 'static>() -> Option<T> {
429 STORAGE.with(move |cell| {
430 cell.borrow()
431 .get(&TypeId::of::<T>())
432 .and_then(|boxed| boxed.downcast_ref())
433 .cloned()
434 })
435}