Skip to main content

meerkat_runtime/handles/
interaction_stream.rs

1//! Runtime impl of [`meerkat_core::handles::InteractionStreamHandle`] (U6).
2//!
3//! Routes the interaction-stream lifecycle (`Reserved` / `Attached` /
4//! `Completed` / `Expired` / `ClosedEarly`) into the session's MeerkatMachine
5//! DSL `interaction_streams` substate map and fans the emitted
6//! `InteractionStreamCleanup` effects out to the installed shell-side
7//! observer, so the comms runtime's `interaction_stream_registry` becomes a
8//! pure projection of DSL truth — no shadow `state` field, no shell-side
9//! CAS, no TTL meaning hidden in registry maps.
10
11use std::sync::{Arc, RwLock, Weak};
12
13use meerkat_core::handles::{
14    DslTransitionError, InteractionStreamCleanupObserver, InteractionStreamHandle,
15};
16use meerkat_core::peer_correlation::{
17    InteractionStreamState as CoreInteractionStreamState, PeerCorrelationId,
18};
19
20use super::HandleDslAuthority;
21use crate::meerkat_machine::dsl as mm_dsl;
22
23/// Runtime-backed [`InteractionStreamHandle`] impl.
24///
25/// Every trait method routes to the corresponding DSL input on the session's
26/// shared MeerkatMachine authority. After the transition lands, emitted
27/// effects are scanned for `InteractionStreamCleanup` and dispatched to the
28/// installed [`InteractionStreamCleanupObserver`] (if any), closing the
29/// "terminal transition → effect → shell projection cleanup" loop.
30///
31/// The observer is held as a `Weak` reference because the canonical owner is
32/// the session's `CommsRuntime`, which in turn holds a strong handle pointer;
33/// storing the observer strongly would create a cycle preventing
34/// `CommsRuntime::drop` from firing on session teardown. `Weak::upgrade`
35/// returning `None` after teardown makes cleanup dispatch a no-op, which is
36/// the desired post-shutdown semantics.
37pub struct RuntimeInteractionStreamHandle {
38    dsl: Arc<HandleDslAuthority>,
39    cleanup_observer: RwLock<Option<Weak<dyn InteractionStreamCleanupObserver>>>,
40}
41
42impl std::fmt::Debug for RuntimeInteractionStreamHandle {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        let observer_tag = self
45            .cleanup_observer
46            .read()
47            .ok()
48            .as_deref()
49            .and_then(|o| o.as_ref().map(|_| "<observer>"));
50        f.debug_struct("RuntimeInteractionStreamHandle")
51            .field("dsl", &self.dsl)
52            .field("cleanup_observer", &observer_tag)
53            .finish()
54    }
55}
56
57impl RuntimeInteractionStreamHandle {
58    /// Construct a handle backed by the session's shared DSL authority.
59    pub fn new(dsl: Arc<HandleDslAuthority>) -> Self {
60        Self {
61            dsl,
62            cleanup_observer: RwLock::new(None),
63        }
64    }
65
66    /// Construct a handle backed by an ephemeral DSL authority for tests and
67    /// minimal hosts that explicitly opt into machine-owned semantics.
68    pub fn ephemeral() -> Self {
69        Self::new(Arc::new(HandleDslAuthority::ephemeral()))
70    }
71
72    fn apply_input_and_dispatch_cleanup(
73        &self,
74        input: mm_dsl::MeerkatMachineInput,
75        context: &'static str,
76    ) -> Result<(), DslTransitionError> {
77        // Sample observer UNDER the DSL lock, dispatch OUTSIDE — same
78        // race-closing pattern as `session_context.rs`.
79        type CleanupTarget = Result<PeerCorrelationId, String>;
80        let dispatch: Option<(
81            Arc<dyn InteractionStreamCleanupObserver>,
82            Vec<CleanupTarget>,
83        )> = self
84            .dsl
85            .apply_input_with_effects_and_sample(input, context, |effects| {
86                let observer_opt = self
87                    .cleanup_observer
88                    .read()
89                    .unwrap_or_else(std::sync::PoisonError::into_inner)
90                    .as_ref()
91                    .and_then(Weak::upgrade);
92                let observer = observer_opt?;
93                let targets: Vec<CleanupTarget> = effects
94                    .iter()
95                    .filter_map(|effect| match effect {
96                        mm_dsl::MeerkatMachineEffect::InteractionStreamCleanup { corr_id } => {
97                            Some(match dsl_corr_id_to_core(corr_id.clone()) {
98                                Some(core_id) => Ok(core_id),
99                                None => Err(corr_id.0.clone()),
100                            })
101                        }
102                        _ => None,
103                    })
104                    .collect();
105                Some((observer, targets))
106            })?;
107        if let Some((observer, targets)) = dispatch {
108            for target in targets {
109                match target {
110                    Ok(core_id) => observer.on_interaction_stream_cleanup(core_id),
111                    Err(raw) => tracing::error!(
112                        raw = %raw,
113                        context = context,
114                        "InteractionStreamCleanup: DSL emitted a corr_id that is not a valid UUID — broken invariant; skipping observer dispatch"
115                    ),
116                }
117            }
118        }
119        Ok(())
120    }
121}
122
123fn dsl_corr_id_to_core(dsl_id: mm_dsl::PeerCorrelationId) -> Option<PeerCorrelationId> {
124    uuid::Uuid::parse_str(&dsl_id.0)
125        .ok()
126        .map(PeerCorrelationId::from_uuid)
127}
128
129impl InteractionStreamHandle for RuntimeInteractionStreamHandle {
130    fn reserved(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
131        self.apply_input_and_dispatch_cleanup(
132            mm_dsl::MeerkatMachineInput::InteractionStreamReserved {
133                corr_id: corr_id.into(),
134            },
135            "InteractionStreamHandle::reserved",
136        )
137    }
138
139    fn attached(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
140        self.apply_input_and_dispatch_cleanup(
141            mm_dsl::MeerkatMachineInput::InteractionStreamAttached {
142                corr_id: corr_id.into(),
143            },
144            "InteractionStreamHandle::attached",
145        )
146    }
147
148    fn completed(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
149        self.apply_input_and_dispatch_cleanup(
150            mm_dsl::MeerkatMachineInput::InteractionStreamCompleted {
151                corr_id: corr_id.into(),
152            },
153            "InteractionStreamHandle::completed",
154        )
155    }
156
157    fn expired(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
158        self.apply_input_and_dispatch_cleanup(
159            mm_dsl::MeerkatMachineInput::InteractionStreamExpired {
160                corr_id: corr_id.into(),
161            },
162            "InteractionStreamHandle::expired",
163        )
164    }
165
166    fn closed_early(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
167        self.apply_input_and_dispatch_cleanup(
168            mm_dsl::MeerkatMachineInput::InteractionStreamClosedEarly {
169                corr_id: corr_id.into(),
170            },
171            "InteractionStreamHandle::closed_early",
172        )
173    }
174
175    fn state(&self, corr_id: PeerCorrelationId) -> Option<CoreInteractionStreamState> {
176        let dsl_key: mm_dsl::PeerCorrelationId = corr_id.into();
177        let snapshot = self.dsl.snapshot_state();
178        // Disjoint-set encoding (matches the DSL `interaction_stream_disjoint`
179        // discipline): a corr_id is in at most one of the two active sets.
180        // Terminal states (`Completed` / `Expired` / `ClosedEarly`) leave both
181        // sets and are never observable here — they surface only via the
182        // `InteractionStreamStateChanged` effect, like the peer-correlation
183        // sibling enums.
184        if snapshot.attached_interaction_streams.contains(&dsl_key) {
185            Some(CoreInteractionStreamState::Attached)
186        } else if snapshot.reserved_interaction_streams.contains(&dsl_key) {
187            Some(CoreInteractionStreamState::Reserved)
188        } else {
189            None
190        }
191    }
192
193    fn install_cleanup_observer(&self, observer: Arc<dyn InteractionStreamCleanupObserver>) {
194        *self
195            .cleanup_observer
196            .write()
197            .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Arc::downgrade(&observer));
198    }
199}