Skip to main content

harn_vm/value/
handles.rs

1use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
2use std::sync::Arc;
3
4use parking_lot::Mutex;
5
6use super::{VmError, VmValue};
7
8/// The raw join handle type for spawned tasks.
9pub type VmJoinHandle = tokio::task::JoinHandle<Result<(VmValue, String), VmError>>;
10
11/// A spawned async task handle with cancellation support.
12pub struct VmTaskHandle {
13    pub handle: VmJoinHandle,
14    /// Cooperative cancellation token. Set to true to request graceful shutdown.
15    pub cancel_token: Arc<AtomicBool>,
16}
17
18/// A channel handle for the VM (uses tokio mpsc).
19#[derive(Debug, Clone)]
20pub struct VmChannelHandle {
21    pub name: Arc<str>,
22    pub sender: Arc<tokio::sync::mpsc::Sender<VmValue>>,
23    pub receiver: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<VmValue>>>,
24    pub closed: Arc<AtomicBool>,
25}
26
27/// An atomic integer handle for the VM.
28#[derive(Debug, Clone)]
29pub struct VmAtomicHandle {
30    pub value: Arc<AtomicI64>,
31}
32
33/// A reproducible random number generator handle.
34#[derive(Clone)]
35pub struct VmRngHandle {
36    pub rng: Arc<Mutex<rand::rngs::StdRng>>,
37}
38
39impl std::fmt::Debug for VmRngHandle {
40    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41        f.write_str("VmRngHandle { .. }")
42    }
43}
44
45/// A held synchronization permit for mutex/semaphore/gate primitives.
46#[derive(Debug, Clone)]
47pub struct VmSyncPermitHandle {
48    pub(crate) lease: Arc<crate::synchronization::VmSyncLease>,
49}
50
51impl VmSyncPermitHandle {
52    pub(crate) fn release(&self) -> bool {
53        self.lease.release()
54    }
55
56    pub(crate) fn kind(&self) -> &str {
57        self.lease.kind()
58    }
59
60    pub(crate) fn key(&self) -> &str {
61        self.lease.key()
62    }
63}
64
65/// A lazy integer range — Python-style. Stores only `(start, end, inclusive)`
66/// so the in-memory footprint is O(1) regardless of the range's length.
67/// `len()`, indexing (`r[k]`), `.contains(x)`, `.first()`, `.last()` are all
68/// O(1); direct iteration walks step-by-step without materializing a list.
69///
70/// Empty-range convention (Python-consistent):
71/// - Inclusive empty when `start > end`.
72/// - Exclusive empty when `start >= end`.
73///
74/// Negative / reversed ranges are NOT supported in v1: `5 to 1` is simply
75/// empty. Authors who want reverse iteration should call `.to_list().reverse()`.
76#[derive(Debug, Clone, Copy)]
77pub struct VmRange {
78    pub start: i64,
79    pub end: i64,
80    pub inclusive: bool,
81}
82
83impl VmRange {
84    /// Number of elements this range yields.
85    ///
86    /// Uses saturating arithmetic so that pathological ranges near
87    /// `i64::MAX`/`i64::MIN` do not panic on overflow. Because a range's
88    /// element count must fit in `i64` the returned length saturates at
89    /// `i64::MAX` for ranges whose width exceeds that (e.g. `i64::MIN to
90    /// i64::MAX` inclusive). Callers that later narrow to `usize` for
91    /// allocation should still guard against huge lengths — see
92    /// `to_vec` / `get` for the indexable-range invariants.
93    pub fn len(&self) -> i64 {
94        if self.inclusive {
95            if self.start > self.end {
96                0
97            } else {
98                self.end.saturating_sub(self.start).saturating_add(1)
99            }
100        } else if self.start >= self.end {
101            0
102        } else {
103            self.end.saturating_sub(self.start)
104        }
105    }
106
107    pub fn is_empty(&self) -> bool {
108        self.len() == 0
109    }
110
111    /// Element at the given 0-based index, bounds-checked.
112    /// Returns `None` when out of bounds or when `start + idx` would
113    /// overflow (which can only happen when `len()` saturated).
114    pub fn get(&self, idx: i64) -> Option<i64> {
115        if idx < 0 || idx >= self.len() {
116            None
117        } else {
118            self.start.checked_add(idx)
119        }
120    }
121
122    /// First element or `None` when empty.
123    pub fn first(&self) -> Option<i64> {
124        if self.is_empty() {
125            None
126        } else {
127            Some(self.start)
128        }
129    }
130
131    /// Last element or `None` when empty.
132    pub fn last(&self) -> Option<i64> {
133        if self.is_empty() {
134            None
135        } else if self.inclusive {
136            Some(self.end)
137        } else {
138            Some(self.end - 1)
139        }
140    }
141
142    /// Whether `v` falls inside the range (O(1)).
143    pub fn contains(&self, v: i64) -> bool {
144        if self.is_empty() {
145            return false;
146        }
147        if self.inclusive {
148            v >= self.start && v <= self.end
149        } else {
150            v >= self.start && v < self.end
151        }
152    }
153
154    /// Materialize to a `Vec<VmValue>` — the explicit escape hatch.
155    ///
156    /// Uses `checked_add` on the per-element index so a range near
157    /// `i64::MAX` stops at the representable bound instead of panicking.
158    /// Callers should still treat a very long range as unwise to
159    /// materialize (the whole point of `VmRange` is to avoid this).
160    pub fn to_vec(&self) -> Vec<VmValue> {
161        let len = self.len();
162        if len <= 0 {
163            return Vec::new();
164        }
165        let cap = len as usize;
166        let mut out = Vec::with_capacity(cap);
167        for i in 0..len {
168            match self.start.checked_add(i) {
169                Some(v) => out.push(VmValue::Int(v)),
170                None => break,
171            }
172        }
173        out
174    }
175}
176
177/// A generator object: lazily produces values via yield.
178/// The generator body runs as a spawned task that sends values through a channel.
179#[derive(Debug, Clone)]
180pub struct VmGenerator {
181    /// Whether the generator has finished (returned or exhausted).
182    pub done: Arc<AtomicBool>,
183    /// Receiver end of the yield channel (generator sends values here).
184    /// Wrapped in a shared async mutex so recv() can be called without holding
185    /// a synchronous iterator-state lock across await points.
186    pub receiver: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Result<VmValue, VmError>>>>,
187}
188
189impl VmGenerator {
190    pub(crate) fn is_done(&self) -> bool {
191        self.done.load(Ordering::Relaxed)
192    }
193
194    pub(crate) fn mark_done(&self) {
195        self.done.store(true, Ordering::Relaxed);
196    }
197}
198
199/// A stream object: lazily produces values from a `gen fn`.
200#[derive(Debug, Clone)]
201pub struct VmStream {
202    /// Whether the stream has finished (returned, thrown, or exhausted).
203    pub done: Arc<AtomicBool>,
204    /// Receiver end of the stream channel.
205    pub receiver: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Result<VmValue, VmError>>>>,
206    /// Optional cancellation hook for host-backed streams.
207    pub cancel: Option<VmStreamCancel>,
208}
209
210impl VmStream {
211    pub(crate) fn is_done(&self) -> bool {
212        self.done.load(Ordering::Relaxed)
213    }
214
215    pub(crate) fn mark_done(&self) {
216        self.done.store(true, Ordering::Relaxed);
217    }
218}
219
220#[derive(Clone)]
221pub struct VmStreamCancel {
222    sender: Arc<tokio::sync::watch::Sender<bool>>,
223}
224
225impl VmStreamCancel {
226    pub fn new() -> Self {
227        let (sender, _receiver) = tokio::sync::watch::channel(false);
228        Self {
229            sender: Arc::new(sender),
230        }
231    }
232
233    pub fn cancel(&self) {
234        let _ = self.sender.send(true);
235    }
236
237    pub fn subscribe(&self) -> tokio::sync::watch::Receiver<bool> {
238        self.sender.subscribe()
239    }
240}
241
242impl Default for VmStreamCancel {
243    fn default() -> Self {
244        Self::new()
245    }
246}
247
248impl std::fmt::Debug for VmStreamCancel {
249    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
250        f.debug_struct("VmStreamCancel")
251            .field("cancelled", &*self.sender.borrow())
252            .finish()
253    }
254}
255
256impl VmStream {
257    pub(crate) fn cancel(&self) {
258        if let Some(cancel) = &self.cancel {
259            cancel.cancel();
260        }
261    }
262}