Skip to main content

meerkat_runtime/handles/
session_context.rs

1//! Runtime impl of [`meerkat_core::handles::SessionContextHandle`] (W2-E).
2//!
3//! Routes every canonical session-truth mutation into the session's
4//! MeerkatMachine DSL via the `AdvanceSessionContext` input. The
5//! transition is monotonically guarded at the DSL layer, so callers fire
6//! unconditionally post-mutation and the DSL drops duplicate or
7//! out-of-order ticks.
8//!
9//! Every successful transition emits `SessionContextAdvanced`; the
10//! handle scans effects and dispatches to the installed
11//! [`meerkat_core::handles::SessionContextAdvancedObserver`]. The
12//! realtime projection consumer uses this observer to drive its typed
13//! `ProjectionFreshness` state — replacing the hand-wired
14//! `projection_refresh_rx` polling channel + `projection_refresh_dirty`
15//! flag.
16
17use std::sync::{Arc, RwLock, Weak};
18
19use meerkat_core::handles::{
20    DslTransitionError, SessionContextAdvancedObserver, SessionContextHandle,
21};
22
23use super::HandleDslAuthority;
24use crate::meerkat_machine::dsl as mm_dsl;
25
26/// Runtime-backed [`SessionContextHandle`] impl.
27///
28/// Mirrors the pattern used by [`super::RuntimePeerInteractionHandle`]:
29/// the observer is held as a `Weak` so this handle does not keep the
30/// realtime projection consumer alive past its socket's lifetime.
31pub struct RuntimeSessionContextHandle {
32    dsl: Arc<HandleDslAuthority>,
33    observer: RwLock<Option<Weak<dyn SessionContextAdvancedObserver>>>,
34}
35
36impl std::fmt::Debug for RuntimeSessionContextHandle {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        let observer_tag = self
39            .observer
40            .read()
41            .ok()
42            .as_deref()
43            .and_then(|o| o.as_ref().map(|_| "<observer>"));
44        f.debug_struct("RuntimeSessionContextHandle")
45            .field("dsl", &self.dsl)
46            .field("observer", &observer_tag)
47            .finish()
48    }
49}
50
51impl RuntimeSessionContextHandle {
52    /// Construct a handle backed by the session's shared DSL authority.
53    pub fn new(dsl: Arc<HandleDslAuthority>) -> Self {
54        Self {
55            dsl,
56            observer: RwLock::new(None),
57        }
58    }
59
60    /// Construct a handle backed by an ephemeral DSL authority (tests /
61    /// legacy recovery paths).
62    pub fn ephemeral() -> Self {
63        Self::new(Arc::new(HandleDslAuthority::ephemeral()))
64    }
65}
66
67impl SessionContextHandle for RuntimeSessionContextHandle {
68    fn context_advanced(&self, updated_at_ms: u64) -> Result<bool, DslTransitionError> {
69        // Sample the observer slot and collect emissions UNDER the DSL
70        // authority lock, then dispatch the observer callback OUTSIDE
71        // the lock.
72        //
73        // The observer must fire post-lock because
74        // `BridgeProjectionToProductTurn::on_session_context_advanced`
75        // re-enters the same authority via `projection_advance_observed`
76        // and the mutex is non-reentrant.
77        //
78        // The atomicity that closes the race PR #286 tried to close
79        // sits at the sample step, not the dispatch step: sampling the
80        // observer slot inside the same DSL critical section that
81        // committed the transition means a concurrent
82        // `install_observer_with_baseline` (which writes the slot under
83        // the same DSL lock via `with_state_lock`) is strictly ordered
84        // relative to this sample. Either the installer ran before the
85        // sample (the sample returns the new observer — correct, the
86        // transition happened inside the observer's lifetime), or it
87        // ran after the sample (the installer's baseline read reflects
88        // this transition's committed state, so the new observer sees
89        // no fire it is owed). The previous implementation released
90        // the lock before reading the slot, allowing an interleaved
91        // install to see the fire of a transition whose effect its
92        // baseline had already captured.
93        let sampled = self.dsl.apply_input_with_effects_and_sample(
94            mm_dsl::MeerkatMachineInput::AdvanceSessionContext { updated_at_ms },
95            "SessionContextHandle::context_advanced",
96            |effects| {
97                let observer_opt = self
98                    .observer
99                    .read()
100                    .unwrap_or_else(std::sync::PoisonError::into_inner)
101                    .as_ref()
102                    .and_then(Weak::upgrade);
103                let emissions: Vec<u64> = effects
104                    .iter()
105                    .filter_map(|effect| match effect {
106                        mm_dsl::MeerkatMachineEffect::SessionContextAdvanced {
107                            updated_at_ms: m,
108                        } => Some(*m),
109                        _ => None,
110                    })
111                    .collect();
112                (observer_opt, emissions)
113            },
114        );
115        let (observer_opt, emissions) = match sampled {
116            Ok(pair) => pair,
117            // The monotonic guard surfaces as a typed `GuardRejected` —
118            // treat as `Ok(false)` so callers can fire unconditionally
119            // without tracking their own watermark. Any other rejection
120            // (e.g., `NoMatchingTransition` from a mis-phased call) is a
121            // real error and propagates.
122            Err(err) if err.is_guard_rejected() => return Ok(false),
123            Err(err) => return Err(err),
124        };
125        if let Some(observer) = observer_opt {
126            for emitted in emissions {
127                observer.on_session_context_advanced(emitted);
128            }
129        }
130        Ok(true)
131    }
132
133    fn current_watermark_ms(&self) -> u64 {
134        self.dsl.snapshot_state().last_session_context_updated_at_ms
135    }
136
137    fn install_observer(&self, observer: Arc<dyn SessionContextAdvancedObserver>) {
138        *self
139            .observer
140            .write()
141            .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Arc::downgrade(&observer));
142    }
143
144    fn install_observer_with_baseline(
145        &self,
146        observer: Arc<dyn SessionContextAdvancedObserver>,
147    ) -> u64 {
148        // Atomic critical section: hold the DSL authority lock while
149        // installing the observer. Any `context_advanced` call that runs
150        // concurrently either (a) completes before this function acquires
151        // the DSL lock — its advance is recorded in the returned baseline
152        // and its observer-notify is dropped because no observer is
153        // installed yet, or (b) blocks until we release the DSL lock —
154        // by then the observer is installed, so the next `context_advanced`
155        // notify lands. In both cases the consumer's baseline and the
156        // observer's effect stream agree on the frontier.
157        //
158        // Locking order matches `apply_input_with_effects` (DSL first,
159        // observer second) so this critical section cannot deadlock with
160        // a concurrent transition.
161        self.dsl.with_state_lock(|state| {
162            *self
163                .observer
164                .write()
165                .unwrap_or_else(std::sync::PoisonError::into_inner) =
166                Some(Arc::downgrade(&observer));
167            state.last_session_context_updated_at_ms
168        })
169    }
170}