Skip to main content

harn_vm/value/
handles.rs

1use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
2use std::sync::Arc;
3
4use parking_lot::Mutex;
5
6use super::{VmError, VmValue};
7
8/// The raw join handle type for spawned tasks.
9pub type VmJoinHandle = tokio::task::JoinHandle<Result<(VmValue, String), VmError>>;
10
11/// A spawned async task handle with cancellation support.
12pub struct VmTaskHandle {
13    pub handle: VmJoinHandle,
14    /// Cooperative cancellation token. Set to true to request graceful shutdown.
15    pub cancel_token: Arc<AtomicBool>,
16    /// Runtime-context task id used by the VM scheduler and wait-for graph.
17    pub wait_task_id: String,
18}
19
20/// A channel handle for the VM (uses tokio mpsc).
21#[derive(Debug, Clone)]
22pub struct VmChannelHandle {
23    pub name: Arc<str>,
24    pub sender: Arc<tokio::sync::mpsc::Sender<VmValue>>,
25    pub receiver: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<VmValue>>>,
26    pub closed: Arc<AtomicBool>,
27}
28
29/// An atomic integer handle for the VM.
30#[derive(Debug, Clone)]
31pub struct VmAtomicHandle {
32    pub value: Arc<AtomicI64>,
33}
34
35/// A reproducible random number generator handle.
36#[derive(Clone)]
37pub struct VmRngHandle {
38    pub rng: Arc<Mutex<rand::rngs::StdRng>>,
39}
40
41impl std::fmt::Debug for VmRngHandle {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        f.write_str("VmRngHandle { .. }")
44    }
45}
46
47/// A held synchronization permit for mutex/semaphore/gate primitives.
48#[derive(Debug, Clone)]
49pub struct VmSyncPermitHandle {
50    pub(crate) lease: Arc<crate::synchronization::VmSyncLease>,
51}
52
53impl VmSyncPermitHandle {
54    pub(crate) fn release(&self) -> bool {
55        self.lease.release()
56    }
57
58    pub(crate) fn kind(&self) -> &str {
59        self.lease.kind()
60    }
61
62    pub(crate) fn key(&self) -> &str {
63        self.lease.key()
64    }
65
66    pub(crate) fn permits(&self) -> u32 {
67        self.lease.permits()
68    }
69}
70
71/// A lazy integer range — Python-style. Stores only `(start, end, inclusive)`
72/// so the in-memory footprint is O(1) regardless of the range's length.
73/// `len()`, indexing (`r[k]`), `.contains(x)`, `.first()`, `.last()` are all
74/// O(1); direct iteration walks step-by-step without materializing a list.
75///
76/// Empty-range convention (Python-consistent):
77/// - Inclusive empty when `start > end`.
78/// - Exclusive empty when `start >= end`.
79///
80/// Negative / reversed ranges are NOT supported in v1: `5 to 1` is simply
81/// empty. Authors who want reverse iteration should call `.to_list().reverse()`.
82#[derive(Debug, Clone, Copy)]
83pub struct VmRange {
84    pub start: i64,
85    pub end: i64,
86    pub inclusive: bool,
87}
88
89impl VmRange {
90    /// Number of elements this range yields.
91    ///
92    /// Uses saturating arithmetic so that pathological ranges near
93    /// `i64::MAX`/`i64::MIN` do not panic on overflow. Because a range's
94    /// element count must fit in `i64` the returned length saturates at
95    /// `i64::MAX` for ranges whose width exceeds that (e.g. `i64::MIN to
96    /// i64::MAX` inclusive). Callers that later narrow to `usize` for
97    /// allocation should still guard against huge lengths — see
98    /// `to_vec` / `get` for the indexable-range invariants.
99    pub fn len(&self) -> i64 {
100        if self.inclusive {
101            if self.start > self.end {
102                0
103            } else {
104                self.end.saturating_sub(self.start).saturating_add(1)
105            }
106        } else if self.start >= self.end {
107            0
108        } else {
109            self.end.saturating_sub(self.start)
110        }
111    }
112
113    pub fn is_empty(&self) -> bool {
114        self.len() == 0
115    }
116
117    /// Element at the given 0-based index, bounds-checked.
118    /// Returns `None` when out of bounds or when `start + idx` would
119    /// overflow (which can only happen when `len()` saturated).
120    pub fn get(&self, idx: i64) -> Option<i64> {
121        if idx < 0 || idx >= self.len() {
122            None
123        } else {
124            self.start.checked_add(idx)
125        }
126    }
127
128    /// First element or `None` when empty.
129    pub fn first(&self) -> Option<i64> {
130        if self.is_empty() {
131            None
132        } else {
133            Some(self.start)
134        }
135    }
136
137    /// Last element or `None` when empty.
138    pub fn last(&self) -> Option<i64> {
139        if self.is_empty() {
140            None
141        } else if self.inclusive {
142            Some(self.end)
143        } else {
144            Some(self.end - 1)
145        }
146    }
147
148    /// Whether `v` falls inside the range (O(1)).
149    pub fn contains(&self, v: i64) -> bool {
150        if self.is_empty() {
151            return false;
152        }
153        if self.inclusive {
154            v >= self.start && v <= self.end
155        } else {
156            v >= self.start && v < self.end
157        }
158    }
159
160    /// Materialize to a `Vec<VmValue>` — the explicit escape hatch.
161    ///
162    /// Uses `checked_add` on the per-element index so a range near
163    /// `i64::MAX` stops at the representable bound instead of panicking.
164    /// Callers should still treat a very long range as unwise to
165    /// materialize (the whole point of `VmRange` is to avoid this).
166    pub fn to_vec(&self) -> Vec<VmValue> {
167        let len = self.len();
168        if len <= 0 {
169            return Vec::new();
170        }
171        let cap = len as usize;
172        let mut out = Vec::with_capacity(cap);
173        for i in 0..len {
174            match self.start.checked_add(i) {
175                Some(v) => out.push(VmValue::Int(v)),
176                None => break,
177            }
178        }
179        out
180    }
181}
182
183/// A generator object: lazily produces values via yield.
184/// The generator body runs as a spawned task that sends values through a channel.
185#[derive(Debug, Clone)]
186pub struct VmGenerator {
187    /// Whether the generator has finished (returned or exhausted).
188    pub done: Arc<AtomicBool>,
189    /// Receiver end of the yield channel (generator sends values here).
190    /// Wrapped in a shared async mutex so recv() can be called without holding
191    /// a synchronous iterator-state lock across await points.
192    pub receiver: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Result<VmValue, VmError>>>>,
193}
194
195impl VmGenerator {
196    pub(crate) fn is_done(&self) -> bool {
197        self.done.load(Ordering::Relaxed)
198    }
199
200    pub(crate) fn mark_done(&self) {
201        self.done.store(true, Ordering::Relaxed);
202    }
203}
204
205/// A stream object: lazily produces values from a `gen fn`.
206#[derive(Debug, Clone)]
207pub struct VmStream {
208    /// Whether the stream has finished (returned, thrown, or exhausted).
209    pub done: Arc<AtomicBool>,
210    /// Receiver end of the stream channel.
211    pub receiver: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Result<VmValue, VmError>>>>,
212    /// Optional cancellation hook for host-backed streams.
213    pub cancel: Option<VmStreamCancel>,
214}
215
216impl VmStream {
217    pub(crate) fn is_done(&self) -> bool {
218        self.done.load(Ordering::Relaxed)
219    }
220
221    pub(crate) fn mark_done(&self) {
222        self.done.store(true, Ordering::Relaxed);
223    }
224}
225
226#[derive(Clone)]
227pub struct VmStreamCancel {
228    sender: Arc<tokio::sync::watch::Sender<bool>>,
229}
230
231impl VmStreamCancel {
232    pub fn new() -> Self {
233        let (sender, _receiver) = tokio::sync::watch::channel(false);
234        Self {
235            sender: Arc::new(sender),
236        }
237    }
238
239    pub fn cancel(&self) {
240        let _ = self.sender.send(true);
241    }
242
243    pub fn subscribe(&self) -> tokio::sync::watch::Receiver<bool> {
244        self.sender.subscribe()
245    }
246}
247
248impl Default for VmStreamCancel {
249    fn default() -> Self {
250        Self::new()
251    }
252}
253
254impl std::fmt::Debug for VmStreamCancel {
255    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256        f.debug_struct("VmStreamCancel")
257            .field("cancelled", &*self.sender.borrow())
258            .finish()
259    }
260}
261
262impl VmStream {
263    pub(crate) fn cancel(&self) {
264        if let Some(cancel) = &self.cancel {
265            cancel.cancel();
266        }
267    }
268}