1use std::{
13 sync::{
14 Arc,
15 atomic::{AtomicBool, Ordering},
16 },
17 time::Duration,
18};
19
20#[cfg(reifydb_target = "dst")]
21use crate::actor::timers::dst as dst_timers;
22#[cfg(reifydb_target = "wasi")]
23use crate::actor::timers::wasi::{schedule_once_fn, schedule_repeat, schedule_repeat_fn};
24#[cfg(reifydb_target = "wasm")]
25use crate::actor::timers::wasm::{schedule_once_fn, schedule_repeat, schedule_repeat_fn};
26use crate::actor::{mailbox::ActorRef, system::ActorSystem, timers::TimerHandle};
27
28#[derive(Clone)]
34pub struct CancellationToken {
35 cancelled: Arc<AtomicBool>,
36 parent: Option<Arc<AtomicBool>>,
37}
38
39impl CancellationToken {
40 pub fn new() -> Self {
42 Self {
43 cancelled: Arc::new(AtomicBool::new(false)),
44 parent: None,
45 }
46 }
47
48 pub fn child_token(&self) -> Self {
52 Self {
53 cancelled: Arc::new(AtomicBool::new(false)),
54 parent: Some(Arc::clone(&self.cancelled)),
55 }
56 }
57
58 pub fn cancel(&self) {
60 self.cancelled.store(true, Ordering::SeqCst);
61 }
62
63 pub fn is_cancelled(&self) -> bool {
65 self.cancelled.load(Ordering::SeqCst) || self.parent.as_ref().is_some_and(|p| p.load(Ordering::SeqCst))
66 }
67}
68
69impl Default for CancellationToken {
70 fn default() -> Self {
71 Self::new()
72 }
73}
74
75pub struct Context<M> {
82 self_ref: ActorRef<M>,
83 system: ActorSystem,
84 cancel: CancellationToken,
85}
86
87impl<M: Send + 'static> Context<M> {
88 pub fn new(self_ref: ActorRef<M>, system: ActorSystem, cancel: CancellationToken) -> Self {
90 Self {
91 self_ref,
92 system,
93 cancel,
94 }
95 }
96
97 pub fn self_ref(&self) -> ActorRef<M> {
99 self.self_ref.clone()
100 }
101
102 pub fn system(&self) -> &ActorSystem {
104 &self.system
105 }
106
107 pub fn is_cancelled(&self) -> bool {
109 self.cancel.is_cancelled()
110 }
111
112 pub fn cancellation_token(&self) -> CancellationToken {
114 self.cancel.clone()
115 }
116}
117
118impl<M: Send + 'static> Context<M> {
119 #[cfg(not(reifydb_single_threaded))]
124 pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
125 let actor_ref = self.self_ref.clone();
126 self.system.scheduler().schedule_once(delay, move || {
127 let _ = actor_ref.send(factory());
128 })
129 }
130
131 #[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
136 pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
137 schedule_once_fn(self.self_ref.clone(), delay, factory)
138 }
139
140 #[cfg(reifydb_target = "dst")]
142 pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
143 dst_timers::schedule_once_fn(
144 self.system.timer_heap(),
145 self.system.mock_clock(),
146 self.self_ref.clone(),
147 delay,
148 factory,
149 )
150 }
151}
152
153impl<M: Send + Sync + Clone + 'static> Context<M> {
154 #[cfg(not(reifydb_single_threaded))]
159 pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
160 let actor_ref = self.self_ref.clone();
161 self.system.scheduler().schedule_repeat(interval, move || actor_ref.send(msg.clone()).is_ok())
162 }
163
164 #[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
169 pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
170 schedule_repeat(self.self_ref.clone(), interval, msg)
171 }
172
173 #[cfg(reifydb_target = "dst")]
175 pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
176 dst_timers::schedule_repeat(
177 self.system.timer_heap(),
178 self.system.mock_clock(),
179 self.self_ref.clone(),
180 interval,
181 msg,
182 )
183 }
184
185 #[cfg(not(reifydb_single_threaded))]
191 pub fn schedule_repeat_fn<F: Fn() -> M + Send + Sync + 'static>(
192 &self,
193 interval: Duration,
194 factory: F,
195 ) -> TimerHandle {
196 let actor_ref = self.self_ref.clone();
197 self.system.scheduler().schedule_repeat(interval, move || actor_ref.send(factory()).is_ok())
198 }
199
200 #[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
206 pub fn schedule_repeat_fn<F: Fn() -> M + Send + Sync + 'static>(
207 &self,
208 interval: Duration,
209 factory: F,
210 ) -> TimerHandle {
211 schedule_repeat_fn(self.self_ref.clone(), interval, factory)
212 }
213
214 #[cfg(reifydb_target = "dst")]
216 pub fn schedule_repeat_fn<F: Fn() -> M + Send + Sync + 'static>(
217 &self,
218 interval: Duration,
219 factory: F,
220 ) -> TimerHandle {
221 dst_timers::schedule_repeat_fn(
222 self.system.timer_heap(),
223 self.system.mock_clock(),
224 self.self_ref.clone(),
225 interval,
226 factory,
227 )
228 }
229
230 pub fn schedule_tick<F: Fn(u64) -> M + Send + Sync + 'static>(
235 &self,
236 interval: Duration,
237 factory: F,
238 ) -> TimerHandle {
239 let actor_ref = self.self_ref.clone();
240 let clock = self.system.clock().clone();
241
242 #[cfg(not(reifydb_single_threaded))]
243 {
244 self.system.scheduler().schedule_repeat(interval, move || {
245 let now = clock.now_nanos();
246 actor_ref.send(factory(now)).is_ok()
247 })
248 }
249
250 #[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
251 {
252 schedule_repeat_fn(actor_ref, interval, move || {
253 let now = clock.now_nanos();
254 factory(now)
255 })
256 }
257
258 #[cfg(reifydb_target = "dst")]
259 {
260 dst_timers::schedule_repeat_fn(
261 self.system.timer_heap(),
262 self.system.mock_clock(),
263 actor_ref,
264 interval,
265 move || {
266 let now = clock.now_nanos();
267 factory(now)
268 },
269 )
270 }
271 }
272}
273
274impl<M> Clone for Context<M> {
275 fn clone(&self) -> Self {
276 Self {
277 self_ref: self.self_ref.clone(),
278 system: self.system.clone(),
279 cancel: self.cancel.clone(),
280 }
281 }
282}