Skip to main content

reifydb_runtime/actor/system/native/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4mod pool;
5
6use std::{
7	any::Any,
8	error, fmt,
9	fmt::{Debug, Formatter},
10	mem,
11	sync::{Arc, Mutex},
12	time,
13	time::Duration,
14};
15
16use crossbeam_channel::{Receiver, RecvTimeoutError as CcRecvTimeoutError};
17
18use crate::{
19	actor::{
20		context::CancellationToken, system::native::pool::PoolActorHandle, timers::scheduler::SchedulerHandle,
21		traits::Actor,
22	},
23	context::clock::Clock,
24	pool::Pools,
25};
26
27struct ActorSystemInner {
28	cancel: CancellationToken,
29	scheduler: SchedulerHandle,
30	clock: Clock,
31	pools: Pools,
32	wakers: Mutex<Vec<Arc<dyn Fn() + Send + Sync>>>,
33	keepalive: Mutex<Vec<Box<dyn Any + Send + Sync>>>,
34	done_rxs: Mutex<Vec<Receiver<()>>>,
35	children: Mutex<Vec<ActorSystem>>,
36}
37
38#[derive(Clone)]
39pub struct ActorSystem {
40	inner: Arc<ActorSystemInner>,
41}
42
43impl ActorSystem {
44	pub fn new(pools: Pools, clock: Clock) -> Self {
45		let scheduler = SchedulerHandle::new(pools.system_pool().clone());
46
47		Self {
48			inner: Arc::new(ActorSystemInner {
49				cancel: CancellationToken::new(),
50				scheduler,
51				clock,
52				pools,
53				wakers: Mutex::new(Vec::new()),
54				keepalive: Mutex::new(Vec::new()),
55				done_rxs: Mutex::new(Vec::new()),
56				children: Mutex::new(Vec::new()),
57			}),
58		}
59	}
60
61	pub fn scope(&self) -> Self {
62		let child = Self {
63			inner: Arc::new(ActorSystemInner {
64				cancel: self.inner.cancel.child_token(),
65				scheduler: self.inner.scheduler.shared(),
66				clock: self.inner.clock.clone(),
67				pools: self.inner.pools.clone(),
68				wakers: Mutex::new(Vec::new()),
69				keepalive: Mutex::new(Vec::new()),
70				done_rxs: Mutex::new(Vec::new()),
71				children: Mutex::new(Vec::new()),
72			}),
73		};
74		self.inner.children.lock().unwrap().push(child.clone());
75		child
76	}
77
78	pub fn pools(&self) -> Pools {
79		self.inner.pools.clone()
80	}
81
82	pub fn cancellation_token(&self) -> CancellationToken {
83		self.inner.cancel.clone()
84	}
85
86	pub fn is_cancelled(&self) -> bool {
87		self.inner.cancel.is_cancelled()
88	}
89
90	pub fn shutdown(&self) {
91		self.inner.cancel.cancel();
92
93		for child in self.inner.children.lock().unwrap().iter() {
94			child.shutdown();
95		}
96
97		let wakers = mem::take(&mut *self.inner.wakers.lock().unwrap());
98		for waker in &wakers {
99			waker();
100		}
101		drop(wakers);
102
103		self.inner.keepalive.lock().unwrap().clear();
104	}
105
106	pub(crate) fn register_waker(&self, f: Arc<dyn Fn() + Send + Sync>) {
107		self.inner.wakers.lock().unwrap().push(f);
108	}
109
110	pub(crate) fn register_keepalive(&self, cell: Box<dyn Any + Send + Sync>) {
111		self.inner.keepalive.lock().unwrap().push(cell);
112	}
113
114	pub(crate) fn register_done_rx(&self, rx: Receiver<()>) {
115		self.inner.done_rxs.lock().unwrap().push(rx);
116	}
117
118	pub fn join(&self) -> Result<(), JoinError> {
119		self.join_timeout(Duration::from_secs(5))
120	}
121
122	#[allow(clippy::disallowed_methods)]
123	pub fn join_timeout(&self, timeout: Duration) -> Result<(), JoinError> {
124		let deadline = time::Instant::now() + timeout;
125		let rxs: Vec<_> = mem::take(&mut *self.inner.done_rxs.lock().unwrap());
126		for rx in rxs {
127			let remaining = deadline.saturating_duration_since(time::Instant::now());
128			match rx.recv_timeout(remaining) {
129				Ok(()) => {}
130				Err(CcRecvTimeoutError::Disconnected) => {}
131				Err(CcRecvTimeoutError::Timeout) => {
132					return Err(JoinError::new("timed out waiting for actors to stop"));
133				}
134			}
135		}
136		Ok(())
137	}
138
139	pub fn scheduler(&self) -> &SchedulerHandle {
140		&self.inner.scheduler
141	}
142
143	pub fn clock(&self) -> &Clock {
144		&self.inner.clock
145	}
146
147	pub fn spawn_system<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
148	where
149		A::State: Send,
150	{
151		pool::spawn_on_pool(self, name, actor, self.inner.pools.system_pool())
152	}
153
154	pub fn spawn_query<A: Actor>(&self, name: &str, actor: A) -> ActorHandle<A::Message>
155	where
156		A::State: Send,
157	{
158		pool::spawn_on_pool(self, name, actor, self.inner.pools.query_pool())
159	}
160}
161
162impl Debug for ActorSystem {
163	fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
164		f.debug_struct("ActorSystem").field("cancelled", &self.is_cancelled()).finish_non_exhaustive()
165	}
166}
167
168pub type ActorHandle<M> = PoolActorHandle<M>;
169
170#[derive(Debug)]
171pub struct JoinError {
172	message: String,
173}
174
175impl JoinError {
176	pub fn new(message: impl Into<String>) -> Self {
177		Self {
178			message: message.into(),
179		}
180	}
181}
182
183impl fmt::Display for JoinError {
184	fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
185		write!(f, "actor join failed: {}", self.message)
186	}
187}
188
189impl error::Error for JoinError {}
190
191#[cfg(test)]
192mod tests {
193	use std::sync;
194
195	use super::*;
196	use crate::{
197		actor::{context::Context, traits::Directive},
198		pool::{PoolConfig, Pools},
199	};
200
201	fn test_system() -> ActorSystem {
202		let pools = Pools::new(PoolConfig::default());
203		ActorSystem::new(pools, Clock::Real)
204	}
205
206	struct CounterActor;
207
208	#[derive(Debug)]
209	enum CounterMessage {
210		Inc,
211		Get(sync::mpsc::Sender<i64>),
212		Stop,
213	}
214
215	impl Actor for CounterActor {
216		type State = i64;
217		type Message = CounterMessage;
218
219		fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
220			0
221		}
222
223		fn handle(
224			&self,
225			state: &mut Self::State,
226			msg: Self::Message,
227			_ctx: &Context<Self::Message>,
228		) -> Directive {
229			match msg {
230				CounterMessage::Inc => *state += 1,
231				CounterMessage::Get(tx) => {
232					let _ = tx.send(*state);
233				}
234				CounterMessage::Stop => return Directive::Stop,
235			}
236			Directive::Continue
237		}
238	}
239
240	#[test]
241	fn test_spawn_and_send() {
242		let system = test_system();
243		let handle = system.spawn_system("counter", CounterActor);
244
245		let actor_ref = handle.actor_ref().clone();
246		actor_ref.send(CounterMessage::Inc).unwrap();
247		actor_ref.send(CounterMessage::Inc).unwrap();
248		actor_ref.send(CounterMessage::Inc).unwrap();
249
250		let (tx, rx) = sync::mpsc::channel();
251		actor_ref.send(CounterMessage::Get(tx)).unwrap();
252
253		let value = rx.recv().unwrap();
254		assert_eq!(value, 3);
255
256		actor_ref.send(CounterMessage::Stop).unwrap();
257		handle.join().unwrap();
258	}
259
260	#[test]
261	fn test_shutdown_join() {
262		let system = test_system();
263
264		// Spawn several actors
265		for i in 0..5 {
266			system.spawn_system(&format!("counter-{i}"), CounterActor);
267		}
268
269		// Shutdown cancels all actors; join waits for them to finish
270		system.shutdown();
271		system.join().unwrap();
272	}
273}