use std::sync::{Arc, RwLock, Weak};
use meerkat_core::handles::{
DslTransitionError, PeerInteractionCleanupObserver, PeerInteractionHandle,
PeerTerminalDisposition as CorePeerDisposition,
};
use meerkat_core::peer_correlation::{
InboundPeerRequestState as CoreInboundState, OutboundPeerRequestState as CoreOutboundState,
PeerCorrelationId,
};
use super::HandleDslAuthority;
use crate::meerkat_machine::dsl as mm_dsl;
pub struct RuntimePeerInteractionHandle {
dsl: Arc<HandleDslAuthority>,
cleanup_observer: RwLock<Option<Weak<dyn PeerInteractionCleanupObserver>>>,
}
impl std::fmt::Debug for RuntimePeerInteractionHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let observer_tag = self
.cleanup_observer
.read()
.ok()
.as_deref()
.and_then(|o| o.as_ref().map(|_| "<observer>"));
f.debug_struct("RuntimePeerInteractionHandle")
.field("dsl", &self.dsl)
.field("cleanup_observer", &observer_tag)
.finish()
}
}
impl RuntimePeerInteractionHandle {
pub fn new(dsl: Arc<HandleDslAuthority>) -> Self {
Self {
dsl,
cleanup_observer: RwLock::new(None),
}
}
pub fn ephemeral() -> Self {
Self::new(Arc::new(HandleDslAuthority::ephemeral()))
}
fn apply_input_and_dispatch_cleanup(
&self,
input: mm_dsl::MeerkatMachineInput,
context: &'static str,
) -> Result<(), DslTransitionError> {
type CleanupTarget = Result<PeerCorrelationId, String>;
let dispatch: Option<(Arc<dyn PeerInteractionCleanupObserver>, Vec<CleanupTarget>)> = self
.dsl
.apply_input_with_effects_and_sample(input, context, |effects| {
let observer_opt = self
.cleanup_observer
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.as_ref()
.and_then(Weak::upgrade);
let observer = observer_opt?;
let targets: Vec<CleanupTarget> = effects
.iter()
.filter_map(|effect| match effect {
mm_dsl::MeerkatMachineEffect::PeerInteractionCleanup { corr_id } => {
Some(match dsl_corr_id_to_core(corr_id.clone()) {
Some(core_id) => Ok(core_id),
None => Err(corr_id.0.clone()),
})
}
_ => None,
})
.collect();
Some((observer, targets))
})?;
if let Some((observer, targets)) = dispatch {
for target in targets {
match target {
Ok(core_id) => observer.on_peer_interaction_cleanup(core_id),
Err(raw) => tracing::error!(
raw = %raw,
context = context,
"PeerInteractionCleanup: DSL emitted a corr_id that is not a valid UUID — broken invariant; skipping observer dispatch"
),
}
}
}
Ok(())
}
}
fn dsl_corr_id_to_core(dsl_id: mm_dsl::PeerCorrelationId) -> Option<PeerCorrelationId> {
uuid::Uuid::parse_str(&dsl_id.0)
.ok()
.map(PeerCorrelationId::from_uuid)
}
impl PeerInteractionHandle for RuntimePeerInteractionHandle {
fn request_sent(
&self,
corr_id: PeerCorrelationId,
to: String,
) -> Result<(), DslTransitionError> {
self.apply_input_and_dispatch_cleanup(
mm_dsl::MeerkatMachineInput::PeerRequestSent {
corr_id: corr_id.into(),
to,
},
"PeerInteractionHandle::request_sent",
)
}
fn response_progress(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
self.apply_input_and_dispatch_cleanup(
mm_dsl::MeerkatMachineInput::PeerResponseProgressArrived {
corr_id: corr_id.into(),
},
"PeerInteractionHandle::response_progress",
)
}
fn response_terminal(
&self,
corr_id: PeerCorrelationId,
disposition: CorePeerDisposition,
) -> Result<(), DslTransitionError> {
self.apply_input_and_dispatch_cleanup(
mm_dsl::MeerkatMachineInput::PeerResponseTerminalArrived {
corr_id: corr_id.into(),
disposition: disposition.into(),
},
"PeerInteractionHandle::response_terminal",
)
}
fn request_timed_out(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
self.apply_input_and_dispatch_cleanup(
mm_dsl::MeerkatMachineInput::PeerRequestTimedOut {
corr_id: corr_id.into(),
},
"PeerInteractionHandle::request_timed_out",
)
}
fn request_received(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
self.apply_input_and_dispatch_cleanup(
mm_dsl::MeerkatMachineInput::PeerRequestReceived {
corr_id: corr_id.into(),
},
"PeerInteractionHandle::request_received",
)
}
fn response_replied(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
self.apply_input_and_dispatch_cleanup(
mm_dsl::MeerkatMachineInput::PeerResponseReplied {
corr_id: corr_id.into(),
},
"PeerInteractionHandle::response_replied",
)
}
fn outbound_state(&self, corr_id: PeerCorrelationId) -> Option<CoreOutboundState> {
let dsl_key: mm_dsl::PeerCorrelationId = corr_id.into();
self.dsl
.snapshot_state()
.pending_peer_requests
.get(&dsl_key)
.copied()
.map(Into::into)
}
fn inbound_state(&self, corr_id: PeerCorrelationId) -> Option<CoreInboundState> {
let dsl_key: mm_dsl::PeerCorrelationId = corr_id.into();
self.dsl
.snapshot_state()
.inbound_peer_requests
.get(&dsl_key)
.copied()
.map(Into::into)
}
fn install_cleanup_observer(&self, observer: Arc<dyn PeerInteractionCleanupObserver>) {
*self
.cleanup_observer
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Arc::downgrade(&observer));
}
}