reifydb_runtime/actor/system/native/
mod.rs1mod pool;
9
10use std::{
11 any::Any,
12 error, fmt,
13 fmt::{Debug, Formatter},
14 mem,
15 sync::{Arc, Mutex, Once},
16 time,
17 time::Duration,
18};
19
20use crossbeam_channel::{Receiver, RecvTimeoutError as CcRecvTimeoutError};
21use rayon::ThreadPoolBuilder;
22
23use crate::{
24 actor::{
25 context::CancellationToken, system::native::pool::PoolActorHandle, timers::scheduler::SchedulerHandle,
26 traits::Actor,
27 },
28 context::clock::Clock,
29};
30
31struct ActorSystemInner {
33 cancel: CancellationToken,
34 scheduler: SchedulerHandle,
35 clock: Clock,
36 wakers: Mutex<Vec<Arc<dyn Fn() + Send + Sync>>>,
37 keepalive: Mutex<Vec<Box<dyn Any + Send + Sync>>>,
38 done_rxs: Mutex<Vec<Receiver<()>>>,
39}
40
41#[derive(Clone)]
48pub struct ActorSystem {
49 inner: Arc<ActorSystemInner>,
50}
51
52static POOL_INIT: Once = Once::new();
53
54impl ActorSystem {
55 pub fn new(pool_threads: usize) -> Self {
57 Self::with_clock(pool_threads, Clock::Real)
58 }
59
60 pub fn with_clock(pool_threads: usize, clock: Clock) -> Self {
62 POOL_INIT.call_once(|| {
63 ThreadPoolBuilder::new()
64 .num_threads(pool_threads)
65 .thread_name(|i| format!("actor-pool-{i}"))
66 .build_global()
67 .expect("failed to configure global rayon thread pool");
68 });
69
70 let scheduler = SchedulerHandle::new();
71
72 Self {
73 inner: Arc::new(ActorSystemInner {
74 cancel: CancellationToken::new(),
75 scheduler,
76 clock,
77 wakers: Mutex::new(Vec::new()),
78 keepalive: Mutex::new(Vec::new()),
79 done_rxs: Mutex::new(Vec::new()),
80 }),
81 }
82 }
83
84 pub fn scope(&self) -> Self {
85 Self {
86 inner: Arc::new(ActorSystemInner {
87 cancel: CancellationToken::new(),
88 scheduler: self.inner.scheduler.shared(),
89 clock: self.inner.clock.clone(),
90 wakers: Mutex::new(Vec::new()),
91 keepalive: Mutex::new(Vec::new()),
92 done_rxs: Mutex::new(Vec::new()),
93 }),
94 }
95 }
96
97 pub fn cancellation_token(&self) -> CancellationToken {
99 self.inner.cancel.clone()
100 }
101
102 pub fn is_cancelled(&self) -> bool {
104 self.inner.cancel.is_cancelled()
105 }
106
107 pub fn shutdown(&self) {
112 self.inner.cancel.cancel();
113
114 let wakers = mem::take(&mut *self.inner.wakers.lock().unwrap());
116 for waker in &wakers {
117 waker();
118 }
119 drop(wakers);
120
121 self.inner.keepalive.lock().unwrap().clear();
123 }
124
125 pub(crate) fn register_waker(&self, f: Arc<dyn Fn() + Send + Sync>) {
127 self.inner.wakers.lock().unwrap().push(f);
128 }
129
130 pub(crate) fn register_keepalive(&self, cell: Box<dyn Any + Send + Sync>) {
134 self.inner.keepalive.lock().unwrap().push(cell);
135 }
136
137 pub(crate) fn register_done_rx(&self, rx: Receiver<()>) {
139 self.inner.done_rxs.lock().unwrap().push(rx);
140 }
141
142 pub fn join(&self) -> Result<(), JoinError> {
144 self.join_timeout(Duration::from_secs(5))
145 }
146
147 #[allow(clippy::disallowed_methods)]
149 pub fn join_timeout(&self, timeout: Duration) -> Result<(), JoinError> {
150 let deadline = time::Instant::now() + timeout;
151 let rxs: Vec<_> = mem::take(&mut *self.inner.done_rxs.lock().unwrap());
152 for rx in rxs {
153 let remaining = deadline.saturating_duration_since(time::Instant::now());
154 match rx.recv_timeout(remaining) {
155 Ok(()) => {}
156 Err(CcRecvTimeoutError::Disconnected) => {
157 }
159 Err(CcRecvTimeoutError::Timeout) => {
160 return Err(JoinError::new("timed out waiting for actors to stop"));
161 }
162 }
163 }
164 Ok(())
165 }
166
167 pub fn scheduler(&self) -> &SchedulerHandle {
169 &self.inner.scheduler
170 }
171
172 pub fn clock(&self) -> &Clock {
174 &self.inner.clock
175 }
176
177 pub fn spawn<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
181 where
182 A::State: Send,
183 {
184 pool::spawn_on_pool(self, name, actor)
185 }
186}
187
188impl Debug for ActorSystem {
189 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
190 f.debug_struct("ActorSystem").field("cancelled", &self.is_cancelled()).finish_non_exhaustive()
191 }
192}
193
194pub type ActorHandle<M> = PoolActorHandle<M>;
196
197#[derive(Debug)]
199pub struct JoinError {
200 message: String,
201}
202
203impl JoinError {
204 pub fn new(message: impl Into<String>) -> Self {
206 Self {
207 message: message.into(),
208 }
209 }
210}
211
212impl fmt::Display for JoinError {
213 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
214 write!(f, "actor join failed: {}", self.message)
215 }
216}
217
218impl error::Error for JoinError {}
219
220#[cfg(test)]
221mod tests {
222 use std::sync;
223
224 use super::*;
225 use crate::actor::{context::Context, traits::Directive};
226
227 struct CounterActor;
228
229 #[derive(Debug)]
230 enum CounterMsg {
231 Inc,
232 Get(sync::mpsc::Sender<i64>),
233 Stop,
234 }
235
236 impl Actor for CounterActor {
237 type State = i64;
238 type Message = CounterMsg;
239
240 fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
241 0
242 }
243
244 fn handle(
245 &self,
246 state: &mut Self::State,
247 msg: Self::Message,
248 _ctx: &Context<Self::Message>,
249 ) -> Directive {
250 match msg {
251 CounterMsg::Inc => *state += 1,
252 CounterMsg::Get(tx) => {
253 let _ = tx.send(*state);
254 }
255 CounterMsg::Stop => return Directive::Stop,
256 }
257 Directive::Continue
258 }
259 }
260
261 #[test]
262 fn test_spawn_and_send() {
263 let system = ActorSystem::new(1);
264 let handle = system.spawn("counter", CounterActor);
265
266 let actor_ref = handle.actor_ref().clone();
267 actor_ref.send(CounterMsg::Inc).unwrap();
268 actor_ref.send(CounterMsg::Inc).unwrap();
269 actor_ref.send(CounterMsg::Inc).unwrap();
270
271 let (tx, rx) = sync::mpsc::channel();
272 actor_ref.send(CounterMsg::Get(tx)).unwrap();
273
274 let value = rx.recv().unwrap();
275 assert_eq!(value, 3);
276
277 actor_ref.send(CounterMsg::Stop).unwrap();
278 handle.join().unwrap();
279 }
280
281 #[test]
282 fn test_shutdown_join() {
283 let system = ActorSystem::new(1);
284
285 for i in 0..5 {
287 system.spawn(&format!("counter-{i}"), CounterActor);
288 }
289
290 system.shutdown();
292 system.join().unwrap();
293 }
294}