reifydb_runtime/actor/system/native/
mod.rs1mod pool;
5
6use std::{
7 any::Any,
8 error, fmt,
9 fmt::{Debug, Formatter},
10 mem,
11 sync::Arc,
12 time,
13 time::Duration,
14};
15
16use crossbeam_channel::{Receiver, RecvTimeoutError as CcRecvTimeoutError};
17
18use crate::{
19 actor::{
20 context::CancellationToken, system::native::pool::PoolActorHandle, timers::scheduler::SchedulerHandle,
21 traits::Actor,
22 },
23 context::clock::Clock,
24 pool::Pools,
25 sync::mutex::Mutex,
26};
27
28struct ActorSystemInner {
29 cancel: CancellationToken,
30 scheduler: SchedulerHandle,
31 clock: Clock,
32 pools: Pools,
33 wakers: Mutex<Vec<Arc<dyn Fn() + Send + Sync>>>,
34 keepalive: Mutex<Vec<Box<dyn Any + Send + Sync>>>,
35 done_rxs: Mutex<Vec<Receiver<()>>>,
36 children: Mutex<Vec<ActorSystem>>,
37}
38
39#[derive(Clone)]
40pub struct ActorSystem {
41 inner: Arc<ActorSystemInner>,
42}
43
44impl ActorSystem {
45 pub fn new(pools: Pools, clock: Clock) -> Self {
46 let scheduler = SchedulerHandle::new(pools.system_pool().clone());
47
48 Self {
49 inner: Arc::new(ActorSystemInner {
50 cancel: CancellationToken::new(),
51 scheduler,
52 clock,
53 pools,
54 wakers: Mutex::new(Vec::new()),
55 keepalive: Mutex::new(Vec::new()),
56 done_rxs: Mutex::new(Vec::new()),
57 children: Mutex::new(Vec::new()),
58 }),
59 }
60 }
61
62 pub fn scope(&self) -> Self {
63 let child = Self {
64 inner: Arc::new(ActorSystemInner {
65 cancel: self.inner.cancel.child_token(),
66 scheduler: self.inner.scheduler.shared(),
67 clock: self.inner.clock.clone(),
68 pools: self.inner.pools.clone(),
69 wakers: Mutex::new(Vec::new()),
70 keepalive: Mutex::new(Vec::new()),
71 done_rxs: Mutex::new(Vec::new()),
72 children: Mutex::new(Vec::new()),
73 }),
74 };
75 self.inner.children.lock().push(child.clone());
76 child
77 }
78
79 pub fn pools(&self) -> Pools {
80 self.inner.pools.clone()
81 }
82
83 pub fn cancellation_token(&self) -> CancellationToken {
84 self.inner.cancel.clone()
85 }
86
87 pub fn is_cancelled(&self) -> bool {
88 self.inner.cancel.is_cancelled()
89 }
90
91 pub fn shutdown(&self) {
92 self.inner.cancel.cancel();
93
94 for child in self.inner.children.lock().iter() {
95 child.shutdown();
96 }
97
98 let wakers = mem::take(&mut *self.inner.wakers.lock());
99 for waker in &wakers {
100 waker();
101 }
102 drop(wakers);
103
104 self.inner.keepalive.lock().clear();
105 }
106
107 pub(crate) fn register_waker(&self, f: Arc<dyn Fn() + Send + Sync>) {
108 self.inner.wakers.lock().push(f);
109 }
110
111 pub(crate) fn register_keepalive(&self, cell: Box<dyn Any + Send + Sync>) {
112 self.inner.keepalive.lock().push(cell);
113 }
114
115 pub(crate) fn register_done_rx(&self, rx: Receiver<()>) {
116 self.inner.done_rxs.lock().push(rx);
117 }
118
119 pub fn join(&self) -> Result<(), JoinError> {
120 self.join_timeout(Duration::from_secs(5))
121 }
122
123 #[allow(clippy::disallowed_methods)]
124 pub fn join_timeout(&self, timeout: Duration) -> Result<(), JoinError> {
125 let deadline = time::Instant::now() + timeout;
126 let rxs: Vec<_> = mem::take(&mut *self.inner.done_rxs.lock());
127 for rx in rxs {
128 let remaining = deadline.saturating_duration_since(time::Instant::now());
129 match rx.recv_timeout(remaining) {
130 Ok(()) => {}
131 Err(CcRecvTimeoutError::Disconnected) => {}
132 Err(CcRecvTimeoutError::Timeout) => {
133 return Err(JoinError::new("timed out waiting for actors to stop"));
134 }
135 }
136 }
137 Ok(())
138 }
139
140 pub fn scheduler(&self) -> &SchedulerHandle {
141 &self.inner.scheduler
142 }
143
144 pub fn clock(&self) -> &Clock {
145 &self.inner.clock
146 }
147
148 pub fn spawn_system<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
149 where
150 A::State: Send,
151 {
152 pool::spawn_on_pool(self, name, actor, self.inner.pools.system_pool())
153 }
154
155 pub fn spawn_query<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
156 where
157 A::State: Send,
158 {
159 pool::spawn_on_pool(self, name, actor, self.inner.pools.query_pool())
160 }
161
162 pub fn spawn_commit<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
163 where
164 A::State: Send,
165 {
166 pool::spawn_on_pool(self, name, actor, self.inner.pools.commit_pool())
167 }
168
169 pub fn spawn_background<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
170 where
171 A::State: Send,
172 {
173 pool::spawn_on_pool(self, name, actor, self.inner.pools.background_pool())
174 }
175}
176
177impl Debug for ActorSystem {
178 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
179 f.debug_struct("ActorSystem").field("cancelled", &self.is_cancelled()).finish_non_exhaustive()
180 }
181}
182
183pub type ActorHandle<M> = PoolActorHandle<M>;
184
185#[derive(Debug)]
186pub struct JoinError {
187 message: String,
188}
189
190impl JoinError {
191 pub fn new(message: impl Into<String>) -> Self {
192 Self {
193 message: message.into(),
194 }
195 }
196}
197
198impl fmt::Display for JoinError {
199 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
200 write!(f, "actor join failed: {}", self.message)
201 }
202}
203
204impl error::Error for JoinError {}
205
206#[cfg(test)]
207mod tests {
208 use std::sync;
209
210 use super::*;
211 use crate::{
212 actor::{context::Context, traits::Directive},
213 pool::{PoolConfig, Pools},
214 };
215
216 fn test_system() -> ActorSystem {
217 let pools = Pools::new(PoolConfig::default());
218 ActorSystem::new(pools, Clock::Real)
219 }
220
221 struct CounterActor;
222
223 #[derive(Debug)]
224 enum CounterMessage {
225 Inc,
226 Get(sync::mpsc::Sender<i64>),
227 Stop,
228 }
229
230 impl Actor for CounterActor {
231 type State = i64;
232 type Message = CounterMessage;
233
234 fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
235 0
236 }
237
238 fn handle(
239 &self,
240 state: &mut Self::State,
241 msg: Self::Message,
242 _ctx: &Context<Self::Message>,
243 ) -> Directive {
244 match msg {
245 CounterMessage::Inc => *state += 1,
246 CounterMessage::Get(tx) => {
247 let _ = tx.send(*state);
248 }
249 CounterMessage::Stop => return Directive::Stop,
250 }
251 Directive::Continue
252 }
253 }
254
255 #[test]
256 fn test_spawn_and_send() {
257 let system = test_system();
258 let handle = system.spawn_system("counter", CounterActor);
259
260 let actor_ref = handle.actor_ref().clone();
261 actor_ref.send(CounterMessage::Inc).unwrap();
262 actor_ref.send(CounterMessage::Inc).unwrap();
263 actor_ref.send(CounterMessage::Inc).unwrap();
264
265 let (tx, rx) = sync::mpsc::channel();
266 actor_ref.send(CounterMessage::Get(tx)).unwrap();
267
268 let value = rx.recv().unwrap();
269 assert_eq!(value, 3);
270
271 actor_ref.send(CounterMessage::Stop).unwrap();
272 handle.join().unwrap();
273 }
274
275 #[test]
276 fn test_shutdown_join() {
277 let system = test_system();
278
279 for i in 0..5 {
281 system.spawn_system(&format!("counter-{i}"), CounterActor);
282 }
283
284 system.shutdown();
286 system.join().unwrap();
287 }
288}