1use std::rc::Rc;
2use std::sync::atomic::{AtomicBool, AtomicI64};
3use std::sync::{Arc, Mutex};
4
5use super::{VmError, VmValue};
6
7pub type VmJoinHandle = tokio::task::JoinHandle<Result<(VmValue, String), VmError>>;
9
10pub struct VmTaskHandle {
12 pub handle: VmJoinHandle,
13 pub cancel_token: Arc<AtomicBool>,
15}
16
17#[derive(Debug, Clone)]
19pub struct VmChannelHandle {
20 pub name: Rc<str>,
21 pub sender: Arc<tokio::sync::mpsc::Sender<VmValue>>,
22 pub receiver: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<VmValue>>>,
23 pub closed: Arc<AtomicBool>,
24}
25
26#[derive(Debug, Clone)]
28pub struct VmAtomicHandle {
29 pub value: Arc<AtomicI64>,
30}
31
32#[derive(Clone)]
34pub struct VmRngHandle {
35 pub rng: Arc<Mutex<rand::rngs::StdRng>>,
36}
37
38impl std::fmt::Debug for VmRngHandle {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 f.write_str("VmRngHandle { .. }")
41 }
42}
43
44#[derive(Debug, Clone)]
46pub struct VmSyncPermitHandle {
47 pub(crate) lease: Arc<crate::synchronization::VmSyncLease>,
48}
49
50impl VmSyncPermitHandle {
51 pub(crate) fn release(&self) -> bool {
52 self.lease.release()
53 }
54
55 pub(crate) fn kind(&self) -> &str {
56 self.lease.kind()
57 }
58
59 pub(crate) fn key(&self) -> &str {
60 self.lease.key()
61 }
62}
63
64#[derive(Debug, Clone, Copy)]
76pub struct VmRange {
77 pub start: i64,
78 pub end: i64,
79 pub inclusive: bool,
80}
81
82impl VmRange {
83 pub fn len(&self) -> i64 {
93 if self.inclusive {
94 if self.start > self.end {
95 0
96 } else {
97 self.end.saturating_sub(self.start).saturating_add(1)
98 }
99 } else if self.start >= self.end {
100 0
101 } else {
102 self.end.saturating_sub(self.start)
103 }
104 }
105
106 pub fn is_empty(&self) -> bool {
107 self.len() == 0
108 }
109
110 pub fn get(&self, idx: i64) -> Option<i64> {
114 if idx < 0 || idx >= self.len() {
115 None
116 } else {
117 self.start.checked_add(idx)
118 }
119 }
120
121 pub fn first(&self) -> Option<i64> {
123 if self.is_empty() {
124 None
125 } else {
126 Some(self.start)
127 }
128 }
129
130 pub fn last(&self) -> Option<i64> {
132 if self.is_empty() {
133 None
134 } else if self.inclusive {
135 Some(self.end)
136 } else {
137 Some(self.end - 1)
138 }
139 }
140
141 pub fn contains(&self, v: i64) -> bool {
143 if self.is_empty() {
144 return false;
145 }
146 if self.inclusive {
147 v >= self.start && v <= self.end
148 } else {
149 v >= self.start && v < self.end
150 }
151 }
152
153 pub fn to_vec(&self) -> Vec<VmValue> {
160 let len = self.len();
161 if len <= 0 {
162 return Vec::new();
163 }
164 let cap = len as usize;
165 let mut out = Vec::with_capacity(cap);
166 for i in 0..len {
167 match self.start.checked_add(i) {
168 Some(v) => out.push(VmValue::Int(v)),
169 None => break,
170 }
171 }
172 out
173 }
174}
175
176#[derive(Debug, Clone)]
179pub struct VmGenerator {
180 pub done: Rc<std::cell::Cell<bool>>,
182 pub receiver: Rc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Result<VmValue, VmError>>>>,
186}
187
188#[derive(Debug, Clone)]
190pub struct VmStream {
191 pub done: Rc<std::cell::Cell<bool>>,
193 pub receiver: Rc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Result<VmValue, VmError>>>>,
195 pub cancel: Option<VmStreamCancel>,
197}
198
199#[derive(Clone)]
200pub struct VmStreamCancel {
201 sender: Arc<tokio::sync::watch::Sender<bool>>,
202}
203
204impl VmStreamCancel {
205 pub fn new() -> Self {
206 let (sender, _receiver) = tokio::sync::watch::channel(false);
207 Self {
208 sender: Arc::new(sender),
209 }
210 }
211
212 pub fn cancel(&self) {
213 let _ = self.sender.send(true);
214 }
215
216 pub fn subscribe(&self) -> tokio::sync::watch::Receiver<bool> {
217 self.sender.subscribe()
218 }
219}
220
221impl Default for VmStreamCancel {
222 fn default() -> Self {
223 Self::new()
224 }
225}
226
227impl std::fmt::Debug for VmStreamCancel {
228 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229 f.debug_struct("VmStreamCancel")
230 .field("cancelled", &*self.sender.borrow())
231 .finish()
232 }
233}
234
235impl VmStream {
236 pub(crate) fn cancel(&self) {
237 if let Some(cancel) = &self.cancel {
238 cancel.cancel();
239 }
240 }
241}