reifydb_runtime/actor/system/native/
mod.rs1mod pool;
9
10use std::{
11 any::Any,
12 fmt::{Debug, Formatter},
13 sync::{Arc, Mutex},
14};
15
16use rayon::{ThreadPool, ThreadPoolBuilder};
17use tokio::{sync::Semaphore, task};
18
19use crate::actor::{
20 context::CancellationToken, system::native::pool::PoolActorHandle, timers::scheduler::SchedulerHandle,
21 traits::Actor,
22};
23
24#[derive(Debug, Clone)]
26pub struct ActorSystemConfig {
27 pub pool_threads: usize,
29 pub max_in_flight: usize,
31}
32
33struct ActorSystemInner {
35 pool: Arc<ThreadPool>,
36 permits: Arc<Semaphore>,
37 cancel: CancellationToken,
38 scheduler: SchedulerHandle,
39 wakers: Mutex<Vec<Arc<dyn Fn() + Send + Sync>>>,
40 keepalive: Mutex<Vec<Box<dyn Any + Send + Sync>>>,
41}
42
43#[derive(Clone)]
50pub struct ActorSystem {
51 inner: Arc<ActorSystemInner>,
52}
53
54impl ActorSystem {
55 pub fn new(config: ActorSystemConfig) -> Self {
57 let pool = Arc::new(
58 ThreadPoolBuilder::new()
59 .num_threads(config.pool_threads)
60 .thread_name(|i| format!("actor-pool-{i}"))
61 .build()
62 .expect("failed to build rayon pool"),
63 );
64
65 let scheduler = SchedulerHandle::new(pool.clone());
66
67 Self {
68 inner: Arc::new(ActorSystemInner {
69 pool,
70 permits: Arc::new(Semaphore::new(config.max_in_flight)),
71 cancel: CancellationToken::new(),
72 scheduler,
73 wakers: Mutex::new(Vec::new()),
74 keepalive: Mutex::new(Vec::new()),
75 }),
76 }
77 }
78
79 pub fn cancellation_token(&self) -> CancellationToken {
81 self.inner.cancel.clone()
82 }
83
84 pub fn is_cancelled(&self) -> bool {
86 self.inner.cancel.is_cancelled()
87 }
88
89 pub fn shutdown(&self) {
94 self.inner.cancel.cancel();
95
96 let wakers = std::mem::take(&mut *self.inner.wakers.lock().unwrap());
98 for waker in &wakers {
99 waker();
100 }
101 drop(wakers);
102
103 self.inner.keepalive.lock().unwrap().clear();
105 }
106
107 pub(crate) fn register_waker(&self, f: Arc<dyn Fn() + Send + Sync>) {
109 self.inner.wakers.lock().unwrap().push(f);
110 }
111
112 pub(crate) fn register_keepalive(&self, cell: Box<dyn Any + Send + Sync>) {
116 self.inner.keepalive.lock().unwrap().push(cell);
117 }
118
119 pub fn scheduler(&self) -> &SchedulerHandle {
121 &self.inner.scheduler
122 }
123
124 pub fn spawn<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
128 where
129 A::State: Send,
130 {
131 pool::spawn_on_pool(self, name, actor)
132 }
133
134 pub fn install<R, F>(&self, f: F) -> R
139 where
140 R: Send,
141 F: FnOnce() -> R + Send,
142 {
143 self.inner.pool.install(f)
144 }
145
146 pub async fn compute<R, F>(&self, f: F) -> Result<R, task::JoinError>
152 where
153 R: Send + 'static,
154 F: FnOnce() -> R + Send + 'static,
155 {
156 let permit = self.inner.permits.clone().acquire_owned().await.expect("semaphore closed");
157 let inner = self.inner.clone();
158
159 let handle = task::spawn_blocking(move || {
160 let _permit = permit; inner.pool.install(f)
162 });
163
164 handle.await
165 }
166
167 pub(crate) fn pool(&self) -> &Arc<ThreadPool> {
169 &self.inner.pool
170 }
171}
172
173impl Debug for ActorSystem {
174 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
175 f.debug_struct("ActorSystem").field("cancelled", &self.is_cancelled()).finish_non_exhaustive()
176 }
177}
178
179pub type ActorHandle<M> = PoolActorHandle<M>;
181
182#[derive(Debug)]
184pub struct JoinError {
185 message: String,
186}
187
188impl JoinError {
189 pub fn new(message: impl Into<String>) -> Self {
191 Self {
192 message: message.into(),
193 }
194 }
195}
196
197impl std::fmt::Display for JoinError {
198 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
199 write!(f, "actor join failed: {}", self.message)
200 }
201}
202
203impl std::error::Error for JoinError {}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208 use crate::{
209 SharedRuntimeConfig,
210 actor::{context::Context, traits::Directive},
211 };
212
213 struct CounterActor;
214
215 #[derive(Debug)]
216 enum CounterMsg {
217 Inc,
218 Get(std::sync::mpsc::Sender<i64>),
219 Stop,
220 }
221
222 impl Actor for CounterActor {
223 type State = i64;
224 type Message = CounterMsg;
225
226 fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
227 0
228 }
229
230 fn handle(
231 &self,
232 state: &mut Self::State,
233 msg: Self::Message,
234 _ctx: &Context<Self::Message>,
235 ) -> Directive {
236 match msg {
237 CounterMsg::Inc => *state += 1,
238 CounterMsg::Get(tx) => {
239 let _ = tx.send(*state);
240 }
241 CounterMsg::Stop => return Directive::Stop,
242 }
243 Directive::Continue
244 }
245 }
246
247 #[test]
248 fn test_spawn_and_send() {
249 let system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
250 let handle = system.spawn("counter", CounterActor);
251
252 let actor_ref = handle.actor_ref().clone();
253 actor_ref.send(CounterMsg::Inc).unwrap();
254 actor_ref.send(CounterMsg::Inc).unwrap();
255 actor_ref.send(CounterMsg::Inc).unwrap();
256
257 let (tx, rx) = std::sync::mpsc::channel();
258 actor_ref.send(CounterMsg::Get(tx)).unwrap();
259
260 let value = rx.recv().unwrap();
261 assert_eq!(value, 3);
262
263 actor_ref.send(CounterMsg::Stop).unwrap();
264 handle.join().unwrap();
265 }
266
267 #[test]
268 fn test_install() {
269 let system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
270 let result = system.install(|| 42);
271 assert_eq!(result, 42);
272 }
273
274 #[tokio::test]
275 async fn test_compute() {
276 let system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
277 let result = system.compute(|| 42).await.unwrap();
278 assert_eq!(result, 42);
279 }
280}