Skip to main content

reifydb_runtime/actor/
context.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Actor execution context.
5//!
6//! The context provides actors with access to:
7//! - Self reference for receiving messages
8//! - Actor system for spawning child actors
9//! - Cancellation status for graceful shutdown
10//! - Timer scheduling (when enabled)
11
12use std::{
13	sync::{
14		Arc,
15		atomic::{AtomicBool, Ordering},
16	},
17	time::Duration,
18};
19
20#[cfg(reifydb_target = "dst")]
21use crate::actor::timers::dst as dst_timers;
22#[cfg(reifydb_target = "wasi")]
23use crate::actor::timers::wasi::{schedule_once_fn, schedule_repeat, schedule_repeat_fn};
24#[cfg(reifydb_target = "wasm")]
25use crate::actor::timers::wasm::{schedule_once_fn, schedule_repeat, schedule_repeat_fn};
26use crate::actor::{mailbox::ActorRef, system::ActorSystem, timers::TimerHandle};
27
28/// A cancellation token for signaling shutdown.
29///
30/// This is a simple atomic boolean that can be shared across actors.
31/// Supports hierarchical cancellation: a child token is considered
32/// cancelled when its parent is cancelled.
33#[derive(Clone)]
34pub struct CancellationToken {
35	cancelled: Arc<AtomicBool>,
36	parent: Option<Arc<AtomicBool>>,
37}
38
39impl CancellationToken {
40	/// Create a new cancellation token.
41	pub fn new() -> Self {
42		Self {
43			cancelled: Arc::new(AtomicBool::new(false)),
44			parent: None,
45		}
46	}
47
48	/// Create a child token that is cancelled when this token is cancelled.
49	///
50	/// Cancelling the child does NOT cancel the parent.
51	pub fn child_token(&self) -> Self {
52		Self {
53			cancelled: Arc::new(AtomicBool::new(false)),
54			parent: Some(Arc::clone(&self.cancelled)),
55		}
56	}
57
58	/// Signal cancellation.
59	pub fn cancel(&self) {
60		self.cancelled.store(true, Ordering::SeqCst);
61	}
62
63	/// Check if cancellation was requested (on this token or its parent).
64	pub fn is_cancelled(&self) -> bool {
65		self.cancelled.load(Ordering::SeqCst) || self.parent.as_ref().is_some_and(|p| p.load(Ordering::SeqCst))
66	}
67}
68
69impl Default for CancellationToken {
70	fn default() -> Self {
71		Self::new()
72	}
73}
74
75/// Context provided to actors during execution.
76///
77/// Provides access to:
78/// - Self reference (to give to other actors)
79/// - Actor system (to spawn child actors and run compute)
80/// - Cancellation (for graceful shutdown)
81pub struct Context<M> {
82	self_ref: ActorRef<M>,
83	system: ActorSystem,
84	cancel: CancellationToken,
85}
86
87impl<M: Send + 'static> Context<M> {
88	/// Create a new context.
89	pub fn new(self_ref: ActorRef<M>, system: ActorSystem, cancel: CancellationToken) -> Self {
90		Self {
91			self_ref,
92			system,
93			cancel,
94		}
95	}
96
97	/// Get a reference to send messages to self.
98	pub fn self_ref(&self) -> ActorRef<M> {
99		self.self_ref.clone()
100	}
101
102	/// Get the actor system (for spawning child actors).
103	pub fn system(&self) -> &ActorSystem {
104		&self.system
105	}
106
107	/// Check if shutdown was requested.
108	pub fn is_cancelled(&self) -> bool {
109		self.cancel.is_cancelled()
110	}
111
112	/// Get the cancellation token.
113	pub fn cancellation_token(&self) -> CancellationToken {
114		self.cancel.clone()
115	}
116}
117
118impl<M: Send + 'static> Context<M> {
119	/// Schedule a message to be sent to this actor after a delay.
120	///
121	/// Uses a factory function to create the message, so `M` doesn't need to be `Clone`.
122	/// Returns a handle that can be used to cancel the timer.
123	#[cfg(not(reifydb_single_threaded))]
124	pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
125		let actor_ref = self.self_ref.clone();
126		self.system.scheduler().schedule_once(delay, move || {
127			let _ = actor_ref.send(factory());
128		})
129	}
130
131	/// Schedule a message to be sent to this actor after a delay.
132	///
133	/// Uses a factory function to create the message, so `M` doesn't need to be `Clone`.
134	/// Returns a handle that can be used to cancel the timer.
135	#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
136	pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
137		schedule_once_fn(self.self_ref.clone(), delay, factory)
138	}
139
140	/// Schedule a message to be sent to this actor after a delay (DST).
141	#[cfg(reifydb_target = "dst")]
142	pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
143		dst_timers::schedule_once_fn(
144			self.system.timer_heap(),
145			self.system.mock_clock(),
146			self.self_ref.clone(),
147			delay,
148			factory,
149		)
150	}
151}
152
153impl<M: Send + Sync + Clone + 'static> Context<M> {
154	/// Schedule a message to be sent to this actor repeatedly at an interval.
155	///
156	/// The timer continues until cancelled or the actor is dropped.
157	/// Returns a handle that can be used to cancel the timer.
158	#[cfg(not(reifydb_single_threaded))]
159	pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
160		let actor_ref = self.self_ref.clone();
161		self.system.scheduler().schedule_repeat(interval, move || actor_ref.send(msg.clone()).is_ok())
162	}
163
164	/// Schedule a message to be sent to this actor repeatedly at an interval.
165	///
166	/// The timer continues until cancelled or the actor is dropped.
167	/// Returns a handle that can be used to cancel the timer.
168	#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
169	pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
170		schedule_repeat(self.self_ref.clone(), interval, msg)
171	}
172
173	/// Schedule a message to be sent to this actor repeatedly at an interval (DST).
174	#[cfg(reifydb_target = "dst")]
175	pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
176		dst_timers::schedule_repeat(
177			self.system.timer_heap(),
178			self.system.mock_clock(),
179			self.self_ref.clone(),
180			interval,
181			msg,
182		)
183	}
184
185	/// Schedule a message to be sent to this actor repeatedly at an interval.
186	///
187	/// Uses a factory function to create the message, so `M` doesn't need to be `Clone`.
188	/// The timer continues until cancelled or the actor is dropped.
189	/// Returns a handle that can be used to cancel the timer.
190	#[cfg(not(reifydb_single_threaded))]
191	pub fn schedule_repeat_fn<F: Fn() -> M + Send + Sync + 'static>(
192		&self,
193		interval: Duration,
194		factory: F,
195	) -> TimerHandle {
196		let actor_ref = self.self_ref.clone();
197		self.system.scheduler().schedule_repeat(interval, move || actor_ref.send(factory()).is_ok())
198	}
199
200	/// Schedule a message to be sent to this actor repeatedly at an interval.
201	///
202	/// Uses a factory function to create the message, so `M` doesn't need to be `Clone`.
203	/// The timer continues until cancelled or the actor is dropped.
204	/// Returns a handle that can be used to cancel the timer.
205	#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
206	pub fn schedule_repeat_fn<F: Fn() -> M + Send + Sync + 'static>(
207		&self,
208		interval: Duration,
209		factory: F,
210	) -> TimerHandle {
211		schedule_repeat_fn(self.self_ref.clone(), interval, factory)
212	}
213
214	/// Schedule a message to be sent to this actor repeatedly at an interval (DST).
215	#[cfg(reifydb_target = "dst")]
216	pub fn schedule_repeat_fn<F: Fn() -> M + Send + Sync + 'static>(
217		&self,
218		interval: Duration,
219		factory: F,
220	) -> TimerHandle {
221		dst_timers::schedule_repeat_fn(
222			self.system.timer_heap(),
223			self.system.mock_clock(),
224			self.self_ref.clone(),
225			interval,
226			factory,
227		)
228	}
229
230	/// Schedule a periodic tick message that includes the current system time.
231	///
232	/// Uses the system clock to populate a timestamp (nanoseconds since epoch)
233	/// which is passed to the factory function on each tick.
234	pub fn schedule_tick<F: Fn(u64) -> M + Send + Sync + 'static>(
235		&self,
236		interval: Duration,
237		factory: F,
238	) -> TimerHandle {
239		let actor_ref = self.self_ref.clone();
240		let clock = self.system.clock().clone();
241
242		#[cfg(not(reifydb_single_threaded))]
243		{
244			self.system.scheduler().schedule_repeat(interval, move || {
245				let now = clock.now_nanos();
246				actor_ref.send(factory(now)).is_ok()
247			})
248		}
249
250		#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
251		{
252			schedule_repeat_fn(actor_ref, interval, move || {
253				let now = clock.now_nanos();
254				factory(now)
255			})
256		}
257
258		#[cfg(reifydb_target = "dst")]
259		{
260			dst_timers::schedule_repeat_fn(
261				self.system.timer_heap(),
262				self.system.mock_clock(),
263				actor_ref,
264				interval,
265				move || {
266					let now = clock.now_nanos();
267					factory(now)
268				},
269			)
270		}
271	}
272}
273
274impl<M> Clone for Context<M> {
275	fn clone(&self) -> Self {
276		Self {
277			self_ref: self.self_ref.clone(),
278			system: self.system.clone(),
279			cancel: self.cancel.clone(),
280		}
281	}
282}