Skip to main content

meerkat_runtime/handles/
peer_interaction.rs

1//! Runtime impl of [`meerkat_core::handles::PeerInteractionHandle`] (W1-A).
2//!
3//! Routes peer request/response lifecycle events into the session's
4//! MeerkatMachine DSL (`pending_peer_requests` / `inbound_peer_requests`
5//! substate maps) and fans the emitted `PeerInteractionCleanup` effects
6//! out to the installed shell-side observer, so the subscriber / stream
7//! registries update causally — the map is a strict projection of DSL
8//! truth, not shadow state that happens to be updated lexically near each
9//! terminal transition.
10
11use std::sync::{Arc, RwLock, Weak};
12
13use meerkat_core::handles::{
14    DslTransitionError, PeerInteractionCleanupObserver, PeerInteractionHandle,
15    PeerTerminalDisposition as CorePeerDisposition,
16};
17use meerkat_core::peer_correlation::{
18    InboundPeerRequestState as CoreInboundState, OutboundPeerRequestState as CoreOutboundState,
19    PeerCorrelationId,
20};
21
22use super::HandleDslAuthority;
23use crate::meerkat_machine::dsl as mm_dsl;
24
25/// Runtime-backed [`PeerInteractionHandle`] impl.
26///
27/// Every trait method routes to the corresponding DSL input on the session's
28/// shared MeerkatMachine authority. After the transition lands, emitted
29/// effects are scanned for `PeerInteractionCleanup` and dispatched to the
30/// installed [`PeerInteractionCleanupObserver`] (if any) — closing the
31/// "terminal transition → effect → shell projection cleanup" loop.
32///
33/// The cleanup observer is held as a `Weak` reference. In production the
34/// observer is the session's `CommsRuntime`, which in turn holds a strong
35/// `Arc<dyn PeerInteractionHandle>` to this struct; storing the observer
36/// strongly would create a cycle that prevents `CommsRuntime::drop` from
37/// firing on session teardown (dropped listeners, leaked session-identity
38/// claims, zombie `InprocRegistry` entries). `Weak` breaks the cycle —
39/// once the runtime drops, `upgrade()` returns `None` and subsequent
40/// effect dispatches become no-ops, which is the desired semantics
41/// post-teardown.
42pub struct RuntimePeerInteractionHandle {
43    dsl: Arc<HandleDslAuthority>,
44    cleanup_observer: RwLock<Option<Weak<dyn PeerInteractionCleanupObserver>>>,
45}
46
47impl std::fmt::Debug for RuntimePeerInteractionHandle {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        let observer_tag = self
50            .cleanup_observer
51            .read()
52            .ok()
53            .as_deref()
54            .and_then(|o| o.as_ref().map(|_| "<observer>"));
55        f.debug_struct("RuntimePeerInteractionHandle")
56            .field("dsl", &self.dsl)
57            .field("cleanup_observer", &observer_tag)
58            .finish()
59    }
60}
61
62impl RuntimePeerInteractionHandle {
63    /// Construct a handle backed by the session's shared DSL authority.
64    pub fn new(dsl: Arc<HandleDslAuthority>) -> Self {
65        Self {
66            dsl,
67            cleanup_observer: RwLock::new(None),
68        }
69    }
70
71    /// Construct a handle backed by an ephemeral DSL authority (tests /
72    /// legacy recovery paths).
73    pub fn ephemeral() -> Self {
74        Self::new(Arc::new(HandleDslAuthority::ephemeral()))
75    }
76
77    fn apply_input_and_dispatch_cleanup(
78        &self,
79        input: mm_dsl::MeerkatMachineInput,
80        context: &'static str,
81    ) -> Result<(), DslTransitionError> {
82        // Sample the cleanup observer slot UNDER the DSL lock so any
83        // concurrent `install_cleanup_observer` is totally ordered vs
84        // this transition (same pattern as `session_context.rs` closes
85        // for PR #286's race). The observer callback runs OUTSIDE the
86        // lock to avoid reentrancy with any shell-side state that calls
87        // back into the handle. Each cleanup target is carried as an
88        // `Ok(core_id)` / `Err(raw)` pair so the invalid-UUID diagnostic
89        // still fires with the raw DSL string when dispatch happens
90        // post-lock.
91        type CleanupTarget = Result<PeerCorrelationId, String>;
92        let dispatch: Option<(Arc<dyn PeerInteractionCleanupObserver>, Vec<CleanupTarget>)> = self
93            .dsl
94            .apply_input_with_effects_and_sample(input, context, |effects| {
95                let observer_opt = self
96                    .cleanup_observer
97                    .read()
98                    .unwrap_or_else(std::sync::PoisonError::into_inner)
99                    .as_ref()
100                    .and_then(Weak::upgrade);
101                let observer = observer_opt?;
102                let targets: Vec<CleanupTarget> = effects
103                    .iter()
104                    .filter_map(|effect| match effect {
105                        mm_dsl::MeerkatMachineEffect::PeerInteractionCleanup { corr_id } => {
106                            Some(match dsl_corr_id_to_core(corr_id.clone()) {
107                                Some(core_id) => Ok(core_id),
108                                None => Err(corr_id.0.clone()),
109                            })
110                        }
111                        _ => None,
112                    })
113                    .collect();
114                Some((observer, targets))
115            })?;
116        if let Some((observer, targets)) = dispatch {
117            for target in targets {
118                match target {
119                    Ok(core_id) => observer.on_peer_interaction_cleanup(core_id),
120                    Err(raw) => tracing::error!(
121                        raw = %raw,
122                        context = context,
123                        "PeerInteractionCleanup: DSL emitted a corr_id that is not a valid UUID — broken invariant; skipping observer dispatch"
124                    ),
125                }
126            }
127        }
128        Ok(())
129    }
130}
131
132fn dsl_corr_id_to_core(dsl_id: mm_dsl::PeerCorrelationId) -> Option<PeerCorrelationId> {
133    // The DSL key is always produced by `From<PeerCorrelationId> for
134    // mm_dsl::PeerCorrelationId`, which stringifies a UUID. Parse must
135    // succeed on every canonical path; a parse failure here is a broken
136    // invariant, not a recoverable condition. Return `None` so the caller
137    // skips observer dispatch and logs — silently substituting nil would
138    // cross-contaminate any real `corr_id 0` event.
139    uuid::Uuid::parse_str(&dsl_id.0)
140        .ok()
141        .map(PeerCorrelationId::from_uuid)
142}
143
144impl PeerInteractionHandle for RuntimePeerInteractionHandle {
145    fn request_sent(
146        &self,
147        corr_id: PeerCorrelationId,
148        to: String,
149    ) -> Result<(), DslTransitionError> {
150        self.apply_input_and_dispatch_cleanup(
151            mm_dsl::MeerkatMachineInput::PeerRequestSent {
152                corr_id: corr_id.into(),
153                to,
154            },
155            "PeerInteractionHandle::request_sent",
156        )
157    }
158
159    fn response_progress(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
160        self.apply_input_and_dispatch_cleanup(
161            mm_dsl::MeerkatMachineInput::PeerResponseProgressArrived {
162                corr_id: corr_id.into(),
163            },
164            "PeerInteractionHandle::response_progress",
165        )
166    }
167
168    fn response_terminal(
169        &self,
170        corr_id: PeerCorrelationId,
171        disposition: CorePeerDisposition,
172    ) -> Result<(), DslTransitionError> {
173        self.apply_input_and_dispatch_cleanup(
174            mm_dsl::MeerkatMachineInput::PeerResponseTerminalArrived {
175                corr_id: corr_id.into(),
176                disposition: disposition.into(),
177            },
178            "PeerInteractionHandle::response_terminal",
179        )
180    }
181
182    fn request_timed_out(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
183        self.apply_input_and_dispatch_cleanup(
184            mm_dsl::MeerkatMachineInput::PeerRequestTimedOut {
185                corr_id: corr_id.into(),
186            },
187            "PeerInteractionHandle::request_timed_out",
188        )
189    }
190
191    fn request_received(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
192        self.apply_input_and_dispatch_cleanup(
193            mm_dsl::MeerkatMachineInput::PeerRequestReceived {
194                corr_id: corr_id.into(),
195            },
196            "PeerInteractionHandle::request_received",
197        )
198    }
199
200    fn response_replied(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
201        self.apply_input_and_dispatch_cleanup(
202            mm_dsl::MeerkatMachineInput::PeerResponseReplied {
203                corr_id: corr_id.into(),
204            },
205            "PeerInteractionHandle::response_replied",
206        )
207    }
208
209    fn outbound_state(&self, corr_id: PeerCorrelationId) -> Option<CoreOutboundState> {
210        let dsl_key: mm_dsl::PeerCorrelationId = corr_id.into();
211        self.dsl
212            .snapshot_state()
213            .pending_peer_requests
214            .get(&dsl_key)
215            .copied()
216            .map(Into::into)
217    }
218
219    fn inbound_state(&self, corr_id: PeerCorrelationId) -> Option<CoreInboundState> {
220        let dsl_key: mm_dsl::PeerCorrelationId = corr_id.into();
221        self.dsl
222            .snapshot_state()
223            .inbound_peer_requests
224            .get(&dsl_key)
225            .copied()
226            .map(Into::into)
227    }
228
229    fn install_cleanup_observer(&self, observer: Arc<dyn PeerInteractionCleanupObserver>) {
230        // Downgrade to a `Weak` so this handle does not keep the observer
231        // (typically the session's `CommsRuntime`) alive. The caller retains
232        // the canonical strong `Arc` via its own field; when the runtime is
233        // dropped, the weak here fails to upgrade and cleanup dispatch
234        // becomes a no-op — matching the "post-teardown, no more work"
235        // semantics the shell-side projection expects.
236        *self
237            .cleanup_observer
238            .write()
239            .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Arc::downgrade(&observer));
240    }
241}