Skip to main content

reifydb_runtime/actor/
context.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	sync::{
6		Arc,
7		atomic::{AtomicBool, Ordering},
8	},
9	time::Duration,
10};
11
12#[cfg(reifydb_target = "dst")]
13use crate::actor::timers::dst as dst_timers;
14#[cfg(reifydb_target = "wasi")]
15use crate::actor::timers::wasi::{schedule_once_fn, schedule_repeat, schedule_repeat_fn};
16#[cfg(reifydb_target = "wasm")]
17use crate::actor::timers::wasm::{schedule_once_fn, schedule_repeat, schedule_repeat_fn};
18use crate::actor::{mailbox::ActorRef, system::ActorSystem, timers::TimerHandle};
19
20#[derive(Clone)]
21pub struct CancellationToken {
22	cancelled: Arc<AtomicBool>,
23	parent: Option<Arc<AtomicBool>>,
24}
25
26impl CancellationToken {
27	pub fn new() -> Self {
28		Self {
29			cancelled: Arc::new(AtomicBool::new(false)),
30			parent: None,
31		}
32	}
33
34	pub fn child_token(&self) -> Self {
35		Self {
36			cancelled: Arc::new(AtomicBool::new(false)),
37			parent: Some(Arc::clone(&self.cancelled)),
38		}
39	}
40
41	pub fn cancel(&self) {
42		self.cancelled.store(true, Ordering::SeqCst);
43	}
44
45	pub fn is_cancelled(&self) -> bool {
46		self.cancelled.load(Ordering::SeqCst) || self.parent.as_ref().is_some_and(|p| p.load(Ordering::SeqCst))
47	}
48}
49
50impl Default for CancellationToken {
51	fn default() -> Self {
52		Self::new()
53	}
54}
55
56pub struct Context<M> {
57	self_ref: ActorRef<M>,
58	system: ActorSystem,
59	cancel: CancellationToken,
60}
61
62impl<M: Send + 'static> Context<M> {
63	pub fn new(self_ref: ActorRef<M>, system: ActorSystem, cancel: CancellationToken) -> Self {
64		Self {
65			self_ref,
66			system,
67			cancel,
68		}
69	}
70
71	pub fn self_ref(&self) -> ActorRef<M> {
72		self.self_ref.clone()
73	}
74
75	pub fn system(&self) -> &ActorSystem {
76		&self.system
77	}
78
79	pub fn is_cancelled(&self) -> bool {
80		self.cancel.is_cancelled()
81	}
82
83	pub fn cancellation_token(&self) -> CancellationToken {
84		self.cancel.clone()
85	}
86}
87
88impl<M: Send + 'static> Context<M> {
89	#[cfg(not(reifydb_single_threaded))]
90	pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
91		let actor_ref = self.self_ref.clone();
92		self.system.scheduler().schedule_once(delay, move || {
93			let _ = actor_ref.send(factory());
94		})
95	}
96
97	#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
98	pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
99		schedule_once_fn(self.self_ref.clone(), delay, factory)
100	}
101
102	#[cfg(reifydb_target = "dst")]
103	pub fn schedule_once<F: FnOnce() -> M + Send + 'static>(&self, delay: Duration, factory: F) -> TimerHandle {
104		dst_timers::schedule_once_fn(
105			self.system.timer_heap(),
106			self.system.mock_clock(),
107			self.self_ref.clone(),
108			delay,
109			factory,
110		)
111	}
112}
113
114impl<M: Send + Sync + Clone + 'static> Context<M> {
115	#[cfg(not(reifydb_single_threaded))]
116	pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
117		let actor_ref = self.self_ref.clone();
118		self.system.scheduler().schedule_repeat(interval, move || actor_ref.send(msg.clone()).is_ok())
119	}
120
121	#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
122	pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
123		schedule_repeat(self.self_ref.clone(), interval, msg)
124	}
125
126	#[cfg(reifydb_target = "dst")]
127	pub fn schedule_repeat(&self, interval: Duration, msg: M) -> TimerHandle {
128		dst_timers::schedule_repeat(
129			self.system.timer_heap(),
130			self.system.mock_clock(),
131			self.self_ref.clone(),
132			interval,
133			msg,
134		)
135	}
136
137	#[cfg(not(reifydb_single_threaded))]
138	pub fn schedule_repeat_fn<F: Fn() -> M + Send + Sync + 'static>(
139		&self,
140		interval: Duration,
141		factory: F,
142	) -> TimerHandle {
143		let actor_ref = self.self_ref.clone();
144		self.system.scheduler().schedule_repeat(interval, move || actor_ref.send(factory()).is_ok())
145	}
146
147	#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
148	pub fn schedule_repeat_fn<F: Fn() -> M + Send + Sync + 'static>(
149		&self,
150		interval: Duration,
151		factory: F,
152	) -> TimerHandle {
153		schedule_repeat_fn(self.self_ref.clone(), interval, factory)
154	}
155
156	#[cfg(reifydb_target = "dst")]
157	pub fn schedule_repeat_fn<F: Fn() -> M + Send + Sync + 'static>(
158		&self,
159		interval: Duration,
160		factory: F,
161	) -> TimerHandle {
162		dst_timers::schedule_repeat_fn(
163			self.system.timer_heap(),
164			self.system.mock_clock(),
165			self.self_ref.clone(),
166			interval,
167			factory,
168		)
169	}
170
171	pub fn schedule_tick<F: Fn(u64) -> M + Send + Sync + 'static>(
172		&self,
173		interval: Duration,
174		factory: F,
175	) -> TimerHandle {
176		let actor_ref = self.self_ref.clone();
177		let clock = self.system.clock().clone();
178
179		#[cfg(not(reifydb_single_threaded))]
180		{
181			self.system.scheduler().schedule_repeat(interval, move || {
182				let now = clock.now_nanos();
183				actor_ref.send(factory(now)).is_ok()
184			})
185		}
186
187		#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
188		{
189			schedule_repeat_fn(actor_ref, interval, move || {
190				let now = clock.now_nanos();
191				factory(now)
192			})
193		}
194
195		#[cfg(reifydb_target = "dst")]
196		{
197			dst_timers::schedule_repeat_fn(
198				self.system.timer_heap(),
199				self.system.mock_clock(),
200				actor_ref,
201				interval,
202				move || {
203					let now = clock.now_nanos();
204					factory(now)
205				},
206			)
207		}
208	}
209}
210
211impl<M> Clone for Context<M> {
212	fn clone(&self) -> Self {
213		Self {
214			self_ref: self.self_ref.clone(),
215			system: self.system.clone(),
216			cancel: self.cancel.clone(),
217		}
218	}
219}