Skip to main content

harn_vm/value/
handles.rs

1use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
2use std::sync::Arc;
3
4use parking_lot::Mutex;
5
6use super::{VmError, VmValue};
7
8/// The raw join handle type for spawned tasks.
9pub type VmJoinHandle = tokio::task::JoinHandle<Result<(VmValue, String), VmError>>;
10
11/// A spawned async task handle with cancellation support.
12pub struct VmTaskHandle {
13    pub handle: VmJoinHandle,
14    /// Cooperative cancellation token. Set to true to request graceful shutdown.
15    pub cancel_token: Arc<AtomicBool>,
16    /// Runtime-context task id used by the VM scheduler and wait-for graph.
17    pub wait_task_id: String,
18}
19
20/// A channel handle for the VM (uses tokio mpsc).
21#[derive(Debug, Clone)]
22pub struct VmChannelHandle {
23    pub name: Arc<str>,
24    pub sender: Arc<tokio::sync::mpsc::Sender<VmValue>>,
25    pub receiver: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<VmValue>>>,
26    pub close: Arc<VmChannelCloseState>,
27}
28
29#[derive(Debug)]
30pub struct VmChannelCloseState {
31    closed: AtomicBool,
32    signal: tokio::sync::watch::Sender<bool>,
33}
34
35impl VmChannelCloseState {
36    pub(crate) fn open() -> Self {
37        let (signal, _) = tokio::sync::watch::channel(false);
38        Self {
39            closed: AtomicBool::new(false),
40            signal,
41        }
42    }
43
44    pub(crate) fn close(&self) -> bool {
45        if self.closed.swap(true, Ordering::SeqCst) {
46            return false;
47        }
48        self.signal.send_replace(true);
49        true
50    }
51
52    pub(crate) fn is_closed(&self) -> bool {
53        self.closed.load(Ordering::SeqCst)
54    }
55
56    pub(crate) fn subscribe(&self) -> tokio::sync::watch::Receiver<bool> {
57        self.signal.subscribe()
58    }
59}
60
61impl VmChannelHandle {
62    pub(crate) fn close(&self) -> bool {
63        self.close.close()
64    }
65
66    pub(crate) fn is_closed(&self) -> bool {
67        self.close.is_closed()
68    }
69
70    pub(crate) fn subscribe_closed(&self) -> tokio::sync::watch::Receiver<bool> {
71        self.close.subscribe()
72    }
73}
74
75/// An atomic integer handle for the VM.
76#[derive(Debug, Clone)]
77pub struct VmAtomicHandle {
78    pub value: Arc<AtomicI64>,
79}
80
81/// A reproducible random number generator handle.
82#[derive(Clone)]
83pub struct VmRngHandle {
84    pub rng: Arc<Mutex<rand::rngs::StdRng>>,
85}
86
87impl std::fmt::Debug for VmRngHandle {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        f.write_str("VmRngHandle { .. }")
90    }
91}
92
93/// A held synchronization permit for mutex/semaphore/gate primitives.
94#[derive(Debug, Clone)]
95pub struct VmSyncPermitHandle {
96    pub(crate) lease: Arc<crate::synchronization::VmSyncLease>,
97}
98
99impl VmSyncPermitHandle {
100    pub(crate) fn release(&self) -> bool {
101        self.lease.release()
102    }
103
104    pub(crate) fn kind(&self) -> &str {
105        self.lease.kind()
106    }
107
108    pub(crate) fn key(&self) -> &str {
109        self.lease.key()
110    }
111
112    pub(crate) fn permits(&self) -> u32 {
113        self.lease.permits()
114    }
115
116    pub(crate) fn is_released(&self) -> bool {
117        self.lease.is_released()
118    }
119
120    pub(crate) fn same_lease(&self, other: &Self) -> bool {
121        Arc::ptr_eq(&self.lease, &other.lease)
122    }
123}
124
125/// A lazy integer range — Python-style. Stores only `(start, end, inclusive)`
126/// so the in-memory footprint is O(1) regardless of the range's length.
127/// `len()`, indexing (`r[k]`), `.contains(x)`, `.first()`, `.last()` are all
128/// O(1); direct iteration walks step-by-step without materializing a list.
129///
130/// Empty-range convention (Python-consistent):
131/// - Inclusive empty when `start > end`.
132/// - Exclusive empty when `start >= end`.
133///
134/// Negative / reversed ranges are NOT supported in v1: `5 to 1` is simply
135/// empty. Authors who want reverse iteration should call `.to_list().reverse()`.
136#[derive(Debug, Clone, Copy)]
137pub struct VmRange {
138    pub start: i64,
139    pub end: i64,
140    pub inclusive: bool,
141}
142
143impl VmRange {
144    /// Number of elements this range yields.
145    ///
146    /// Uses saturating arithmetic so that pathological ranges near
147    /// `i64::MAX`/`i64::MIN` do not panic on overflow. Because a range's
148    /// element count must fit in `i64` the returned length saturates at
149    /// `i64::MAX` for ranges whose width exceeds that (e.g. `i64::MIN to
150    /// i64::MAX` inclusive). Callers that later narrow to `usize` for
151    /// allocation should still guard against huge lengths — see
152    /// `to_vec` / `get` for the indexable-range invariants.
153    pub fn len(&self) -> i64 {
154        if self.inclusive {
155            if self.start > self.end {
156                0
157            } else {
158                self.end.saturating_sub(self.start).saturating_add(1)
159            }
160        } else if self.start >= self.end {
161            0
162        } else {
163            self.end.saturating_sub(self.start)
164        }
165    }
166
167    pub fn is_empty(&self) -> bool {
168        self.len() == 0
169    }
170
171    /// Element at the given 0-based index, bounds-checked.
172    /// Returns `None` when out of bounds or when `start + idx` would
173    /// overflow (which can only happen when `len()` saturated).
174    pub fn get(&self, idx: i64) -> Option<i64> {
175        if idx < 0 || idx >= self.len() {
176            None
177        } else {
178            self.start.checked_add(idx)
179        }
180    }
181
182    /// First element or `None` when empty.
183    pub fn first(&self) -> Option<i64> {
184        if self.is_empty() {
185            None
186        } else {
187            Some(self.start)
188        }
189    }
190
191    /// Last element or `None` when empty.
192    pub fn last(&self) -> Option<i64> {
193        if self.is_empty() {
194            None
195        } else if self.inclusive {
196            Some(self.end)
197        } else {
198            Some(self.end - 1)
199        }
200    }
201
202    /// Whether `v` falls inside the range (O(1)).
203    pub fn contains(&self, v: i64) -> bool {
204        if self.is_empty() {
205            return false;
206        }
207        if self.inclusive {
208            v >= self.start && v <= self.end
209        } else {
210            v >= self.start && v < self.end
211        }
212    }
213
214    /// Materialize to a `Vec<VmValue>` — the explicit escape hatch.
215    ///
216    /// Uses `checked_add` on the per-element index so a range near
217    /// `i64::MAX` stops at the representable bound instead of panicking.
218    /// Callers should still treat a very long range as unwise to
219    /// materialize (the whole point of `VmRange` is to avoid this).
220    pub fn to_vec(&self) -> Vec<VmValue> {
221        let len = self.len();
222        if len <= 0 {
223            return Vec::new();
224        }
225        let cap = len as usize;
226        let mut out = Vec::with_capacity(cap);
227        for i in 0..len {
228            match self.start.checked_add(i) {
229                Some(v) => out.push(VmValue::Int(v)),
230                None => break,
231            }
232        }
233        out
234    }
235}
236
237/// A generator object: lazily produces values via yield.
238/// The generator body runs as a spawned task that sends values through a channel.
239#[derive(Debug, Clone)]
240pub struct VmGenerator {
241    /// Whether the generator has finished (returned or exhausted).
242    pub done: Arc<AtomicBool>,
243    /// Receiver end of the yield channel (generator sends values here).
244    /// Wrapped in a shared async mutex so recv() can be called without holding
245    /// a synchronous iterator-state lock across await points.
246    pub receiver: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Result<VmValue, VmError>>>>,
247}
248
249impl VmGenerator {
250    pub(crate) fn is_done(&self) -> bool {
251        self.done.load(Ordering::Relaxed)
252    }
253
254    pub(crate) fn mark_done(&self) {
255        self.done.store(true, Ordering::Relaxed);
256    }
257}
258
259/// A stream object: lazily produces values from a `gen fn`.
260#[derive(Debug, Clone)]
261pub struct VmStream {
262    /// Whether the stream has finished (returned, thrown, or exhausted).
263    pub done: Arc<AtomicBool>,
264    /// Receiver end of the stream channel.
265    pub receiver: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Result<VmValue, VmError>>>>,
266    /// Optional cancellation hook for host-backed streams.
267    pub cancel: Option<VmStreamCancel>,
268}
269
270impl VmStream {
271    pub(crate) fn is_done(&self) -> bool {
272        self.done.load(Ordering::Relaxed)
273    }
274
275    pub(crate) fn mark_done(&self) {
276        self.done.store(true, Ordering::Relaxed);
277    }
278}
279
280#[derive(Clone)]
281pub struct VmStreamCancel {
282    sender: Arc<tokio::sync::watch::Sender<bool>>,
283}
284
285impl VmStreamCancel {
286    pub fn new() -> Self {
287        let (sender, _receiver) = tokio::sync::watch::channel(false);
288        Self {
289            sender: Arc::new(sender),
290        }
291    }
292
293    pub fn cancel(&self) {
294        let _ = self.sender.send(true);
295    }
296
297    pub fn subscribe(&self) -> tokio::sync::watch::Receiver<bool> {
298        self.sender.subscribe()
299    }
300}
301
302impl Default for VmStreamCancel {
303    fn default() -> Self {
304        Self::new()
305    }
306}
307
308impl std::fmt::Debug for VmStreamCancel {
309    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
310        f.debug_struct("VmStreamCancel")
311            .field("cancelled", &*self.sender.borrow())
312            .finish()
313    }
314}
315
316impl VmStream {
317    pub(crate) fn cancel(&self) {
318        if let Some(cancel) = &self.cancel {
319            cancel.cancel();
320        }
321    }
322}