use std::sync::{Arc, RwLock, Weak};
use meerkat_core::handles::{
DslTransitionError, InteractionStreamCleanupObserver, InteractionStreamHandle,
};
use meerkat_core::peer_correlation::{
InteractionStreamState as CoreInteractionStreamState, PeerCorrelationId,
};
use super::HandleDslAuthority;
use crate::meerkat_machine::dsl as mm_dsl;
pub struct RuntimeInteractionStreamHandle {
dsl: Arc<HandleDslAuthority>,
cleanup_observer: RwLock<Option<Weak<dyn InteractionStreamCleanupObserver>>>,
}
impl std::fmt::Debug for RuntimeInteractionStreamHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let observer_tag = self
.cleanup_observer
.read()
.ok()
.as_deref()
.and_then(|o| o.as_ref().map(|_| "<observer>"));
f.debug_struct("RuntimeInteractionStreamHandle")
.field("dsl", &self.dsl)
.field("cleanup_observer", &observer_tag)
.finish()
}
}
impl RuntimeInteractionStreamHandle {
pub fn new(dsl: Arc<HandleDslAuthority>) -> Self {
Self {
dsl,
cleanup_observer: RwLock::new(None),
}
}
pub fn ephemeral() -> Self {
Self::new(Arc::new(HandleDslAuthority::ephemeral()))
}
fn apply_input_and_dispatch_cleanup(
&self,
input: mm_dsl::MeerkatMachineInput,
context: &'static str,
) -> Result<(), DslTransitionError> {
type CleanupTarget = Result<PeerCorrelationId, String>;
let dispatch: Option<(
Arc<dyn InteractionStreamCleanupObserver>,
Vec<CleanupTarget>,
)> = self
.dsl
.apply_input_with_effects_and_sample(input, context, |effects| {
let observer_opt = self
.cleanup_observer
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.as_ref()
.and_then(Weak::upgrade);
let observer = observer_opt?;
let targets: Vec<CleanupTarget> = effects
.iter()
.filter_map(|effect| match effect {
mm_dsl::MeerkatMachineEffect::InteractionStreamCleanup { corr_id } => {
Some(match dsl_corr_id_to_core(corr_id.clone()) {
Some(core_id) => Ok(core_id),
None => Err(corr_id.0.clone()),
})
}
_ => None,
})
.collect();
Some((observer, targets))
})?;
if let Some((observer, targets)) = dispatch {
for target in targets {
match target {
Ok(core_id) => observer.on_interaction_stream_cleanup(core_id),
Err(raw) => tracing::error!(
raw = %raw,
context = context,
"InteractionStreamCleanup: DSL emitted a corr_id that is not a valid UUID — broken invariant; skipping observer dispatch"
),
}
}
}
Ok(())
}
}
fn dsl_corr_id_to_core(dsl_id: mm_dsl::PeerCorrelationId) -> Option<PeerCorrelationId> {
uuid::Uuid::parse_str(&dsl_id.0)
.ok()
.map(PeerCorrelationId::from_uuid)
}
impl InteractionStreamHandle for RuntimeInteractionStreamHandle {
fn reserved(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
self.apply_input_and_dispatch_cleanup(
mm_dsl::MeerkatMachineInput::InteractionStreamReserved {
corr_id: corr_id.into(),
},
"InteractionStreamHandle::reserved",
)
}
fn attached(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
self.apply_input_and_dispatch_cleanup(
mm_dsl::MeerkatMachineInput::InteractionStreamAttached {
corr_id: corr_id.into(),
},
"InteractionStreamHandle::attached",
)
}
fn completed(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
self.apply_input_and_dispatch_cleanup(
mm_dsl::MeerkatMachineInput::InteractionStreamCompleted {
corr_id: corr_id.into(),
},
"InteractionStreamHandle::completed",
)
}
fn expired(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
self.apply_input_and_dispatch_cleanup(
mm_dsl::MeerkatMachineInput::InteractionStreamExpired {
corr_id: corr_id.into(),
},
"InteractionStreamHandle::expired",
)
}
fn closed_early(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
self.apply_input_and_dispatch_cleanup(
mm_dsl::MeerkatMachineInput::InteractionStreamClosedEarly {
corr_id: corr_id.into(),
},
"InteractionStreamHandle::closed_early",
)
}
fn state(&self, corr_id: PeerCorrelationId) -> Option<CoreInteractionStreamState> {
let dsl_key: mm_dsl::PeerCorrelationId = corr_id.into();
let snapshot = self.dsl.snapshot_state();
if snapshot.attached_interaction_streams.contains(&dsl_key) {
Some(CoreInteractionStreamState::Attached)
} else if snapshot.reserved_interaction_streams.contains(&dsl_key) {
Some(CoreInteractionStreamState::Reserved)
} else {
None
}
}
fn install_cleanup_observer(&self, observer: Arc<dyn InteractionStreamCleanupObserver>) {
*self
.cleanup_observer
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Arc::downgrade(&observer));
}
}