Skip to main content

reifydb_runtime/actor/
context.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Actor execution context.
5//!
6//! The context provides actors with access to:
7//! - Self reference for receiving messages
8//! - Actor system for spawning child actors
9//! - Cancellation status for graceful shutdown
10//! - Timer scheduling (when enabled)
11
12use std::{
13	sync::{
14		Arc,
15		atomic::{AtomicBool, Ordering},
16	},
17	time::Duration,
18};
19
20#[cfg(reifydb_target = "wasi")]
21use crate::actor::timers::wasi::{schedule_once_fn, schedule_repeat, schedule_repeat_fn};
22#[cfg(reifydb_target = "wasm")]
23use crate::actor::timers::wasm::{schedule_once_fn, schedule_repeat, schedule_repeat_fn};
24use crate::actor::{mailbox::ActorRef, system::ActorSystem, timers::TimerHandle};
25
26/// A cancellation token for signaling shutdown.
27///
28/// This is a simple atomic boolean that can be shared across actors.
29#[derive(Clone)]
30pub struct CancellationToken {
31	cancelled: Arc<AtomicBool>,
32}
33
34impl CancellationToken {
35	/// Create a new cancellation token.
36	pub fn new() -> Self {
37		Self {
38			cancelled: Arc::new(AtomicBool::new(false)),
39		}
40	}
41
42	/// Signal cancellation.
43	pub fn cancel(&self) {
44		self.cancelled.store(true, Ordering::SeqCst);
45	}
46
47	/// Check if cancellation was requested.
48	pub fn is_cancelled(&self) -> bool {
49		self.cancelled.load(Ordering::SeqCst)
50	}
51}
52
53impl Default for CancellationToken {
54	fn default() -> Self {
55		Self::new()
56	}
57}
58
59/// Context provided to actors during execution.
60///
61/// Provides access to:
62/// - Self reference (to give to other actors)
63/// - Actor system (to spawn child actors and run compute)
64/// - Cancellation (for graceful shutdown)
65pub struct Context<M> {
66	self_ref: ActorRef<M>,
67	system: ActorSystem,
68	cancel: CancellationToken,
69}
70
71impl<M: Send + 'static> Context<M> {
72	/// Create a new context.
73	pub(crate) fn new(self_ref: ActorRef<M>, system: ActorSystem, cancel: CancellationToken) -> Self {
74		Self {
75			self_ref,
76			system,
77			cancel,
78		}
79	}
80
81	/// Get a reference to send messages to self.
82	pub fn self_ref(&self) -> ActorRef<M> {
83		self.self_ref.clone()
84	}
85
86	/// Get the actor system (for spawning child actors).
87	pub fn system(&self) -> &ActorSystem {
88		&self.system
89	}
90
91	/// Check if shutdown was requested.
92	pub fn is_cancelled(&self) -> bool {
93		self.cancel.is_cancelled()
94	}
95
96	/// Get the cancellation token.
97	pub fn cancellation_token(&self) -> CancellationToken {
98		self.cancel.clone()
99	}
100}
101
102impl<M: Send + 'static> Context<M> {
103	/// Schedule a message to be sent to this actor after a delay.
104	///
105	/// Uses a factory function to create the message, so `M` doesn't need to be `Clone`.
106	/// Returns a handle that can be used to cancel the timer.
107	#[cfg(not(reifydb_single_threaded))]
108	pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
109		let actor_ref = self.self_ref.clone();
110		self.system.scheduler().schedule_once(delay, move || {
111			let _ = actor_ref.send(factory());
112		})
113	}
114
115	/// Schedule a message to be sent to this actor after a delay.
116	///
117	/// Uses a factory function to create the message, so `M` doesn't need to be `Clone`.
118	/// Returns a handle that can be used to cancel the timer.
119	#[cfg(reifydb_single_threaded)]
120	pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
121		schedule_once_fn(self.self_ref.clone(), delay, factory)
122	}
123}
124
125impl<M: Send + Sync + Clone + 'static> Context<M> {
126	/// Schedule a message to be sent to this actor repeatedly at an interval.
127	///
128	/// The timer continues until cancelled or the actor is dropped.
129	/// Returns a handle that can be used to cancel the timer.
130	#[cfg(not(reifydb_single_threaded))]
131	pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
132		let actor_ref = self.self_ref.clone();
133		self.system.scheduler().schedule_repeat(interval, move || actor_ref.send(msg.clone()).is_ok())
134	}
135
136	/// Schedule a message to be sent to this actor repeatedly at an interval.
137	///
138	/// The timer continues until cancelled or the actor is dropped.
139	/// Returns a handle that can be used to cancel the timer.
140	#[cfg(reifydb_single_threaded)]
141	pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
142		schedule_repeat(self.self_ref.clone(), interval, msg)
143	}
144
145	/// Schedule a message to be sent to this actor repeatedly at an interval.
146	///
147	/// Uses a factory function to create the message, so `M` doesn't need to be `Clone`.
148	/// The timer continues until cancelled or the actor is dropped.
149	/// Returns a handle that can be used to cancel the timer.
150	#[cfg(not(reifydb_single_threaded))]
151	pub fn schedule_repeat_fn<F: Fn() -> M + Send + Sync + 'static>(
152		&self,
153		interval: Duration,
154		factory: F,
155	) -> TimerHandle {
156		let actor_ref = self.self_ref.clone();
157		self.system.scheduler().schedule_repeat(interval, move || actor_ref.send(factory()).is_ok())
158	}
159
160	/// Schedule a message to be sent to this actor repeatedly at an interval.
161	///
162	/// Uses a factory function to create the message, so `M` doesn't need to be `Clone`.
163	/// The timer continues until cancelled or the actor is dropped.
164	/// Returns a handle that can be used to cancel the timer.
165	#[cfg(reifydb_single_threaded)]
166	pub fn schedule_repeat_fn<F: Fn() -> M + Send + Sync + 'static>(
167		&self,
168		interval: Duration,
169		factory: F,
170	) -> TimerHandle {
171		schedule_repeat_fn(self.self_ref.clone(), interval, factory)
172	}
173
174	/// Schedule a periodic tick message that includes the current system time.
175	///
176	/// Uses the system clock to populate a timestamp (nanoseconds since epoch)
177	/// which is passed to the factory function on each tick.
178	pub fn schedule_tick<F: Fn(u64) -> M + Send + Sync + 'static>(
179		&self,
180		interval: Duration,
181		factory: F,
182	) -> TimerHandle {
183		let actor_ref = self.self_ref.clone();
184		let clock = self.system.clock().clone();
185
186		#[cfg(not(reifydb_single_threaded))]
187		{
188			self.system.scheduler().schedule_repeat(interval, move || {
189				let now = clock.now_nanos();
190				actor_ref.send(factory(now)).is_ok()
191			})
192		}
193
194		#[cfg(reifydb_single_threaded)]
195		{
196			schedule_repeat_fn(actor_ref, interval, move || {
197				let now = clock.now_nanos();
198				factory(now)
199			})
200		}
201	}
202}
203
204impl<M> Clone for Context<M> {
205	fn clone(&self) -> Self {
206		Self {
207			self_ref: self.self_ref.clone(),
208			system: self.system.clone(),
209			cancel: self.cancel.clone(),
210		}
211	}
212}