lelet/executor/
machine.rs1use std::cell::RefCell;
2use std::marker::PhantomData;
3use std::rc::Rc;
4
5#[cfg(feature = "tracing")]
6use std::sync::atomic::{AtomicUsize, Ordering};
7
8#[cfg(feature = "tracing")]
9use log::trace;
10
11use lelet_utils::abort_on_panic;
12
13use crate::thread_pool;
14
15use super::processor::Processor;
16use super::Task;
17
18pub struct Machine {
20 #[cfg(feature = "tracing")]
21 pub id: usize,
22
23 processor: &'static Processor,
24
25 _marker: PhantomData<*mut ()>,
27}
28
29thread_local! {
30 static CURRENT: RefCell<Option<Rc<Machine>>> = RefCell::new(None);
31}
32
33impl Machine {
34 #[inline(always)]
35 fn new(processor: &'static Processor) -> Rc<Machine> {
36 #[cfg(feature = "tracing")]
37 static MACHINE_ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
38
39 let machine = Machine {
40 #[cfg(feature = "tracing")]
41 id: MACHINE_ID_COUNTER.fetch_add(1, Ordering::Relaxed),
42
43 processor,
44
45 _marker: PhantomData,
46 };
47
48 #[cfg(feature = "tracing")]
49 trace!("{:?} is created", machine);
50
51 Rc::new(machine)
52 }
53
54 #[inline(always)]
55 fn run(self: &Rc<Machine>) {
56 CURRENT.with(|current| {
57 let old = current.borrow_mut().replace(self.clone());
58
59 assert!(old.is_none());
61 });
62
63 #[cfg(feature = "tracing")]
64 crate::thread_pool::THREAD_ID.with(|tid| {
65 trace!("{:?} is running on {:?}", self, tid);
66 });
67
68 self.processor.run_on(self);
69
70 CURRENT.with(|current| current.borrow_mut().take());
71 }
72}
73
74#[cfg(feature = "tracing")]
75impl std::fmt::Debug for Machine {
76 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
77 f.write_str(&format!("Machine({})", self.id))
78 }
79}
80
81#[cfg(feature = "tracing")]
82impl Drop for Machine {
83 fn drop(&mut self) {
84 trace!("{:?} is destroyed", self);
85 }
86}
87
88#[inline(always)]
89pub fn spawn(processor: &'static Processor) {
90 thread_pool::spawn_box(Box::new(move || {
91 abort_on_panic(move || {
92 Machine::new(processor).run();
93 })
94 }));
95}
96
97#[inline(always)]
98pub fn direct_push(task: Task) -> Result<(), Task> {
99 CURRENT.with(|current| {
100 let mut current = current.borrow_mut();
101 match current.as_ref() {
102 None => Err(task),
103 Some(m) => match m.processor.push_local(m, task) {
104 Ok(()) => Ok(()),
105 Err(err) => {
106 current.take();
107 Err(err)
108 }
109 },
110 }
111 })
112}
113
114#[inline(always)]
115pub fn respawn() {
116 CURRENT.with(|current| {
117 if let Some(m) = current.borrow_mut().take() {
118 #[cfg(feature = "tracing")]
119 trace!(
120 "{:?} is giving up on {:?}, spawn new machine",
121 m,
122 m.processor
123 );
124
125 spawn(m.processor)
126 }
127 })
128}