Skip to main content

reifydb_runtime/actor/
context.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	sync::{
6		Arc,
7		atomic::{AtomicBool, Ordering},
8	},
9	time::Duration,
10};
11
12#[cfg(reifydb_target = "dst")]
13use crate::actor::timers::dst as dst_timers;
14#[cfg(reifydb_target = "wasi")]
15use crate::actor::timers::wasi::{schedule_once_fn, schedule_repeat, schedule_repeat_fn};
16#[cfg(reifydb_target = "wasm")]
17use crate::actor::timers::wasm::{schedule_once_fn, schedule_repeat, schedule_repeat_fn};
18use crate::actor::{mailbox::ActorRef, system::ActorSystem, timers::TimerHandle};
19
20/// A cancellation token for signaling shutdown.
21///
22/// This is a simple atomic boolean that can be shared across actors.
23/// Supports hierarchical cancellation: a child token is considered
24/// cancelled when its parent is cancelled.
25#[derive(Clone)]
26pub struct CancellationToken {
27	cancelled: Arc<AtomicBool>,
28	parent: Option<Arc<AtomicBool>>,
29}
30
31impl CancellationToken {
32	/// Create a new cancellation token.
33	pub fn new() -> Self {
34		Self {
35			cancelled: Arc::new(AtomicBool::new(false)),
36			parent: None,
37		}
38	}
39
40	/// Create a child token that is cancelled when this token is cancelled.
41	///
42	/// Cancelling the child does NOT cancel the parent.
43	pub fn child_token(&self) -> Self {
44		Self {
45			cancelled: Arc::new(AtomicBool::new(false)),
46			parent: Some(Arc::clone(&self.cancelled)),
47		}
48	}
49
50	/// Signal cancellation.
51	pub fn cancel(&self) {
52		self.cancelled.store(true, Ordering::SeqCst);
53	}
54
55	/// Check if cancellation was requested (on this token or its parent).
56	pub fn is_cancelled(&self) -> bool {
57		self.cancelled.load(Ordering::SeqCst) || self.parent.as_ref().is_some_and(|p| p.load(Ordering::SeqCst))
58	}
59}
60
61impl Default for CancellationToken {
62	fn default() -> Self {
63		Self::new()
64	}
65}
66
67/// Context provided to actors during execution.
68///
69/// Provides access to:
70/// - Self reference (to give to other actors)
71/// - Actor system (to spawn child actors and run compute)
72/// - Cancellation (for graceful shutdown)
73pub struct Context<M> {
74	self_ref: ActorRef<M>,
75	system: ActorSystem,
76	cancel: CancellationToken,
77}
78
79impl<M: Send + 'static> Context<M> {
80	/// Create a new context.
81	pub fn new(self_ref: ActorRef<M>, system: ActorSystem, cancel: CancellationToken) -> Self {
82		Self {
83			self_ref,
84			system,
85			cancel,
86		}
87	}
88
89	/// Get a reference to send messages to self.
90	pub fn self_ref(&self) -> ActorRef<M> {
91		self.self_ref.clone()
92	}
93
94	/// Get the actor system (for spawning child actors).
95	pub fn system(&self) -> &ActorSystem {
96		&self.system
97	}
98
99	/// Check if shutdown was requested.
100	pub fn is_cancelled(&self) -> bool {
101		self.cancel.is_cancelled()
102	}
103
104	/// Get the cancellation token.
105	pub fn cancellation_token(&self) -> CancellationToken {
106		self.cancel.clone()
107	}
108}
109
110impl<M: Send + 'static> Context<M> {
111	/// Schedule a message to be sent to this actor after a delay.
112	///
113	/// Uses a factory function to create the message, so `M` doesn't need to be `Clone`.
114	/// Returns a handle that can be used to cancel the timer.
115	#[cfg(not(reifydb_single_threaded))]
116	pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
117		let actor_ref = self.self_ref.clone();
118		self.system.scheduler().schedule_once(delay, move || {
119			let _ = actor_ref.send(factory());
120		})
121	}
122
123	/// Schedule a message to be sent to this actor after a delay.
124	///
125	/// Uses a factory function to create the message, so `M` doesn't need to be `Clone`.
126	/// Returns a handle that can be used to cancel the timer.
127	#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
128	pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
129		schedule_once_fn(self.self_ref.clone(), delay, factory)
130	}
131
132	/// Schedule a message to be sent to this actor after a delay (DST).
133	#[cfg(reifydb_target = "dst")]
134	pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
135		dst_timers::schedule_once_fn(
136			self.system.timer_heap(),
137			self.system.mock_clock(),
138			self.self_ref.clone(),
139			delay,
140			factory,
141		)
142	}
143}
144
145impl<M: Send + Sync + Clone + 'static> Context<M> {
146	/// Schedule a message to be sent to this actor repeatedly at an interval.
147	///
148	/// The timer continues until cancelled or the actor is dropped.
149	/// Returns a handle that can be used to cancel the timer.
150	#[cfg(not(reifydb_single_threaded))]
151	pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
152		let actor_ref = self.self_ref.clone();
153		self.system.scheduler().schedule_repeat(interval, move || actor_ref.send(msg.clone()).is_ok())
154	}
155
156	/// Schedule a message to be sent to this actor repeatedly at an interval.
157	///
158	/// The timer continues until cancelled or the actor is dropped.
159	/// Returns a handle that can be used to cancel the timer.
160	#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
161	pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
162		schedule_repeat(self.self_ref.clone(), interval, msg)
163	}
164
165	/// Schedule a message to be sent to this actor repeatedly at an interval (DST).
166	#[cfg(reifydb_target = "dst")]
167	pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
168		dst_timers::schedule_repeat(
169			self.system.timer_heap(),
170			self.system.mock_clock(),
171			self.self_ref.clone(),
172			interval,
173			msg,
174		)
175	}
176
177	/// Schedule a message to be sent to this actor repeatedly at an interval.
178	///
179	/// Uses a factory function to create the message, so `M` doesn't need to be `Clone`.
180	/// The timer continues until cancelled or the actor is dropped.
181	/// Returns a handle that can be used to cancel the timer.
182	#[cfg(not(reifydb_single_threaded))]
183	pub fn schedule_repeat_fn<F: Fn() -> M + Send + Sync + 'static>(
184		&self,
185		interval: Duration,
186		factory: F,
187	) -> TimerHandle {
188		let actor_ref = self.self_ref.clone();
189		self.system.scheduler().schedule_repeat(interval, move || actor_ref.send(factory()).is_ok())
190	}
191
192	/// Schedule a message to be sent to this actor repeatedly at an interval.
193	///
194	/// Uses a factory function to create the message, so `M` doesn't need to be `Clone`.
195	/// The timer continues until cancelled or the actor is dropped.
196	/// Returns a handle that can be used to cancel the timer.
197	#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
198	pub fn schedule_repeat_fn<F: Fn() -> M + Send + Sync + 'static>(
199		&self,
200		interval: Duration,
201		factory: F,
202	) -> TimerHandle {
203		schedule_repeat_fn(self.self_ref.clone(), interval, factory)
204	}
205
206	/// Schedule a message to be sent to this actor repeatedly at an interval (DST).
207	#[cfg(reifydb_target = "dst")]
208	pub fn schedule_repeat_fn<F: Fn() -> M + Send + Sync + 'static>(
209		&self,
210		interval: Duration,
211		factory: F,
212	) -> TimerHandle {
213		dst_timers::schedule_repeat_fn(
214			self.system.timer_heap(),
215			self.system.mock_clock(),
216			self.self_ref.clone(),
217			interval,
218			factory,
219		)
220	}
221
222	/// Schedule a periodic tick message that includes the current system time.
223	///
224	/// Uses the system clock to populate a timestamp (nanoseconds since epoch)
225	/// which is passed to the factory function on each tick.
226	pub fn schedule_tick<F: Fn(u64) -> M + Send + Sync + 'static>(
227		&self,
228		interval: Duration,
229		factory: F,
230	) -> TimerHandle {
231		let actor_ref = self.self_ref.clone();
232		let clock = self.system.clock().clone();
233
234		#[cfg(not(reifydb_single_threaded))]
235		{
236			self.system.scheduler().schedule_repeat(interval, move || {
237				let now = clock.now_nanos();
238				actor_ref.send(factory(now)).is_ok()
239			})
240		}
241
242		#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
243		{
244			schedule_repeat_fn(actor_ref, interval, move || {
245				let now = clock.now_nanos();
246				factory(now)
247			})
248		}
249
250		#[cfg(reifydb_target = "dst")]
251		{
252			dst_timers::schedule_repeat_fn(
253				self.system.timer_heap(),
254				self.system.mock_clock(),
255				actor_ref,
256				interval,
257				move || {
258					let now = clock.now_nanos();
259					factory(now)
260				},
261			)
262		}
263	}
264}
265
266impl<M> Clone for Context<M> {
267	fn clone(&self) -> Self {
268		Self {
269			self_ref: self.self_ref.clone(),
270			system: self.system.clone(),
271			cancel: self.cancel.clone(),
272		}
273	}
274}