1use std::{
5 sync::{
6 Arc,
7 atomic::{AtomicBool, Ordering},
8 },
9 time::Duration,
10};
11
12#[cfg(reifydb_target = "dst")]
13use crate::actor::timers::dst as dst_timers;
14#[cfg(reifydb_target = "wasi")]
15use crate::actor::timers::wasi::{schedule_once_fn, schedule_repeat, schedule_repeat_fn};
16#[cfg(reifydb_target = "wasm")]
17use crate::actor::timers::wasm::{schedule_once_fn, schedule_repeat, schedule_repeat_fn};
18use crate::actor::{mailbox::ActorRef, system::ActorSystem, timers::TimerHandle};
19
20#[derive(Clone)]
21pub struct CancellationToken {
22 cancelled: Arc<AtomicBool>,
23 parent: Option<Arc<AtomicBool>>,
24}
25
26impl CancellationToken {
27 pub fn new() -> Self {
28 Self {
29 cancelled: Arc::new(AtomicBool::new(false)),
30 parent: None,
31 }
32 }
33
34 pub fn child_token(&self) -> Self {
35 Self {
36 cancelled: Arc::new(AtomicBool::new(false)),
37 parent: Some(Arc::clone(&self.cancelled)),
38 }
39 }
40
41 pub fn cancel(&self) {
42 self.cancelled.store(true, Ordering::SeqCst);
43 }
44
45 pub fn is_cancelled(&self) -> bool {
46 self.cancelled.load(Ordering::SeqCst) || self.parent.as_ref().is_some_and(|p| p.load(Ordering::SeqCst))
47 }
48}
49
50impl Default for CancellationToken {
51 fn default() -> Self {
52 Self::new()
53 }
54}
55
56pub struct Context<M> {
57 self_ref: ActorRef<M>,
58 system: ActorSystem,
59 cancel: CancellationToken,
60}
61
62impl<M: Send + 'static> Context<M> {
63 pub fn new(self_ref: ActorRef<M>, system: ActorSystem, cancel: CancellationToken) -> Self {
64 Self {
65 self_ref,
66 system,
67 cancel,
68 }
69 }
70
71 pub fn self_ref(&self) -> ActorRef<M> {
72 self.self_ref.clone()
73 }
74
75 pub fn system(&self) -> &ActorSystem {
76 &self.system
77 }
78
79 pub fn is_cancelled(&self) -> bool {
80 self.cancel.is_cancelled()
81 }
82
83 pub fn cancellation_token(&self) -> CancellationToken {
84 self.cancel.clone()
85 }
86}
87
88impl<M: Send + 'static> Context<M> {
89 #[cfg(not(reifydb_single_threaded))]
90 pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
91 let actor_ref = self.self_ref.clone();
92 self.system.scheduler().schedule_once(delay, move || {
93 let _ = actor_ref.send(factory());
94 })
95 }
96
97 #[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
98 pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
99 schedule_once_fn(self.self_ref.clone(), delay, factory)
100 }
101
102 #[cfg(reifydb_target = "dst")]
103 pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
104 dst_timers::schedule_once_fn(
105 self.system.timer_heap(),
106 self.system.mock_clock(),
107 self.self_ref.clone(),
108 delay,
109 factory,
110 )
111 }
112}
113
114impl<M: Send + Sync + Clone + 'static> Context<M> {
115 #[cfg(not(reifydb_single_threaded))]
116 pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
117 let actor_ref = self.self_ref.clone();
118 self.system.scheduler().schedule_repeat(interval, move || actor_ref.send(msg.clone()).is_ok())
119 }
120
121 #[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
122 pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
123 schedule_repeat(self.self_ref.clone(), interval, msg)
124 }
125
126 #[cfg(reifydb_target = "dst")]
127 pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
128 dst_timers::schedule_repeat(
129 self.system.timer_heap(),
130 self.system.mock_clock(),
131 self.self_ref.clone(),
132 interval,
133 msg,
134 )
135 }
136
137 #[cfg(not(reifydb_single_threaded))]
138 pub fn schedule_repeat_fn<F: Fn() -> M + Send + Sync + 'static>(
139 &self,
140 interval: Duration,
141 factory: F,
142 ) -> TimerHandle {
143 let actor_ref = self.self_ref.clone();
144 self.system.scheduler().schedule_repeat(interval, move || actor_ref.send(factory()).is_ok())
145 }
146
147 #[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
148 pub fn schedule_repeat_fn<F: Fn() -> M + Send + Sync + 'static>(
149 &self,
150 interval: Duration,
151 factory: F,
152 ) -> TimerHandle {
153 schedule_repeat_fn(self.self_ref.clone(), interval, factory)
154 }
155
156 #[cfg(reifydb_target = "dst")]
157 pub fn schedule_repeat_fn<F: Fn() -> M + Send + Sync + 'static>(
158 &self,
159 interval: Duration,
160 factory: F,
161 ) -> TimerHandle {
162 dst_timers::schedule_repeat_fn(
163 self.system.timer_heap(),
164 self.system.mock_clock(),
165 self.self_ref.clone(),
166 interval,
167 factory,
168 )
169 }
170
171 pub fn schedule_tick<F: Fn(u64) -> M + Send + Sync + 'static>(
172 &self,
173 interval: Duration,
174 factory: F,
175 ) -> TimerHandle {
176 let actor_ref = self.self_ref.clone();
177 let clock = self.system.clock().clone();
178
179 #[cfg(not(reifydb_single_threaded))]
180 {
181 self.system.scheduler().schedule_repeat(interval, move || {
182 let now = clock.now_nanos();
183 actor_ref.send(factory(now)).is_ok()
184 })
185 }
186
187 #[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
188 {
189 schedule_repeat_fn(actor_ref, interval, move || {
190 let now = clock.now_nanos();
191 factory(now)
192 })
193 }
194
195 #[cfg(reifydb_target = "dst")]
196 {
197 dst_timers::schedule_repeat_fn(
198 self.system.timer_heap(),
199 self.system.mock_clock(),
200 actor_ref,
201 interval,
202 move || {
203 let now = clock.now_nanos();
204 factory(now)
205 },
206 )
207 }
208 }
209}
210
211impl<M> Clone for Context<M> {
212 fn clone(&self) -> Self {
213 Self {
214 self_ref: self.self_ref.clone(),
215 system: self.system.clone(),
216 cancel: self.cancel.clone(),
217 }
218 }
219}