reifydb_runtime/actor/
context.rs1use std::{
13 sync::{
14 Arc,
15 atomic::{AtomicBool, Ordering},
16 },
17 time::Duration,
18};
19
20#[cfg(reifydb_target = "wasi")]
21use crate::actor::timers::wasi::{schedule_once_fn, schedule_repeat};
22#[cfg(reifydb_target = "wasm")]
23use crate::actor::timers::wasm::{schedule_once_fn, schedule_repeat};
24use crate::actor::{mailbox::ActorRef, system::ActorSystem, timers::TimerHandle};
25
26#[derive(Clone)]
30pub struct CancellationToken {
31 cancelled: Arc<AtomicBool>,
32}
33
34impl CancellationToken {
35 pub fn new() -> Self {
37 Self {
38 cancelled: Arc::new(AtomicBool::new(false)),
39 }
40 }
41
42 pub fn cancel(&self) {
44 self.cancelled.store(true, Ordering::SeqCst);
45 }
46
47 pub fn is_cancelled(&self) -> bool {
49 self.cancelled.load(Ordering::SeqCst)
50 }
51}
52
53impl Default for CancellationToken {
54 fn default() -> Self {
55 Self::new()
56 }
57}
58
59pub struct Context<M> {
66 self_ref: ActorRef<M>,
67 system: ActorSystem,
68 cancel: CancellationToken,
69}
70
71impl<M: Send + 'static> Context<M> {
72 pub(crate) fn new(self_ref: ActorRef<M>, system: ActorSystem, cancel: CancellationToken) -> Self {
74 Self {
75 self_ref,
76 system,
77 cancel,
78 }
79 }
80
81 pub fn self_ref(&self) -> ActorRef<M> {
83 self.self_ref.clone()
84 }
85
86 pub fn system(&self) -> &ActorSystem {
88 &self.system
89 }
90
91 pub fn is_cancelled(&self) -> bool {
93 self.cancel.is_cancelled()
94 }
95
96 pub fn cancellation_token(&self) -> CancellationToken {
98 self.cancel.clone()
99 }
100}
101
102impl<M: Send + 'static> Context<M> {
103 #[cfg(not(reifydb_single_threaded))]
108 pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
109 let actor_ref = self.self_ref.clone();
110 self.system.scheduler().schedule_once(delay, move || {
111 let _ = actor_ref.send(factory());
112 })
113 }
114
115 #[cfg(reifydb_single_threaded)]
120 pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
121 schedule_once_fn(self.self_ref.clone(), delay, factory)
122 }
123}
124
125impl<M: Send + Sync + Clone + 'static> Context<M> {
126 #[cfg(not(reifydb_single_threaded))]
131 pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
132 let actor_ref = self.self_ref.clone();
133 self.system.scheduler().schedule_repeat(interval, move || actor_ref.send(msg.clone()).is_ok())
134 }
135
136 #[cfg(reifydb_single_threaded)]
141 pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
142 schedule_repeat(self.self_ref.clone(), interval, msg)
143 }
144}
145
146impl<M> Clone for Context<M> {
147 fn clone(&self) -> Self {
148 Self {
149 self_ref: self.self_ref.clone(),
150 system: self.system.clone(),
151 cancel: self.cancel.clone(),
152 }
153 }
154}