Skip to main content

reifydb_runtime/actor/system/native/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Native actor system implementation.
5//!
6//! Uses rayon for all actors on a shared work-stealing pool.
7
8mod pool;
9
10use std::{
11	any::Any,
12	error, fmt,
13	fmt::{Debug, Formatter},
14	mem,
15	sync::{Arc, Mutex},
16	time,
17	time::Duration,
18};
19
20use crossbeam_channel::{Receiver, RecvTimeoutError as CcRecvTimeoutError};
21
22use crate::{
23	actor::{
24		context::CancellationToken, system::native::pool::PoolActorHandle, timers::scheduler::SchedulerHandle,
25		traits::Actor,
26	},
27	context::clock::Clock,
28	pool::Pools,
29};
30
31/// Inner shared state for the actor system.
32struct ActorSystemInner {
33	cancel: CancellationToken,
34	scheduler: SchedulerHandle,
35	clock: Clock,
36	pools: Pools,
37	wakers: Mutex<Vec<Arc<dyn Fn() + Send + Sync>>>,
38	keepalive: Mutex<Vec<Box<dyn Any + Send + Sync>>>,
39	done_rxs: Mutex<Vec<Receiver<()>>>,
40	children: Mutex<Vec<ActorSystem>>,
41}
42
43/// Unified system for all concurrent work.
44///
45/// Provides:
46/// - Actor spawning on a shared work-stealing pool
47/// - CPU-bound compute with admission control
48/// - Graceful shutdown via cancellation token
49#[derive(Clone)]
50pub struct ActorSystem {
51	inner: Arc<ActorSystemInner>,
52}
53
54impl ActorSystem {
55	/// Create a new actor system with the given pools and clock.
56	pub fn new(pools: Pools, clock: Clock) -> Self {
57		let scheduler = SchedulerHandle::new(pools.system_pool().clone());
58
59		Self {
60			inner: Arc::new(ActorSystemInner {
61				cancel: CancellationToken::new(),
62				scheduler,
63				clock,
64				pools,
65				wakers: Mutex::new(Vec::new()),
66				keepalive: Mutex::new(Vec::new()),
67				done_rxs: Mutex::new(Vec::new()),
68				children: Mutex::new(Vec::new()),
69			}),
70		}
71	}
72
73	pub fn scope(&self) -> Self {
74		let child = Self {
75			inner: Arc::new(ActorSystemInner {
76				cancel: self.inner.cancel.child_token(),
77				scheduler: self.inner.scheduler.shared(),
78				clock: self.inner.clock.clone(),
79				pools: self.inner.pools.clone(),
80				wakers: Mutex::new(Vec::new()),
81				keepalive: Mutex::new(Vec::new()),
82				done_rxs: Mutex::new(Vec::new()),
83				children: Mutex::new(Vec::new()),
84			}),
85		};
86		self.inner.children.lock().unwrap().push(child.clone());
87		child
88	}
89
90	/// Get the pools for this system.
91	pub fn pools(&self) -> Pools {
92		self.inner.pools.clone()
93	}
94
95	/// Get the cancellation token for this system.
96	pub fn cancellation_token(&self) -> CancellationToken {
97		self.inner.cancel.clone()
98	}
99
100	/// Check if the system has been cancelled.
101	pub fn is_cancelled(&self) -> bool {
102		self.inner.cancel.is_cancelled()
103	}
104
105	/// Signal shutdown to all actors and the timer scheduler.
106	///
107	/// Cancels all actors, wakes any that are parked, then drops the waker
108	/// and keepalive references so actor cells can be freed.
109	pub fn shutdown(&self) {
110		self.inner.cancel.cancel();
111
112		// Propagate shutdown to child scopes.
113		for child in self.inner.children.lock().unwrap().iter() {
114			child.shutdown();
115		}
116
117		// Drain wakers: wake all parked actors and release the closures in one step.
118		let wakers = mem::take(&mut *self.inner.wakers.lock().unwrap());
119		for waker in &wakers {
120			waker();
121		}
122		drop(wakers);
123
124		// Release keepalive references so actor cells can be freed.
125		self.inner.keepalive.lock().unwrap().clear();
126	}
127
128	/// Register a waker to be called on shutdown.
129	pub(crate) fn register_waker(&self, f: Arc<dyn Fn() + Send + Sync>) {
130		self.inner.wakers.lock().unwrap().push(f);
131	}
132
133	/// Register an actor cell to be kept alive while the system is running.
134	///
135	/// Cleared on shutdown so actor cells can be freed.
136	pub(crate) fn register_keepalive(&self, cell: Box<dyn Any + Send + Sync>) {
137		self.inner.keepalive.lock().unwrap().push(cell);
138	}
139
140	/// Register a done receiver for an actor, used by `join()` to wait for all actors.
141	pub(crate) fn register_done_rx(&self, rx: Receiver<()>) {
142		self.inner.done_rxs.lock().unwrap().push(rx);
143	}
144
145	/// Wait for all actors to finish after shutdown, with a default 5-second timeout.
146	pub fn join(&self) -> Result<(), JoinError> {
147		self.join_timeout(Duration::from_secs(5))
148	}
149
150	/// Wait for all actors to finish after shutdown, with a custom timeout.
151	#[allow(clippy::disallowed_methods)]
152	pub fn join_timeout(&self, timeout: Duration) -> Result<(), JoinError> {
153		let deadline = time::Instant::now() + timeout;
154		let rxs: Vec<_> = mem::take(&mut *self.inner.done_rxs.lock().unwrap());
155		for rx in rxs {
156			let remaining = deadline.saturating_duration_since(time::Instant::now());
157			match rx.recv_timeout(remaining) {
158				Ok(()) => {}
159				Err(CcRecvTimeoutError::Disconnected) => {
160					// Cell dropped without sending — actor already cleaned up
161				}
162				Err(CcRecvTimeoutError::Timeout) => {
163					return Err(JoinError::new("timed out waiting for actors to stop"));
164				}
165			}
166		}
167		Ok(())
168	}
169
170	/// Get the timer scheduler for scheduling delayed/periodic callbacks.
171	pub fn scheduler(&self) -> &SchedulerHandle {
172		&self.inner.scheduler
173	}
174
175	/// Get the clock for this system.
176	pub fn clock(&self) -> &Clock {
177		&self.inner.clock
178	}
179
180	/// Spawn an actor on the system pool.
181	///
182	/// Use this for lightweight actors that must never stall
183	/// (flow, CDC, watermark, metrics, etc.).
184	pub fn spawn<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
185	where
186		A::State: Send,
187	{
188		pool::spawn_on_pool(self, name, actor, self.inner.pools.system_pool())
189	}
190
191	/// Spawn an actor on the query pool.
192	///
193	/// Use this for execution-heavy actors that may block on engine calls
194	/// (WS, gRPC, HTTP server actors).
195	pub fn spawn_query<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
196	where
197		A::State: Send,
198	{
199		pool::spawn_on_pool(self, name, actor, self.inner.pools.query_pool())
200	}
201}
202
203impl Debug for ActorSystem {
204	fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
205		f.debug_struct("ActorSystem").field("cancelled", &self.is_cancelled()).finish_non_exhaustive()
206	}
207}
208
209/// Handle to a spawned actor.
210pub type ActorHandle<M> = PoolActorHandle<M>;
211
212/// Error returned when joining an actor fails.
213#[derive(Debug)]
214pub struct JoinError {
215	message: String,
216}
217
218impl JoinError {
219	/// Create a new JoinError with a message.
220	pub fn new(message: impl Into<String>) -> Self {
221		Self {
222			message: message.into(),
223		}
224	}
225}
226
227impl fmt::Display for JoinError {
228	fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
229		write!(f, "actor join failed: {}", self.message)
230	}
231}
232
233impl error::Error for JoinError {}
234
235#[cfg(test)]
236mod tests {
237	use std::sync;
238
239	use super::*;
240	use crate::{
241		actor::{context::Context, traits::Directive},
242		pool::{PoolConfig, Pools},
243	};
244
245	fn test_system() -> ActorSystem {
246		let pools = Pools::new(PoolConfig::default());
247		ActorSystem::new(pools, Clock::Real)
248	}
249
250	struct CounterActor;
251
252	#[derive(Debug)]
253	enum CounterMessage {
254		Inc,
255		Get(sync::mpsc::Sender<i64>),
256		Stop,
257	}
258
259	impl Actor for CounterActor {
260		type State = i64;
261		type Message = CounterMessage;
262
263		fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
264			0
265		}
266
267		fn handle(
268			&self,
269			state: &mut Self::State,
270			msg: Self::Message,
271			_ctx: &Context<Self::Message>,
272		) -> Directive {
273			match msg {
274				CounterMessage::Inc => *state += 1,
275				CounterMessage::Get(tx) => {
276					let _ = tx.send(*state);
277				}
278				CounterMessage::Stop => return Directive::Stop,
279			}
280			Directive::Continue
281		}
282	}
283
284	#[test]
285	fn test_spawn_and_send() {
286		let system = test_system();
287		let handle = system.spawn("counter", CounterActor);
288
289		let actor_ref = handle.actor_ref().clone();
290		actor_ref.send(CounterMessage::Inc).unwrap();
291		actor_ref.send(CounterMessage::Inc).unwrap();
292		actor_ref.send(CounterMessage::Inc).unwrap();
293
294		let (tx, rx) = sync::mpsc::channel();
295		actor_ref.send(CounterMessage::Get(tx)).unwrap();
296
297		let value = rx.recv().unwrap();
298		assert_eq!(value, 3);
299
300		actor_ref.send(CounterMessage::Stop).unwrap();
301		handle.join().unwrap();
302	}
303
304	#[test]
305	fn test_shutdown_join() {
306		let system = test_system();
307
308		// Spawn several actors
309		for i in 0..5 {
310			system.spawn(&format!("counter-{i}"), CounterActor);
311		}
312
313		// Shutdown cancels all actors; join waits for them to finish
314		system.shutdown();
315		system.join().unwrap();
316	}
317}