1use std::any::{Any, TypeId};
2use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4use std::pin::Pin;
5use std::sync::atomic::{AtomicUsize, Ordering};
6use std::task::{Context, Poll};
7use std::{fmt, thread};
8
9use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
10use futures::channel::oneshot::{channel, Canceled, Sender};
11use futures::{future, Future, FutureExt, Stream};
12
13use crate::runtime::Runtime;
14use crate::system::System;
15
16use copyless::BoxHelper;
17
18thread_local!(
19 static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None);
20 static RUNNING: Cell<bool> = Cell::new(false);
21 static Q: RefCell<Vec<Pin<Box<dyn Future<Output = ()>>>>> = RefCell::new(Vec::new());
22 static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
23);
24
25pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
26
27pub(crate) enum ArbiterCommand {
28 Stop,
29 Execute(Box<dyn Future<Output = ()> + Unpin + Send>),
30 ExecuteFn(Box<dyn FnExec>),
31}
32
33impl fmt::Debug for ArbiterCommand {
34 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35 match self {
36 ArbiterCommand::Stop => write!(f, "ArbiterCommand::Stop"),
37 ArbiterCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"),
38 ArbiterCommand::ExecuteFn(_) => write!(f, "ArbiterCommand::ExecuteFn"),
39 }
40 }
41}
42
43#[derive(Debug)]
44pub struct Arbiter {
48 sender: UnboundedSender<ArbiterCommand>,
49 thread_handle: Option<thread::JoinHandle<()>>,
50}
51
52impl Clone for Arbiter {
53 fn clone(&self) -> Self {
54 Self::with_sender(self.sender.clone())
55 }
56}
57
58impl Default for Arbiter {
59 fn default() -> Self {
60 Self::new()
61 }
62}
63
64impl Arbiter {
65 pub(crate) fn new_system() -> Self {
66 let (tx, rx) = unbounded();
67
68 let arb = Arbiter::with_sender(tx);
69 ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
70 RUNNING.with(|cell| cell.set(false));
71 STORAGE.with(|cell| cell.borrow_mut().clear());
72 Arbiter::spawn(ArbiterController { stop: None, rx });
73
74 arb
75 }
76
77 pub fn current() -> Arbiter {
80 ADDR.with(|cell| match *cell.borrow() {
81 Some(ref addr) => addr.clone(),
82 None => panic!("Arbiter is not running"),
83 })
84 }
85
86 pub fn stop(&self) {
88 let _ = self.sender.unbounded_send(ArbiterCommand::Stop);
89 }
90
91 pub fn new() -> Arbiter {
94 let id = COUNT.fetch_add(1, Ordering::Relaxed);
95 let name = format!("actix-rt:worker:{}", id);
96 let sys = System::current();
97 let (arb_tx, arb_rx) = unbounded();
98 let arb_tx2 = arb_tx.clone();
99
100 let handle = thread::Builder::new()
101 .name(name.clone())
102 .spawn(move || {
103 let mut rt = Runtime::new().expect("Can not create Runtime");
104 let arb = Arbiter::with_sender(arb_tx);
105
106 let (stop, stop_rx) = channel();
107 RUNNING.with(|cell| cell.set(true));
108 STORAGE.with(|cell| cell.borrow_mut().clear());
109
110 System::set_current(sys);
111
112 rt.spawn(ArbiterController {
114 stop: Some(stop),
115 rx: arb_rx,
116 });
117 ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
118
119 let _ = System::current()
121 .sys()
122 .unbounded_send(SystemCommand::RegisterArbiter(id, arb));
123
124 let _ = match rt.block_on(stop_rx) {
126 Ok(code) => code,
127 Err(_) => 1,
128 };
129
130 let _ = System::current()
132 .sys()
133 .unbounded_send(SystemCommand::UnregisterArbiter(id));
134 })
135 .unwrap_or_else(|err| {
136 panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err)
137 });
138
139 Arbiter {
140 sender: arb_tx2,
141 thread_handle: Some(handle),
142 }
143 }
144
145 pub(crate) fn run_system(rt: Option<&Runtime>) {
146 RUNNING.with(|cell| cell.set(true));
147 Q.with(|cell| {
148 let mut v = cell.borrow_mut();
149 for fut in v.drain(..) {
150 if let Some(rt) = rt {
151 rt.spawn(fut);
152 } else {
153 tokio::task::spawn_local(fut);
154 }
155 }
156 });
157 }
158
159 pub(crate) fn stop_system() {
160 RUNNING.with(|cell| cell.set(false));
161 }
162
163 pub fn spawn<F>(future: F)
167 where
168 F: Future<Output = ()> + 'static,
169 {
170 RUNNING.with(move |cell| {
171 if cell.get() {
172 tokio::task::spawn_local(future);
174 } else {
175 Q.with(move |cell| {
178 cell.borrow_mut()
179 .push(unsafe { Pin::new_unchecked(Box::alloc().init(future)) })
180 });
181 }
182 });
183 }
184
185 pub fn spawn_fn<F, R>(f: F)
189 where
190 F: FnOnce() -> R + 'static,
191 R: Future<Output = ()> + 'static,
192 {
193 Arbiter::spawn(future::lazy(|_| f()).flatten())
194 }
195
196 pub fn send<F>(&self, future: F)
198 where
199 F: Future<Output = ()> + Send + Unpin + 'static,
200 {
201 let _ = self
202 .sender
203 .unbounded_send(ArbiterCommand::Execute(Box::new(future)));
204 }
205
206 pub fn exec_fn<F>(&self, f: F)
209 where
210 F: FnOnce() + Send + 'static,
211 {
212 let _ = self
213 .sender
214 .unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
215 f();
216 })));
217 }
218
219 pub fn exec<F, R>(&self, f: F) -> impl Future<Output = Result<R, Canceled>>
223 where
224 F: FnOnce() -> R + Send + 'static,
225 R: Send + 'static,
226 {
227 let (tx, rx) = channel();
228 let _ = self
229 .sender
230 .unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
231 if !tx.is_canceled() {
232 let _ = tx.send(f());
233 }
234 })));
235 rx
236 }
237
238 pub fn set_item<T: 'static>(item: T) {
240 STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::<T>(), Box::new(item)));
241 }
242
243 pub fn contains_item<T: 'static>() -> bool {
245 STORAGE.with(move |cell| cell.borrow().get(&TypeId::of::<T>()).is_some())
246 }
247
248 pub fn get_item<T: 'static, F, R>(mut f: F) -> R
252 where
253 F: FnMut(&T) -> R,
254 {
255 STORAGE.with(move |cell| {
256 let st = cell.borrow();
257 let item = st
258 .get(&TypeId::of::<T>())
259 .and_then(|boxed| (&**boxed as &(dyn Any + 'static)).downcast_ref())
260 .unwrap();
261 f(item)
262 })
263 }
264
265 pub fn get_mut_item<T: 'static, F, R>(mut f: F) -> R
269 where
270 F: FnMut(&mut T) -> R,
271 {
272 STORAGE.with(move |cell| {
273 let mut st = cell.borrow_mut();
274 let item = st
275 .get_mut(&TypeId::of::<T>())
276 .and_then(|boxed| (&mut **boxed as &mut (dyn Any + 'static)).downcast_mut())
277 .unwrap();
278 f(item)
279 })
280 }
281
282 fn with_sender(sender: UnboundedSender<ArbiterCommand>) -> Self {
283 Self {
284 sender,
285 thread_handle: None,
286 }
287 }
288
289 pub fn join(&mut self) -> thread::Result<()> {
291 if let Some(thread_handle) = self.thread_handle.take() {
292 thread_handle.join()
293 } else {
294 Ok(())
295 }
296 }
297}
298
299struct ArbiterController {
300 stop: Option<Sender<i32>>,
301 rx: UnboundedReceiver<ArbiterCommand>,
302}
303
304impl Drop for ArbiterController {
305 fn drop(&mut self) {
306 if thread::panicking() {
307 if System::current().stop_on_panic() {
308 eprintln!("Panic in Arbiter thread, shutting down system.");
309 System::current().stop_with_code(1)
310 } else {
311 eprintln!("Panic in Arbiter thread.");
312 }
313 }
314 }
315}
316
317impl Future for ArbiterController {
318 type Output = ();
319
320 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
321 loop {
322 match Pin::new(&mut self.rx).poll_next(cx) {
323 Poll::Ready(None) => return Poll::Ready(()),
324 Poll::Ready(Some(item)) => match item {
325 ArbiterCommand::Stop => {
326 if let Some(stop) = self.stop.take() {
327 let _ = stop.send(0);
328 };
329 return Poll::Ready(());
330 }
331 ArbiterCommand::Execute(fut) => {
332 tokio::task::spawn_local(fut);
333 }
334 ArbiterCommand::ExecuteFn(f) => {
335 f.call_box();
336 }
337 },
338 Poll::Pending => return Poll::Pending,
339 }
340 }
341 }
342}
343
344#[derive(Debug)]
345pub(crate) enum SystemCommand {
346 Exit(i32),
347 RegisterArbiter(usize, Arbiter),
348 UnregisterArbiter(usize),
349}
350
351#[derive(Debug)]
352pub(crate) struct SystemArbiter {
353 stop: Option<Sender<i32>>,
354 commands: UnboundedReceiver<SystemCommand>,
355 arbiters: HashMap<usize, Arbiter>,
356}
357
358impl SystemArbiter {
359 pub(crate) fn new(stop: Sender<i32>, commands: UnboundedReceiver<SystemCommand>) -> Self {
360 SystemArbiter {
361 commands,
362 stop: Some(stop),
363 arbiters: HashMap::new(),
364 }
365 }
366}
367
368impl Future for SystemArbiter {
369 type Output = ();
370
371 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
372 loop {
373 match Pin::new(&mut self.commands).poll_next(cx) {
374 Poll::Ready(None) => return Poll::Ready(()),
375 Poll::Ready(Some(cmd)) => match cmd {
376 SystemCommand::Exit(code) => {
377 for arb in self.arbiters.values() {
379 arb.stop();
380 }
381 if let Some(stop) = self.stop.take() {
383 let _ = stop.send(code);
384 }
385 }
386 SystemCommand::RegisterArbiter(name, hnd) => {
387 self.arbiters.insert(name, hnd);
388 }
389 SystemCommand::UnregisterArbiter(name) => {
390 self.arbiters.remove(&name);
391 }
392 },
393 Poll::Pending => return Poll::Pending,
394 }
395 }
396 }
397}
398
399pub trait FnExec: Send + 'static {
400 fn call_box(self: Box<Self>);
401}
402
403impl<F> FnExec for F
404where
405 F: FnOnce() + Send + 'static,
406{
407 #[allow(clippy::boxed_local)]
408 fn call_box(self: Box<Self>) {
409 (*self)()
410 }
411}