lightproc/
proc_stack.rs

1//! Stack abstraction for lightweight processes
2//!
3//! This abstraction allows us to execute lifecycle callbacks when
4//! a process transites from one state to another.
5//!
6//! If we want to make an analogy, stack abstraction is similar to actor lifecycle abstractions
7//! in frameworks like Akka, but tailored version for Rust environment.
8use super::proc_state::*;
9
10use std::fmt::{self, Debug, Formatter};
11
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::sync::{Arc, Mutex};
14
15/// Stack abstraction for lightweight processes
16///
17/// # Example
18///
19/// ```rust
20/// use lightproc::proc_stack::ProcStack;
21/// use lightproc::proc_state::EmptyProcState;
22///
23/// ProcStack::default()
24///     .with_before_start(|s: &mut EmptyProcState| { println!("Before start"); })
25///     .with_after_complete(|s: &mut EmptyProcState| { println!("After complete"); })
26///     .with_after_panic(|s: &mut EmptyProcState| { println!("After panic"); });
27/// ```
28pub struct ProcStack {
29    /// Process ID for the Lightweight Process
30    ///
31    /// Can be used to identify specific processes during any executor, reactor implementations.
32    pub pid: AtomicUsize,
33
34    pub(crate) state: ProcState,
35
36    /// Before start callback
37    ///
38    /// This callback is called before we start to inner future of the process
39    pub(crate) before_start: Option<Arc<dyn Fn(ProcState) + Send + Sync>>,
40
41    /// After complete callback
42    ///
43    /// This callback is called after future resolved to it's output.
44    /// Mind that, even panic occurs this callback will get executed.
45    ///
46    /// Eventually all panics are coming from an Error output.
47    pub(crate) after_complete: Option<Arc<dyn Fn(ProcState) + Send + Sync>>,
48
49    /// After panic callback
50    ///
51    /// This callback is only called when a panic has been occurred.
52    /// Mind that [ProcHandle](proc_handle/struct.ProcHandle.html) is not using this
53    pub(crate) after_panic: Option<Arc<dyn Fn(ProcState) + Send + Sync>>,
54}
55
56impl ProcStack {
57    /// Adds pid for the process which is going to take this stack
58    ///
59    /// # Example
60    ///
61    /// ```rust
62    /// use lightproc::proc_stack::ProcStack;
63    ///
64    /// ProcStack::default()
65    ///     .with_pid(1);
66    /// ```
67    pub fn with_pid(mut self, pid: usize) -> Self {
68        self.pid = AtomicUsize::new(pid);
69        self
70    }
71
72    /// Adds state for the process which is going to be embedded into this stack.
73    ///
74    /// # Example
75    ///
76    /// ```rust
77    /// use lightproc::proc_stack::ProcStack;
78    ///
79    /// pub struct GlobalState {
80    ///    pub amount: usize
81    /// }
82    ///
83    /// ProcStack::default()
84    ///     .with_pid(1)
85    ///     .with_state(GlobalState { amount: 1 });
86    /// ```
87    pub fn with_state<S>(mut self, state: S) -> Self
88    where
89        S: State + 'static,
90    {
91        self.state = Arc::new(Mutex::new(state));
92        self
93    }
94
95    /// Adds a callback that will be executed before polling inner future to the stack
96    ///
97    /// ```rust
98    /// use lightproc::proc_stack::{ProcStack};
99    /// use lightproc::proc_state::EmptyProcState;
100    ///
101    /// ProcStack::default()
102    ///     .with_before_start(|s: &mut EmptyProcState| { println!("Before start"); });
103    /// ```
104    pub fn with_before_start<C, S>(mut self, callback: C) -> Self
105    where
106        S: State,
107        C: Fn(&mut S) + Send + Sync + 'static,
108    {
109        self.before_start = Some(self.wrap_callback(callback));
110        self
111    }
112
113    /// Adds a callback that will be executed after inner future resolves to an output to the stack
114    ///
115    /// ```rust
116    /// use lightproc::proc_stack::ProcStack;
117    /// use lightproc::proc_state::EmptyProcState;
118    ///
119    /// ProcStack::default()
120    ///     .with_after_complete(|s: &mut EmptyProcState| { println!("After complete"); });
121    /// ```
122    pub fn with_after_complete<C, S>(mut self, callback: C) -> Self
123    where
124        S: State,
125        C: Fn(&mut S) + Send + Sync + 'static,
126    {
127        self.after_complete = Some(self.wrap_callback(callback));
128        self
129    }
130
131    /// Adds a callback that will be executed after inner future panics to the stack
132    ///
133    /// ```rust
134    /// use lightproc::proc_stack::ProcStack;
135    /// use lightproc::proc_state::EmptyProcState;
136    ///
137    /// ProcStack::default()
138    ///     .with_after_panic(|s: &mut EmptyProcState| { println!("After panic"); });
139    /// ```
140    pub fn with_after_panic<C, S>(mut self, callback: C) -> Self
141    where
142        S: State,
143        C: Fn(&mut S) + Send + Sync + 'static,
144    {
145        self.after_panic = Some(self.wrap_callback(callback));
146        self
147    }
148
149    /// Utility function to get_pid for the implementation of executors.
150    ///
151    /// ```rust
152    /// use lightproc::proc_stack::ProcStack;
153    ///
154    /// let proc = ProcStack::default().with_pid(123);
155    ///
156    /// assert_eq!(proc.get_pid(), 123);
157    /// ```
158    pub fn get_pid(&self) -> usize {
159        self.pid.load(Ordering::Acquire)
160    }
161
162    /// Get the state which is embedded into this [ProcStack].
163    ///
164    /// ```rust
165    /// use lightproc::proc_stack::ProcStack;
166    ///
167    /// #[derive(Copy, Clone)]
168    /// pub struct GlobalState {
169    ///    pub amount: usize
170    /// }
171    ///
172    /// let mut proc = ProcStack::default().with_pid(123)
173    ///             .with_state(GlobalState { amount: 0} );
174    ///
175    /// let state = proc.get_state::<GlobalState>();
176    /// ```
177    pub fn get_state<S>(&self) -> S
178    where
179        S: State + Copy + 'static,
180    {
181        let state = self.state.clone();
182        let s = unsafe { &*(&state as *const ProcState as *const Arc<Mutex<S>>) };
183        *s.lock().unwrap()
184    }
185
186    /// Wraps the callback to the with given trait boundaries of the state.
187    ///
188    /// Why there is unsafe?
189    /// * Given state is taken as dyn State we don't know the size. But rest assured it is sized.
190    /// * Executor of the callback should know what exactly coming up from state. That said it can even pass dynamically sized trait too, we can't constrain that.
191    /// * Cast it to the correct type and trust the boundaries which was already ensured at the method signature.
192    /// * Synchronization can't revolve around the unsafe. because we lock the dynamic state right after the cast.
193    fn wrap_callback<C, S>(&self, callback: C) -> Arc<dyn Fn(ProcState) + Send + Sync>
194    where
195        S: State + 'static,
196        C: Fn(&mut S) + Send + Sync + 'static,
197    {
198        let wrapped = move |s: ProcState| {
199            let x = unsafe { &*(&s as *const ProcState as *const Arc<Mutex<S>>) };
200            let mut mg = x.lock().unwrap();
201            callback(&mut *mg);
202        };
203        Arc::new(wrapped)
204    }
205}
206
207///
208/// Default implementation for the ProcStack
209impl Default for ProcStack {
210    fn default() -> Self {
211        ProcStack {
212            pid: AtomicUsize::new(0xDEAD_BEEF),
213            state: Arc::new(Mutex::new(EmptyState)),
214            before_start: None,
215            after_complete: None,
216            after_panic: None,
217        }
218    }
219}
220
221impl Debug for ProcStack {
222    fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
223        fmt.debug_struct("ProcStack")
224            .field("pid", &self.pid.load(Ordering::SeqCst))
225            .field("state", &self.state)
226            .field("before_start", &self.before_start.is_some())
227            .field("after_complete", &self.after_complete.is_some())
228            .field("after_panic", &self.after_panic.is_some())
229            .finish()
230    }
231}
232
233impl Clone for ProcStack {
234    fn clone(&self) -> Self {
235        ProcStack {
236            pid: AtomicUsize::new(self.pid.load(Ordering::Acquire)),
237            state: self.state.clone(),
238            before_start: self.before_start.clone(),
239            after_complete: self.after_complete.clone(),
240            after_panic: self.after_panic.clone(),
241        }
242    }
243}