reifydb_runtime/actor/system/native/
mod.rs1mod pool;
9
10use std::{
11 any::Any,
12 error, fmt,
13 fmt::{Debug, Formatter},
14 mem,
15 sync::{Arc, Mutex},
16 time,
17 time::Duration,
18};
19
20use crossbeam_channel::{Receiver, RecvTimeoutError as CcRecvTimeoutError};
21
22use crate::{
23 actor::{
24 context::CancellationToken, system::native::pool::PoolActorHandle, timers::scheduler::SchedulerHandle,
25 traits::Actor,
26 },
27 context::clock::Clock,
28 pool::Pools,
29};
30
31struct ActorSystemInner {
33 cancel: CancellationToken,
34 scheduler: SchedulerHandle,
35 clock: Clock,
36 pools: Pools,
37 wakers: Mutex<Vec<Arc<dyn Fn() + Send + Sync>>>,
38 keepalive: Mutex<Vec<Box<dyn Any + Send + Sync>>>,
39 done_rxs: Mutex<Vec<Receiver<()>>>,
40 children: Mutex<Vec<ActorSystem>>,
41}
42
43#[derive(Clone)]
50pub struct ActorSystem {
51 inner: Arc<ActorSystemInner>,
52}
53
54impl ActorSystem {
55 pub fn new(pools: Pools, clock: Clock) -> Self {
57 let scheduler = SchedulerHandle::new(pools.system_pool().clone());
58
59 Self {
60 inner: Arc::new(ActorSystemInner {
61 cancel: CancellationToken::new(),
62 scheduler,
63 clock,
64 pools,
65 wakers: Mutex::new(Vec::new()),
66 keepalive: Mutex::new(Vec::new()),
67 done_rxs: Mutex::new(Vec::new()),
68 children: Mutex::new(Vec::new()),
69 }),
70 }
71 }
72
73 pub fn scope(&self) -> Self {
74 let child = Self {
75 inner: Arc::new(ActorSystemInner {
76 cancel: self.inner.cancel.child_token(),
77 scheduler: self.inner.scheduler.shared(),
78 clock: self.inner.clock.clone(),
79 pools: self.inner.pools.clone(),
80 wakers: Mutex::new(Vec::new()),
81 keepalive: Mutex::new(Vec::new()),
82 done_rxs: Mutex::new(Vec::new()),
83 children: Mutex::new(Vec::new()),
84 }),
85 };
86 self.inner.children.lock().unwrap().push(child.clone());
87 child
88 }
89
90 pub fn pools(&self) -> Pools {
92 self.inner.pools.clone()
93 }
94
95 pub fn cancellation_token(&self) -> CancellationToken {
97 self.inner.cancel.clone()
98 }
99
100 pub fn is_cancelled(&self) -> bool {
102 self.inner.cancel.is_cancelled()
103 }
104
105 pub fn shutdown(&self) {
110 self.inner.cancel.cancel();
111
112 for child in self.inner.children.lock().unwrap().iter() {
114 child.shutdown();
115 }
116
117 let wakers = mem::take(&mut *self.inner.wakers.lock().unwrap());
119 for waker in &wakers {
120 waker();
121 }
122 drop(wakers);
123
124 self.inner.keepalive.lock().unwrap().clear();
126 }
127
128 pub(crate) fn register_waker(&self, f: Arc<dyn Fn() + Send + Sync>) {
130 self.inner.wakers.lock().unwrap().push(f);
131 }
132
133 pub(crate) fn register_keepalive(&self, cell: Box<dyn Any + Send + Sync>) {
137 self.inner.keepalive.lock().unwrap().push(cell);
138 }
139
140 pub(crate) fn register_done_rx(&self, rx: Receiver<()>) {
142 self.inner.done_rxs.lock().unwrap().push(rx);
143 }
144
145 pub fn join(&self) -> Result<(), JoinError> {
147 self.join_timeout(Duration::from_secs(5))
148 }
149
150 #[allow(clippy::disallowed_methods)]
152 pub fn join_timeout(&self, timeout: Duration) -> Result<(), JoinError> {
153 let deadline = time::Instant::now() + timeout;
154 let rxs: Vec<_> = mem::take(&mut *self.inner.done_rxs.lock().unwrap());
155 for rx in rxs {
156 let remaining = deadline.saturating_duration_since(time::Instant::now());
157 match rx.recv_timeout(remaining) {
158 Ok(()) => {}
159 Err(CcRecvTimeoutError::Disconnected) => {
160 }
162 Err(CcRecvTimeoutError::Timeout) => {
163 return Err(JoinError::new("timed out waiting for actors to stop"));
164 }
165 }
166 }
167 Ok(())
168 }
169
170 pub fn scheduler(&self) -> &SchedulerHandle {
172 &self.inner.scheduler
173 }
174
175 pub fn clock(&self) -> &Clock {
177 &self.inner.clock
178 }
179
180 pub fn spawn<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
185 where
186 A::State: Send,
187 {
188 pool::spawn_on_pool(self, name, actor, self.inner.pools.system_pool())
189 }
190
191 pub fn spawn_query<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
196 where
197 A::State: Send,
198 {
199 pool::spawn_on_pool(self, name, actor, self.inner.pools.query_pool())
200 }
201}
202
203impl Debug for ActorSystem {
204 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
205 f.debug_struct("ActorSystem").field("cancelled", &self.is_cancelled()).finish_non_exhaustive()
206 }
207}
208
209pub type ActorHandle<M> = PoolActorHandle<M>;
211
212#[derive(Debug)]
214pub struct JoinError {
215 message: String,
216}
217
218impl JoinError {
219 pub fn new(message: impl Into<String>) -> Self {
221 Self {
222 message: message.into(),
223 }
224 }
225}
226
227impl fmt::Display for JoinError {
228 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
229 write!(f, "actor join failed: {}", self.message)
230 }
231}
232
233impl error::Error for JoinError {}
234
235#[cfg(test)]
236mod tests {
237 use std::sync;
238
239 use super::*;
240 use crate::{
241 actor::{context::Context, traits::Directive},
242 pool::{PoolConfig, Pools},
243 };
244
245 fn test_system() -> ActorSystem {
246 let pools = Pools::new(PoolConfig::default());
247 ActorSystem::new(pools, Clock::Real)
248 }
249
250 struct CounterActor;
251
252 #[derive(Debug)]
253 enum CounterMessage {
254 Inc,
255 Get(sync::mpsc::Sender<i64>),
256 Stop,
257 }
258
259 impl Actor for CounterActor {
260 type State = i64;
261 type Message = CounterMessage;
262
263 fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
264 0
265 }
266
267 fn handle(
268 &self,
269 state: &mut Self::State,
270 msg: Self::Message,
271 _ctx: &Context<Self::Message>,
272 ) -> Directive {
273 match msg {
274 CounterMessage::Inc => *state += 1,
275 CounterMessage::Get(tx) => {
276 let _ = tx.send(*state);
277 }
278 CounterMessage::Stop => return Directive::Stop,
279 }
280 Directive::Continue
281 }
282 }
283
284 #[test]
285 fn test_spawn_and_send() {
286 let system = test_system();
287 let handle = system.spawn("counter", CounterActor);
288
289 let actor_ref = handle.actor_ref().clone();
290 actor_ref.send(CounterMessage::Inc).unwrap();
291 actor_ref.send(CounterMessage::Inc).unwrap();
292 actor_ref.send(CounterMessage::Inc).unwrap();
293
294 let (tx, rx) = sync::mpsc::channel();
295 actor_ref.send(CounterMessage::Get(tx)).unwrap();
296
297 let value = rx.recv().unwrap();
298 assert_eq!(value, 3);
299
300 actor_ref.send(CounterMessage::Stop).unwrap();
301 handle.join().unwrap();
302 }
303
304 #[test]
305 fn test_shutdown_join() {
306 let system = test_system();
307
308 for i in 0..5 {
310 system.spawn(&format!("counter-{i}"), CounterActor);
311 }
312
313 system.shutdown();
315 system.join().unwrap();
316 }
317}