reifydb_runtime/actor/system/native/
mod.rs1mod pool;
5
6use std::{
7 any::Any,
8 error, fmt,
9 fmt::{Debug, Formatter},
10 mem,
11 sync::{Arc, Mutex},
12 time,
13 time::Duration,
14};
15
16use crossbeam_channel::{Receiver, RecvTimeoutError as CcRecvTimeoutError};
17
18use crate::{
19 actor::{
20 context::CancellationToken, system::native::pool::PoolActorHandle, timers::scheduler::SchedulerHandle,
21 traits::Actor,
22 },
23 context::clock::Clock,
24 pool::Pools,
25};
26
27struct ActorSystemInner {
29 cancel: CancellationToken,
30 scheduler: SchedulerHandle,
31 clock: Clock,
32 pools: Pools,
33 wakers: Mutex<Vec<Arc<dyn Fn() + Send + Sync>>>,
34 keepalive: Mutex<Vec<Box<dyn Any + Send + Sync>>>,
35 done_rxs: Mutex<Vec<Receiver<()>>>,
36 children: Mutex<Vec<ActorSystem>>,
37}
38
39#[derive(Clone)]
46pub struct ActorSystem {
47 inner: Arc<ActorSystemInner>,
48}
49
50impl ActorSystem {
51 pub fn new(pools: Pools, clock: Clock) -> Self {
53 let scheduler = SchedulerHandle::new(pools.system_pool().clone());
54
55 Self {
56 inner: Arc::new(ActorSystemInner {
57 cancel: CancellationToken::new(),
58 scheduler,
59 clock,
60 pools,
61 wakers: Mutex::new(Vec::new()),
62 keepalive: Mutex::new(Vec::new()),
63 done_rxs: Mutex::new(Vec::new()),
64 children: Mutex::new(Vec::new()),
65 }),
66 }
67 }
68
69 pub fn scope(&self) -> Self {
70 let child = Self {
71 inner: Arc::new(ActorSystemInner {
72 cancel: self.inner.cancel.child_token(),
73 scheduler: self.inner.scheduler.shared(),
74 clock: self.inner.clock.clone(),
75 pools: self.inner.pools.clone(),
76 wakers: Mutex::new(Vec::new()),
77 keepalive: Mutex::new(Vec::new()),
78 done_rxs: Mutex::new(Vec::new()),
79 children: Mutex::new(Vec::new()),
80 }),
81 };
82 self.inner.children.lock().unwrap().push(child.clone());
83 child
84 }
85
86 pub fn pools(&self) -> Pools {
88 self.inner.pools.clone()
89 }
90
91 pub fn cancellation_token(&self) -> CancellationToken {
93 self.inner.cancel.clone()
94 }
95
96 pub fn is_cancelled(&self) -> bool {
98 self.inner.cancel.is_cancelled()
99 }
100
101 pub fn shutdown(&self) {
106 self.inner.cancel.cancel();
107
108 for child in self.inner.children.lock().unwrap().iter() {
110 child.shutdown();
111 }
112
113 let wakers = mem::take(&mut *self.inner.wakers.lock().unwrap());
115 for waker in &wakers {
116 waker();
117 }
118 drop(wakers);
119
120 self.inner.keepalive.lock().unwrap().clear();
122 }
123
124 pub(crate) fn register_waker(&self, f: Arc<dyn Fn() + Send + Sync>) {
126 self.inner.wakers.lock().unwrap().push(f);
127 }
128
129 pub(crate) fn register_keepalive(&self, cell: Box<dyn Any + Send + Sync>) {
133 self.inner.keepalive.lock().unwrap().push(cell);
134 }
135
136 pub(crate) fn register_done_rx(&self, rx: Receiver<()>) {
138 self.inner.done_rxs.lock().unwrap().push(rx);
139 }
140
141 pub fn join(&self) -> Result<(), JoinError> {
143 self.join_timeout(Duration::from_secs(5))
144 }
145
146 #[allow(clippy::disallowed_methods)]
148 pub fn join_timeout(&self, timeout: Duration) -> Result<(), JoinError> {
149 let deadline = time::Instant::now() + timeout;
150 let rxs: Vec<_> = mem::take(&mut *self.inner.done_rxs.lock().unwrap());
151 for rx in rxs {
152 let remaining = deadline.saturating_duration_since(time::Instant::now());
153 match rx.recv_timeout(remaining) {
154 Ok(()) => {}
155 Err(CcRecvTimeoutError::Disconnected) => {
156 }
158 Err(CcRecvTimeoutError::Timeout) => {
159 return Err(JoinError::new("timed out waiting for actors to stop"));
160 }
161 }
162 }
163 Ok(())
164 }
165
166 pub fn scheduler(&self) -> &SchedulerHandle {
168 &self.inner.scheduler
169 }
170
171 pub fn clock(&self) -> &Clock {
173 &self.inner.clock
174 }
175
176 pub fn spawn<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
181 where
182 A::State: Send,
183 {
184 pool::spawn_on_pool(self, name, actor, self.inner.pools.system_pool())
185 }
186
187 pub fn spawn_query<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
192 where
193 A::State: Send,
194 {
195 pool::spawn_on_pool(self, name, actor, self.inner.pools.query_pool())
196 }
197}
198
199impl Debug for ActorSystem {
200 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
201 f.debug_struct("ActorSystem").field("cancelled", &self.is_cancelled()).finish_non_exhaustive()
202 }
203}
204
205pub type ActorHandle<M> = PoolActorHandle<M>;
207
208#[derive(Debug)]
210pub struct JoinError {
211 message: String,
212}
213
214impl JoinError {
215 pub fn new(message: impl Into<String>) -> Self {
217 Self {
218 message: message.into(),
219 }
220 }
221}
222
223impl fmt::Display for JoinError {
224 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
225 write!(f, "actor join failed: {}", self.message)
226 }
227}
228
229impl error::Error for JoinError {}
230
231#[cfg(test)]
232mod tests {
233 use std::sync;
234
235 use super::*;
236 use crate::{
237 actor::{context::Context, traits::Directive},
238 pool::{PoolConfig, Pools},
239 };
240
241 fn test_system() -> ActorSystem {
242 let pools = Pools::new(PoolConfig::default());
243 ActorSystem::new(pools, Clock::Real)
244 }
245
246 struct CounterActor;
247
248 #[derive(Debug)]
249 enum CounterMessage {
250 Inc,
251 Get(sync::mpsc::Sender<i64>),
252 Stop,
253 }
254
255 impl Actor for CounterActor {
256 type State = i64;
257 type Message = CounterMessage;
258
259 fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
260 0
261 }
262
263 fn handle(
264 &self,
265 state: &mut Self::State,
266 msg: Self::Message,
267 _ctx: &Context<Self::Message>,
268 ) -> Directive {
269 match msg {
270 CounterMessage::Inc => *state += 1,
271 CounterMessage::Get(tx) => {
272 let _ = tx.send(*state);
273 }
274 CounterMessage::Stop => return Directive::Stop,
275 }
276 Directive::Continue
277 }
278 }
279
280 #[test]
281 fn test_spawn_and_send() {
282 let system = test_system();
283 let handle = system.spawn("counter", CounterActor);
284
285 let actor_ref = handle.actor_ref().clone();
286 actor_ref.send(CounterMessage::Inc).unwrap();
287 actor_ref.send(CounterMessage::Inc).unwrap();
288 actor_ref.send(CounterMessage::Inc).unwrap();
289
290 let (tx, rx) = sync::mpsc::channel();
291 actor_ref.send(CounterMessage::Get(tx)).unwrap();
292
293 let value = rx.recv().unwrap();
294 assert_eq!(value, 3);
295
296 actor_ref.send(CounterMessage::Stop).unwrap();
297 handle.join().unwrap();
298 }
299
300 #[test]
301 fn test_shutdown_join() {
302 let system = test_system();
303
304 for i in 0..5 {
306 system.spawn(&format!("counter-{i}"), CounterActor);
307 }
308
309 system.shutdown();
311 system.join().unwrap();
312 }
313}