Skip to main content

reifydb_runtime/actor/system/native/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Native actor system implementation.
5//!
6//! Uses rayon for all actors on a shared work-stealing pool.
7
8mod pool;
9
10use std::{
11	any::Any,
12	error, fmt,
13	fmt::{Debug, Formatter},
14	mem,
15	sync::{Arc, Mutex, Once},
16	time,
17	time::Duration,
18};
19
20use crossbeam_channel::{Receiver, RecvTimeoutError as CcRecvTimeoutError};
21use rayon::ThreadPoolBuilder;
22
23use crate::actor::{
24	context::CancellationToken, system::native::pool::PoolActorHandle, timers::scheduler::SchedulerHandle,
25	traits::Actor,
26};
27
28/// Inner shared state for the actor system.
29struct ActorSystemInner {
30	cancel: CancellationToken,
31	scheduler: SchedulerHandle,
32	wakers: Mutex<Vec<Arc<dyn Fn() + Send + Sync>>>,
33	keepalive: Mutex<Vec<Box<dyn Any + Send + Sync>>>,
34	done_rxs: Mutex<Vec<Receiver<()>>>,
35}
36
37/// Unified system for all concurrent work.
38///
39/// Provides:
40/// - Actor spawning on a shared work-stealing pool
41/// - CPU-bound compute with admission control
42/// - Graceful shutdown via cancellation token
43#[derive(Clone)]
44pub struct ActorSystem {
45	inner: Arc<ActorSystemInner>,
46}
47
48static POOL_INIT: Once = Once::new();
49
50impl ActorSystem {
51	/// Create a new actor system with the given number of pool threads.
52	pub fn new(pool_threads: usize) -> Self {
53		POOL_INIT.call_once(|| {
54			ThreadPoolBuilder::new()
55				.num_threads(pool_threads)
56				.thread_name(|i| format!("actor-pool-{i}"))
57				.build_global()
58				.expect("failed to configure global rayon thread pool");
59		});
60
61		let scheduler = SchedulerHandle::new();
62
63		Self {
64			inner: Arc::new(ActorSystemInner {
65				cancel: CancellationToken::new(),
66				scheduler,
67				wakers: Mutex::new(Vec::new()),
68				keepalive: Mutex::new(Vec::new()),
69				done_rxs: Mutex::new(Vec::new()),
70			}),
71		}
72	}
73
74	pub fn scope(&self) -> Self {
75		Self {
76			inner: Arc::new(ActorSystemInner {
77				cancel: CancellationToken::new(),
78				scheduler: self.inner.scheduler.shared(),
79				wakers: Mutex::new(Vec::new()),
80				keepalive: Mutex::new(Vec::new()),
81				done_rxs: Mutex::new(Vec::new()),
82			}),
83		}
84	}
85
86	/// Get the cancellation token for this system.
87	pub fn cancellation_token(&self) -> CancellationToken {
88		self.inner.cancel.clone()
89	}
90
91	/// Check if the system has been cancelled.
92	pub fn is_cancelled(&self) -> bool {
93		self.inner.cancel.is_cancelled()
94	}
95
96	/// Signal shutdown to all actors and the timer scheduler.
97	///
98	/// Cancels all actors, wakes any that are parked, then drops the waker
99	/// and keepalive references so actor cells can be freed.
100	pub fn shutdown(&self) {
101		self.inner.cancel.cancel();
102
103		// Drain wakers: wake all parked actors and release the closures in one step.
104		let wakers = mem::take(&mut *self.inner.wakers.lock().unwrap());
105		for waker in &wakers {
106			waker();
107		}
108		drop(wakers);
109
110		// Release keepalive references so actor cells can be freed.
111		self.inner.keepalive.lock().unwrap().clear();
112	}
113
114	/// Register a waker to be called on shutdown.
115	pub(crate) fn register_waker(&self, f: Arc<dyn Fn() + Send + Sync>) {
116		self.inner.wakers.lock().unwrap().push(f);
117	}
118
119	/// Register an actor cell to be kept alive while the system is running.
120	///
121	/// Cleared on shutdown so actor cells can be freed.
122	pub(crate) fn register_keepalive(&self, cell: Box<dyn Any + Send + Sync>) {
123		self.inner.keepalive.lock().unwrap().push(cell);
124	}
125
126	/// Register a done receiver for an actor, used by `join()` to wait for all actors.
127	pub(crate) fn register_done_rx(&self, rx: Receiver<()>) {
128		self.inner.done_rxs.lock().unwrap().push(rx);
129	}
130
131	/// Wait for all actors to finish after shutdown, with a default 5-second timeout.
132	pub fn join(&self) -> Result<(), JoinError> {
133		self.join_timeout(Duration::from_secs(5))
134	}
135
136	/// Wait for all actors to finish after shutdown, with a custom timeout.
137	#[allow(clippy::disallowed_methods)]
138	pub fn join_timeout(&self, timeout: Duration) -> Result<(), JoinError> {
139		let deadline = time::Instant::now() + timeout;
140		let rxs: Vec<_> = mem::take(&mut *self.inner.done_rxs.lock().unwrap());
141		for rx in rxs {
142			let remaining = deadline.saturating_duration_since(time::Instant::now());
143			match rx.recv_timeout(remaining) {
144				Ok(()) => {}
145				Err(CcRecvTimeoutError::Disconnected) => {
146					// Cell dropped without sending — actor already cleaned up
147				}
148				Err(CcRecvTimeoutError::Timeout) => {
149					return Err(JoinError::new("timed out waiting for actors to stop"));
150				}
151			}
152		}
153		Ok(())
154	}
155
156	/// Get the timer scheduler for scheduling delayed/periodic callbacks.
157	pub fn scheduler(&self) -> &SchedulerHandle {
158		&self.inner.scheduler
159	}
160
161	/// Spawn an actor on the shared work-stealing pool.
162	///
163	/// Returns a handle to the spawned actor.
164	pub fn spawn<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
165	where
166		A::State: Send,
167	{
168		pool::spawn_on_pool(self, name, actor)
169	}
170}
171
172impl Debug for ActorSystem {
173	fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
174		f.debug_struct("ActorSystem").field("cancelled", &self.is_cancelled()).finish_non_exhaustive()
175	}
176}
177
178/// Handle to a spawned actor.
179pub type ActorHandle<M> = PoolActorHandle<M>;
180
181/// Error returned when joining an actor fails.
182#[derive(Debug)]
183pub struct JoinError {
184	message: String,
185}
186
187impl JoinError {
188	/// Create a new JoinError with a message.
189	pub fn new(message: impl Into<String>) -> Self {
190		Self {
191			message: message.into(),
192		}
193	}
194}
195
196impl fmt::Display for JoinError {
197	fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
198		write!(f, "actor join failed: {}", self.message)
199	}
200}
201
202impl error::Error for JoinError {}
203
204#[cfg(test)]
205mod tests {
206	use std::sync;
207
208	use super::*;
209	use crate::actor::{context::Context, traits::Directive};
210
211	struct CounterActor;
212
213	#[derive(Debug)]
214	enum CounterMsg {
215		Inc,
216		Get(sync::mpsc::Sender<i64>),
217		Stop,
218	}
219
220	impl Actor for CounterActor {
221		type State = i64;
222		type Message = CounterMsg;
223
224		fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
225			0
226		}
227
228		fn handle(
229			&self,
230			state: &mut Self::State,
231			msg: Self::Message,
232			_ctx: &Context<Self::Message>,
233		) -> Directive {
234			match msg {
235				CounterMsg::Inc => *state += 1,
236				CounterMsg::Get(tx) => {
237					let _ = tx.send(*state);
238				}
239				CounterMsg::Stop => return Directive::Stop,
240			}
241			Directive::Continue
242		}
243	}
244
245	#[test]
246	fn test_spawn_and_send() {
247		let system = ActorSystem::new(1);
248		let handle = system.spawn("counter", CounterActor);
249
250		let actor_ref = handle.actor_ref().clone();
251		actor_ref.send(CounterMsg::Inc).unwrap();
252		actor_ref.send(CounterMsg::Inc).unwrap();
253		actor_ref.send(CounterMsg::Inc).unwrap();
254
255		let (tx, rx) = sync::mpsc::channel();
256		actor_ref.send(CounterMsg::Get(tx)).unwrap();
257
258		let value = rx.recv().unwrap();
259		assert_eq!(value, 3);
260
261		actor_ref.send(CounterMsg::Stop).unwrap();
262		handle.join().unwrap();
263	}
264
265	#[test]
266	fn test_shutdown_join() {
267		let system = ActorSystem::new(1);
268
269		// Spawn several actors
270		for i in 0..5 {
271			system.spawn(&format!("counter-{i}"), CounterActor);
272		}
273
274		// Shutdown cancels all actors; join waits for them to finish
275		system.shutdown();
276		system.join().unwrap();
277	}
278}