reifydb_runtime/actor/
context.rs1use std::{
13 sync::{
14 Arc,
15 atomic::{AtomicBool, Ordering},
16 },
17 time::Duration,
18};
19
20#[cfg(reifydb_target = "wasi")]
21use crate::actor::timers::wasi::{schedule_once_fn, schedule_repeat, schedule_repeat_fn};
22#[cfg(reifydb_target = "wasm")]
23use crate::actor::timers::wasm::{schedule_once_fn, schedule_repeat, schedule_repeat_fn};
24use crate::actor::{mailbox::ActorRef, system::ActorSystem, timers::TimerHandle};
25
26#[derive(Clone)]
30pub struct CancellationToken {
31 cancelled: Arc<AtomicBool>,
32}
33
34impl CancellationToken {
35 pub fn new() -> Self {
37 Self {
38 cancelled: Arc::new(AtomicBool::new(false)),
39 }
40 }
41
42 pub fn cancel(&self) {
44 self.cancelled.store(true, Ordering::SeqCst);
45 }
46
47 pub fn is_cancelled(&self) -> bool {
49 self.cancelled.load(Ordering::SeqCst)
50 }
51}
52
53impl Default for CancellationToken {
54 fn default() -> Self {
55 Self::new()
56 }
57}
58
59pub struct Context<M> {
66 self_ref: ActorRef<M>,
67 system: ActorSystem,
68 cancel: CancellationToken,
69}
70
71impl<M: Send + 'static> Context<M> {
72 pub(crate) fn new(self_ref: ActorRef<M>, system: ActorSystem, cancel: CancellationToken) -> Self {
74 Self {
75 self_ref,
76 system,
77 cancel,
78 }
79 }
80
81 pub fn self_ref(&self) -> ActorRef<M> {
83 self.self_ref.clone()
84 }
85
86 pub fn system(&self) -> &ActorSystem {
88 &self.system
89 }
90
91 pub fn is_cancelled(&self) -> bool {
93 self.cancel.is_cancelled()
94 }
95
96 pub fn cancellation_token(&self) -> CancellationToken {
98 self.cancel.clone()
99 }
100}
101
102impl<M: Send + 'static> Context<M> {
103 #[cfg(not(reifydb_single_threaded))]
108 pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
109 let actor_ref = self.self_ref.clone();
110 self.system.scheduler().schedule_once(delay, move || {
111 let _ = actor_ref.send(factory());
112 })
113 }
114
115 #[cfg(reifydb_single_threaded)]
120 pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
121 schedule_once_fn(self.self_ref.clone(), delay, factory)
122 }
123}
124
125impl<M: Send + Sync + Clone + 'static> Context<M> {
126 #[cfg(not(reifydb_single_threaded))]
131 pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
132 let actor_ref = self.self_ref.clone();
133 self.system.scheduler().schedule_repeat(interval, move || actor_ref.send(msg.clone()).is_ok())
134 }
135
136 #[cfg(reifydb_single_threaded)]
141 pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
142 schedule_repeat(self.self_ref.clone(), interval, msg)
143 }
144
145 #[cfg(not(reifydb_single_threaded))]
151 pub fn schedule_repeat_fn<F: Fn() -> M + Send + Sync + 'static>(
152 &self,
153 interval: Duration,
154 factory: F,
155 ) -> TimerHandle {
156 let actor_ref = self.self_ref.clone();
157 self.system.scheduler().schedule_repeat(interval, move || actor_ref.send(factory()).is_ok())
158 }
159
160 #[cfg(reifydb_single_threaded)]
166 pub fn schedule_repeat_fn<F: Fn() -> M + Send + Sync + 'static>(
167 &self,
168 interval: Duration,
169 factory: F,
170 ) -> TimerHandle {
171 schedule_repeat_fn(self.self_ref.clone(), interval, factory)
172 }
173
174 pub fn schedule_tick<F: Fn(u64) -> M + Send + Sync + 'static>(
179 &self,
180 interval: Duration,
181 factory: F,
182 ) -> TimerHandle {
183 let actor_ref = self.self_ref.clone();
184 let clock = self.system.clock().clone();
185
186 #[cfg(not(reifydb_single_threaded))]
187 {
188 self.system.scheduler().schedule_repeat(interval, move || {
189 let now = clock.now_nanos();
190 actor_ref.send(factory(now)).is_ok()
191 })
192 }
193
194 #[cfg(reifydb_single_threaded)]
195 {
196 schedule_repeat_fn(actor_ref, interval, move || {
197 let now = clock.now_nanos();
198 factory(now)
199 })
200 }
201 }
202}
203
204impl<M> Clone for Context<M> {
205 fn clone(&self) -> Self {
206 Self {
207 self_ref: self.self_ref.clone(),
208 system: self.system.clone(),
209 cancel: self.cancel.clone(),
210 }
211 }
212}