meerkat_runtime/handles/
interaction_stream.rs1use std::sync::{Arc, RwLock, Weak};
12
13use meerkat_core::handles::{
14 DslTransitionError, InteractionStreamCleanupObserver, InteractionStreamHandle,
15};
16use meerkat_core::peer_correlation::{
17 InteractionStreamState as CoreInteractionStreamState, PeerCorrelationId,
18};
19
20use super::HandleDslAuthority;
21use crate::meerkat_machine::dsl as mm_dsl;
22
23pub struct RuntimeInteractionStreamHandle {
38 dsl: Arc<HandleDslAuthority>,
39 cleanup_observer: RwLock<Option<Weak<dyn InteractionStreamCleanupObserver>>>,
40}
41
42impl std::fmt::Debug for RuntimeInteractionStreamHandle {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 let observer_tag = self
45 .cleanup_observer
46 .read()
47 .ok()
48 .as_deref()
49 .and_then(|o| o.as_ref().map(|_| "<observer>"));
50 f.debug_struct("RuntimeInteractionStreamHandle")
51 .field("dsl", &self.dsl)
52 .field("cleanup_observer", &observer_tag)
53 .finish()
54 }
55}
56
57impl RuntimeInteractionStreamHandle {
58 pub fn new(dsl: Arc<HandleDslAuthority>) -> Self {
60 Self {
61 dsl,
62 cleanup_observer: RwLock::new(None),
63 }
64 }
65
66 pub fn ephemeral() -> Self {
69 Self::new(Arc::new(HandleDslAuthority::ephemeral()))
70 }
71
72 fn apply_input_and_dispatch_cleanup(
73 &self,
74 input: mm_dsl::MeerkatMachineInput,
75 context: &'static str,
76 ) -> Result<(), DslTransitionError> {
77 type CleanupTarget = Result<PeerCorrelationId, String>;
80 let dispatch: Option<(
81 Arc<dyn InteractionStreamCleanupObserver>,
82 Vec<CleanupTarget>,
83 )> = self
84 .dsl
85 .apply_input_with_effects_and_sample(input, context, |effects| {
86 let observer_opt = self
87 .cleanup_observer
88 .read()
89 .unwrap_or_else(std::sync::PoisonError::into_inner)
90 .as_ref()
91 .and_then(Weak::upgrade);
92 let observer = observer_opt?;
93 let targets: Vec<CleanupTarget> = effects
94 .iter()
95 .filter_map(|effect| match effect {
96 mm_dsl::MeerkatMachineEffect::InteractionStreamCleanup { corr_id } => {
97 Some(match dsl_corr_id_to_core(corr_id.clone()) {
98 Some(core_id) => Ok(core_id),
99 None => Err(corr_id.0.clone()),
100 })
101 }
102 _ => None,
103 })
104 .collect();
105 Some((observer, targets))
106 })?;
107 if let Some((observer, targets)) = dispatch {
108 for target in targets {
109 match target {
110 Ok(core_id) => observer.on_interaction_stream_cleanup(core_id),
111 Err(raw) => tracing::error!(
112 raw = %raw,
113 context = context,
114 "InteractionStreamCleanup: DSL emitted a corr_id that is not a valid UUID — broken invariant; skipping observer dispatch"
115 ),
116 }
117 }
118 }
119 Ok(())
120 }
121}
122
123fn dsl_corr_id_to_core(dsl_id: mm_dsl::PeerCorrelationId) -> Option<PeerCorrelationId> {
124 uuid::Uuid::parse_str(&dsl_id.0)
125 .ok()
126 .map(PeerCorrelationId::from_uuid)
127}
128
129impl InteractionStreamHandle for RuntimeInteractionStreamHandle {
130 fn reserved(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
131 self.apply_input_and_dispatch_cleanup(
132 mm_dsl::MeerkatMachineInput::InteractionStreamReserved {
133 corr_id: corr_id.into(),
134 },
135 "InteractionStreamHandle::reserved",
136 )
137 }
138
139 fn attached(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
140 self.apply_input_and_dispatch_cleanup(
141 mm_dsl::MeerkatMachineInput::InteractionStreamAttached {
142 corr_id: corr_id.into(),
143 },
144 "InteractionStreamHandle::attached",
145 )
146 }
147
148 fn completed(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
149 self.apply_input_and_dispatch_cleanup(
150 mm_dsl::MeerkatMachineInput::InteractionStreamCompleted {
151 corr_id: corr_id.into(),
152 },
153 "InteractionStreamHandle::completed",
154 )
155 }
156
157 fn expired(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
158 self.apply_input_and_dispatch_cleanup(
159 mm_dsl::MeerkatMachineInput::InteractionStreamExpired {
160 corr_id: corr_id.into(),
161 },
162 "InteractionStreamHandle::expired",
163 )
164 }
165
166 fn closed_early(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
167 self.apply_input_and_dispatch_cleanup(
168 mm_dsl::MeerkatMachineInput::InteractionStreamClosedEarly {
169 corr_id: corr_id.into(),
170 },
171 "InteractionStreamHandle::closed_early",
172 )
173 }
174
175 fn state(&self, corr_id: PeerCorrelationId) -> Option<CoreInteractionStreamState> {
176 let dsl_key: mm_dsl::PeerCorrelationId = corr_id.into();
177 let snapshot = self.dsl.snapshot_state();
178 if snapshot.attached_interaction_streams.contains(&dsl_key) {
185 Some(CoreInteractionStreamState::Attached)
186 } else if snapshot.reserved_interaction_streams.contains(&dsl_key) {
187 Some(CoreInteractionStreamState::Reserved)
188 } else {
189 None
190 }
191 }
192
193 fn install_cleanup_observer(&self, observer: Arc<dyn InteractionStreamCleanupObserver>) {
194 *self
195 .cleanup_observer
196 .write()
197 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Arc::downgrade(&observer));
198 }
199}