1use std::{
5 sync::{
6 Arc,
7 atomic::{AtomicBool, Ordering},
8 },
9 time::Duration,
10};
11
12#[cfg(reifydb_target = "dst")]
13use crate::actor::timers::dst as dst_timers;
14#[cfg(reifydb_target = "wasi")]
15use crate::actor::timers::wasi::{schedule_once_fn, schedule_repeat, schedule_repeat_fn};
16#[cfg(reifydb_target = "wasm")]
17use crate::actor::timers::wasm::{schedule_once_fn, schedule_repeat, schedule_repeat_fn};
18use crate::actor::{mailbox::ActorRef, system::ActorSystem, timers::TimerHandle};
19
20#[derive(Clone)]
26pub struct CancellationToken {
27 cancelled: Arc<AtomicBool>,
28 parent: Option<Arc<AtomicBool>>,
29}
30
31impl CancellationToken {
32 pub fn new() -> Self {
34 Self {
35 cancelled: Arc::new(AtomicBool::new(false)),
36 parent: None,
37 }
38 }
39
40 pub fn child_token(&self) -> Self {
44 Self {
45 cancelled: Arc::new(AtomicBool::new(false)),
46 parent: Some(Arc::clone(&self.cancelled)),
47 }
48 }
49
50 pub fn cancel(&self) {
52 self.cancelled.store(true, Ordering::SeqCst);
53 }
54
55 pub fn is_cancelled(&self) -> bool {
57 self.cancelled.load(Ordering::SeqCst) || self.parent.as_ref().is_some_and(|p| p.load(Ordering::SeqCst))
58 }
59}
60
61impl Default for CancellationToken {
62 fn default() -> Self {
63 Self::new()
64 }
65}
66
67pub struct Context<M> {
74 self_ref: ActorRef<M>,
75 system: ActorSystem,
76 cancel: CancellationToken,
77}
78
79impl<M: Send + 'static> Context<M> {
80 pub fn new(self_ref: ActorRef<M>, system: ActorSystem, cancel: CancellationToken) -> Self {
82 Self {
83 self_ref,
84 system,
85 cancel,
86 }
87 }
88
89 pub fn self_ref(&self) -> ActorRef<M> {
91 self.self_ref.clone()
92 }
93
94 pub fn system(&self) -> &ActorSystem {
96 &self.system
97 }
98
99 pub fn is_cancelled(&self) -> bool {
101 self.cancel.is_cancelled()
102 }
103
104 pub fn cancellation_token(&self) -> CancellationToken {
106 self.cancel.clone()
107 }
108}
109
110impl<M: Send + 'static> Context<M> {
111 #[cfg(not(reifydb_single_threaded))]
116 pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
117 let actor_ref = self.self_ref.clone();
118 self.system.scheduler().schedule_once(delay, move || {
119 let _ = actor_ref.send(factory());
120 })
121 }
122
123 #[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
128 pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
129 schedule_once_fn(self.self_ref.clone(), delay, factory)
130 }
131
132 #[cfg(reifydb_target = "dst")]
134 pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
135 dst_timers::schedule_once_fn(
136 self.system.timer_heap(),
137 self.system.mock_clock(),
138 self.self_ref.clone(),
139 delay,
140 factory,
141 )
142 }
143}
144
145impl<M: Send + Sync + Clone + 'static> Context<M> {
146 #[cfg(not(reifydb_single_threaded))]
151 pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
152 let actor_ref = self.self_ref.clone();
153 self.system.scheduler().schedule_repeat(interval, move || actor_ref.send(msg.clone()).is_ok())
154 }
155
156 #[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
161 pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
162 schedule_repeat(self.self_ref.clone(), interval, msg)
163 }
164
165 #[cfg(reifydb_target = "dst")]
167 pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
168 dst_timers::schedule_repeat(
169 self.system.timer_heap(),
170 self.system.mock_clock(),
171 self.self_ref.clone(),
172 interval,
173 msg,
174 )
175 }
176
177 #[cfg(not(reifydb_single_threaded))]
183 pub fn schedule_repeat_fn<F: Fn() -> M + Send + Sync + 'static>(
184 &self,
185 interval: Duration,
186 factory: F,
187 ) -> TimerHandle {
188 let actor_ref = self.self_ref.clone();
189 self.system.scheduler().schedule_repeat(interval, move || actor_ref.send(factory()).is_ok())
190 }
191
192 #[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
198 pub fn schedule_repeat_fn<F: Fn() -> M + Send + Sync + 'static>(
199 &self,
200 interval: Duration,
201 factory: F,
202 ) -> TimerHandle {
203 schedule_repeat_fn(self.self_ref.clone(), interval, factory)
204 }
205
206 #[cfg(reifydb_target = "dst")]
208 pub fn schedule_repeat_fn<F: Fn() -> M + Send + Sync + 'static>(
209 &self,
210 interval: Duration,
211 factory: F,
212 ) -> TimerHandle {
213 dst_timers::schedule_repeat_fn(
214 self.system.timer_heap(),
215 self.system.mock_clock(),
216 self.self_ref.clone(),
217 interval,
218 factory,
219 )
220 }
221
222 pub fn schedule_tick<F: Fn(u64) -> M + Send + Sync + 'static>(
227 &self,
228 interval: Duration,
229 factory: F,
230 ) -> TimerHandle {
231 let actor_ref = self.self_ref.clone();
232 let clock = self.system.clock().clone();
233
234 #[cfg(not(reifydb_single_threaded))]
235 {
236 self.system.scheduler().schedule_repeat(interval, move || {
237 let now = clock.now_nanos();
238 actor_ref.send(factory(now)).is_ok()
239 })
240 }
241
242 #[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
243 {
244 schedule_repeat_fn(actor_ref, interval, move || {
245 let now = clock.now_nanos();
246 factory(now)
247 })
248 }
249
250 #[cfg(reifydb_target = "dst")]
251 {
252 dst_timers::schedule_repeat_fn(
253 self.system.timer_heap(),
254 self.system.mock_clock(),
255 actor_ref,
256 interval,
257 move || {
258 let now = clock.now_nanos();
259 factory(now)
260 },
261 )
262 }
263 }
264}
265
266impl<M> Clone for Context<M> {
267 fn clone(&self) -> Self {
268 Self {
269 self_ref: self.self_ref.clone(),
270 system: self.system.clone(),
271 cancel: self.cancel.clone(),
272 }
273 }
274}