1use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
2use std::sync::Arc;
3
4use parking_lot::Mutex;
5
6use super::{VmError, VmValue};
7
8pub type VmJoinHandle = tokio::task::JoinHandle<Result<(VmValue, String), VmError>>;
10
11pub struct VmTaskHandle {
13 pub handle: VmJoinHandle,
14 pub cancel_token: Arc<AtomicBool>,
16 pub wait_task_id: String,
18}
19
20#[derive(Debug, Clone)]
22pub struct VmChannelHandle {
23 pub name: Arc<str>,
24 pub sender: Arc<tokio::sync::mpsc::Sender<VmValue>>,
25 pub receiver: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<VmValue>>>,
26 pub closed: Arc<AtomicBool>,
27}
28
29#[derive(Debug, Clone)]
31pub struct VmAtomicHandle {
32 pub value: Arc<AtomicI64>,
33}
34
35#[derive(Clone)]
37pub struct VmRngHandle {
38 pub rng: Arc<Mutex<rand::rngs::StdRng>>,
39}
40
41impl std::fmt::Debug for VmRngHandle {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 f.write_str("VmRngHandle { .. }")
44 }
45}
46
47#[derive(Debug, Clone)]
49pub struct VmSyncPermitHandle {
50 pub(crate) lease: Arc<crate::synchronization::VmSyncLease>,
51}
52
53impl VmSyncPermitHandle {
54 pub(crate) fn release(&self) -> bool {
55 self.lease.release()
56 }
57
58 pub(crate) fn kind(&self) -> &str {
59 self.lease.kind()
60 }
61
62 pub(crate) fn key(&self) -> &str {
63 self.lease.key()
64 }
65
66 pub(crate) fn permits(&self) -> u32 {
67 self.lease.permits()
68 }
69}
70
71#[derive(Debug, Clone, Copy)]
83pub struct VmRange {
84 pub start: i64,
85 pub end: i64,
86 pub inclusive: bool,
87}
88
89impl VmRange {
90 pub fn len(&self) -> i64 {
100 if self.inclusive {
101 if self.start > self.end {
102 0
103 } else {
104 self.end.saturating_sub(self.start).saturating_add(1)
105 }
106 } else if self.start >= self.end {
107 0
108 } else {
109 self.end.saturating_sub(self.start)
110 }
111 }
112
113 pub fn is_empty(&self) -> bool {
114 self.len() == 0
115 }
116
117 pub fn get(&self, idx: i64) -> Option<i64> {
121 if idx < 0 || idx >= self.len() {
122 None
123 } else {
124 self.start.checked_add(idx)
125 }
126 }
127
128 pub fn first(&self) -> Option<i64> {
130 if self.is_empty() {
131 None
132 } else {
133 Some(self.start)
134 }
135 }
136
137 pub fn last(&self) -> Option<i64> {
139 if self.is_empty() {
140 None
141 } else if self.inclusive {
142 Some(self.end)
143 } else {
144 Some(self.end - 1)
145 }
146 }
147
148 pub fn contains(&self, v: i64) -> bool {
150 if self.is_empty() {
151 return false;
152 }
153 if self.inclusive {
154 v >= self.start && v <= self.end
155 } else {
156 v >= self.start && v < self.end
157 }
158 }
159
160 pub fn to_vec(&self) -> Vec<VmValue> {
167 let len = self.len();
168 if len <= 0 {
169 return Vec::new();
170 }
171 let cap = len as usize;
172 let mut out = Vec::with_capacity(cap);
173 for i in 0..len {
174 match self.start.checked_add(i) {
175 Some(v) => out.push(VmValue::Int(v)),
176 None => break,
177 }
178 }
179 out
180 }
181}
182
183#[derive(Debug, Clone)]
186pub struct VmGenerator {
187 pub done: Arc<AtomicBool>,
189 pub receiver: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Result<VmValue, VmError>>>>,
193}
194
195impl VmGenerator {
196 pub(crate) fn is_done(&self) -> bool {
197 self.done.load(Ordering::Relaxed)
198 }
199
200 pub(crate) fn mark_done(&self) {
201 self.done.store(true, Ordering::Relaxed);
202 }
203}
204
205#[derive(Debug, Clone)]
207pub struct VmStream {
208 pub done: Arc<AtomicBool>,
210 pub receiver: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Result<VmValue, VmError>>>>,
212 pub cancel: Option<VmStreamCancel>,
214}
215
216impl VmStream {
217 pub(crate) fn is_done(&self) -> bool {
218 self.done.load(Ordering::Relaxed)
219 }
220
221 pub(crate) fn mark_done(&self) {
222 self.done.store(true, Ordering::Relaxed);
223 }
224}
225
226#[derive(Clone)]
227pub struct VmStreamCancel {
228 sender: Arc<tokio::sync::watch::Sender<bool>>,
229}
230
231impl VmStreamCancel {
232 pub fn new() -> Self {
233 let (sender, _receiver) = tokio::sync::watch::channel(false);
234 Self {
235 sender: Arc::new(sender),
236 }
237 }
238
239 pub fn cancel(&self) {
240 let _ = self.sender.send(true);
241 }
242
243 pub fn subscribe(&self) -> tokio::sync::watch::Receiver<bool> {
244 self.sender.subscribe()
245 }
246}
247
248impl Default for VmStreamCancel {
249 fn default() -> Self {
250 Self::new()
251 }
252}
253
254impl std::fmt::Debug for VmStreamCancel {
255 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256 f.debug_struct("VmStreamCancel")
257 .field("cancelled", &*self.sender.borrow())
258 .finish()
259 }
260}
261
262impl VmStream {
263 pub(crate) fn cancel(&self) {
264 if let Some(cancel) = &self.cancel {
265 cancel.cancel();
266 }
267 }
268}