lightproc/proc_stack.rs
1//! Stack abstraction for lightweight processes
2//!
3//! This abstraction allows us to execute lifecycle callbacks when
4//! a process transites from one state to another.
5//!
6//! If we want to make an analogy, stack abstraction is similar to actor lifecycle abstractions
7//! in frameworks like Akka, but tailored version for Rust environment.
8use super::proc_state::*;
9
10use std::fmt::{self, Debug, Formatter};
11
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::sync::{Arc, Mutex};
14
15/// Stack abstraction for lightweight processes
16///
17/// # Example
18///
19/// ```rust
20/// use lightproc::proc_stack::ProcStack;
21/// use lightproc::proc_state::EmptyProcState;
22///
23/// ProcStack::default()
24/// .with_before_start(|s: &mut EmptyProcState| { println!("Before start"); })
25/// .with_after_complete(|s: &mut EmptyProcState| { println!("After complete"); })
26/// .with_after_panic(|s: &mut EmptyProcState| { println!("After panic"); });
27/// ```
28pub struct ProcStack {
29 /// Process ID for the Lightweight Process
30 ///
31 /// Can be used to identify specific processes during any executor, reactor implementations.
32 pub pid: AtomicUsize,
33
34 pub(crate) state: ProcState,
35
36 /// Before start callback
37 ///
38 /// This callback is called before we start to inner future of the process
39 pub(crate) before_start: Option<Arc<dyn Fn(ProcState) + Send + Sync>>,
40
41 /// After complete callback
42 ///
43 /// This callback is called after future resolved to it's output.
44 /// Mind that, even panic occurs this callback will get executed.
45 ///
46 /// Eventually all panics are coming from an Error output.
47 pub(crate) after_complete: Option<Arc<dyn Fn(ProcState) + Send + Sync>>,
48
49 /// After panic callback
50 ///
51 /// This callback is only called when a panic has been occurred.
52 /// Mind that [ProcHandle](proc_handle/struct.ProcHandle.html) is not using this
53 pub(crate) after_panic: Option<Arc<dyn Fn(ProcState) + Send + Sync>>,
54}
55
56impl ProcStack {
57 /// Adds pid for the process which is going to take this stack
58 ///
59 /// # Example
60 ///
61 /// ```rust
62 /// use lightproc::proc_stack::ProcStack;
63 ///
64 /// ProcStack::default()
65 /// .with_pid(1);
66 /// ```
67 pub fn with_pid(mut self, pid: usize) -> Self {
68 self.pid = AtomicUsize::new(pid);
69 self
70 }
71
72 /// Adds state for the process which is going to be embedded into this stack.
73 ///
74 /// # Example
75 ///
76 /// ```rust
77 /// use lightproc::proc_stack::ProcStack;
78 ///
79 /// pub struct GlobalState {
80 /// pub amount: usize
81 /// }
82 ///
83 /// ProcStack::default()
84 /// .with_pid(1)
85 /// .with_state(GlobalState { amount: 1 });
86 /// ```
87 pub fn with_state<S>(mut self, state: S) -> Self
88 where
89 S: State + 'static,
90 {
91 self.state = Arc::new(Mutex::new(state));
92 self
93 }
94
95 /// Adds a callback that will be executed before polling inner future to the stack
96 ///
97 /// ```rust
98 /// use lightproc::proc_stack::{ProcStack};
99 /// use lightproc::proc_state::EmptyProcState;
100 ///
101 /// ProcStack::default()
102 /// .with_before_start(|s: &mut EmptyProcState| { println!("Before start"); });
103 /// ```
104 pub fn with_before_start<C, S>(mut self, callback: C) -> Self
105 where
106 S: State,
107 C: Fn(&mut S) + Send + Sync + 'static,
108 {
109 self.before_start = Some(self.wrap_callback(callback));
110 self
111 }
112
113 /// Adds a callback that will be executed after inner future resolves to an output to the stack
114 ///
115 /// ```rust
116 /// use lightproc::proc_stack::ProcStack;
117 /// use lightproc::proc_state::EmptyProcState;
118 ///
119 /// ProcStack::default()
120 /// .with_after_complete(|s: &mut EmptyProcState| { println!("After complete"); });
121 /// ```
122 pub fn with_after_complete<C, S>(mut self, callback: C) -> Self
123 where
124 S: State,
125 C: Fn(&mut S) + Send + Sync + 'static,
126 {
127 self.after_complete = Some(self.wrap_callback(callback));
128 self
129 }
130
131 /// Adds a callback that will be executed after inner future panics to the stack
132 ///
133 /// ```rust
134 /// use lightproc::proc_stack::ProcStack;
135 /// use lightproc::proc_state::EmptyProcState;
136 ///
137 /// ProcStack::default()
138 /// .with_after_panic(|s: &mut EmptyProcState| { println!("After panic"); });
139 /// ```
140 pub fn with_after_panic<C, S>(mut self, callback: C) -> Self
141 where
142 S: State,
143 C: Fn(&mut S) + Send + Sync + 'static,
144 {
145 self.after_panic = Some(self.wrap_callback(callback));
146 self
147 }
148
149 /// Utility function to get_pid for the implementation of executors.
150 ///
151 /// ```rust
152 /// use lightproc::proc_stack::ProcStack;
153 ///
154 /// let proc = ProcStack::default().with_pid(123);
155 ///
156 /// assert_eq!(proc.get_pid(), 123);
157 /// ```
158 pub fn get_pid(&self) -> usize {
159 self.pid.load(Ordering::Acquire)
160 }
161
162 /// Get the state which is embedded into this [ProcStack].
163 ///
164 /// ```rust
165 /// use lightproc::proc_stack::ProcStack;
166 ///
167 /// #[derive(Copy, Clone)]
168 /// pub struct GlobalState {
169 /// pub amount: usize
170 /// }
171 ///
172 /// let mut proc = ProcStack::default().with_pid(123)
173 /// .with_state(GlobalState { amount: 0} );
174 ///
175 /// let state = proc.get_state::<GlobalState>();
176 /// ```
177 pub fn get_state<S>(&self) -> S
178 where
179 S: State + Copy + 'static,
180 {
181 let state = self.state.clone();
182 let s = unsafe { &*(&state as *const ProcState as *const Arc<Mutex<S>>) };
183 *s.lock().unwrap()
184 }
185
186 /// Wraps the callback to the with given trait boundaries of the state.
187 ///
188 /// Why there is unsafe?
189 /// * Given state is taken as dyn State we don't know the size. But rest assured it is sized.
190 /// * Executor of the callback should know what exactly coming up from state. That said it can even pass dynamically sized trait too, we can't constrain that.
191 /// * Cast it to the correct type and trust the boundaries which was already ensured at the method signature.
192 /// * Synchronization can't revolve around the unsafe. because we lock the dynamic state right after the cast.
193 fn wrap_callback<C, S>(&self, callback: C) -> Arc<dyn Fn(ProcState) + Send + Sync>
194 where
195 S: State + 'static,
196 C: Fn(&mut S) + Send + Sync + 'static,
197 {
198 let wrapped = move |s: ProcState| {
199 let x = unsafe { &*(&s as *const ProcState as *const Arc<Mutex<S>>) };
200 let mut mg = x.lock().unwrap();
201 callback(&mut *mg);
202 };
203 Arc::new(wrapped)
204 }
205}
206
207///
208/// Default implementation for the ProcStack
209impl Default for ProcStack {
210 fn default() -> Self {
211 ProcStack {
212 pid: AtomicUsize::new(0xDEAD_BEEF),
213 state: Arc::new(Mutex::new(EmptyState)),
214 before_start: None,
215 after_complete: None,
216 after_panic: None,
217 }
218 }
219}
220
221impl Debug for ProcStack {
222 fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
223 fmt.debug_struct("ProcStack")
224 .field("pid", &self.pid.load(Ordering::SeqCst))
225 .field("state", &self.state)
226 .field("before_start", &self.before_start.is_some())
227 .field("after_complete", &self.after_complete.is_some())
228 .field("after_panic", &self.after_panic.is_some())
229 .finish()
230 }
231}
232
233impl Clone for ProcStack {
234 fn clone(&self) -> Self {
235 ProcStack {
236 pid: AtomicUsize::new(self.pid.load(Ordering::Acquire)),
237 state: self.state.clone(),
238 before_start: self.before_start.clone(),
239 after_complete: self.after_complete.clone(),
240 after_panic: self.after_panic.clone(),
241 }
242 }
243}