reifydb_runtime/actor/system/native/
mod.rs1mod pool;
9
10use std::{
11 any::Any,
12 error, fmt,
13 fmt::{Debug, Formatter},
14 mem,
15 sync::{Arc, Mutex},
16 time,
17 time::Duration,
18};
19
20use crossbeam_channel::{Receiver, RecvTimeoutError as CcRecvTimeoutError};
21use rayon::{ThreadPool, ThreadPoolBuilder};
22use tokio::{sync::Semaphore, task};
23
24use crate::actor::{
25 context::CancellationToken, system::native::pool::PoolActorHandle, timers::scheduler::SchedulerHandle,
26 traits::Actor,
27};
28
29#[derive(Debug, Clone)]
31pub struct ActorSystemConfig {
32 pub pool_threads: usize,
34 pub max_in_flight: usize,
36}
37
38struct ActorSystemInner {
40 pool: Arc<ThreadPool>,
41 permits: Arc<Semaphore>,
42 cancel: CancellationToken,
43 scheduler: SchedulerHandle,
44 wakers: Mutex<Vec<Arc<dyn Fn() + Send + Sync>>>,
45 keepalive: Mutex<Vec<Box<dyn Any + Send + Sync>>>,
46 done_rxs: Mutex<Vec<Receiver<()>>>,
47}
48
49#[derive(Clone)]
56pub struct ActorSystem {
57 inner: Arc<ActorSystemInner>,
58}
59
60impl ActorSystem {
61 pub fn new(config: ActorSystemConfig) -> Self {
63 let pool = Arc::new(
64 ThreadPoolBuilder::new()
65 .num_threads(config.pool_threads)
66 .thread_name(|i| format!("actor-pool-{i}"))
67 .build()
68 .expect("failed to build rayon pool"),
69 );
70
71 let scheduler = SchedulerHandle::new(pool.clone());
72
73 Self {
74 inner: Arc::new(ActorSystemInner {
75 pool,
76 permits: Arc::new(Semaphore::new(config.max_in_flight)),
77 cancel: CancellationToken::new(),
78 scheduler,
79 wakers: Mutex::new(Vec::new()),
80 keepalive: Mutex::new(Vec::new()),
81 done_rxs: Mutex::new(Vec::new()),
82 }),
83 }
84 }
85
86 pub fn scope(&self) -> Self {
87 Self {
88 inner: Arc::new(ActorSystemInner {
89 pool: self.inner.pool.clone(),
90 permits: self.inner.permits.clone(),
91 cancel: CancellationToken::new(),
92 scheduler: self.inner.scheduler.shared(),
93 wakers: Mutex::new(Vec::new()),
94 keepalive: Mutex::new(Vec::new()),
95 done_rxs: Mutex::new(Vec::new()),
96 }),
97 }
98 }
99
100 pub fn cancellation_token(&self) -> CancellationToken {
102 self.inner.cancel.clone()
103 }
104
105 pub fn is_cancelled(&self) -> bool {
107 self.inner.cancel.is_cancelled()
108 }
109
110 pub fn shutdown(&self) {
115 self.inner.cancel.cancel();
116
117 let wakers = mem::take(&mut *self.inner.wakers.lock().unwrap());
119 for waker in &wakers {
120 waker();
121 }
122 drop(wakers);
123
124 self.inner.keepalive.lock().unwrap().clear();
126 }
127
128 pub(crate) fn register_waker(&self, f: Arc<dyn Fn() + Send + Sync>) {
130 self.inner.wakers.lock().unwrap().push(f);
131 }
132
133 pub(crate) fn register_keepalive(&self, cell: Box<dyn Any + Send + Sync>) {
137 self.inner.keepalive.lock().unwrap().push(cell);
138 }
139
140 pub(crate) fn register_done_rx(&self, rx: Receiver<()>) {
142 self.inner.done_rxs.lock().unwrap().push(rx);
143 }
144
145 pub fn join(&self) -> Result<(), JoinError> {
147 self.join_timeout(Duration::from_secs(5))
148 }
149
150 pub fn join_timeout(&self, timeout: Duration) -> Result<(), JoinError> {
152 let deadline = time::Instant::now() + timeout;
153 let rxs: Vec<_> = mem::take(&mut *self.inner.done_rxs.lock().unwrap());
154 for rx in rxs {
155 let remaining = deadline.saturating_duration_since(time::Instant::now());
156 match rx.recv_timeout(remaining) {
157 Ok(()) => {}
158 Err(CcRecvTimeoutError::Disconnected) => {
159 }
161 Err(CcRecvTimeoutError::Timeout) => {
162 return Err(JoinError::new("timed out waiting for actors to stop"));
163 }
164 }
165 }
166 Ok(())
167 }
168
169 pub fn scheduler(&self) -> &SchedulerHandle {
171 &self.inner.scheduler
172 }
173
174 pub fn spawn<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
178 where
179 A::State: Send,
180 {
181 pool::spawn_on_pool(self, name, actor)
182 }
183
184 pub fn install<R, F>(&self, f: F) -> R
189 where
190 R: Send,
191 F: FnOnce() -> R + Send,
192 {
193 self.inner.pool.install(f)
194 }
195
196 pub async fn compute<R, F>(&self, f: F) -> Result<R, task::JoinError>
202 where
203 R: Send + 'static,
204 F: FnOnce() -> R + Send + 'static,
205 {
206 let permit = self.inner.permits.clone().acquire_owned().await.expect("semaphore closed");
207 let inner = self.inner.clone();
208
209 let handle = task::spawn_blocking(move || {
210 let _permit = permit; inner.pool.install(f)
212 });
213
214 handle.await
215 }
216
217 pub(crate) fn pool(&self) -> &Arc<ThreadPool> {
219 &self.inner.pool
220 }
221}
222
223impl Debug for ActorSystem {
224 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
225 f.debug_struct("ActorSystem").field("cancelled", &self.is_cancelled()).finish_non_exhaustive()
226 }
227}
228
229pub type ActorHandle<M> = PoolActorHandle<M>;
231
232#[derive(Debug)]
234pub struct JoinError {
235 message: String,
236}
237
238impl JoinError {
239 pub fn new(message: impl Into<String>) -> Self {
241 Self {
242 message: message.into(),
243 }
244 }
245}
246
247impl fmt::Display for JoinError {
248 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
249 write!(f, "actor join failed: {}", self.message)
250 }
251}
252
253impl error::Error for JoinError {}
254
255#[cfg(test)]
256mod tests {
257 use std::sync;
258
259 use super::*;
260 use crate::{
261 SharedRuntimeConfig,
262 actor::{context::Context, traits::Directive},
263 };
264
265 struct CounterActor;
266
267 #[derive(Debug)]
268 enum CounterMsg {
269 Inc,
270 Get(sync::mpsc::Sender<i64>),
271 Stop,
272 }
273
274 impl Actor for CounterActor {
275 type State = i64;
276 type Message = CounterMsg;
277
278 fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
279 0
280 }
281
282 fn handle(
283 &self,
284 state: &mut Self::State,
285 msg: Self::Message,
286 _ctx: &Context<Self::Message>,
287 ) -> Directive {
288 match msg {
289 CounterMsg::Inc => *state += 1,
290 CounterMsg::Get(tx) => {
291 let _ = tx.send(*state);
292 }
293 CounterMsg::Stop => return Directive::Stop,
294 }
295 Directive::Continue
296 }
297 }
298
299 #[test]
300 fn test_spawn_and_send() {
301 let system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
302 let handle = system.spawn("counter", CounterActor);
303
304 let actor_ref = handle.actor_ref().clone();
305 actor_ref.send(CounterMsg::Inc).unwrap();
306 actor_ref.send(CounterMsg::Inc).unwrap();
307 actor_ref.send(CounterMsg::Inc).unwrap();
308
309 let (tx, rx) = sync::mpsc::channel();
310 actor_ref.send(CounterMsg::Get(tx)).unwrap();
311
312 let value = rx.recv().unwrap();
313 assert_eq!(value, 3);
314
315 actor_ref.send(CounterMsg::Stop).unwrap();
316 handle.join().unwrap();
317 }
318
319 #[test]
320 fn test_install() {
321 let system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
322 let result = system.install(|| 42);
323 assert_eq!(result, 42);
324 }
325
326 #[tokio::test]
327 async fn test_compute() {
328 let system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
329 let result = system.compute(|| 42).await.unwrap();
330 assert_eq!(result, 42);
331 }
332
333 #[test]
334 fn test_shutdown_join() {
335 let system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
336
337 for i in 0..5 {
339 system.spawn(&format!("counter-{i}"), CounterActor);
340 }
341
342 system.shutdown();
344 system.join().unwrap();
345 }
346}