Skip to main content

reifydb_runtime/actor/system/native/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! Native actor system implementation.
5//!
6//! Uses rayon for all actors on a shared work-stealing pool.
7
8mod pool;
9
10use std::{
11	any::Any,
12	error, fmt,
13	fmt::{Debug, Formatter},
14	mem,
15	sync::{Arc, Mutex},
16	time,
17	time::Duration,
18};
19
20use crossbeam_channel::{Receiver, RecvTimeoutError as CcRecvTimeoutError};
21use rayon::{ThreadPool, ThreadPoolBuilder};
22use tokio::{sync::Semaphore, task};
23
24use crate::actor::{
25	context::CancellationToken, system::native::pool::PoolActorHandle, timers::scheduler::SchedulerHandle,
26	traits::Actor,
27};
28
29/// Configuration for the actor system.
30#[derive(Debug, Clone)]
31pub struct ActorSystemConfig {
32	/// Number of worker threads in the shared rayon pool.
33	pub pool_threads: usize,
34	/// Maximum concurrent compute tasks (admission control).
35	pub max_in_flight: usize,
36}
37
38/// Inner shared state for the actor system.
39struct ActorSystemInner {
40	pool: Arc<ThreadPool>,
41	permits: Arc<Semaphore>,
42	cancel: CancellationToken,
43	scheduler: SchedulerHandle,
44	wakers: Mutex<Vec<Arc<dyn Fn() + Send + Sync>>>,
45	keepalive: Mutex<Vec<Box<dyn Any + Send + Sync>>>,
46	done_rxs: Mutex<Vec<Receiver<()>>>,
47}
48
49/// Unified system for all concurrent work.
50///
51/// Provides:
52/// - Actor spawning on a shared work-stealing pool
53/// - CPU-bound compute with admission control
54/// - Graceful shutdown via cancellation token
55#[derive(Clone)]
56pub struct ActorSystem {
57	inner: Arc<ActorSystemInner>,
58}
59
60impl ActorSystem {
61	/// Create a new actor system with the given configuration.
62	pub fn new(config: ActorSystemConfig) -> Self {
63		let pool = Arc::new(
64			ThreadPoolBuilder::new()
65				.num_threads(config.pool_threads)
66				.thread_name(|i| format!("actor-pool-{i}"))
67				.build()
68				.expect("failed to build rayon pool"),
69		);
70
71		let scheduler = SchedulerHandle::new(pool.clone());
72
73		Self {
74			inner: Arc::new(ActorSystemInner {
75				pool,
76				permits: Arc::new(Semaphore::new(config.max_in_flight)),
77				cancel: CancellationToken::new(),
78				scheduler,
79				wakers: Mutex::new(Vec::new()),
80				keepalive: Mutex::new(Vec::new()),
81				done_rxs: Mutex::new(Vec::new()),
82			}),
83		}
84	}
85
86	pub fn scope(&self) -> Self {
87		Self {
88			inner: Arc::new(ActorSystemInner {
89				pool: self.inner.pool.clone(),
90				permits: self.inner.permits.clone(),
91				cancel: CancellationToken::new(),
92				scheduler: self.inner.scheduler.shared(),
93				wakers: Mutex::new(Vec::new()),
94				keepalive: Mutex::new(Vec::new()),
95				done_rxs: Mutex::new(Vec::new()),
96			}),
97		}
98	}
99
100	/// Get the cancellation token for this system.
101	pub fn cancellation_token(&self) -> CancellationToken {
102		self.inner.cancel.clone()
103	}
104
105	/// Check if the system has been cancelled.
106	pub fn is_cancelled(&self) -> bool {
107		self.inner.cancel.is_cancelled()
108	}
109
110	/// Signal shutdown to all actors and the timer scheduler.
111	///
112	/// Cancels all actors, wakes any that are parked, then drops the waker
113	/// and keepalive references so actor cells can be freed.
114	pub fn shutdown(&self) {
115		self.inner.cancel.cancel();
116
117		// Drain wakers: wake all parked actors and release the closures in one step.
118		let wakers = mem::take(&mut *self.inner.wakers.lock().unwrap());
119		for waker in &wakers {
120			waker();
121		}
122		drop(wakers);
123
124		// Release keepalive references so actor cells can be freed.
125		self.inner.keepalive.lock().unwrap().clear();
126	}
127
128	/// Register a waker to be called on shutdown.
129	pub(crate) fn register_waker(&self, f: Arc<dyn Fn() + Send + Sync>) {
130		self.inner.wakers.lock().unwrap().push(f);
131	}
132
133	/// Register an actor cell to be kept alive while the system is running.
134	///
135	/// Cleared on shutdown so actor cells can be freed.
136	pub(crate) fn register_keepalive(&self, cell: Box<dyn Any + Send + Sync>) {
137		self.inner.keepalive.lock().unwrap().push(cell);
138	}
139
140	/// Register a done receiver for an actor, used by `join()` to wait for all actors.
141	pub(crate) fn register_done_rx(&self, rx: Receiver<()>) {
142		self.inner.done_rxs.lock().unwrap().push(rx);
143	}
144
145	/// Wait for all actors to finish after shutdown, with a default 5-second timeout.
146	pub fn join(&self) -> Result<(), JoinError> {
147		self.join_timeout(Duration::from_secs(5))
148	}
149
150	/// Wait for all actors to finish after shutdown, with a custom timeout.
151	pub fn join_timeout(&self, timeout: Duration) -> Result<(), JoinError> {
152		let deadline = time::Instant::now() + timeout;
153		let rxs: Vec<_> = mem::take(&mut *self.inner.done_rxs.lock().unwrap());
154		for rx in rxs {
155			let remaining = deadline.saturating_duration_since(time::Instant::now());
156			match rx.recv_timeout(remaining) {
157				Ok(()) => {}
158				Err(CcRecvTimeoutError::Disconnected) => {
159					// Cell dropped without sending — actor already cleaned up
160				}
161				Err(CcRecvTimeoutError::Timeout) => {
162					return Err(JoinError::new("timed out waiting for actors to stop"));
163				}
164			}
165		}
166		Ok(())
167	}
168
169	/// Get the timer scheduler for scheduling delayed/periodic callbacks.
170	pub fn scheduler(&self) -> &SchedulerHandle {
171		&self.inner.scheduler
172	}
173
174	/// Spawn an actor on the shared work-stealing pool.
175	///
176	/// Returns a handle to the spawned actor.
177	pub fn spawn<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
178	where
179		A::State: Send,
180	{
181		pool::spawn_on_pool(self, name, actor)
182	}
183
184	/// Executes a closure on the rayon thread pool directly.
185	///
186	/// Synchronous and bypasses admission control.
187	/// Use this when you're already in a synchronous context and need parallel execution.
188	pub fn install<R, F>(&self, f: F) -> R
189	where
190		R: Send,
191		F: FnOnce() -> R + Send,
192	{
193		self.inner.pool.install(f)
194	}
195
196	/// Runs a CPU-bound function on the compute pool.
197	///
198	/// The task is scheduled via `spawn_blocking` and executed on the
199	/// dedicated rayon pool using `install`. Admission control ensures
200	/// no more than `max_in_flight` tasks run concurrently.
201	pub async fn compute<R, F>(&self, f: F) -> Result<R, task::JoinError>
202	where
203		R: Send + 'static,
204		F: FnOnce() -> R + Send + 'static,
205	{
206		let permit = self.inner.permits.clone().acquire_owned().await.expect("semaphore closed");
207		let inner = self.inner.clone();
208
209		let handle = task::spawn_blocking(move || {
210			let _permit = permit; // released when closure returns
211			inner.pool.install(f)
212		});
213
214		handle.await
215	}
216
217	/// Get direct access to the rayon pool (for advanced use cases).
218	pub(crate) fn pool(&self) -> &Arc<ThreadPool> {
219		&self.inner.pool
220	}
221}
222
223impl Debug for ActorSystem {
224	fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
225		f.debug_struct("ActorSystem").field("cancelled", &self.is_cancelled()).finish_non_exhaustive()
226	}
227}
228
229/// Handle to a spawned actor.
230pub type ActorHandle<M> = PoolActorHandle<M>;
231
232/// Error returned when joining an actor fails.
233#[derive(Debug)]
234pub struct JoinError {
235	message: String,
236}
237
238impl JoinError {
239	/// Create a new JoinError with a message.
240	pub fn new(message: impl Into<String>) -> Self {
241		Self {
242			message: message.into(),
243		}
244	}
245}
246
247impl fmt::Display for JoinError {
248	fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
249		write!(f, "actor join failed: {}", self.message)
250	}
251}
252
253impl error::Error for JoinError {}
254
255#[cfg(test)]
256mod tests {
257	use std::sync;
258
259	use super::*;
260	use crate::{
261		SharedRuntimeConfig,
262		actor::{context::Context, traits::Directive},
263	};
264
265	struct CounterActor;
266
267	#[derive(Debug)]
268	enum CounterMsg {
269		Inc,
270		Get(sync::mpsc::Sender<i64>),
271		Stop,
272	}
273
274	impl Actor for CounterActor {
275		type State = i64;
276		type Message = CounterMsg;
277
278		fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
279			0
280		}
281
282		fn handle(
283			&self,
284			state: &mut Self::State,
285			msg: Self::Message,
286			_ctx: &Context<Self::Message>,
287		) -> Directive {
288			match msg {
289				CounterMsg::Inc => *state += 1,
290				CounterMsg::Get(tx) => {
291					let _ = tx.send(*state);
292				}
293				CounterMsg::Stop => return Directive::Stop,
294			}
295			Directive::Continue
296		}
297	}
298
299	#[test]
300	fn test_spawn_and_send() {
301		let system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
302		let handle = system.spawn("counter", CounterActor);
303
304		let actor_ref = handle.actor_ref().clone();
305		actor_ref.send(CounterMsg::Inc).unwrap();
306		actor_ref.send(CounterMsg::Inc).unwrap();
307		actor_ref.send(CounterMsg::Inc).unwrap();
308
309		let (tx, rx) = sync::mpsc::channel();
310		actor_ref.send(CounterMsg::Get(tx)).unwrap();
311
312		let value = rx.recv().unwrap();
313		assert_eq!(value, 3);
314
315		actor_ref.send(CounterMsg::Stop).unwrap();
316		handle.join().unwrap();
317	}
318
319	#[test]
320	fn test_install() {
321		let system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
322		let result = system.install(|| 42);
323		assert_eq!(result, 42);
324	}
325
326	#[tokio::test]
327	async fn test_compute() {
328		let system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
329		let result = system.compute(|| 42).await.unwrap();
330		assert_eq!(result, 42);
331	}
332
333	#[test]
334	fn test_shutdown_join() {
335		let system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
336
337		// Spawn several actors
338		for i in 0..5 {
339			system.spawn(&format!("counter-{i}"), CounterActor);
340		}
341
342		// Shutdown cancels all actors; join waits for them to finish
343		system.shutdown();
344		system.join().unwrap();
345	}
346}