Skip to main content

reifydb_runtime/actor/
context.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Actor execution context.
5//!
6//! The context provides actors with access to:
7//! - Self reference for receiving messages
8//! - Actor system for spawning child actors
9//! - Cancellation status for graceful shutdown
10//! - Timer scheduling (when enabled)
11
12use std::{
13	sync::{
14		Arc,
15		atomic::{AtomicBool, Ordering},
16	},
17	time::Duration,
18};
19
20#[cfg(reifydb_target = "wasi")]
21use crate::actor::timers::wasi::{schedule_once_fn, schedule_repeat};
22#[cfg(reifydb_target = "wasm")]
23use crate::actor::timers::wasm::{schedule_once_fn, schedule_repeat};
24use crate::actor::{mailbox::ActorRef, system::ActorSystem, timers::TimerHandle};
25
26/// A cancellation token for signaling shutdown.
27///
28/// This is a simple atomic boolean that can be shared across actors.
29#[derive(Clone)]
30pub struct CancellationToken {
31	cancelled: Arc<AtomicBool>,
32}
33
34impl CancellationToken {
35	/// Create a new cancellation token.
36	pub fn new() -> Self {
37		Self {
38			cancelled: Arc::new(AtomicBool::new(false)),
39		}
40	}
41
42	/// Signal cancellation.
43	pub fn cancel(&self) {
44		self.cancelled.store(true, Ordering::SeqCst);
45	}
46
47	/// Check if cancellation was requested.
48	pub fn is_cancelled(&self) -> bool {
49		self.cancelled.load(Ordering::SeqCst)
50	}
51}
52
53impl Default for CancellationToken {
54	fn default() -> Self {
55		Self::new()
56	}
57}
58
59/// Context provided to actors during execution.
60///
61/// Provides access to:
62/// - Self reference (to give to other actors)
63/// - Actor system (to spawn child actors and run compute)
64/// - Cancellation (for graceful shutdown)
65pub struct Context<M> {
66	self_ref: ActorRef<M>,
67	system: ActorSystem,
68	cancel: CancellationToken,
69}
70
71impl<M: Send + 'static> Context<M> {
72	/// Create a new context.
73	pub(crate) fn new(self_ref: ActorRef<M>, system: ActorSystem, cancel: CancellationToken) -> Self {
74		Self {
75			self_ref,
76			system,
77			cancel,
78		}
79	}
80
81	/// Get a reference to send messages to self.
82	pub fn self_ref(&self) -> ActorRef<M> {
83		self.self_ref.clone()
84	}
85
86	/// Get the actor system (for spawning child actors).
87	pub fn system(&self) -> &ActorSystem {
88		&self.system
89	}
90
91	/// Check if shutdown was requested.
92	pub fn is_cancelled(&self) -> bool {
93		self.cancel.is_cancelled()
94	}
95
96	/// Get the cancellation token.
97	pub fn cancellation_token(&self) -> CancellationToken {
98		self.cancel.clone()
99	}
100}
101
102impl<M: Send + 'static> Context<M> {
103	/// Schedule a message to be sent to this actor after a delay.
104	///
105	/// Uses a factory function to create the message, so `M` doesn't need to be `Clone`.
106	/// Returns a handle that can be used to cancel the timer.
107	#[cfg(not(reifydb_single_threaded))]
108	pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
109		let actor_ref = self.self_ref.clone();
110		self.system.scheduler().schedule_once(delay, move || {
111			let _ = actor_ref.send(factory());
112		})
113	}
114
115	/// Schedule a message to be sent to this actor after a delay.
116	///
117	/// Uses a factory function to create the message, so `M` doesn't need to be `Clone`.
118	/// Returns a handle that can be used to cancel the timer.
119	#[cfg(reifydb_single_threaded)]
120	pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
121		schedule_once_fn(self.self_ref.clone(), delay, factory)
122	}
123}
124
125impl<M: Send + Sync + Clone + 'static> Context<M> {
126	/// Schedule a message to be sent to this actor repeatedly at an interval.
127	///
128	/// The timer continues until cancelled or the actor is dropped.
129	/// Returns a handle that can be used to cancel the timer.
130	#[cfg(not(reifydb_single_threaded))]
131	pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
132		let actor_ref = self.self_ref.clone();
133		self.system.scheduler().schedule_repeat(interval, move || actor_ref.send(msg.clone()).is_ok())
134	}
135
136	/// Schedule a message to be sent to this actor repeatedly at an interval.
137	///
138	/// The timer continues until cancelled or the actor is dropped.
139	/// Returns a handle that can be used to cancel the timer.
140	#[cfg(reifydb_single_threaded)]
141	pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
142		schedule_repeat(self.self_ref.clone(), interval, msg)
143	}
144}
145
146impl<M> Clone for Context<M> {
147	fn clone(&self) -> Self {
148		Self {
149			self_ref: self.self_ref.clone(),
150			system: self.system.clone(),
151			cancel: self.cancel.clone(),
152		}
153	}
154}