Skip to main content

algocline_engine/execution/
record.rs

1//! `SessionRecord` — per-session ownership bundle for the v2 execution path.
2//!
3//! Holds the shared state, the progress broadcast sender, the cancellation token,
4//! the background task handle, and the per-query oneshot senders used to resume
5//! a paused Lua coroutine.
6//!
7//! `SessionRecord` is intentionally free of any MCP / rmcp concepts (Crux R1).
8//! All cancellation uses [`tokio_util::sync::CancellationToken`]; no
9//! `JoinHandle::abort()` path exists (Crux R2).
10
11use std::collections::HashMap;
12use std::sync::Arc;
13
14use algocline_core::execution::{CancelInfo, ExecutionState};
15use algocline_core::QueryId;
16use tokio::sync::broadcast;
17use tokio::sync::Mutex;
18use tokio::task::JoinHandle;
19use tokio_util::sync::CancellationToken;
20
21use algocline_core::execution::ProgressEvent;
22
23// ---------------------------------------------------------------------------
24// SessionRecord
25// ---------------------------------------------------------------------------
26
27/// Type alias for the shared resp_txs map.
28///
29/// `Arc<Mutex<...>>` so the driver_loop and the registry can share the same map
30/// without a circular ownership relationship.
31pub(crate) type RespTxsMap =
32    Arc<Mutex<HashMap<QueryId, tokio::sync::oneshot::Sender<Result<String, String>>>>>;
33
34/// Ownership bundle for a single v2 execution session.
35///
36/// Created by [`super::registry::SessionRegistryV2::spawn_v2`] and kept alive
37/// inside the registry's `HashMap` until the session reaches a terminal state
38/// (or is explicitly removed by GC).
39///
40/// Drop order:
41/// 1. `join_handle` — task has already completed (normal path) or panicked.
42/// 2. `bus_tx` — triggers `RecvError::Closed` on every open receiver.
43/// 3. `cancel_token` — releasing any child tokens.
44pub struct SessionRecord {
45    /// Shared execution state (v2), protected by a per-session `Mutex`.
46    ///
47    /// The driver loop is the only writer; readers use `clone-then-release`
48    /// to avoid holding the lock across `.await` points (Crux K-4).
49    pub(crate) state: Arc<Mutex<ExecutionState>>,
50
51    /// Broadcast sender for [`ProgressEvent`]s.
52    ///
53    /// Capacity 256 (design-v1.md §5.1).  When this field is dropped, every
54    /// open `broadcast::Receiver` observes `RecvError::Closed` — signalling
55    /// session termination (Crux R3 / design-v1.md §5.6).
56    ///
57    /// **Crux R3 (sink-free)**: `send()` is called with `let _ = ...` at every
58    /// site in `driver_loop` (`driver.rs`); when 0 receivers are subscribed,
59    /// `send` returns `Err(SendError)` and the event is silently dropped
60    /// without affecting the caller's control flow.  No sentinel receiver is
61    /// held — the contract is "caller is not crashed by 0 observers", not
62    /// "`send` always returns `Ok`".  See `bus_tx_does_not_crash_caller_with_zero_observers`.
63    pub(crate) bus_tx: broadcast::Sender<ProgressEvent>,
64
65    /// Cooperative cancellation token.
66    ///
67    /// Calling `.cancel()` sets the internal flag; the driver loop observes it
68    /// at exactly four checkpoints (A/B/C/D) and transitions to `Cancelled`.
69    /// `.abort()` on the join handle is never called (Crux R2).
70    pub(crate) cancel_token: CancellationToken,
71
72    /// Handle to the background driver task.
73    ///
74    /// Wrapped in `Mutex<Option<...>>` so `await_terminal` can take ownership
75    /// and `.await` on the handle directly (single-awaiter semantics) without
76    /// busy-polling the `state` mutex.  Subsequent awaiters observe `None` and
77    /// fall through to a direct state read (the driver_loop has either already
78    /// completed or is about to complete — `await_terminal` returns the
79    /// resulting terminal state, or `AwaitError::Joined` in the rare concurrent
80    /// race case).
81    ///
82    /// Never `.abort()`-ed — cancellation uses `cancel_token` only (Crux R2).
83    pub(crate) join_handle: Mutex<Option<JoinHandle<()>>>,
84
85    /// Per-query oneshot senders to wake the paused Lua coroutine.
86    ///
87    /// Shared via `Arc<Mutex<...>>` between this record and the driver_loop task
88    /// so that `resume()` can deliver responses into the same map the driver reads.
89    ///
90    /// Populated when the driver publishes `PauseRequested`; consumed by
91    /// `resume()`.
92    pub(crate) resp_txs: RespTxsMap,
93
94    /// Stores the first `CancelInfo` observed for idempotent cancel (Crux R2).
95    ///
96    /// `Some` once `cancel()` has been called; subsequent calls return `Ok(())`
97    /// without overwriting this entry.
98    pub(crate) first_cancel_info: Mutex<Option<CancelInfo>>,
99}
100
101impl SessionRecord {
102    /// Create a new `SessionRecord`.
103    ///
104    /// `bus_capacity` is the broadcast channel buffer size (typically 256).
105    #[cfg(test)]
106    pub(crate) fn new(
107        state: Arc<Mutex<ExecutionState>>,
108        bus_capacity: usize,
109        cancel_token: CancellationToken,
110        join_handle: JoinHandle<()>,
111        resp_txs: RespTxsMap,
112    ) -> Self {
113        let (bus_tx, _) = broadcast::channel(bus_capacity);
114        Self {
115            state,
116            bus_tx,
117            cancel_token,
118            join_handle: Mutex::new(Some(join_handle)),
119            resp_txs,
120            first_cancel_info: Mutex::new(None),
121        }
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use algocline_core::execution::ExecutionState;
129    use tokio::task;
130
131    #[tokio::test]
132    async fn record_created_with_running_state() {
133        let state = Arc::new(Mutex::new(ExecutionState::Running));
134        let cancel_token = CancellationToken::new();
135        let handle = task::spawn(async {});
136        let resp_txs: RespTxsMap = Arc::new(Mutex::new(HashMap::new()));
137        let record = SessionRecord::new(state.clone(), 256, cancel_token, handle, resp_txs);
138
139        let guard = record.state.lock().await;
140        assert!(matches!(*guard, ExecutionState::Running));
141    }
142
143    #[tokio::test]
144    async fn bus_tx_does_not_crash_caller_with_zero_observers() {
145        // Crux R3 (sink-free): the contract is "caller is not crashed by 0
146        // observers".  After removing the sentinel receiver, `bus_tx.send`
147        // returns `Err(SendError)` when no receivers are subscribed — that
148        // error must be ignorable (the event is dropped, but the caller's
149        // control flow is intact).  The production sites in `driver_loop`
150        // all use `let _ = bus_tx.send(...)` to enact this invariant.
151        use algocline_core::execution::{ExecutionStateTag, ProgressEvent};
152
153        let state = Arc::new(Mutex::new(ExecutionState::Running));
154        let cancel_token = CancellationToken::new();
155        let handle = task::spawn(async {});
156        let resp_txs: RespTxsMap = Arc::new(Mutex::new(HashMap::new()));
157        let record = SessionRecord::new(state, 256, cancel_token, handle, resp_txs);
158
159        let event = ProgressEvent::StateTransition {
160            from: ExecutionStateTag::Running,
161            to: ExecutionStateTag::Done,
162            at: 0,
163        };
164        // 0 receivers → Err(SendError).  Production code drops the result via
165        // `let _ = ...`; assert that the call itself does not panic.
166        let _ = record.bus_tx.send(event);
167    }
168}