Skip to main content

reifydb_runtime/actor/system/native/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4mod pool;
5
6use std::{
7	any::Any,
8	error, fmt,
9	fmt::{Debug, Formatter},
10	mem,
11	sync::{Arc, Mutex},
12	time,
13	time::Duration,
14};
15
16use crossbeam_channel::{Receiver, RecvTimeoutError as CcRecvTimeoutError};
17
18use crate::{
19	actor::{
20		context::CancellationToken, system::native::pool::PoolActorHandle, timers::scheduler::SchedulerHandle,
21		traits::Actor,
22	},
23	context::clock::Clock,
24	pool::Pools,
25};
26
27/// Inner shared state for the actor system.
28struct ActorSystemInner {
29	cancel: CancellationToken,
30	scheduler: SchedulerHandle,
31	clock: Clock,
32	pools: Pools,
33	wakers: Mutex<Vec<Arc<dyn Fn() + Send + Sync>>>,
34	keepalive: Mutex<Vec<Box<dyn Any + Send + Sync>>>,
35	done_rxs: Mutex<Vec<Receiver<()>>>,
36	children: Mutex<Vec<ActorSystem>>,
37}
38
39/// Unified system for all concurrent work.
40///
41/// Provides:
42/// - Actor spawning on a shared work-stealing pool
43/// - CPU-bound compute with admission control
44/// - Graceful shutdown via cancellation token
45#[derive(Clone)]
46pub struct ActorSystem {
47	inner: Arc<ActorSystemInner>,
48}
49
50impl ActorSystem {
51	/// Create a new actor system with the given pools and clock.
52	pub fn new(pools: Pools, clock: Clock) -> Self {
53		let scheduler = SchedulerHandle::new(pools.system_pool().clone());
54
55		Self {
56			inner: Arc::new(ActorSystemInner {
57				cancel: CancellationToken::new(),
58				scheduler,
59				clock,
60				pools,
61				wakers: Mutex::new(Vec::new()),
62				keepalive: Mutex::new(Vec::new()),
63				done_rxs: Mutex::new(Vec::new()),
64				children: Mutex::new(Vec::new()),
65			}),
66		}
67	}
68
69	pub fn scope(&self) -> Self {
70		let child = Self {
71			inner: Arc::new(ActorSystemInner {
72				cancel: self.inner.cancel.child_token(),
73				scheduler: self.inner.scheduler.shared(),
74				clock: self.inner.clock.clone(),
75				pools: self.inner.pools.clone(),
76				wakers: Mutex::new(Vec::new()),
77				keepalive: Mutex::new(Vec::new()),
78				done_rxs: Mutex::new(Vec::new()),
79				children: Mutex::new(Vec::new()),
80			}),
81		};
82		self.inner.children.lock().unwrap().push(child.clone());
83		child
84	}
85
86	/// Get the pools for this system.
87	pub fn pools(&self) -> Pools {
88		self.inner.pools.clone()
89	}
90
91	/// Get the cancellation token for this system.
92	pub fn cancellation_token(&self) -> CancellationToken {
93		self.inner.cancel.clone()
94	}
95
96	/// Check if the system has been cancelled.
97	pub fn is_cancelled(&self) -> bool {
98		self.inner.cancel.is_cancelled()
99	}
100
101	/// Signal shutdown to all actors and the timer scheduler.
102	///
103	/// Cancels all actors, wakes any that are parked, then drops the waker
104	/// and keepalive references so actor cells can be freed.
105	pub fn shutdown(&self) {
106		self.inner.cancel.cancel();
107
108		// Propagate shutdown to child scopes.
109		for child in self.inner.children.lock().unwrap().iter() {
110			child.shutdown();
111		}
112
113		// Drain wakers: wake all parked actors and release the closures in one step.
114		let wakers = mem::take(&mut *self.inner.wakers.lock().unwrap());
115		for waker in &wakers {
116			waker();
117		}
118		drop(wakers);
119
120		// Release keepalive references so actor cells can be freed.
121		self.inner.keepalive.lock().unwrap().clear();
122	}
123
124	/// Register a waker to be called on shutdown.
125	pub(crate) fn register_waker(&self, f: Arc<dyn Fn() + Send + Sync>) {
126		self.inner.wakers.lock().unwrap().push(f);
127	}
128
129	/// Register an actor cell to be kept alive while the system is running.
130	///
131	/// Cleared on shutdown so actor cells can be freed.
132	pub(crate) fn register_keepalive(&self, cell: Box<dyn Any + Send + Sync>) {
133		self.inner.keepalive.lock().unwrap().push(cell);
134	}
135
136	/// Register a done receiver for an actor, used by `join()` to wait for all actors.
137	pub(crate) fn register_done_rx(&self, rx: Receiver<()>) {
138		self.inner.done_rxs.lock().unwrap().push(rx);
139	}
140
141	/// Wait for all actors to finish after shutdown, with a default 5-second timeout.
142	pub fn join(&self) -> Result<(), JoinError> {
143		self.join_timeout(Duration::from_secs(5))
144	}
145
146	/// Wait for all actors to finish after shutdown, with a custom timeout.
147	#[allow(clippy::disallowed_methods)]
148	pub fn join_timeout(&self, timeout: Duration) -> Result<(), JoinError> {
149		let deadline = time::Instant::now() + timeout;
150		let rxs: Vec<_> = mem::take(&mut *self.inner.done_rxs.lock().unwrap());
151		for rx in rxs {
152			let remaining = deadline.saturating_duration_since(time::Instant::now());
153			match rx.recv_timeout(remaining) {
154				Ok(()) => {}
155				Err(CcRecvTimeoutError::Disconnected) => {
156					// Cell dropped without sending - actor already cleaned up
157				}
158				Err(CcRecvTimeoutError::Timeout) => {
159					return Err(JoinError::new("timed out waiting for actors to stop"));
160				}
161			}
162		}
163		Ok(())
164	}
165
166	/// Get the timer scheduler for scheduling delayed/periodic callbacks.
167	pub fn scheduler(&self) -> &SchedulerHandle {
168		&self.inner.scheduler
169	}
170
171	/// Get the clock for this system.
172	pub fn clock(&self) -> &Clock {
173		&self.inner.clock
174	}
175
176	/// Spawn an actor on the system pool.
177	///
178	/// Use this for lightweight actors that must never stall
179	/// (flow, CDC, watermark, metrics, etc.).
180	pub fn spawn<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
181	where
182		A::State: Send,
183	{
184		pool::spawn_on_pool(self, name, actor, self.inner.pools.system_pool())
185	}
186
187	/// Spawn an actor on the query pool.
188	///
189	/// Use this for execution-heavy actors that may block on engine calls
190	/// (WS, gRPC, HTTP server actors).
191	pub fn spawn_query<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
192	where
193		A::State: Send,
194	{
195		pool::spawn_on_pool(self, name, actor, self.inner.pools.query_pool())
196	}
197}
198
199impl Debug for ActorSystem {
200	fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
201		f.debug_struct("ActorSystem").field("cancelled", &self.is_cancelled()).finish_non_exhaustive()
202	}
203}
204
205/// Handle to a spawned actor.
206pub type ActorHandle<M> = PoolActorHandle<M>;
207
208/// Error returned when joining an actor fails.
209#[derive(Debug)]
210pub struct JoinError {
211	message: String,
212}
213
214impl JoinError {
215	/// Create a new JoinError with a message.
216	pub fn new(message: impl Into<String>) -> Self {
217		Self {
218			message: message.into(),
219		}
220	}
221}
222
223impl fmt::Display for JoinError {
224	fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
225		write!(f, "actor join failed: {}", self.message)
226	}
227}
228
229impl error::Error for JoinError {}
230
231#[cfg(test)]
232mod tests {
233	use std::sync;
234
235	use super::*;
236	use crate::{
237		actor::{context::Context, traits::Directive},
238		pool::{PoolConfig, Pools},
239	};
240
241	fn test_system() -> ActorSystem {
242		let pools = Pools::new(PoolConfig::default());
243		ActorSystem::new(pools, Clock::Real)
244	}
245
246	struct CounterActor;
247
248	#[derive(Debug)]
249	enum CounterMessage {
250		Inc,
251		Get(sync::mpsc::Sender<i64>),
252		Stop,
253	}
254
255	impl Actor for CounterActor {
256		type State = i64;
257		type Message = CounterMessage;
258
259		fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
260			0
261		}
262
263		fn handle(
264			&self,
265			state: &mut Self::State,
266			msg: Self::Message,
267			_ctx: &Context<Self::Message>,
268		) -> Directive {
269			match msg {
270				CounterMessage::Inc => *state += 1,
271				CounterMessage::Get(tx) => {
272					let _ = tx.send(*state);
273				}
274				CounterMessage::Stop => return Directive::Stop,
275			}
276			Directive::Continue
277		}
278	}
279
280	#[test]
281	fn test_spawn_and_send() {
282		let system = test_system();
283		let handle = system.spawn("counter", CounterActor);
284
285		let actor_ref = handle.actor_ref().clone();
286		actor_ref.send(CounterMessage::Inc).unwrap();
287		actor_ref.send(CounterMessage::Inc).unwrap();
288		actor_ref.send(CounterMessage::Inc).unwrap();
289
290		let (tx, rx) = sync::mpsc::channel();
291		actor_ref.send(CounterMessage::Get(tx)).unwrap();
292
293		let value = rx.recv().unwrap();
294		assert_eq!(value, 3);
295
296		actor_ref.send(CounterMessage::Stop).unwrap();
297		handle.join().unwrap();
298	}
299
300	#[test]
301	fn test_shutdown_join() {
302		let system = test_system();
303
304		// Spawn several actors
305		for i in 0..5 {
306			system.spawn(&format!("counter-{i}"), CounterActor);
307		}
308
309		// Shutdown cancels all actors; join waits for them to finish
310		system.shutdown();
311		system.join().unwrap();
312	}
313}