Skip to main content

reifydb_runtime/actor/system/native/
mod.rs

1// SPDX-License-Identifier: MIT
2// Copyright (c) 2026 ReifyDB
3
4mod pool;
5
6use std::{
7	any::Any,
8	error, fmt,
9	fmt::{Debug, Formatter},
10	mem,
11	sync::Arc,
12	time,
13	time::Duration,
14};
15
16use crossbeam_channel::{Receiver, RecvTimeoutError as CcRecvTimeoutError};
17
18use crate::{
19	actor::{
20		context::CancellationToken, system::native::pool::PoolActorHandle, timers::scheduler::SchedulerHandle,
21		traits::Actor,
22	},
23	context::clock::Clock,
24	pool::Pools,
25	sync::mutex::Mutex,
26};
27
28struct ActorSystemInner {
29	cancel: CancellationToken,
30	scheduler: SchedulerHandle,
31	clock: Clock,
32	pools: Pools,
33	wakers: Mutex<Vec<Arc<dyn Fn() + Send + Sync>>>,
34	keepalive: Mutex<Vec<Box<dyn Any + Send + Sync>>>,
35	done_rxs: Mutex<Vec<Receiver<()>>>,
36	children: Mutex<Vec<ActorSystem>>,
37}
38
39#[derive(Clone)]
40pub struct ActorSystem {
41	inner: Arc<ActorSystemInner>,
42}
43
44impl ActorSystem {
45	pub fn new(pools: Pools, clock: Clock) -> Self {
46		let scheduler = SchedulerHandle::new(pools.system_pool().clone());
47
48		Self {
49			inner: Arc::new(ActorSystemInner {
50				cancel: CancellationToken::new(),
51				scheduler,
52				clock,
53				pools,
54				wakers: Mutex::new(Vec::new()),
55				keepalive: Mutex::new(Vec::new()),
56				done_rxs: Mutex::new(Vec::new()),
57				children: Mutex::new(Vec::new()),
58			}),
59		}
60	}
61
62	pub fn scope(&self) -> Self {
63		let child = Self {
64			inner: Arc::new(ActorSystemInner {
65				cancel: self.inner.cancel.child_token(),
66				scheduler: self.inner.scheduler.shared(),
67				clock: self.inner.clock.clone(),
68				pools: self.inner.pools.clone(),
69				wakers: Mutex::new(Vec::new()),
70				keepalive: Mutex::new(Vec::new()),
71				done_rxs: Mutex::new(Vec::new()),
72				children: Mutex::new(Vec::new()),
73			}),
74		};
75		self.inner.children.lock().push(child.clone());
76		child
77	}
78
79	pub fn pools(&self) -> Pools {
80		self.inner.pools.clone()
81	}
82
83	pub fn cancellation_token(&self) -> CancellationToken {
84		self.inner.cancel.clone()
85	}
86
87	pub fn is_cancelled(&self) -> bool {
88		self.inner.cancel.is_cancelled()
89	}
90
91	pub fn shutdown(&self) {
92		self.inner.cancel.cancel();
93
94		for child in self.inner.children.lock().iter() {
95			child.shutdown();
96		}
97
98		let wakers = mem::take(&mut *self.inner.wakers.lock());
99		for waker in &wakers {
100			waker();
101		}
102		drop(wakers);
103
104		self.inner.keepalive.lock().clear();
105	}
106
107	pub(crate) fn register_waker(&self, f: Arc<dyn Fn() + Send + Sync>) {
108		self.inner.wakers.lock().push(f);
109	}
110
111	pub(crate) fn register_keepalive(&self, cell: Box<dyn Any + Send + Sync>) {
112		self.inner.keepalive.lock().push(cell);
113	}
114
115	pub(crate) fn register_done_rx(&self, rx: Receiver<()>) {
116		self.inner.done_rxs.lock().push(rx);
117	}
118
119	pub fn join(&self) -> Result<(), JoinError> {
120		self.join_timeout(Duration::from_secs(5))
121	}
122
123	#[allow(clippy::disallowed_methods)]
124	pub fn join_timeout(&self, timeout: Duration) -> Result<(), JoinError> {
125		let deadline = time::Instant::now() + timeout;
126		let rxs: Vec<_> = mem::take(&mut *self.inner.done_rxs.lock());
127		for rx in rxs {
128			let remaining = deadline.saturating_duration_since(time::Instant::now());
129			match rx.recv_timeout(remaining) {
130				Ok(()) => {}
131				Err(CcRecvTimeoutError::Disconnected) => {}
132				Err(CcRecvTimeoutError::Timeout) => {
133					return Err(JoinError::new("timed out waiting for actors to stop"));
134				}
135			}
136		}
137		Ok(())
138	}
139
140	pub fn scheduler(&self) -> &SchedulerHandle {
141		&self.inner.scheduler
142	}
143
144	pub fn clock(&self) -> &Clock {
145		&self.inner.clock
146	}
147
148	pub fn spawn_system<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
149	where
150		A::State: Send,
151	{
152		pool::spawn_on_pool(self, name, actor, self.inner.pools.system_pool())
153	}
154
155	pub fn spawn_query<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
156	where
157		A::State: Send,
158	{
159		pool::spawn_on_pool(self, name, actor, self.inner.pools.query_pool())
160	}
161
162	pub fn spawn_commit<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
163	where
164		A::State: Send,
165	{
166		pool::spawn_on_pool(self, name, actor, self.inner.pools.commit_pool())
167	}
168
169	pub fn spawn_background<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
170	where
171		A::State: Send,
172	{
173		pool::spawn_on_pool(self, name, actor, self.inner.pools.background_pool())
174	}
175}
176
177impl Debug for ActorSystem {
178	fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
179		f.debug_struct("ActorSystem").field("cancelled", &self.is_cancelled()).finish_non_exhaustive()
180	}
181}
182
183pub type ActorHandle<M> = PoolActorHandle<M>;
184
185#[derive(Debug)]
186pub struct JoinError {
187	message: String,
188}
189
190impl JoinError {
191	pub fn new(message: impl Into<String>) -> Self {
192		Self {
193			message: message.into(),
194		}
195	}
196}
197
198impl fmt::Display for JoinError {
199	fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
200		write!(f, "actor join failed: {}", self.message)
201	}
202}
203
204impl error::Error for JoinError {}
205
206#[cfg(test)]
207mod tests {
208	use std::sync;
209
210	use super::*;
211	use crate::{
212		actor::{context::Context, traits::Directive},
213		pool::{PoolConfig, Pools},
214	};
215
216	fn test_system() -> ActorSystem {
217		let pools = Pools::new(PoolConfig::default());
218		ActorSystem::new(pools, Clock::Real)
219	}
220
221	struct CounterActor;
222
223	#[derive(Debug)]
224	enum CounterMessage {
225		Inc,
226		Get(sync::mpsc::Sender<i64>),
227		Stop,
228	}
229
230	impl Actor for CounterActor {
231		type State = i64;
232		type Message = CounterMessage;
233
234		fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
235			0
236		}
237
238		fn handle(
239			&self,
240			state: &mut Self::State,
241			msg: Self::Message,
242			_ctx: &Context<Self::Message>,
243		) -> Directive {
244			match msg {
245				CounterMessage::Inc => *state += 1,
246				CounterMessage::Get(tx) => {
247					let _ = tx.send(*state);
248				}
249				CounterMessage::Stop => return Directive::Stop,
250			}
251			Directive::Continue
252		}
253	}
254
255	#[test]
256	fn test_spawn_and_send() {
257		let system = test_system();
258		let handle = system.spawn_system("counter", CounterActor);
259
260		let actor_ref = handle.actor_ref().clone();
261		actor_ref.send(CounterMessage::Inc).unwrap();
262		actor_ref.send(CounterMessage::Inc).unwrap();
263		actor_ref.send(CounterMessage::Inc).unwrap();
264
265		let (tx, rx) = sync::mpsc::channel();
266		actor_ref.send(CounterMessage::Get(tx)).unwrap();
267
268		let value = rx.recv().unwrap();
269		assert_eq!(value, 3);
270
271		actor_ref.send(CounterMessage::Stop).unwrap();
272		handle.join().unwrap();
273	}
274
275	#[test]
276	fn test_shutdown_join() {
277		let system = test_system();
278
279		// Spawn several actors
280		for i in 0..5 {
281			system.spawn_system(&format!("counter-{i}"), CounterActor);
282		}
283
284		// Shutdown cancels all actors; join waits for them to finish
285		system.shutdown();
286		system.join().unwrap();
287	}
288}