1use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
2use std::sync::Arc;
3
4use parking_lot::Mutex;
5
6use super::{VmError, VmValue};
7
8pub type VmJoinHandle = tokio::task::JoinHandle<Result<(VmValue, String), VmError>>;
10
11pub struct VmTaskHandle {
13 pub handle: VmJoinHandle,
14 pub cancel_token: Arc<AtomicBool>,
16 pub wait_task_id: String,
18}
19
20#[derive(Debug, Clone)]
22pub struct VmChannelHandle {
23 pub name: Arc<str>,
24 pub sender: Arc<tokio::sync::mpsc::Sender<VmValue>>,
25 pub receiver: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<VmValue>>>,
26 pub close: Arc<VmChannelCloseState>,
27}
28
29#[derive(Debug)]
30pub struct VmChannelCloseState {
31 closed: AtomicBool,
32 signal: tokio::sync::watch::Sender<bool>,
33}
34
35impl VmChannelCloseState {
36 pub(crate) fn open() -> Self {
37 let (signal, _) = tokio::sync::watch::channel(false);
38 Self {
39 closed: AtomicBool::new(false),
40 signal,
41 }
42 }
43
44 pub(crate) fn close(&self) -> bool {
45 if self.closed.swap(true, Ordering::SeqCst) {
46 return false;
47 }
48 self.signal.send_replace(true);
49 true
50 }
51
52 pub(crate) fn is_closed(&self) -> bool {
53 self.closed.load(Ordering::SeqCst)
54 }
55
56 pub(crate) fn subscribe(&self) -> tokio::sync::watch::Receiver<bool> {
57 self.signal.subscribe()
58 }
59}
60
61impl VmChannelHandle {
62 pub(crate) fn close(&self) -> bool {
63 self.close.close()
64 }
65
66 pub(crate) fn is_closed(&self) -> bool {
67 self.close.is_closed()
68 }
69
70 pub(crate) fn subscribe_closed(&self) -> tokio::sync::watch::Receiver<bool> {
71 self.close.subscribe()
72 }
73}
74
75#[derive(Debug, Clone)]
77pub struct VmAtomicHandle {
78 pub value: Arc<AtomicI64>,
79}
80
81#[derive(Clone)]
83pub struct VmRngHandle {
84 pub rng: Arc<Mutex<rand::rngs::StdRng>>,
85}
86
87impl std::fmt::Debug for VmRngHandle {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 f.write_str("VmRngHandle { .. }")
90 }
91}
92
93#[derive(Debug, Clone)]
95pub struct VmSyncPermitHandle {
96 pub(crate) lease: Arc<crate::synchronization::VmSyncLease>,
97}
98
99impl VmSyncPermitHandle {
100 pub(crate) fn release(&self) -> bool {
101 self.lease.release()
102 }
103
104 pub(crate) fn kind(&self) -> &str {
105 self.lease.kind()
106 }
107
108 pub(crate) fn key(&self) -> &str {
109 self.lease.key()
110 }
111
112 pub(crate) fn permits(&self) -> u32 {
113 self.lease.permits()
114 }
115
116 pub(crate) fn is_released(&self) -> bool {
117 self.lease.is_released()
118 }
119
120 pub(crate) fn same_lease(&self, other: &Self) -> bool {
121 Arc::ptr_eq(&self.lease, &other.lease)
122 }
123}
124
125#[derive(Debug, Clone, Copy)]
137pub struct VmRange {
138 pub start: i64,
139 pub end: i64,
140 pub inclusive: bool,
141}
142
143impl VmRange {
144 pub fn len(&self) -> i64 {
154 if self.inclusive {
155 if self.start > self.end {
156 0
157 } else {
158 self.end.saturating_sub(self.start).saturating_add(1)
159 }
160 } else if self.start >= self.end {
161 0
162 } else {
163 self.end.saturating_sub(self.start)
164 }
165 }
166
167 pub fn is_empty(&self) -> bool {
168 self.len() == 0
169 }
170
171 pub fn get(&self, idx: i64) -> Option<i64> {
175 if idx < 0 || idx >= self.len() {
176 None
177 } else {
178 self.start.checked_add(idx)
179 }
180 }
181
182 pub fn first(&self) -> Option<i64> {
184 if self.is_empty() {
185 None
186 } else {
187 Some(self.start)
188 }
189 }
190
191 pub fn last(&self) -> Option<i64> {
193 if self.is_empty() {
194 None
195 } else if self.inclusive {
196 Some(self.end)
197 } else {
198 Some(self.end - 1)
199 }
200 }
201
202 pub fn contains(&self, v: i64) -> bool {
204 if self.is_empty() {
205 return false;
206 }
207 if self.inclusive {
208 v >= self.start && v <= self.end
209 } else {
210 v >= self.start && v < self.end
211 }
212 }
213
214 pub fn to_vec(&self) -> Vec<VmValue> {
221 let len = self.len();
222 if len <= 0 {
223 return Vec::new();
224 }
225 let cap = len as usize;
226 let mut out = Vec::with_capacity(cap);
227 for i in 0..len {
228 match self.start.checked_add(i) {
229 Some(v) => out.push(VmValue::Int(v)),
230 None => break,
231 }
232 }
233 out
234 }
235}
236
237#[derive(Debug, Clone)]
240pub struct VmGenerator {
241 pub done: Arc<AtomicBool>,
243 pub receiver: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Result<VmValue, VmError>>>>,
247}
248
249impl VmGenerator {
250 pub(crate) fn is_done(&self) -> bool {
251 self.done.load(Ordering::Relaxed)
252 }
253
254 pub(crate) fn mark_done(&self) {
255 self.done.store(true, Ordering::Relaxed);
256 }
257}
258
259#[derive(Debug, Clone)]
261pub struct VmStream {
262 pub done: Arc<AtomicBool>,
264 pub receiver: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Result<VmValue, VmError>>>>,
266 pub cancel: Option<VmStreamCancel>,
268}
269
270impl VmStream {
271 pub(crate) fn is_done(&self) -> bool {
272 self.done.load(Ordering::Relaxed)
273 }
274
275 pub(crate) fn mark_done(&self) {
276 self.done.store(true, Ordering::Relaxed);
277 }
278}
279
280#[derive(Clone)]
281pub struct VmStreamCancel {
282 sender: Arc<tokio::sync::watch::Sender<bool>>,
283}
284
285impl VmStreamCancel {
286 pub fn new() -> Self {
287 let (sender, _receiver) = tokio::sync::watch::channel(false);
288 Self {
289 sender: Arc::new(sender),
290 }
291 }
292
293 pub fn cancel(&self) {
294 let _ = self.sender.send(true);
295 }
296
297 pub fn subscribe(&self) -> tokio::sync::watch::Receiver<bool> {
298 self.sender.subscribe()
299 }
300}
301
302impl Default for VmStreamCancel {
303 fn default() -> Self {
304 Self::new()
305 }
306}
307
308impl std::fmt::Debug for VmStreamCancel {
309 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
310 f.debug_struct("VmStreamCancel")
311 .field("cancelled", &*self.sender.borrow())
312 .finish()
313 }
314}
315
316impl VmStream {
317 pub(crate) fn cancel(&self) {
318 if let Some(cancel) = &self.cancel {
319 cancel.cancel();
320 }
321 }
322}