Skip to main content

reifydb_runtime/actor/system/native/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Native actor system implementation.
5//!
6//! Uses rayon for all actors on a shared work-stealing pool.
7
8mod pool;
9
10use std::{
11	any::Any,
12	error, fmt,
13	fmt::{Debug, Formatter},
14	mem,
15	sync::{Arc, Mutex, Once},
16	time,
17	time::Duration,
18};
19
20use crossbeam_channel::{Receiver, RecvTimeoutError as CcRecvTimeoutError};
21use rayon::ThreadPoolBuilder;
22
23use crate::{
24	actor::{
25		context::CancellationToken, system::native::pool::PoolActorHandle, timers::scheduler::SchedulerHandle,
26		traits::Actor,
27	},
28	context::clock::Clock,
29};
30
31/// Inner shared state for the actor system.
32struct ActorSystemInner {
33	cancel: CancellationToken,
34	scheduler: SchedulerHandle,
35	clock: Clock,
36	wakers: Mutex<Vec<Arc<dyn Fn() + Send + Sync>>>,
37	keepalive: Mutex<Vec<Box<dyn Any + Send + Sync>>>,
38	done_rxs: Mutex<Vec<Receiver<()>>>,
39}
40
41/// Unified system for all concurrent work.
42///
43/// Provides:
44/// - Actor spawning on a shared work-stealing pool
45/// - CPU-bound compute with admission control
46/// - Graceful shutdown via cancellation token
47#[derive(Clone)]
48pub struct ActorSystem {
49	inner: Arc<ActorSystemInner>,
50}
51
52static POOL_INIT: Once = Once::new();
53
54impl ActorSystem {
55	/// Create a new actor system with the given number of pool threads.
56	pub fn new(pool_threads: usize) -> Self {
57		Self::with_clock(pool_threads, Clock::Real)
58	}
59
60	/// Create a new actor system with a specific clock.
61	pub fn with_clock(pool_threads: usize, clock: Clock) -> Self {
62		POOL_INIT.call_once(|| {
63			ThreadPoolBuilder::new()
64				.num_threads(pool_threads)
65				.thread_name(|i| format!("actor-pool-{i}"))
66				.build_global()
67				.expect("failed to configure global rayon thread pool");
68		});
69
70		let scheduler = SchedulerHandle::new();
71
72		Self {
73			inner: Arc::new(ActorSystemInner {
74				cancel: CancellationToken::new(),
75				scheduler,
76				clock,
77				wakers: Mutex::new(Vec::new()),
78				keepalive: Mutex::new(Vec::new()),
79				done_rxs: Mutex::new(Vec::new()),
80			}),
81		}
82	}
83
84	pub fn scope(&self) -> Self {
85		Self {
86			inner: Arc::new(ActorSystemInner {
87				cancel: CancellationToken::new(),
88				scheduler: self.inner.scheduler.shared(),
89				clock: self.inner.clock.clone(),
90				wakers: Mutex::new(Vec::new()),
91				keepalive: Mutex::new(Vec::new()),
92				done_rxs: Mutex::new(Vec::new()),
93			}),
94		}
95	}
96
97	/// Get the cancellation token for this system.
98	pub fn cancellation_token(&self) -> CancellationToken {
99		self.inner.cancel.clone()
100	}
101
102	/// Check if the system has been cancelled.
103	pub fn is_cancelled(&self) -> bool {
104		self.inner.cancel.is_cancelled()
105	}
106
107	/// Signal shutdown to all actors and the timer scheduler.
108	///
109	/// Cancels all actors, wakes any that are parked, then drops the waker
110	/// and keepalive references so actor cells can be freed.
111	pub fn shutdown(&self) {
112		self.inner.cancel.cancel();
113
114		// Drain wakers: wake all parked actors and release the closures in one step.
115		let wakers = mem::take(&mut *self.inner.wakers.lock().unwrap());
116		for waker in &wakers {
117			waker();
118		}
119		drop(wakers);
120
121		// Release keepalive references so actor cells can be freed.
122		self.inner.keepalive.lock().unwrap().clear();
123	}
124
125	/// Register a waker to be called on shutdown.
126	pub(crate) fn register_waker(&self, f: Arc<dyn Fn() + Send + Sync>) {
127		self.inner.wakers.lock().unwrap().push(f);
128	}
129
130	/// Register an actor cell to be kept alive while the system is running.
131	///
132	/// Cleared on shutdown so actor cells can be freed.
133	pub(crate) fn register_keepalive(&self, cell: Box<dyn Any + Send + Sync>) {
134		self.inner.keepalive.lock().unwrap().push(cell);
135	}
136
137	/// Register a done receiver for an actor, used by `join()` to wait for all actors.
138	pub(crate) fn register_done_rx(&self, rx: Receiver<()>) {
139		self.inner.done_rxs.lock().unwrap().push(rx);
140	}
141
142	/// Wait for all actors to finish after shutdown, with a default 5-second timeout.
143	pub fn join(&self) -> Result<(), JoinError> {
144		self.join_timeout(Duration::from_secs(5))
145	}
146
147	/// Wait for all actors to finish after shutdown, with a custom timeout.
148	#[allow(clippy::disallowed_methods)]
149	pub fn join_timeout(&self, timeout: Duration) -> Result<(), JoinError> {
150		let deadline = time::Instant::now() + timeout;
151		let rxs: Vec<_> = mem::take(&mut *self.inner.done_rxs.lock().unwrap());
152		for rx in rxs {
153			let remaining = deadline.saturating_duration_since(time::Instant::now());
154			match rx.recv_timeout(remaining) {
155				Ok(()) => {}
156				Err(CcRecvTimeoutError::Disconnected) => {
157					// Cell dropped without sending — actor already cleaned up
158				}
159				Err(CcRecvTimeoutError::Timeout) => {
160					return Err(JoinError::new("timed out waiting for actors to stop"));
161				}
162			}
163		}
164		Ok(())
165	}
166
167	/// Get the timer scheduler for scheduling delayed/periodic callbacks.
168	pub fn scheduler(&self) -> &SchedulerHandle {
169		&self.inner.scheduler
170	}
171
172	/// Get the clock for this system.
173	pub fn clock(&self) -> &Clock {
174		&self.inner.clock
175	}
176
177	/// Spawn an actor on the shared work-stealing pool.
178	///
179	/// Returns a handle to the spawned actor.
180	pub fn spawn<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
181	where
182		A::State: Send,
183	{
184		pool::spawn_on_pool(self, name, actor)
185	}
186}
187
188impl Debug for ActorSystem {
189	fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
190		f.debug_struct("ActorSystem").field("cancelled", &self.is_cancelled()).finish_non_exhaustive()
191	}
192}
193
194/// Handle to a spawned actor.
195pub type ActorHandle<M> = PoolActorHandle<M>;
196
197/// Error returned when joining an actor fails.
198#[derive(Debug)]
199pub struct JoinError {
200	message: String,
201}
202
203impl JoinError {
204	/// Create a new JoinError with a message.
205	pub fn new(message: impl Into<String>) -> Self {
206		Self {
207			message: message.into(),
208		}
209	}
210}
211
212impl fmt::Display for JoinError {
213	fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
214		write!(f, "actor join failed: {}", self.message)
215	}
216}
217
218impl error::Error for JoinError {}
219
220#[cfg(test)]
221mod tests {
222	use std::sync;
223
224	use super::*;
225	use crate::actor::{context::Context, traits::Directive};
226
227	struct CounterActor;
228
229	#[derive(Debug)]
230	enum CounterMsg {
231		Inc,
232		Get(sync::mpsc::Sender<i64>),
233		Stop,
234	}
235
236	impl Actor for CounterActor {
237		type State = i64;
238		type Message = CounterMsg;
239
240		fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
241			0
242		}
243
244		fn handle(
245			&self,
246			state: &mut Self::State,
247			msg: Self::Message,
248			_ctx: &Context<Self::Message>,
249		) -> Directive {
250			match msg {
251				CounterMsg::Inc => *state += 1,
252				CounterMsg::Get(tx) => {
253					let _ = tx.send(*state);
254				}
255				CounterMsg::Stop => return Directive::Stop,
256			}
257			Directive::Continue
258		}
259	}
260
261	#[test]
262	fn test_spawn_and_send() {
263		let system = ActorSystem::new(1);
264		let handle = system.spawn("counter", CounterActor);
265
266		let actor_ref = handle.actor_ref().clone();
267		actor_ref.send(CounterMsg::Inc).unwrap();
268		actor_ref.send(CounterMsg::Inc).unwrap();
269		actor_ref.send(CounterMsg::Inc).unwrap();
270
271		let (tx, rx) = sync::mpsc::channel();
272		actor_ref.send(CounterMsg::Get(tx)).unwrap();
273
274		let value = rx.recv().unwrap();
275		assert_eq!(value, 3);
276
277		actor_ref.send(CounterMsg::Stop).unwrap();
278		handle.join().unwrap();
279	}
280
281	#[test]
282	fn test_shutdown_join() {
283		let system = ActorSystem::new(1);
284
285		// Spawn several actors
286		for i in 0..5 {
287			system.spawn(&format!("counter-{i}"), CounterActor);
288		}
289
290		// Shutdown cancels all actors; join waits for them to finish
291		system.shutdown();
292		system.join().unwrap();
293	}
294}