use std::sync::{Arc, RwLock, Weak};
use meerkat_core::handles::{
DslTransitionError, RealtimeProductTurnHandle, RealtimeProductTurnPhase,
RealtimeProjectionFreshness, RealtimeProjectionFreshnessObserver, RealtimeReconnectPolicy,
};
use super::HandleDslAuthority;
use crate::meerkat_machine::dsl as mm_dsl;
pub struct RuntimeRealtimeProductTurnHandle {
dsl: Arc<HandleDslAuthority>,
freshness_observer: RwLock<Option<Weak<dyn RealtimeProjectionFreshnessObserver>>>,
}
impl std::fmt::Debug for RuntimeRealtimeProductTurnHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let observer_tag = self
.freshness_observer
.read()
.ok()
.as_deref()
.and_then(|o| o.as_ref().map(|_| "<observer>"));
f.debug_struct("RuntimeRealtimeProductTurnHandle")
.field("dsl", &self.dsl)
.field("freshness_observer", &observer_tag)
.finish()
}
}
impl RuntimeRealtimeProductTurnHandle {
pub fn new(dsl: Arc<HandleDslAuthority>) -> Self {
Self {
dsl,
freshness_observer: RwLock::new(None),
}
}
pub fn ephemeral() -> Self {
Self::new(Arc::new(HandleDslAuthority::ephemeral()))
}
fn apply_idempotent(
&self,
input: mm_dsl::MeerkatMachineInput,
context: &'static str,
) -> Result<bool, DslTransitionError> {
match self.dsl.apply_input(input, context) {
Ok(()) => Ok(true),
Err(err) if err.is_guard_rejected() => Ok(false),
Err(err) => Err(err),
}
}
fn apply_idempotent_with_freshness_effects(
&self,
input: mm_dsl::MeerkatMachineInput,
context: &'static str,
) -> Result<bool, DslTransitionError> {
type FreshnessEmission = (mm_dsl::RealtimeProjectionFreshness, u64);
let sampled: Option<(
Arc<dyn RealtimeProjectionFreshnessObserver>,
Vec<FreshnessEmission>,
)> = match self
.dsl
.apply_input_with_effects_and_sample(input, context, |effects| {
let observer_opt = self
.freshness_observer
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.as_ref()
.and_then(Weak::upgrade);
let observer = observer_opt?;
let emissions: Vec<FreshnessEmission> = effects
.iter()
.filter_map(|effect| match effect {
mm_dsl::MeerkatMachineEffect::RealtimeProjectionFreshnessChanged {
new_freshness,
frontier_ms,
} => Some((*new_freshness, *frontier_ms)),
_ => None,
})
.collect();
Some((observer, emissions))
}) {
Ok(sampled) => sampled,
Err(err) if err.is_guard_rejected() => return Ok(false),
Err(err) => return Err(err),
};
if let Some((observer, emissions)) = sampled {
for (new_freshness, frontier_ms) in emissions {
observer.on_realtime_projection_freshness_changed(
map_freshness(new_freshness),
frontier_ms,
);
}
}
Ok(true)
}
}
fn map_phase(raw: mm_dsl::RealtimeProductTurnPhase) -> RealtimeProductTurnPhase {
match raw {
mm_dsl::RealtimeProductTurnPhase::Idle => RealtimeProductTurnPhase::Idle,
mm_dsl::RealtimeProductTurnPhase::AwaitingProgress => {
RealtimeProductTurnPhase::AwaitingProgress
}
mm_dsl::RealtimeProductTurnPhase::Committed => RealtimeProductTurnPhase::Committed,
mm_dsl::RealtimeProductTurnPhase::OutputStarted => RealtimeProductTurnPhase::OutputStarted,
mm_dsl::RealtimeProductTurnPhase::Preemptible => RealtimeProductTurnPhase::Preemptible,
}
}
fn map_freshness(raw: mm_dsl::RealtimeProjectionFreshness) -> RealtimeProjectionFreshness {
match raw {
mm_dsl::RealtimeProjectionFreshness::Clean => RealtimeProjectionFreshness::Clean,
mm_dsl::RealtimeProjectionFreshness::StaleDeferred => {
RealtimeProjectionFreshness::StaleDeferred
}
mm_dsl::RealtimeProjectionFreshness::StaleImmediate => {
RealtimeProjectionFreshness::StaleImmediate
}
}
}
fn map_policy(raw: mm_dsl::RealtimeReconnectPolicy) -> RealtimeReconnectPolicy {
match raw {
mm_dsl::RealtimeReconnectPolicy::CleanExit => RealtimeReconnectPolicy::CleanExit,
mm_dsl::RealtimeReconnectPolicy::ReattachAndRecover => {
RealtimeReconnectPolicy::ReattachAndRecover
}
}
}
impl RealtimeProductTurnHandle for RuntimeRealtimeProductTurnHandle {
fn turn_in_flight(&self) -> Result<bool, DslTransitionError> {
self.apply_idempotent(
mm_dsl::MeerkatMachineInput::ProductTurnInFlight,
"RealtimeProductTurnHandle::turn_in_flight",
)
}
fn turn_committed(&self) -> Result<bool, DslTransitionError> {
self.apply_idempotent(
mm_dsl::MeerkatMachineInput::ProductTurnCommitted,
"RealtimeProductTurnHandle::turn_committed",
)
}
fn output_started(&self) -> Result<bool, DslTransitionError> {
self.apply_idempotent(
mm_dsl::MeerkatMachineInput::ProductOutputStarted,
"RealtimeProductTurnHandle::output_started",
)
}
fn turn_interrupted(&self) -> Result<bool, DslTransitionError> {
self.apply_idempotent(
mm_dsl::MeerkatMachineInput::ProductTurnInterrupted,
"RealtimeProductTurnHandle::turn_interrupted",
)
}
fn turn_terminal(&self) -> Result<bool, DslTransitionError> {
self.apply_idempotent(
mm_dsl::MeerkatMachineInput::ProductTurnTerminal,
"RealtimeProductTurnHandle::turn_terminal",
)
}
fn current_phase(&self) -> RealtimeProductTurnPhase {
map_phase(self.dsl.snapshot_state().realtime_product_turn_phase)
}
fn projection_advance_observed(&self, advanced_at_ms: u64) -> Result<bool, DslTransitionError> {
self.apply_idempotent_with_freshness_effects(
mm_dsl::MeerkatMachineInput::RealtimeProjectionAdvanceObserved { advanced_at_ms },
"RealtimeProductTurnHandle::projection_advance_observed",
)
}
fn projection_refreshed(&self, observed_ms: u64) -> Result<bool, DslTransitionError> {
self.apply_idempotent_with_freshness_effects(
mm_dsl::MeerkatMachineInput::RealtimeProjectionRefreshed { observed_ms },
"RealtimeProductTurnHandle::projection_refreshed",
)
}
fn projection_baseline_observed(&self, observed_ms: u64) -> Result<bool, DslTransitionError> {
self.apply_idempotent_with_freshness_effects(
mm_dsl::MeerkatMachineInput::RealtimeProjectionBaselineObserved { observed_ms },
"RealtimeProductTurnHandle::projection_baseline_observed",
)
}
fn projection_reset(&self, baseline_ms: u64) -> Result<bool, DslTransitionError> {
self.apply_idempotent_with_freshness_effects(
mm_dsl::MeerkatMachineInput::RealtimeProjectionReset { baseline_ms },
"RealtimeProductTurnHandle::projection_reset",
)
}
fn projection_freshness(&self) -> RealtimeProjectionFreshness {
map_freshness(self.dsl.snapshot_state().realtime_projection_freshness)
}
fn projection_frontier_ms(&self) -> u64 {
self.dsl.snapshot_state().realtime_projection_frontier_ms
}
fn install_projection_freshness_observer(
&self,
observer: Arc<dyn RealtimeProjectionFreshnessObserver>,
) {
*self
.freshness_observer
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Arc::downgrade(&observer));
}
fn install_projection_freshness_observer_with_snapshot(
&self,
observer: Arc<dyn RealtimeProjectionFreshnessObserver>,
) -> (RealtimeProjectionFreshness, u64) {
self.dsl.with_state_lock(|state| {
*self
.freshness_observer
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) =
Some(Arc::downgrade(&observer));
(
map_freshness(state.realtime_projection_freshness),
state.realtime_projection_frontier_ms,
)
})
}
fn classify_client_input_submitted(&self) -> Result<bool, DslTransitionError> {
self.apply_idempotent(
mm_dsl::MeerkatMachineInput::ClassifyRealtimeClientInputSubmitted,
"RealtimeProductTurnHandle::classify_client_input_submitted",
)
}
fn classify_mid_turn_activity(&self) -> Result<bool, DslTransitionError> {
self.apply_idempotent(
mm_dsl::MeerkatMachineInput::ClassifyRealtimeMidTurnActivity,
"RealtimeProductTurnHandle::classify_mid_turn_activity",
)
}
fn classify_turn_terminated(&self) -> Result<bool, DslTransitionError> {
self.apply_idempotent_with_freshness_effects(
mm_dsl::MeerkatMachineInput::ClassifyRealtimeTurnTerminated,
"RealtimeProductTurnHandle::classify_turn_terminated",
)
}
fn reconnect_policy_on_clean_close(&self) -> RealtimeReconnectPolicy {
map_policy(self.dsl.snapshot_state().realtime_reconnect_policy)
}
}