Skip to main content

algocline_engine/execution/
record.rs

1//! `SessionRecord` — per-session ownership bundle for the v2 execution path.
2//!
3//! Holds the shared state, the progress broadcast sender, the cancellation token,
4//! the background task handle, and the per-query oneshot senders used to resume
5//! a paused Lua coroutine.
6//!
7//! `SessionRecord` is intentionally free of any MCP / rmcp concepts (Crux R1).
8//! All cancellation uses [`tokio_util::sync::CancellationToken`]; no
9//! `JoinHandle::abort()` path exists (Crux R2).
10
11use std::collections::HashMap;
12use std::sync::atomic::AtomicI64;
13use std::sync::Arc;
14
15use algocline_core::execution::{CancelInfo, ExecutionState};
16use algocline_core::{ExecutionMetrics, QueryId};
17use tokio::sync::broadcast;
18use tokio::sync::Mutex;
19use tokio::task::JoinHandle;
20use tokio_util::sync::CancellationToken;
21
22use algocline_core::execution::ProgressEvent;
23
24// ---------------------------------------------------------------------------
25// SessionRecord
26// ---------------------------------------------------------------------------
27
28/// Type alias for the shared resp_txs map.
29///
30/// `Arc<Mutex<...>>` so the driver_loop and the registry can share the same map
31/// without a circular ownership relationship.
32pub(crate) type RespTxsMap =
33    Arc<Mutex<HashMap<QueryId, tokio::sync::oneshot::Sender<Result<String, String>>>>>;
34
35/// Ownership bundle for a single v2 execution session.
36///
37/// Created by [`super::registry::SessionRegistryV2::spawn_v2`] and kept alive
38/// inside the registry's `HashMap` until the session reaches a terminal state
39/// (or is explicitly removed by GC).
40///
41/// Drop order:
42/// 1. `join_handle` — task has already completed (normal path) or panicked.
43/// 2. `bus_tx` — triggers `RecvError::Closed` on every open receiver.
44/// 3. `cancel_token` — releasing any child tokens.
45pub struct SessionRecord {
46    /// Shared execution state (v2), protected by a per-session `Mutex`.
47    ///
48    /// The driver loop is the only writer; readers use `clone-then-release`
49    /// to avoid holding the lock across `.await` points (Crux K-4).
50    pub(crate) state: Arc<Mutex<ExecutionState>>,
51
52    /// Broadcast sender for [`ProgressEvent`]s.
53    ///
54    /// Capacity 256 (design-v1.md §5.1).  When this field is dropped, every
55    /// open `broadcast::Receiver` observes `RecvError::Closed` — signalling
56    /// session termination (Crux R3 / design-v1.md §5.6).
57    ///
58    /// **Crux R3 (sink-free)**: `send()` is called with `let _ = ...` at every
59    /// site in `driver_loop` (`driver.rs`); when 0 receivers are subscribed,
60    /// `send` returns `Err(SendError)` and the event is silently dropped
61    /// without affecting the caller's control flow.  No sentinel receiver is
62    /// held — the contract is "caller is not crashed by 0 observers", not
63    /// "`send` always returns `Ok`".  See `bus_tx_does_not_crash_caller_with_zero_observers`.
64    pub(crate) bus_tx: broadcast::Sender<ProgressEvent>,
65
66    /// Wall-clock timestamp (Unix ms) of the last driver activity.
67    ///
68    /// Updated by `driver_loop` at entry (session-start wall-clock),
69    /// using `Ordering::Relaxed` (single writer = driver_loop, single reader = GC).
70    /// Matches the `Session.last_activity_ms: Arc<AtomicI64>` pattern in the
71    /// legacy `session.rs` (L297, L419) — legacy parity (Crux #3).
72    ///
73    /// The GC task reads this atomically while holding the `sessions` write lock to
74    /// determine whether a session has been idle long enough to be evicted.
75    pub(crate) last_active: Arc<AtomicI64>,
76
77    /// Cooperative cancellation token.
78    ///
79    /// Calling `.cancel()` sets the internal flag; the driver loop observes it
80    /// at exactly four checkpoints (A/B/C/D) and transitions to `Cancelled`.
81    /// `.abort()` on the join handle is never called (Crux R2).
82    pub(crate) cancel_token: CancellationToken,
83
84    /// Handle to the background driver task.
85    ///
86    /// Wrapped in `Mutex<Option<...>>` so `await_terminal` can take ownership
87    /// and `.await` on the handle directly (single-awaiter semantics) without
88    /// busy-polling the `state` mutex.  Subsequent awaiters observe `None` and
89    /// fall through to a direct state read (the driver_loop has either already
90    /// completed or is about to complete — `await_terminal` returns the
91    /// resulting terminal state, or `AwaitError::Joined` in the rare concurrent
92    /// race case).
93    ///
94    /// Never `.abort()`-ed — cancellation uses `cancel_token` only (Crux R2).
95    pub(crate) join_handle: Mutex<Option<JoinHandle<()>>>,
96
97    /// Per-query oneshot senders to wake the paused Lua coroutine.
98    ///
99    /// Shared via `Arc<Mutex<...>>` between this record and the driver_loop task
100    /// so that `resume()` can deliver responses into the same map the driver reads.
101    ///
102    /// Populated when the driver publishes `PauseRequested`; consumed by
103    /// `resume()`.
104    pub(crate) resp_txs: RespTxsMap,
105
106    /// Stores the first `CancelInfo` observed for idempotent cancel (Crux R2).
107    ///
108    /// `Some` once `cancel()` has been called; subsequent calls return `Ok(())`
109    /// without overwriting this entry.
110    pub(crate) first_cancel_info: Mutex<Option<CancelInfo>>,
111
112    /// Shared token-usage accumulator for this session.
113    ///
114    /// Cloned from the same `Arc` as `DriverContext.metrics` so that
115    /// `registry.resume` can call `on_response_fed` against the same
116    /// `SessionStatus` that `driver_loop` reads via `usage_aggregate()` at Done.
117    /// `Arc` drop is order-independent.
118    ///
119    /// Read path added in ST2 (`registry.resume` → `on_response_fed`).
120    pub(crate) metrics: Arc<ExecutionMetrics>,
121}
122
123impl SessionRecord {
124    /// Create a new `SessionRecord`.
125    ///
126    /// `bus_capacity` is the broadcast channel buffer size (typically 256).
127    #[cfg(test)]
128    pub(crate) fn new(
129        state: Arc<Mutex<ExecutionState>>,
130        bus_capacity: usize,
131        cancel_token: CancellationToken,
132        join_handle: JoinHandle<()>,
133        resp_txs: RespTxsMap,
134        last_active: Arc<AtomicI64>,
135        metrics: Arc<ExecutionMetrics>,
136    ) -> Self {
137        let (bus_tx, _) = broadcast::channel(bus_capacity);
138        Self {
139            state,
140            bus_tx,
141            last_active,
142            cancel_token,
143            join_handle: Mutex::new(Some(join_handle)),
144            resp_txs,
145            first_cancel_info: Mutex::new(None),
146            metrics,
147        }
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154    use algocline_core::execution::ExecutionState;
155    use std::sync::atomic::AtomicI64;
156    use tokio::task;
157
158    #[tokio::test]
159    async fn record_created_with_running_state() {
160        let state = Arc::new(Mutex::new(ExecutionState::Running));
161        let cancel_token = CancellationToken::new();
162        let handle = task::spawn(async {});
163        let resp_txs: RespTxsMap = Arc::new(Mutex::new(HashMap::new()));
164        let last_active = Arc::new(AtomicI64::new(0));
165        let record = SessionRecord::new(
166            state.clone(),
167            256,
168            cancel_token,
169            handle,
170            resp_txs,
171            last_active,
172            Arc::new(algocline_core::ExecutionMetrics::new()),
173        );
174
175        let guard = record.state.lock().await;
176        assert!(matches!(*guard, ExecutionState::Running));
177    }
178
179    #[tokio::test]
180    async fn bus_tx_does_not_crash_caller_with_zero_observers() {
181        // Crux R3 (sink-free): the contract is "caller is not crashed by 0
182        // observers".  After removing the sentinel receiver, `bus_tx.send`
183        // returns `Err(SendError)` when no receivers are subscribed — that
184        // error must be ignorable (the event is dropped, but the caller's
185        // control flow is intact).  The production sites in `driver_loop`
186        // all use `let _ = bus_tx.send(...)` to enact this invariant.
187        use algocline_core::execution::{ExecutionStateTag, ProgressEvent};
188
189        let state = Arc::new(Mutex::new(ExecutionState::Running));
190        let cancel_token = CancellationToken::new();
191        let handle = task::spawn(async {});
192        let resp_txs: RespTxsMap = Arc::new(Mutex::new(HashMap::new()));
193        let last_active = Arc::new(AtomicI64::new(0));
194        let record = SessionRecord::new(
195            state,
196            256,
197            cancel_token,
198            handle,
199            resp_txs,
200            last_active,
201            Arc::new(algocline_core::ExecutionMetrics::new()),
202        );
203
204        let event = ProgressEvent::StateTransition {
205            from: ExecutionStateTag::Running,
206            to: ExecutionStateTag::Done,
207            at: 0,
208        };
209        // 0 receivers → Err(SendError).  Production code drops the result via
210        // `let _ = ...`; assert that the call itself does not panic.
211        let _ = record.bus_tx.send(event);
212    }
213}