Skip to main content

reifydb_runtime/actor/system/native/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! Native actor system implementation.
5//!
6//! Uses rayon for all actors on a shared work-stealing pool.
7
8mod pool;
9
10use std::{
11	any::Any,
12	fmt::{Debug, Formatter},
13	sync::{Arc, Mutex},
14};
15
16use rayon::{ThreadPool, ThreadPoolBuilder};
17use tokio::{sync::Semaphore, task};
18
19use crate::actor::{
20	context::CancellationToken, system::native::pool::PoolActorHandle, timers::scheduler::SchedulerHandle,
21	traits::Actor,
22};
23
24/// Configuration for the actor system.
25#[derive(Debug, Clone)]
26pub struct ActorSystemConfig {
27	/// Number of worker threads in the shared rayon pool.
28	pub pool_threads: usize,
29	/// Maximum concurrent compute tasks (admission control).
30	pub max_in_flight: usize,
31}
32
33/// Inner shared state for the actor system.
34struct ActorSystemInner {
35	pool: Arc<ThreadPool>,
36	permits: Arc<Semaphore>,
37	cancel: CancellationToken,
38	scheduler: SchedulerHandle,
39	wakers: Mutex<Vec<Arc<dyn Fn() + Send + Sync>>>,
40	keepalive: Mutex<Vec<Box<dyn Any + Send + Sync>>>,
41}
42
43/// Unified system for all concurrent work.
44///
45/// Provides:
46/// - Actor spawning on a shared work-stealing pool
47/// - CPU-bound compute with admission control
48/// - Graceful shutdown via cancellation token
49#[derive(Clone)]
50pub struct ActorSystem {
51	inner: Arc<ActorSystemInner>,
52}
53
54impl ActorSystem {
55	/// Create a new actor system with the given configuration.
56	pub fn new(config: ActorSystemConfig) -> Self {
57		let pool = Arc::new(
58			ThreadPoolBuilder::new()
59				.num_threads(config.pool_threads)
60				.thread_name(|i| format!("actor-pool-{i}"))
61				.build()
62				.expect("failed to build rayon pool"),
63		);
64
65		let scheduler = SchedulerHandle::new(pool.clone());
66
67		Self {
68			inner: Arc::new(ActorSystemInner {
69				pool,
70				permits: Arc::new(Semaphore::new(config.max_in_flight)),
71				cancel: CancellationToken::new(),
72				scheduler,
73				wakers: Mutex::new(Vec::new()),
74				keepalive: Mutex::new(Vec::new()),
75			}),
76		}
77	}
78
79	/// Get the cancellation token for this system.
80	pub fn cancellation_token(&self) -> CancellationToken {
81		self.inner.cancel.clone()
82	}
83
84	/// Check if the system has been cancelled.
85	pub fn is_cancelled(&self) -> bool {
86		self.inner.cancel.is_cancelled()
87	}
88
89	/// Signal shutdown to all actors and the timer scheduler.
90	///
91	/// Cancels all actors, wakes any that are parked, then drops the waker
92	/// and keepalive references so actor cells can be freed.
93	pub fn shutdown(&self) {
94		self.inner.cancel.cancel();
95
96		// Drain wakers: wake all parked actors and release the closures in one step.
97		let wakers = std::mem::take(&mut *self.inner.wakers.lock().unwrap());
98		for waker in &wakers {
99			waker();
100		}
101		drop(wakers);
102
103		// Release keepalive references so actor cells can be freed.
104		self.inner.keepalive.lock().unwrap().clear();
105	}
106
107	/// Register a waker to be called on shutdown.
108	pub(crate) fn register_waker(&self, f: Arc<dyn Fn() + Send + Sync>) {
109		self.inner.wakers.lock().unwrap().push(f);
110	}
111
112	/// Register an actor cell to be kept alive while the system is running.
113	///
114	/// Cleared on shutdown so actor cells can be freed.
115	pub(crate) fn register_keepalive(&self, cell: Box<dyn Any + Send + Sync>) {
116		self.inner.keepalive.lock().unwrap().push(cell);
117	}
118
119	/// Get the timer scheduler for scheduling delayed/periodic callbacks.
120	pub fn scheduler(&self) -> &SchedulerHandle {
121		&self.inner.scheduler
122	}
123
124	/// Spawn an actor on the shared work-stealing pool.
125	///
126	/// Returns a handle to the spawned actor.
127	pub fn spawn<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
128	where
129		A::State: Send,
130	{
131		pool::spawn_on_pool(self, name, actor)
132	}
133
134	/// Executes a closure on the rayon thread pool directly.
135	///
136	/// Synchronous and bypasses admission control.
137	/// Use this when you're already in a synchronous context and need parallel execution.
138	pub fn install<R, F>(&self, f: F) -> R
139	where
140		R: Send,
141		F: FnOnce() -> R + Send,
142	{
143		self.inner.pool.install(f)
144	}
145
146	/// Runs a CPU-bound function on the compute pool.
147	///
148	/// The task is scheduled via `spawn_blocking` and executed on the
149	/// dedicated rayon pool using `install`. Admission control ensures
150	/// no more than `max_in_flight` tasks run concurrently.
151	pub async fn compute<R, F>(&self, f: F) -> Result<R, task::JoinError>
152	where
153		R: Send + 'static,
154		F: FnOnce() -> R + Send + 'static,
155	{
156		let permit = self.inner.permits.clone().acquire_owned().await.expect("semaphore closed");
157		let inner = self.inner.clone();
158
159		let handle = task::spawn_blocking(move || {
160			let _permit = permit; // released when closure returns
161			inner.pool.install(f)
162		});
163
164		handle.await
165	}
166
167	/// Get direct access to the rayon pool (for advanced use cases).
168	pub(crate) fn pool(&self) -> &Arc<ThreadPool> {
169		&self.inner.pool
170	}
171}
172
173impl Debug for ActorSystem {
174	fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
175		f.debug_struct("ActorSystem").field("cancelled", &self.is_cancelled()).finish_non_exhaustive()
176	}
177}
178
179/// Handle to a spawned actor.
180pub type ActorHandle<M> = PoolActorHandle<M>;
181
182/// Error returned when joining an actor fails.
183#[derive(Debug)]
184pub struct JoinError {
185	message: String,
186}
187
188impl JoinError {
189	/// Create a new JoinError with a message.
190	pub fn new(message: impl Into<String>) -> Self {
191		Self {
192			message: message.into(),
193		}
194	}
195}
196
197impl std::fmt::Display for JoinError {
198	fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
199		write!(f, "actor join failed: {}", self.message)
200	}
201}
202
203impl std::error::Error for JoinError {}
204
205#[cfg(test)]
206mod tests {
207	use super::*;
208	use crate::{
209		SharedRuntimeConfig,
210		actor::{context::Context, traits::Directive},
211	};
212
213	struct CounterActor;
214
215	#[derive(Debug)]
216	enum CounterMsg {
217		Inc,
218		Get(std::sync::mpsc::Sender<i64>),
219		Stop,
220	}
221
222	impl Actor for CounterActor {
223		type State = i64;
224		type Message = CounterMsg;
225
226		fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
227			0
228		}
229
230		fn handle(
231			&self,
232			state: &mut Self::State,
233			msg: Self::Message,
234			_ctx: &Context<Self::Message>,
235		) -> Directive {
236			match msg {
237				CounterMsg::Inc => *state += 1,
238				CounterMsg::Get(tx) => {
239					let _ = tx.send(*state);
240				}
241				CounterMsg::Stop => return Directive::Stop,
242			}
243			Directive::Continue
244		}
245	}
246
247	#[test]
248	fn test_spawn_and_send() {
249		let system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
250		let handle = system.spawn("counter", CounterActor);
251
252		let actor_ref = handle.actor_ref().clone();
253		actor_ref.send(CounterMsg::Inc).unwrap();
254		actor_ref.send(CounterMsg::Inc).unwrap();
255		actor_ref.send(CounterMsg::Inc).unwrap();
256
257		let (tx, rx) = std::sync::mpsc::channel();
258		actor_ref.send(CounterMsg::Get(tx)).unwrap();
259
260		let value = rx.recv().unwrap();
261		assert_eq!(value, 3);
262
263		actor_ref.send(CounterMsg::Stop).unwrap();
264		handle.join().unwrap();
265	}
266
267	#[test]
268	fn test_install() {
269		let system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
270		let result = system.install(|| 42);
271		assert_eq!(result, 42);
272	}
273
274	#[tokio::test]
275	async fn test_compute() {
276		let system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
277		let result = system.compute(|| 42).await.unwrap();
278		assert_eq!(result, 42);
279	}
280}