1use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
2use std::sync::Arc;
3
4use parking_lot::Mutex;
5
6use super::{VmError, VmValue};
7
8pub type VmJoinHandle = tokio::task::JoinHandle<Result<(VmValue, String), VmError>>;
10
11pub struct VmTaskHandle {
13 pub handle: VmJoinHandle,
14 pub cancel_token: Arc<AtomicBool>,
16}
17
18#[derive(Debug, Clone)]
20pub struct VmChannelHandle {
21 pub name: Arc<str>,
22 pub sender: Arc<tokio::sync::mpsc::Sender<VmValue>>,
23 pub receiver: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<VmValue>>>,
24 pub closed: Arc<AtomicBool>,
25}
26
27#[derive(Debug, Clone)]
29pub struct VmAtomicHandle {
30 pub value: Arc<AtomicI64>,
31}
32
33#[derive(Clone)]
35pub struct VmRngHandle {
36 pub rng: Arc<Mutex<rand::rngs::StdRng>>,
37}
38
39impl std::fmt::Debug for VmRngHandle {
40 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41 f.write_str("VmRngHandle { .. }")
42 }
43}
44
45#[derive(Debug, Clone)]
47pub struct VmSyncPermitHandle {
48 pub(crate) lease: Arc<crate::synchronization::VmSyncLease>,
49}
50
51impl VmSyncPermitHandle {
52 pub(crate) fn release(&self) -> bool {
53 self.lease.release()
54 }
55
56 pub(crate) fn kind(&self) -> &str {
57 self.lease.kind()
58 }
59
60 pub(crate) fn key(&self) -> &str {
61 self.lease.key()
62 }
63}
64
65#[derive(Debug, Clone, Copy)]
77pub struct VmRange {
78 pub start: i64,
79 pub end: i64,
80 pub inclusive: bool,
81}
82
83impl VmRange {
84 pub fn len(&self) -> i64 {
94 if self.inclusive {
95 if self.start > self.end {
96 0
97 } else {
98 self.end.saturating_sub(self.start).saturating_add(1)
99 }
100 } else if self.start >= self.end {
101 0
102 } else {
103 self.end.saturating_sub(self.start)
104 }
105 }
106
107 pub fn is_empty(&self) -> bool {
108 self.len() == 0
109 }
110
111 pub fn get(&self, idx: i64) -> Option<i64> {
115 if idx < 0 || idx >= self.len() {
116 None
117 } else {
118 self.start.checked_add(idx)
119 }
120 }
121
122 pub fn first(&self) -> Option<i64> {
124 if self.is_empty() {
125 None
126 } else {
127 Some(self.start)
128 }
129 }
130
131 pub fn last(&self) -> Option<i64> {
133 if self.is_empty() {
134 None
135 } else if self.inclusive {
136 Some(self.end)
137 } else {
138 Some(self.end - 1)
139 }
140 }
141
142 pub fn contains(&self, v: i64) -> bool {
144 if self.is_empty() {
145 return false;
146 }
147 if self.inclusive {
148 v >= self.start && v <= self.end
149 } else {
150 v >= self.start && v < self.end
151 }
152 }
153
154 pub fn to_vec(&self) -> Vec<VmValue> {
161 let len = self.len();
162 if len <= 0 {
163 return Vec::new();
164 }
165 let cap = len as usize;
166 let mut out = Vec::with_capacity(cap);
167 for i in 0..len {
168 match self.start.checked_add(i) {
169 Some(v) => out.push(VmValue::Int(v)),
170 None => break,
171 }
172 }
173 out
174 }
175}
176
177#[derive(Debug, Clone)]
180pub struct VmGenerator {
181 pub done: Arc<AtomicBool>,
183 pub receiver: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Result<VmValue, VmError>>>>,
187}
188
189impl VmGenerator {
190 pub(crate) fn is_done(&self) -> bool {
191 self.done.load(Ordering::Relaxed)
192 }
193
194 pub(crate) fn mark_done(&self) {
195 self.done.store(true, Ordering::Relaxed);
196 }
197}
198
199#[derive(Debug, Clone)]
201pub struct VmStream {
202 pub done: Arc<AtomicBool>,
204 pub receiver: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Result<VmValue, VmError>>>>,
206 pub cancel: Option<VmStreamCancel>,
208}
209
210impl VmStream {
211 pub(crate) fn is_done(&self) -> bool {
212 self.done.load(Ordering::Relaxed)
213 }
214
215 pub(crate) fn mark_done(&self) {
216 self.done.store(true, Ordering::Relaxed);
217 }
218}
219
220#[derive(Clone)]
221pub struct VmStreamCancel {
222 sender: Arc<tokio::sync::watch::Sender<bool>>,
223}
224
225impl VmStreamCancel {
226 pub fn new() -> Self {
227 let (sender, _receiver) = tokio::sync::watch::channel(false);
228 Self {
229 sender: Arc::new(sender),
230 }
231 }
232
233 pub fn cancel(&self) {
234 let _ = self.sender.send(true);
235 }
236
237 pub fn subscribe(&self) -> tokio::sync::watch::Receiver<bool> {
238 self.sender.subscribe()
239 }
240}
241
242impl Default for VmStreamCancel {
243 fn default() -> Self {
244 Self::new()
245 }
246}
247
248impl std::fmt::Debug for VmStreamCancel {
249 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
250 f.debug_struct("VmStreamCancel")
251 .field("cancelled", &*self.sender.borrow())
252 .finish()
253 }
254}
255
256impl VmStream {
257 pub(crate) fn cancel(&self) {
258 if let Some(cancel) = &self.cancel {
259 cancel.cancel();
260 }
261 }
262}