reifydb_runtime/actor/system/native/
mod.rs1mod pool;
9
10use std::{
11 any::Any,
12 error, fmt,
13 fmt::{Debug, Formatter},
14 mem,
15 sync::{Arc, Mutex},
16 time,
17 time::Duration,
18};
19
20use crossbeam_channel::{Receiver, RecvTimeoutError as CcRecvTimeoutError};
21use rayon::{ThreadPool, ThreadPoolBuilder};
22use tokio::{sync::Semaphore, task};
23
24use crate::actor::{
25 context::CancellationToken, system::native::pool::PoolActorHandle, timers::scheduler::SchedulerHandle,
26 traits::Actor,
27};
28
29#[derive(Debug, Clone)]
31pub struct ActorSystemConfig {
32 pub pool_threads: usize,
34 pub max_in_flight: usize,
36}
37
38struct ActorSystemInner {
40 pool: Arc<ThreadPool>,
41 permits: Arc<Semaphore>,
42 cancel: CancellationToken,
43 scheduler: SchedulerHandle,
44 wakers: Mutex<Vec<Arc<dyn Fn() + Send + Sync>>>,
45 keepalive: Mutex<Vec<Box<dyn Any + Send + Sync>>>,
46 done_rxs: Mutex<Vec<Receiver<()>>>,
47}
48
49#[derive(Clone)]
56pub struct ActorSystem {
57 inner: Arc<ActorSystemInner>,
58}
59
60impl ActorSystem {
61 pub fn new(config: ActorSystemConfig) -> Self {
63 let pool = Arc::new(
64 ThreadPoolBuilder::new()
65 .num_threads(config.pool_threads)
66 .thread_name(|i| format!("actor-pool-{i}"))
67 .build()
68 .expect("failed to build rayon pool"),
69 );
70
71 let scheduler = SchedulerHandle::new(pool.clone());
72
73 Self {
74 inner: Arc::new(ActorSystemInner {
75 pool,
76 permits: Arc::new(Semaphore::new(config.max_in_flight)),
77 cancel: CancellationToken::new(),
78 scheduler,
79 wakers: Mutex::new(Vec::new()),
80 keepalive: Mutex::new(Vec::new()),
81 done_rxs: Mutex::new(Vec::new()),
82 }),
83 }
84 }
85
86 pub fn scope(&self) -> Self {
87 Self {
88 inner: Arc::new(ActorSystemInner {
89 pool: self.inner.pool.clone(),
90 permits: self.inner.permits.clone(),
91 cancel: CancellationToken::new(),
92 scheduler: self.inner.scheduler.shared(),
93 wakers: Mutex::new(Vec::new()),
94 keepalive: Mutex::new(Vec::new()),
95 done_rxs: Mutex::new(Vec::new()),
96 }),
97 }
98 }
99
100 pub fn cancellation_token(&self) -> CancellationToken {
102 self.inner.cancel.clone()
103 }
104
105 pub fn is_cancelled(&self) -> bool {
107 self.inner.cancel.is_cancelled()
108 }
109
110 pub fn shutdown(&self) {
115 self.inner.cancel.cancel();
116
117 let wakers = mem::take(&mut *self.inner.wakers.lock().unwrap());
119 for waker in &wakers {
120 waker();
121 }
122 drop(wakers);
123
124 self.inner.keepalive.lock().unwrap().clear();
126 }
127
128 pub(crate) fn register_waker(&self, f: Arc<dyn Fn() + Send + Sync>) {
130 self.inner.wakers.lock().unwrap().push(f);
131 }
132
133 pub(crate) fn register_keepalive(&self, cell: Box<dyn Any + Send + Sync>) {
137 self.inner.keepalive.lock().unwrap().push(cell);
138 }
139
140 pub(crate) fn register_done_rx(&self, rx: Receiver<()>) {
142 self.inner.done_rxs.lock().unwrap().push(rx);
143 }
144
145 pub fn join(&self) -> Result<(), JoinError> {
147 self.join_timeout(Duration::from_secs(5))
148 }
149
150 pub fn join_timeout(&self, timeout: Duration) -> Result<(), JoinError> {
152 let deadline = time::Instant::now() + timeout;
153 let rxs: Vec<_> = mem::take(&mut *self.inner.done_rxs.lock().unwrap());
154 for rx in rxs {
155 let remaining = deadline.saturating_duration_since(time::Instant::now());
156 match rx.recv_timeout(remaining) {
157 Ok(()) => {}
158 Err(CcRecvTimeoutError::Disconnected) => {
159 }
161 Err(CcRecvTimeoutError::Timeout) => {
162 return Err(JoinError::new("timed out waiting for actors to stop"));
163 }
164 }
165 }
166 Ok(())
167 }
168
169 pub fn scheduler(&self) -> &SchedulerHandle {
171 &self.inner.scheduler
172 }
173
174 pub fn spawn<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
178 where
179 A::State: Send,
180 {
181 pool::spawn_on_pool(self, name, actor)
182 }
183
184 pub fn install<R, F>(&self, f: F) -> R
189 where
190 R: Send,
191 F: FnOnce() -> R + Send,
192 {
193 self.inner.pool.install(f)
194 }
195
196 pub async fn compute<R, F>(&self, f: F) -> Result<R, task::JoinError>
202 where
203 R: Send + 'static,
204 F: FnOnce() -> R + Send + 'static,
205 {
206 let permit = self.inner.permits.clone().acquire_owned().await.expect("semaphore closed");
207 let inner = self.inner.clone();
208
209 let handle = task::spawn_blocking(move || {
210 let _permit = permit; inner.pool.install(f)
212 });
213
214 handle.await
215 }
216
217 pub async fn execute<R, F>(&self, f: F) -> Result<R, task::JoinError>
223 where
224 R: Send + 'static,
225 F: FnOnce() -> R + Send + 'static,
226 {
227 let permit = self.inner.permits.clone().acquire_owned().await.expect("semaphore closed");
228
229 let handle = task::spawn_blocking(move || {
230 let _permit = permit;
231 f()
232 });
233
234 handle.await
235 }
236
237 pub(crate) fn pool(&self) -> &Arc<ThreadPool> {
239 &self.inner.pool
240 }
241}
242
243impl Debug for ActorSystem {
244 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
245 f.debug_struct("ActorSystem").field("cancelled", &self.is_cancelled()).finish_non_exhaustive()
246 }
247}
248
249pub type ActorHandle<M> = PoolActorHandle<M>;
251
252#[derive(Debug)]
254pub struct JoinError {
255 message: String,
256}
257
258impl JoinError {
259 pub fn new(message: impl Into<String>) -> Self {
261 Self {
262 message: message.into(),
263 }
264 }
265}
266
267impl fmt::Display for JoinError {
268 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
269 write!(f, "actor join failed: {}", self.message)
270 }
271}
272
273impl error::Error for JoinError {}
274
275#[cfg(test)]
276mod tests {
277 use std::sync;
278
279 use super::*;
280 use crate::{
281 SharedRuntimeConfig,
282 actor::{context::Context, traits::Directive},
283 };
284
285 struct CounterActor;
286
287 #[derive(Debug)]
288 enum CounterMsg {
289 Inc,
290 Get(sync::mpsc::Sender<i64>),
291 Stop,
292 }
293
294 impl Actor for CounterActor {
295 type State = i64;
296 type Message = CounterMsg;
297
298 fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
299 0
300 }
301
302 fn handle(
303 &self,
304 state: &mut Self::State,
305 msg: Self::Message,
306 _ctx: &Context<Self::Message>,
307 ) -> Directive {
308 match msg {
309 CounterMsg::Inc => *state += 1,
310 CounterMsg::Get(tx) => {
311 let _ = tx.send(*state);
312 }
313 CounterMsg::Stop => return Directive::Stop,
314 }
315 Directive::Continue
316 }
317 }
318
319 #[test]
320 fn test_spawn_and_send() {
321 let system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
322 let handle = system.spawn("counter", CounterActor);
323
324 let actor_ref = handle.actor_ref().clone();
325 actor_ref.send(CounterMsg::Inc).unwrap();
326 actor_ref.send(CounterMsg::Inc).unwrap();
327 actor_ref.send(CounterMsg::Inc).unwrap();
328
329 let (tx, rx) = sync::mpsc::channel();
330 actor_ref.send(CounterMsg::Get(tx)).unwrap();
331
332 let value = rx.recv().unwrap();
333 assert_eq!(value, 3);
334
335 actor_ref.send(CounterMsg::Stop).unwrap();
336 handle.join().unwrap();
337 }
338
339 #[test]
340 fn test_install() {
341 let system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
342 let result = system.install(|| 42);
343 assert_eq!(result, 42);
344 }
345
346 #[tokio::test]
347 async fn test_compute() {
348 let system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
349 let result = system.compute(|| 42).await.unwrap();
350 assert_eq!(result, 42);
351 }
352
353 #[test]
354 fn test_shutdown_join() {
355 let system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
356
357 for i in 0..5 {
359 system.spawn(&format!("counter-{i}"), CounterActor);
360 }
361
362 system.shutdown();
364 system.join().unwrap();
365 }
366}