Skip to main content

lelet/executor/
machine.rs

1use std::cell::RefCell;
2use std::marker::PhantomData;
3use std::rc::Rc;
4
5#[cfg(feature = "tracing")]
6use std::sync::atomic::{AtomicUsize, Ordering};
7
8#[cfg(feature = "tracing")]
9use log::trace;
10
11use lelet_utils::abort_on_panic;
12
13use crate::thread_pool;
14
15use super::processor::Processor;
16use super::Task;
17
18/// Machine is the one who have OS thread
19pub struct Machine {
20    #[cfg(feature = "tracing")]
21    pub id: usize,
22
23    processor: &'static Processor,
24
25    // !Send + !Sync
26    _marker: PhantomData<*mut ()>,
27}
28
29thread_local! {
30    static CURRENT: RefCell<Option<Rc<Machine>>> = RefCell::new(None);
31}
32
33impl Machine {
34    #[inline(always)]
35    fn new(processor: &'static Processor) -> Rc<Machine> {
36        #[cfg(feature = "tracing")]
37        static MACHINE_ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
38
39        let machine = Machine {
40            #[cfg(feature = "tracing")]
41            id: MACHINE_ID_COUNTER.fetch_add(1, Ordering::Relaxed),
42
43            processor,
44
45            _marker: PhantomData,
46        };
47
48        #[cfg(feature = "tracing")]
49        trace!("{:?} is created", machine);
50
51        Rc::new(machine)
52    }
53
54    #[inline(always)]
55    fn run(self: &Rc<Machine>) {
56        CURRENT.with(|current| {
57            let old = current.borrow_mut().replace(self.clone());
58
59            // just to make sure that the machine is not cached in thread pool
60            assert!(old.is_none());
61        });
62
63        #[cfg(feature = "tracing")]
64        crate::thread_pool::THREAD_ID.with(|tid| {
65            trace!("{:?} is running on {:?}", self, tid);
66        });
67
68        self.processor.run_on(self);
69
70        CURRENT.with(|current| current.borrow_mut().take());
71    }
72}
73
74#[cfg(feature = "tracing")]
75impl std::fmt::Debug for Machine {
76    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
77        f.write_str(&format!("Machine({})", self.id))
78    }
79}
80
81#[cfg(feature = "tracing")]
82impl Drop for Machine {
83    fn drop(&mut self) {
84        trace!("{:?} is destroyed", self);
85    }
86}
87
88#[inline(always)]
89pub fn spawn(processor: &'static Processor) {
90    thread_pool::spawn_box(Box::new(move || {
91        abort_on_panic(move || {
92            Machine::new(processor).run();
93        })
94    }));
95}
96
97#[inline(always)]
98pub fn direct_push(task: Task) -> Result<(), Task> {
99    CURRENT.with(|current| {
100        let mut current = current.borrow_mut();
101        match current.as_ref() {
102            None => Err(task),
103            Some(m) => match m.processor.push_local(m, task) {
104                Ok(()) => Ok(()),
105                Err(err) => {
106                    current.take();
107                    Err(err)
108                }
109            },
110        }
111    })
112}
113
114#[inline(always)]
115pub fn respawn() {
116    CURRENT.with(|current| {
117        if let Some(m) = current.borrow_mut().take() {
118            #[cfg(feature = "tracing")]
119            trace!(
120                "{:?} is giving up on {:?}, spawn new machine",
121                m,
122                m.processor
123            );
124
125            spawn(m.processor)
126        }
127    })
128}