meerkat_runtime/handles/session_context.rs
1//! Runtime impl of [`meerkat_core::handles::SessionContextHandle`] (W2-E).
2//!
3//! Routes every canonical session-truth mutation into the session's
4//! MeerkatMachine DSL via the `AdvanceSessionContext` input. The
5//! transition is monotonically guarded at the DSL layer, so callers fire
6//! unconditionally post-mutation and the DSL drops duplicate or
7//! out-of-order ticks.
8//!
9//! Every successful transition emits `SessionContextAdvanced`; the
10//! handle scans effects and dispatches to the installed
11//! [`meerkat_core::handles::SessionContextAdvancedObserver`]. The
12//! realtime projection consumer uses this observer to drive its typed
13//! `ProjectionFreshness` state — replacing the hand-wired
14//! `projection_refresh_rx` polling channel + `projection_refresh_dirty`
15//! flag.
16
17use std::sync::{Arc, RwLock, Weak};
18
19use meerkat_core::handles::{
20 DslTransitionError, SessionContextAdvancedObserver, SessionContextHandle,
21};
22
23use super::HandleDslAuthority;
24use crate::meerkat_machine::dsl as mm_dsl;
25
26/// Runtime-backed [`SessionContextHandle`] impl.
27///
28/// Mirrors the pattern used by [`super::RuntimePeerInteractionHandle`]:
29/// the observer is held as a `Weak` so this handle does not keep the
30/// realtime projection consumer alive past its socket's lifetime.
31pub struct RuntimeSessionContextHandle {
32 dsl: Arc<HandleDslAuthority>,
33 observer: RwLock<Option<Weak<dyn SessionContextAdvancedObserver>>>,
34}
35
36impl std::fmt::Debug for RuntimeSessionContextHandle {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 let observer_tag = self
39 .observer
40 .read()
41 .ok()
42 .as_deref()
43 .and_then(|o| o.as_ref().map(|_| "<observer>"));
44 f.debug_struct("RuntimeSessionContextHandle")
45 .field("dsl", &self.dsl)
46 .field("observer", &observer_tag)
47 .finish()
48 }
49}
50
51impl RuntimeSessionContextHandle {
52 /// Construct a handle backed by the session's shared DSL authority.
53 pub fn new(dsl: Arc<HandleDslAuthority>) -> Self {
54 Self {
55 dsl,
56 observer: RwLock::new(None),
57 }
58 }
59
60 /// Construct a handle backed by an ephemeral DSL authority (tests /
61 /// legacy recovery paths).
62 pub fn ephemeral() -> Self {
63 Self::new(Arc::new(HandleDslAuthority::ephemeral()))
64 }
65}
66
67impl SessionContextHandle for RuntimeSessionContextHandle {
68 fn context_advanced(&self, updated_at_ms: u64) -> Result<bool, DslTransitionError> {
69 // Sample the observer slot and collect emissions UNDER the DSL
70 // authority lock, then dispatch the observer callback OUTSIDE
71 // the lock.
72 //
73 // The observer must fire post-lock because
74 // `BridgeProjectionToProductTurn::on_session_context_advanced`
75 // re-enters the same authority via `projection_advance_observed`
76 // and the mutex is non-reentrant.
77 //
78 // The atomicity that closes the race PR #286 tried to close
79 // sits at the sample step, not the dispatch step: sampling the
80 // observer slot inside the same DSL critical section that
81 // committed the transition means a concurrent
82 // `install_observer_with_baseline` (which writes the slot under
83 // the same DSL lock via `with_state_lock`) is strictly ordered
84 // relative to this sample. Either the installer ran before the
85 // sample (the sample returns the new observer — correct, the
86 // transition happened inside the observer's lifetime), or it
87 // ran after the sample (the installer's baseline read reflects
88 // this transition's committed state, so the new observer sees
89 // no fire it is owed). The previous implementation released
90 // the lock before reading the slot, allowing an interleaved
91 // install to see the fire of a transition whose effect its
92 // baseline had already captured.
93 let sampled = self.dsl.apply_input_with_effects_and_sample(
94 mm_dsl::MeerkatMachineInput::AdvanceSessionContext { updated_at_ms },
95 "SessionContextHandle::context_advanced",
96 |effects| {
97 let observer_opt = self
98 .observer
99 .read()
100 .unwrap_or_else(std::sync::PoisonError::into_inner)
101 .as_ref()
102 .and_then(Weak::upgrade);
103 let emissions: Vec<u64> = effects
104 .iter()
105 .filter_map(|effect| match effect {
106 mm_dsl::MeerkatMachineEffect::SessionContextAdvanced {
107 updated_at_ms: m,
108 } => Some(*m),
109 _ => None,
110 })
111 .collect();
112 (observer_opt, emissions)
113 },
114 );
115 let (observer_opt, emissions) = match sampled {
116 Ok(pair) => pair,
117 // The monotonic guard surfaces as a typed `GuardRejected` —
118 // treat as `Ok(false)` so callers can fire unconditionally
119 // without tracking their own watermark. Any other rejection
120 // (e.g., `NoMatchingTransition` from a mis-phased call) is a
121 // real error and propagates.
122 Err(err) if err.is_guard_rejected() => return Ok(false),
123 Err(err) => return Err(err),
124 };
125 if let Some(observer) = observer_opt {
126 for emitted in emissions {
127 observer.on_session_context_advanced(emitted);
128 }
129 }
130 Ok(true)
131 }
132
133 fn current_watermark_ms(&self) -> u64 {
134 self.dsl.snapshot_state().last_session_context_updated_at_ms
135 }
136
137 fn install_observer(&self, observer: Arc<dyn SessionContextAdvancedObserver>) {
138 *self
139 .observer
140 .write()
141 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Arc::downgrade(&observer));
142 }
143
144 fn install_observer_with_baseline(
145 &self,
146 observer: Arc<dyn SessionContextAdvancedObserver>,
147 ) -> u64 {
148 // Atomic critical section: hold the DSL authority lock while
149 // installing the observer. Any `context_advanced` call that runs
150 // concurrently either (a) completes before this function acquires
151 // the DSL lock — its advance is recorded in the returned baseline
152 // and its observer-notify is dropped because no observer is
153 // installed yet, or (b) blocks until we release the DSL lock —
154 // by then the observer is installed, so the next `context_advanced`
155 // notify lands. In both cases the consumer's baseline and the
156 // observer's effect stream agree on the frontier.
157 //
158 // Locking order matches `apply_input_with_effects` (DSL first,
159 // observer second) so this critical section cannot deadlock with
160 // a concurrent transition.
161 self.dsl.with_state_lock(|state| {
162 *self
163 .observer
164 .write()
165 .unwrap_or_else(std::sync::PoisonError::into_inner) =
166 Some(Arc::downgrade(&observer));
167 state.last_session_context_updated_at_ms
168 })
169 }
170}