reifydb_runtime/actor/system/native/
mod.rs1mod pool;
5
6use std::{
7 any::Any,
8 error, fmt,
9 fmt::{Debug, Formatter},
10 mem,
11 sync::{Arc, Mutex},
12 time,
13 time::Duration,
14};
15
16use crossbeam_channel::{Receiver, RecvTimeoutError as CcRecvTimeoutError};
17
18use crate::{
19 actor::{
20 context::CancellationToken, system::native::pool::PoolActorHandle, timers::scheduler::SchedulerHandle,
21 traits::Actor,
22 },
23 context::clock::Clock,
24 pool::Pools,
25};
26
27struct ActorSystemInner {
28 cancel: CancellationToken,
29 scheduler: SchedulerHandle,
30 clock: Clock,
31 pools: Pools,
32 wakers: Mutex<Vec<Arc<dyn Fn() + Send + Sync>>>,
33 keepalive: Mutex<Vec<Box<dyn Any + Send + Sync>>>,
34 done_rxs: Mutex<Vec<Receiver<()>>>,
35 children: Mutex<Vec<ActorSystem>>,
36}
37
38#[derive(Clone)]
39pub struct ActorSystem {
40 inner: Arc<ActorSystemInner>,
41}
42
43impl ActorSystem {
44 pub fn new(pools: Pools, clock: Clock) -> Self {
45 let scheduler = SchedulerHandle::new(pools.system_pool().clone());
46
47 Self {
48 inner: Arc::new(ActorSystemInner {
49 cancel: CancellationToken::new(),
50 scheduler,
51 clock,
52 pools,
53 wakers: Mutex::new(Vec::new()),
54 keepalive: Mutex::new(Vec::new()),
55 done_rxs: Mutex::new(Vec::new()),
56 children: Mutex::new(Vec::new()),
57 }),
58 }
59 }
60
61 pub fn scope(&self) -> Self {
62 let child = Self {
63 inner: Arc::new(ActorSystemInner {
64 cancel: self.inner.cancel.child_token(),
65 scheduler: self.inner.scheduler.shared(),
66 clock: self.inner.clock.clone(),
67 pools: self.inner.pools.clone(),
68 wakers: Mutex::new(Vec::new()),
69 keepalive: Mutex::new(Vec::new()),
70 done_rxs: Mutex::new(Vec::new()),
71 children: Mutex::new(Vec::new()),
72 }),
73 };
74 self.inner.children.lock().unwrap().push(child.clone());
75 child
76 }
77
78 pub fn pools(&self) -> Pools {
79 self.inner.pools.clone()
80 }
81
82 pub fn cancellation_token(&self) -> CancellationToken {
83 self.inner.cancel.clone()
84 }
85
86 pub fn is_cancelled(&self) -> bool {
87 self.inner.cancel.is_cancelled()
88 }
89
90 pub fn shutdown(&self) {
91 self.inner.cancel.cancel();
92
93 for child in self.inner.children.lock().unwrap().iter() {
94 child.shutdown();
95 }
96
97 let wakers = mem::take(&mut *self.inner.wakers.lock().unwrap());
98 for waker in &wakers {
99 waker();
100 }
101 drop(wakers);
102
103 self.inner.keepalive.lock().unwrap().clear();
104 }
105
106 pub(crate) fn register_waker(&self, f: Arc<dyn Fn() + Send + Sync>) {
107 self.inner.wakers.lock().unwrap().push(f);
108 }
109
110 pub(crate) fn register_keepalive(&self, cell: Box<dyn Any + Send + Sync>) {
111 self.inner.keepalive.lock().unwrap().push(cell);
112 }
113
114 pub(crate) fn register_done_rx(&self, rx: Receiver<()>) {
115 self.inner.done_rxs.lock().unwrap().push(rx);
116 }
117
118 pub fn join(&self) -> Result<(), JoinError> {
119 self.join_timeout(Duration::from_secs(5))
120 }
121
122 #[allow(clippy::disallowed_methods)]
123 pub fn join_timeout(&self, timeout: Duration) -> Result<(), JoinError> {
124 let deadline = time::Instant::now() + timeout;
125 let rxs: Vec<_> = mem::take(&mut *self.inner.done_rxs.lock().unwrap());
126 for rx in rxs {
127 let remaining = deadline.saturating_duration_since(time::Instant::now());
128 match rx.recv_timeout(remaining) {
129 Ok(()) => {}
130 Err(CcRecvTimeoutError::Disconnected) => {}
131 Err(CcRecvTimeoutError::Timeout) => {
132 return Err(JoinError::new("timed out waiting for actors to stop"));
133 }
134 }
135 }
136 Ok(())
137 }
138
139 pub fn scheduler(&self) -> &SchedulerHandle {
140 &self.inner.scheduler
141 }
142
143 pub fn clock(&self) -> &Clock {
144 &self.inner.clock
145 }
146
147 pub fn spawn_system<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
148 where
149 A::State: Send,
150 {
151 pool::spawn_on_pool(self, name, actor, self.inner.pools.system_pool())
152 }
153
154 pub fn spawn_query<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
155 where
156 A::State: Send,
157 {
158 pool::spawn_on_pool(self, name, actor, self.inner.pools.query_pool())
159 }
160}
161
162impl Debug for ActorSystem {
163 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
164 f.debug_struct("ActorSystem").field("cancelled", &self.is_cancelled()).finish_non_exhaustive()
165 }
166}
167
168pub type ActorHandle<M> = PoolActorHandle<M>;
169
170#[derive(Debug)]
171pub struct JoinError {
172 message: String,
173}
174
175impl JoinError {
176 pub fn new(message: impl Into<String>) -> Self {
177 Self {
178 message: message.into(),
179 }
180 }
181}
182
183impl fmt::Display for JoinError {
184 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
185 write!(f, "actor join failed: {}", self.message)
186 }
187}
188
189impl error::Error for JoinError {}
190
191#[cfg(test)]
192mod tests {
193 use std::sync;
194
195 use super::*;
196 use crate::{
197 actor::{context::Context, traits::Directive},
198 pool::{PoolConfig, Pools},
199 };
200
201 fn test_system() -> ActorSystem {
202 let pools = Pools::new(PoolConfig::default());
203 ActorSystem::new(pools, Clock::Real)
204 }
205
206 struct CounterActor;
207
208 #[derive(Debug)]
209 enum CounterMessage {
210 Inc,
211 Get(sync::mpsc::Sender<i64>),
212 Stop,
213 }
214
215 impl Actor for CounterActor {
216 type State = i64;
217 type Message = CounterMessage;
218
219 fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
220 0
221 }
222
223 fn handle(
224 &self,
225 state: &mut Self::State,
226 msg: Self::Message,
227 _ctx: &Context<Self::Message>,
228 ) -> Directive {
229 match msg {
230 CounterMessage::Inc => *state += 1,
231 CounterMessage::Get(tx) => {
232 let _ = tx.send(*state);
233 }
234 CounterMessage::Stop => return Directive::Stop,
235 }
236 Directive::Continue
237 }
238 }
239
240 #[test]
241 fn test_spawn_and_send() {
242 let system = test_system();
243 let handle = system.spawn_system("counter", CounterActor);
244
245 let actor_ref = handle.actor_ref().clone();
246 actor_ref.send(CounterMessage::Inc).unwrap();
247 actor_ref.send(CounterMessage::Inc).unwrap();
248 actor_ref.send(CounterMessage::Inc).unwrap();
249
250 let (tx, rx) = sync::mpsc::channel();
251 actor_ref.send(CounterMessage::Get(tx)).unwrap();
252
253 let value = rx.recv().unwrap();
254 assert_eq!(value, 3);
255
256 actor_ref.send(CounterMessage::Stop).unwrap();
257 handle.join().unwrap();
258 }
259
260 #[test]
261 fn test_shutdown_join() {
262 let system = test_system();
263
264 for i in 0..5 {
266 system.spawn_system(&format!("counter-{i}"), CounterActor);
267 }
268
269 system.shutdown();
271 system.join().unwrap();
272 }
273}