reifydb_runtime/actor/system/native/
mod.rs1mod pool;
9
10use std::{
11 any::Any,
12 error, fmt,
13 fmt::{Debug, Formatter},
14 mem,
15 sync::{Arc, Mutex, Once},
16 time,
17 time::Duration,
18};
19
20use crossbeam_channel::{Receiver, RecvTimeoutError as CcRecvTimeoutError};
21use rayon::ThreadPoolBuilder;
22
23use crate::actor::{
24 context::CancellationToken, system::native::pool::PoolActorHandle, timers::scheduler::SchedulerHandle,
25 traits::Actor,
26};
27
28struct ActorSystemInner {
30 cancel: CancellationToken,
31 scheduler: SchedulerHandle,
32 wakers: Mutex<Vec<Arc<dyn Fn() + Send + Sync>>>,
33 keepalive: Mutex<Vec<Box<dyn Any + Send + Sync>>>,
34 done_rxs: Mutex<Vec<Receiver<()>>>,
35}
36
37#[derive(Clone)]
44pub struct ActorSystem {
45 inner: Arc<ActorSystemInner>,
46}
47
48static POOL_INIT: Once = Once::new();
49
50impl ActorSystem {
51 pub fn new(pool_threads: usize) -> Self {
53 POOL_INIT.call_once(|| {
54 ThreadPoolBuilder::new()
55 .num_threads(pool_threads)
56 .thread_name(|i| format!("actor-pool-{i}"))
57 .build_global()
58 .expect("failed to configure global rayon thread pool");
59 });
60
61 let scheduler = SchedulerHandle::new();
62
63 Self {
64 inner: Arc::new(ActorSystemInner {
65 cancel: CancellationToken::new(),
66 scheduler,
67 wakers: Mutex::new(Vec::new()),
68 keepalive: Mutex::new(Vec::new()),
69 done_rxs: Mutex::new(Vec::new()),
70 }),
71 }
72 }
73
74 pub fn scope(&self) -> Self {
75 Self {
76 inner: Arc::new(ActorSystemInner {
77 cancel: CancellationToken::new(),
78 scheduler: self.inner.scheduler.shared(),
79 wakers: Mutex::new(Vec::new()),
80 keepalive: Mutex::new(Vec::new()),
81 done_rxs: Mutex::new(Vec::new()),
82 }),
83 }
84 }
85
86 pub fn cancellation_token(&self) -> CancellationToken {
88 self.inner.cancel.clone()
89 }
90
91 pub fn is_cancelled(&self) -> bool {
93 self.inner.cancel.is_cancelled()
94 }
95
96 pub fn shutdown(&self) {
101 self.inner.cancel.cancel();
102
103 let wakers = mem::take(&mut *self.inner.wakers.lock().unwrap());
105 for waker in &wakers {
106 waker();
107 }
108 drop(wakers);
109
110 self.inner.keepalive.lock().unwrap().clear();
112 }
113
114 pub(crate) fn register_waker(&self, f: Arc<dyn Fn() + Send + Sync>) {
116 self.inner.wakers.lock().unwrap().push(f);
117 }
118
119 pub(crate) fn register_keepalive(&self, cell: Box<dyn Any + Send + Sync>) {
123 self.inner.keepalive.lock().unwrap().push(cell);
124 }
125
126 pub(crate) fn register_done_rx(&self, rx: Receiver<()>) {
128 self.inner.done_rxs.lock().unwrap().push(rx);
129 }
130
131 pub fn join(&self) -> Result<(), JoinError> {
133 self.join_timeout(Duration::from_secs(5))
134 }
135
136 #[allow(clippy::disallowed_methods)]
138 pub fn join_timeout(&self, timeout: Duration) -> Result<(), JoinError> {
139 let deadline = time::Instant::now() + timeout;
140 let rxs: Vec<_> = mem::take(&mut *self.inner.done_rxs.lock().unwrap());
141 for rx in rxs {
142 let remaining = deadline.saturating_duration_since(time::Instant::now());
143 match rx.recv_timeout(remaining) {
144 Ok(()) => {}
145 Err(CcRecvTimeoutError::Disconnected) => {
146 }
148 Err(CcRecvTimeoutError::Timeout) => {
149 return Err(JoinError::new("timed out waiting for actors to stop"));
150 }
151 }
152 }
153 Ok(())
154 }
155
156 pub fn scheduler(&self) -> &SchedulerHandle {
158 &self.inner.scheduler
159 }
160
161 pub fn spawn<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
165 where
166 A::State: Send,
167 {
168 pool::spawn_on_pool(self, name, actor)
169 }
170}
171
172impl Debug for ActorSystem {
173 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
174 f.debug_struct("ActorSystem").field("cancelled", &self.is_cancelled()).finish_non_exhaustive()
175 }
176}
177
178pub type ActorHandle<M> = PoolActorHandle<M>;
180
181#[derive(Debug)]
183pub struct JoinError {
184 message: String,
185}
186
187impl JoinError {
188 pub fn new(message: impl Into<String>) -> Self {
190 Self {
191 message: message.into(),
192 }
193 }
194}
195
196impl fmt::Display for JoinError {
197 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
198 write!(f, "actor join failed: {}", self.message)
199 }
200}
201
202impl error::Error for JoinError {}
203
204#[cfg(test)]
205mod tests {
206 use std::sync;
207
208 use super::*;
209 use crate::actor::{context::Context, traits::Directive};
210
211 struct CounterActor;
212
213 #[derive(Debug)]
214 enum CounterMsg {
215 Inc,
216 Get(sync::mpsc::Sender<i64>),
217 Stop,
218 }
219
220 impl Actor for CounterActor {
221 type State = i64;
222 type Message = CounterMsg;
223
224 fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
225 0
226 }
227
228 fn handle(
229 &self,
230 state: &mut Self::State,
231 msg: Self::Message,
232 _ctx: &Context<Self::Message>,
233 ) -> Directive {
234 match msg {
235 CounterMsg::Inc => *state += 1,
236 CounterMsg::Get(tx) => {
237 let _ = tx.send(*state);
238 }
239 CounterMsg::Stop => return Directive::Stop,
240 }
241 Directive::Continue
242 }
243 }
244
245 #[test]
246 fn test_spawn_and_send() {
247 let system = ActorSystem::new(1);
248 let handle = system.spawn("counter", CounterActor);
249
250 let actor_ref = handle.actor_ref().clone();
251 actor_ref.send(CounterMsg::Inc).unwrap();
252 actor_ref.send(CounterMsg::Inc).unwrap();
253 actor_ref.send(CounterMsg::Inc).unwrap();
254
255 let (tx, rx) = sync::mpsc::channel();
256 actor_ref.send(CounterMsg::Get(tx)).unwrap();
257
258 let value = rx.recv().unwrap();
259 assert_eq!(value, 3);
260
261 actor_ref.send(CounterMsg::Stop).unwrap();
262 handle.join().unwrap();
263 }
264
265 #[test]
266 fn test_shutdown_join() {
267 let system = ActorSystem::new(1);
268
269 for i in 0..5 {
271 system.spawn(&format!("counter-{i}"), CounterActor);
272 }
273
274 system.shutdown();
276 system.join().unwrap();
277 }
278}