1use std::sync::{Arc, RwLock, Weak};
19
20use meerkat_core::handles::{
21 DslTransitionError, RealtimeProductTurnHandle, RealtimeProductTurnPhase,
22 RealtimeProjectionFreshness, RealtimeProjectionFreshnessObserver, RealtimeReconnectPolicy,
23};
24
25use super::HandleDslAuthority;
26use crate::meerkat_machine::dsl as mm_dsl;
27
28pub struct RuntimeRealtimeProductTurnHandle {
30 dsl: Arc<HandleDslAuthority>,
31 freshness_observer: RwLock<Option<Weak<dyn RealtimeProjectionFreshnessObserver>>>,
32}
33
34impl std::fmt::Debug for RuntimeRealtimeProductTurnHandle {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 let observer_tag = self
37 .freshness_observer
38 .read()
39 .ok()
40 .as_deref()
41 .and_then(|o| o.as_ref().map(|_| "<observer>"));
42 f.debug_struct("RuntimeRealtimeProductTurnHandle")
43 .field("dsl", &self.dsl)
44 .field("freshness_observer", &observer_tag)
45 .finish()
46 }
47}
48
49impl RuntimeRealtimeProductTurnHandle {
50 pub fn new(dsl: Arc<HandleDslAuthority>) -> Self {
52 Self {
53 dsl,
54 freshness_observer: RwLock::new(None),
55 }
56 }
57
58 pub fn ephemeral() -> Self {
61 Self::new(Arc::new(HandleDslAuthority::ephemeral()))
62 }
63
64 fn apply_idempotent(
70 &self,
71 input: mm_dsl::MeerkatMachineInput,
72 context: &'static str,
73 ) -> Result<bool, DslTransitionError> {
74 match self.dsl.apply_input(input, context) {
77 Ok(()) => Ok(true),
78 Err(err) if err.is_guard_rejected() => Ok(false),
79 Err(err) => Err(err),
80 }
81 }
82
83 fn apply_idempotent_with_freshness_effects(
99 &self,
100 input: mm_dsl::MeerkatMachineInput,
101 context: &'static str,
102 ) -> Result<bool, DslTransitionError> {
103 type FreshnessEmission = (mm_dsl::RealtimeProjectionFreshness, u64);
104 let sampled: Option<(
105 Arc<dyn RealtimeProjectionFreshnessObserver>,
106 Vec<FreshnessEmission>,
107 )> = match self
108 .dsl
109 .apply_input_with_effects_and_sample(input, context, |effects| {
110 let observer_opt = self
111 .freshness_observer
112 .read()
113 .unwrap_or_else(std::sync::PoisonError::into_inner)
114 .as_ref()
115 .and_then(Weak::upgrade);
116 let observer = observer_opt?;
117 let emissions: Vec<FreshnessEmission> = effects
118 .iter()
119 .filter_map(|effect| match effect {
120 mm_dsl::MeerkatMachineEffect::RealtimeProjectionFreshnessChanged {
121 new_freshness,
122 frontier_ms,
123 } => Some((*new_freshness, *frontier_ms)),
124 _ => None,
125 })
126 .collect();
127 Some((observer, emissions))
128 }) {
129 Ok(sampled) => sampled,
130 Err(err) if err.is_guard_rejected() => return Ok(false),
131 Err(err) => return Err(err),
132 };
133 if let Some((observer, emissions)) = sampled {
134 for (new_freshness, frontier_ms) in emissions {
135 observer.on_realtime_projection_freshness_changed(
136 map_freshness(new_freshness),
137 frontier_ms,
138 );
139 }
140 }
141 Ok(true)
142 }
143}
144
145fn map_phase(raw: mm_dsl::RealtimeProductTurnPhase) -> RealtimeProductTurnPhase {
146 match raw {
147 mm_dsl::RealtimeProductTurnPhase::Idle => RealtimeProductTurnPhase::Idle,
148 mm_dsl::RealtimeProductTurnPhase::AwaitingProgress => {
149 RealtimeProductTurnPhase::AwaitingProgress
150 }
151 mm_dsl::RealtimeProductTurnPhase::Committed => RealtimeProductTurnPhase::Committed,
152 mm_dsl::RealtimeProductTurnPhase::OutputStarted => RealtimeProductTurnPhase::OutputStarted,
153 mm_dsl::RealtimeProductTurnPhase::Preemptible => RealtimeProductTurnPhase::Preemptible,
154 }
155}
156
157fn map_freshness(raw: mm_dsl::RealtimeProjectionFreshness) -> RealtimeProjectionFreshness {
158 match raw {
159 mm_dsl::RealtimeProjectionFreshness::Clean => RealtimeProjectionFreshness::Clean,
160 mm_dsl::RealtimeProjectionFreshness::StaleDeferred => {
161 RealtimeProjectionFreshness::StaleDeferred
162 }
163 mm_dsl::RealtimeProjectionFreshness::StaleImmediate => {
164 RealtimeProjectionFreshness::StaleImmediate
165 }
166 }
167}
168
169fn map_policy(raw: mm_dsl::RealtimeReconnectPolicy) -> RealtimeReconnectPolicy {
170 match raw {
171 mm_dsl::RealtimeReconnectPolicy::CleanExit => RealtimeReconnectPolicy::CleanExit,
172 mm_dsl::RealtimeReconnectPolicy::ReattachAndRecover => {
173 RealtimeReconnectPolicy::ReattachAndRecover
174 }
175 }
176}
177
178impl RealtimeProductTurnHandle for RuntimeRealtimeProductTurnHandle {
179 fn turn_in_flight(&self) -> Result<bool, DslTransitionError> {
180 self.apply_idempotent(
181 mm_dsl::MeerkatMachineInput::ProductTurnInFlight,
182 "RealtimeProductTurnHandle::turn_in_flight",
183 )
184 }
185
186 fn turn_committed(&self) -> Result<bool, DslTransitionError> {
187 self.apply_idempotent(
188 mm_dsl::MeerkatMachineInput::ProductTurnCommitted,
189 "RealtimeProductTurnHandle::turn_committed",
190 )
191 }
192
193 fn output_started(&self) -> Result<bool, DslTransitionError> {
194 self.apply_idempotent(
195 mm_dsl::MeerkatMachineInput::ProductOutputStarted,
196 "RealtimeProductTurnHandle::output_started",
197 )
198 }
199
200 fn turn_interrupted(&self) -> Result<bool, DslTransitionError> {
201 self.apply_idempotent(
202 mm_dsl::MeerkatMachineInput::ProductTurnInterrupted,
203 "RealtimeProductTurnHandle::turn_interrupted",
204 )
205 }
206
207 fn turn_terminal(&self) -> Result<bool, DslTransitionError> {
208 self.apply_idempotent(
209 mm_dsl::MeerkatMachineInput::ProductTurnTerminal,
210 "RealtimeProductTurnHandle::turn_terminal",
211 )
212 }
213
214 fn current_phase(&self) -> RealtimeProductTurnPhase {
215 map_phase(self.dsl.snapshot_state().realtime_product_turn_phase)
216 }
217
218 fn projection_advance_observed(&self, advanced_at_ms: u64) -> Result<bool, DslTransitionError> {
221 self.apply_idempotent_with_freshness_effects(
222 mm_dsl::MeerkatMachineInput::RealtimeProjectionAdvanceObserved { advanced_at_ms },
223 "RealtimeProductTurnHandle::projection_advance_observed",
224 )
225 }
226
227 fn projection_refreshed(&self, observed_ms: u64) -> Result<bool, DslTransitionError> {
228 self.apply_idempotent_with_freshness_effects(
229 mm_dsl::MeerkatMachineInput::RealtimeProjectionRefreshed { observed_ms },
230 "RealtimeProductTurnHandle::projection_refreshed",
231 )
232 }
233
234 fn projection_baseline_observed(&self, observed_ms: u64) -> Result<bool, DslTransitionError> {
235 self.apply_idempotent_with_freshness_effects(
236 mm_dsl::MeerkatMachineInput::RealtimeProjectionBaselineObserved { observed_ms },
237 "RealtimeProductTurnHandle::projection_baseline_observed",
238 )
239 }
240
241 fn projection_reset(&self, baseline_ms: u64) -> Result<bool, DslTransitionError> {
242 self.apply_idempotent_with_freshness_effects(
243 mm_dsl::MeerkatMachineInput::RealtimeProjectionReset { baseline_ms },
244 "RealtimeProductTurnHandle::projection_reset",
245 )
246 }
247
248 fn projection_freshness(&self) -> RealtimeProjectionFreshness {
249 map_freshness(self.dsl.snapshot_state().realtime_projection_freshness)
250 }
251
252 fn projection_frontier_ms(&self) -> u64 {
253 self.dsl.snapshot_state().realtime_projection_frontier_ms
254 }
255
256 fn install_projection_freshness_observer(
257 &self,
258 observer: Arc<dyn RealtimeProjectionFreshnessObserver>,
259 ) {
260 *self
261 .freshness_observer
262 .write()
263 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Arc::downgrade(&observer));
264 }
265
266 fn install_projection_freshness_observer_with_snapshot(
267 &self,
268 observer: Arc<dyn RealtimeProjectionFreshnessObserver>,
269 ) -> (RealtimeProjectionFreshness, u64) {
270 self.dsl.with_state_lock(|state| {
271 *self
272 .freshness_observer
273 .write()
274 .unwrap_or_else(std::sync::PoisonError::into_inner) =
275 Some(Arc::downgrade(&observer));
276 (
277 map_freshness(state.realtime_projection_freshness),
278 state.realtime_projection_frontier_ms,
279 )
280 })
281 }
282
283 fn classify_client_input_submitted(&self) -> Result<bool, DslTransitionError> {
286 self.apply_idempotent(
287 mm_dsl::MeerkatMachineInput::ClassifyRealtimeClientInputSubmitted,
288 "RealtimeProductTurnHandle::classify_client_input_submitted",
289 )
290 }
291
292 fn classify_mid_turn_activity(&self) -> Result<bool, DslTransitionError> {
293 self.apply_idempotent(
294 mm_dsl::MeerkatMachineInput::ClassifyRealtimeMidTurnActivity,
295 "RealtimeProductTurnHandle::classify_mid_turn_activity",
296 )
297 }
298
299 fn classify_turn_terminated(&self) -> Result<bool, DslTransitionError> {
300 self.apply_idempotent_with_freshness_effects(
304 mm_dsl::MeerkatMachineInput::ClassifyRealtimeTurnTerminated,
305 "RealtimeProductTurnHandle::classify_turn_terminated",
306 )
307 }
308
309 fn reconnect_policy_on_clean_close(&self) -> RealtimeReconnectPolicy {
310 map_policy(self.dsl.snapshot_state().realtime_reconnect_policy)
311 }
312}