Skip to main content

meerkat_runtime/handles/
realtime_product_turn.rs

1//! Runtime impl of [`meerkat_core::handles::RealtimeProductTurnHandle`]
2//! (U9 / dogma #4, dogma round 2 U-C / dogma #1, #3, #13, #18, #20).
3//!
4//! Routes every realtime product-turn lifecycle observation into the
5//! session's MeerkatMachine DSL. Replaces the shell-local boolean triple
6//! (`product_turn_in_flight`, `product_turn_committed`,
7//! `product_output_started`) + helper-local event matching that used to
8//! live in `meerkat-rpc::realtime_ws`.
9//!
10//! Dogma round 2 (U-C) additionally routes the realtime projection
11//! freshness (`RealtimeProjectionFreshness` state machine) and the clean-
12//! close reconnect policy (`RealtimeReconnectPolicy`) through this same
13//! handle — both previously lived as shell-local typed state on the
14//! websocket task. The handle holds a `Weak` ref to the freshness
15//! observer so it does not keep the socket's dispatcher alive past its
16//! natural lifetime.
17
18use std::sync::{Arc, RwLock, Weak};
19
20use meerkat_core::handles::{
21    DslTransitionError, RealtimeProductTurnHandle, RealtimeProductTurnPhase,
22    RealtimeProjectionFreshness, RealtimeProjectionFreshnessObserver, RealtimeReconnectPolicy,
23};
24
25use super::HandleDslAuthority;
26use crate::meerkat_machine::dsl as mm_dsl;
27
28/// Runtime-backed [`RealtimeProductTurnHandle`] impl.
29pub struct RuntimeRealtimeProductTurnHandle {
30    dsl: Arc<HandleDslAuthority>,
31    freshness_observer: RwLock<Option<Weak<dyn RealtimeProjectionFreshnessObserver>>>,
32}
33
34impl std::fmt::Debug for RuntimeRealtimeProductTurnHandle {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        let observer_tag = self
37            .freshness_observer
38            .read()
39            .ok()
40            .as_deref()
41            .and_then(|o| o.as_ref().map(|_| "<observer>"));
42        f.debug_struct("RuntimeRealtimeProductTurnHandle")
43            .field("dsl", &self.dsl)
44            .field("freshness_observer", &observer_tag)
45            .finish()
46    }
47}
48
49impl RuntimeRealtimeProductTurnHandle {
50    /// Construct a handle backed by the session's shared DSL authority.
51    pub fn new(dsl: Arc<HandleDslAuthority>) -> Self {
52        Self {
53            dsl,
54            freshness_observer: RwLock::new(None),
55        }
56    }
57
58    /// Construct a handle backed by an ephemeral DSL authority (tests /
59    /// legacy recovery paths).
60    pub fn ephemeral() -> Self {
61        Self::new(Arc::new(HandleDslAuthority::ephemeral()))
62    }
63
64    /// Apply the input; map typed guard rejection (all five product-turn
65    /// transitions are idempotent via guards) to `Ok(false)` while
66    /// propagating any other transition error. Classification routes
67    /// through the typed [`meerkat_core::handles::DslRejectionKind`] so
68    /// no substring matching on rendered messages is required.
69    fn apply_idempotent(
70        &self,
71        input: mm_dsl::MeerkatMachineInput,
72        context: &'static str,
73    ) -> Result<bool, DslTransitionError> {
74        // intra-machine: no route; dispatcher not applicable
75        // (handle targets the meerkat DSL directly, not a CompositionDispatcher seam)
76        match self.dsl.apply_input(input, context) {
77            Ok(()) => Ok(true),
78            Err(err) if err.is_guard_rejected() => Ok(false),
79            Err(err) => Err(err),
80        }
81    }
82
83    /// Apply an input that may emit `RealtimeProjectionFreshnessChanged`
84    /// effects, sampling the installed freshness observer under the
85    /// same DSL lock that committed the transition and dispatching the
86    /// observer callback post-lock.
87    ///
88    /// Same race-closing pattern as `session_context.rs`: the sample
89    /// inside the DSL critical section ensures a concurrent
90    /// `install_projection_freshness_observer` is totally ordered vs
91    /// this transition, so a just-installed observer cannot receive a
92    /// fire whose state it already reflects in its install snapshot.
93    /// The dispatch runs post-lock because the wake observer
94    /// (`RealtimeSocketFreshnessWake`) uses `try_send` on an mpsc that
95    /// doesn't re-enter the authority, but keeping dispatch outside
96    /// the lock is uniform with the session-context seam and guards
97    /// against future observers that might.
98    fn apply_idempotent_with_freshness_effects(
99        &self,
100        input: mm_dsl::MeerkatMachineInput,
101        context: &'static str,
102    ) -> Result<bool, DslTransitionError> {
103        type FreshnessEmission = (mm_dsl::RealtimeProjectionFreshness, u64);
104        let sampled: Option<(
105            Arc<dyn RealtimeProjectionFreshnessObserver>,
106            Vec<FreshnessEmission>,
107        )> = match self
108            .dsl
109            .apply_input_with_effects_and_sample(input, context, |effects| {
110                let observer_opt = self
111                    .freshness_observer
112                    .read()
113                    .unwrap_or_else(std::sync::PoisonError::into_inner)
114                    .as_ref()
115                    .and_then(Weak::upgrade);
116                let observer = observer_opt?;
117                let emissions: Vec<FreshnessEmission> = effects
118                    .iter()
119                    .filter_map(|effect| match effect {
120                        mm_dsl::MeerkatMachineEffect::RealtimeProjectionFreshnessChanged {
121                            new_freshness,
122                            frontier_ms,
123                        } => Some((*new_freshness, *frontier_ms)),
124                        _ => None,
125                    })
126                    .collect();
127                Some((observer, emissions))
128            }) {
129            Ok(sampled) => sampled,
130            Err(err) if err.is_guard_rejected() => return Ok(false),
131            Err(err) => return Err(err),
132        };
133        if let Some((observer, emissions)) = sampled {
134            for (new_freshness, frontier_ms) in emissions {
135                observer.on_realtime_projection_freshness_changed(
136                    map_freshness(new_freshness),
137                    frontier_ms,
138                );
139            }
140        }
141        Ok(true)
142    }
143}
144
145fn map_phase(raw: mm_dsl::RealtimeProductTurnPhase) -> RealtimeProductTurnPhase {
146    match raw {
147        mm_dsl::RealtimeProductTurnPhase::Idle => RealtimeProductTurnPhase::Idle,
148        mm_dsl::RealtimeProductTurnPhase::AwaitingProgress => {
149            RealtimeProductTurnPhase::AwaitingProgress
150        }
151        mm_dsl::RealtimeProductTurnPhase::Committed => RealtimeProductTurnPhase::Committed,
152        mm_dsl::RealtimeProductTurnPhase::OutputStarted => RealtimeProductTurnPhase::OutputStarted,
153        mm_dsl::RealtimeProductTurnPhase::Preemptible => RealtimeProductTurnPhase::Preemptible,
154    }
155}
156
157fn map_freshness(raw: mm_dsl::RealtimeProjectionFreshness) -> RealtimeProjectionFreshness {
158    match raw {
159        mm_dsl::RealtimeProjectionFreshness::Clean => RealtimeProjectionFreshness::Clean,
160        mm_dsl::RealtimeProjectionFreshness::StaleDeferred => {
161            RealtimeProjectionFreshness::StaleDeferred
162        }
163        mm_dsl::RealtimeProjectionFreshness::StaleImmediate => {
164            RealtimeProjectionFreshness::StaleImmediate
165        }
166    }
167}
168
169fn map_policy(raw: mm_dsl::RealtimeReconnectPolicy) -> RealtimeReconnectPolicy {
170    match raw {
171        mm_dsl::RealtimeReconnectPolicy::CleanExit => RealtimeReconnectPolicy::CleanExit,
172        mm_dsl::RealtimeReconnectPolicy::ReattachAndRecover => {
173            RealtimeReconnectPolicy::ReattachAndRecover
174        }
175    }
176}
177
178impl RealtimeProductTurnHandle for RuntimeRealtimeProductTurnHandle {
179    fn turn_in_flight(&self) -> Result<bool, DslTransitionError> {
180        self.apply_idempotent(
181            mm_dsl::MeerkatMachineInput::ProductTurnInFlight,
182            "RealtimeProductTurnHandle::turn_in_flight",
183        )
184    }
185
186    fn turn_committed(&self) -> Result<bool, DslTransitionError> {
187        self.apply_idempotent(
188            mm_dsl::MeerkatMachineInput::ProductTurnCommitted,
189            "RealtimeProductTurnHandle::turn_committed",
190        )
191    }
192
193    fn output_started(&self) -> Result<bool, DslTransitionError> {
194        self.apply_idempotent(
195            mm_dsl::MeerkatMachineInput::ProductOutputStarted,
196            "RealtimeProductTurnHandle::output_started",
197        )
198    }
199
200    fn turn_interrupted(&self) -> Result<bool, DslTransitionError> {
201        self.apply_idempotent(
202            mm_dsl::MeerkatMachineInput::ProductTurnInterrupted,
203            "RealtimeProductTurnHandle::turn_interrupted",
204        )
205    }
206
207    fn turn_terminal(&self) -> Result<bool, DslTransitionError> {
208        self.apply_idempotent(
209            mm_dsl::MeerkatMachineInput::ProductTurnTerminal,
210            "RealtimeProductTurnHandle::turn_terminal",
211        )
212    }
213
214    fn current_phase(&self) -> RealtimeProductTurnPhase {
215        map_phase(self.dsl.snapshot_state().realtime_product_turn_phase)
216    }
217
218    // ---- Projection freshness (dogma round 2, U-C) ----
219
220    fn projection_advance_observed(&self, advanced_at_ms: u64) -> Result<bool, DslTransitionError> {
221        self.apply_idempotent_with_freshness_effects(
222            mm_dsl::MeerkatMachineInput::RealtimeProjectionAdvanceObserved { advanced_at_ms },
223            "RealtimeProductTurnHandle::projection_advance_observed",
224        )
225    }
226
227    fn projection_refreshed(&self, observed_ms: u64) -> Result<bool, DslTransitionError> {
228        self.apply_idempotent_with_freshness_effects(
229            mm_dsl::MeerkatMachineInput::RealtimeProjectionRefreshed { observed_ms },
230            "RealtimeProductTurnHandle::projection_refreshed",
231        )
232    }
233
234    fn projection_baseline_observed(&self, observed_ms: u64) -> Result<bool, DslTransitionError> {
235        self.apply_idempotent_with_freshness_effects(
236            mm_dsl::MeerkatMachineInput::RealtimeProjectionBaselineObserved { observed_ms },
237            "RealtimeProductTurnHandle::projection_baseline_observed",
238        )
239    }
240
241    fn projection_reset(&self, baseline_ms: u64) -> Result<bool, DslTransitionError> {
242        self.apply_idempotent_with_freshness_effects(
243            mm_dsl::MeerkatMachineInput::RealtimeProjectionReset { baseline_ms },
244            "RealtimeProductTurnHandle::projection_reset",
245        )
246    }
247
248    fn projection_freshness(&self) -> RealtimeProjectionFreshness {
249        map_freshness(self.dsl.snapshot_state().realtime_projection_freshness)
250    }
251
252    fn projection_frontier_ms(&self) -> u64 {
253        self.dsl.snapshot_state().realtime_projection_frontier_ms
254    }
255
256    fn install_projection_freshness_observer(
257        &self,
258        observer: Arc<dyn RealtimeProjectionFreshnessObserver>,
259    ) {
260        *self
261            .freshness_observer
262            .write()
263            .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Arc::downgrade(&observer));
264    }
265
266    fn install_projection_freshness_observer_with_snapshot(
267        &self,
268        observer: Arc<dyn RealtimeProjectionFreshnessObserver>,
269    ) -> (RealtimeProjectionFreshness, u64) {
270        self.dsl.with_state_lock(|state| {
271            *self
272                .freshness_observer
273                .write()
274                .unwrap_or_else(std::sync::PoisonError::into_inner) =
275                Some(Arc::downgrade(&observer));
276            (
277                map_freshness(state.realtime_projection_freshness),
278                state.realtime_projection_frontier_ms,
279            )
280        })
281    }
282
283    // ---- Reconnect policy (dogma round 2, U-C) ----
284
285    fn classify_client_input_submitted(&self) -> Result<bool, DslTransitionError> {
286        self.apply_idempotent(
287            mm_dsl::MeerkatMachineInput::ClassifyRealtimeClientInputSubmitted,
288            "RealtimeProductTurnHandle::classify_client_input_submitted",
289        )
290    }
291
292    fn classify_mid_turn_activity(&self) -> Result<bool, DslTransitionError> {
293        self.apply_idempotent(
294            mm_dsl::MeerkatMachineInput::ClassifyRealtimeMidTurnActivity,
295            "RealtimeProductTurnHandle::classify_mid_turn_activity",
296        )
297    }
298
299    fn classify_turn_terminated(&self) -> Result<bool, DslTransitionError> {
300        // Fold in the freshness-promotion observer dispatch so the turn-
301        // terminated classification + any `StaleDeferred → StaleImmediate`
302        // promotion arrive on the freshness observer under the same lock.
303        self.apply_idempotent_with_freshness_effects(
304            mm_dsl::MeerkatMachineInput::ClassifyRealtimeTurnTerminated,
305            "RealtimeProductTurnHandle::classify_turn_terminated",
306        )
307    }
308
309    fn reconnect_policy_on_clean_close(&self) -> RealtimeReconnectPolicy {
310        map_policy(self.dsl.snapshot_state().realtime_reconnect_policy)
311    }
312}