meerkat_runtime/handles/
peer_interaction.rs1use std::sync::{Arc, RwLock, Weak};
12
13use meerkat_core::handles::{
14 DslTransitionError, PeerInteractionCleanupObserver, PeerInteractionHandle,
15 PeerTerminalDisposition as CorePeerDisposition,
16};
17use meerkat_core::peer_correlation::{
18 InboundPeerRequestState as CoreInboundState, OutboundPeerRequestState as CoreOutboundState,
19 PeerCorrelationId,
20};
21
22use super::HandleDslAuthority;
23use crate::meerkat_machine::dsl as mm_dsl;
24
25pub struct RuntimePeerInteractionHandle {
43 dsl: Arc<HandleDslAuthority>,
44 cleanup_observer: RwLock<Option<Weak<dyn PeerInteractionCleanupObserver>>>,
45}
46
47impl std::fmt::Debug for RuntimePeerInteractionHandle {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 let observer_tag = self
50 .cleanup_observer
51 .read()
52 .ok()
53 .as_deref()
54 .and_then(|o| o.as_ref().map(|_| "<observer>"));
55 f.debug_struct("RuntimePeerInteractionHandle")
56 .field("dsl", &self.dsl)
57 .field("cleanup_observer", &observer_tag)
58 .finish()
59 }
60}
61
62impl RuntimePeerInteractionHandle {
63 pub fn new(dsl: Arc<HandleDslAuthority>) -> Self {
65 Self {
66 dsl,
67 cleanup_observer: RwLock::new(None),
68 }
69 }
70
71 pub fn ephemeral() -> Self {
74 Self::new(Arc::new(HandleDslAuthority::ephemeral()))
75 }
76
77 fn apply_input_and_dispatch_cleanup(
78 &self,
79 input: mm_dsl::MeerkatMachineInput,
80 context: &'static str,
81 ) -> Result<(), DslTransitionError> {
82 type CleanupTarget = Result<PeerCorrelationId, String>;
92 let dispatch: Option<(Arc<dyn PeerInteractionCleanupObserver>, Vec<CleanupTarget>)> = self
93 .dsl
94 .apply_input_with_effects_and_sample(input, context, |effects| {
95 let observer_opt = self
96 .cleanup_observer
97 .read()
98 .unwrap_or_else(std::sync::PoisonError::into_inner)
99 .as_ref()
100 .and_then(Weak::upgrade);
101 let observer = observer_opt?;
102 let targets: Vec<CleanupTarget> = effects
103 .iter()
104 .filter_map(|effect| match effect {
105 mm_dsl::MeerkatMachineEffect::PeerInteractionCleanup { corr_id } => {
106 Some(match dsl_corr_id_to_core(corr_id.clone()) {
107 Some(core_id) => Ok(core_id),
108 None => Err(corr_id.0.clone()),
109 })
110 }
111 _ => None,
112 })
113 .collect();
114 Some((observer, targets))
115 })?;
116 if let Some((observer, targets)) = dispatch {
117 for target in targets {
118 match target {
119 Ok(core_id) => observer.on_peer_interaction_cleanup(core_id),
120 Err(raw) => tracing::error!(
121 raw = %raw,
122 context = context,
123 "PeerInteractionCleanup: DSL emitted a corr_id that is not a valid UUID — broken invariant; skipping observer dispatch"
124 ),
125 }
126 }
127 }
128 Ok(())
129 }
130}
131
132fn dsl_corr_id_to_core(dsl_id: mm_dsl::PeerCorrelationId) -> Option<PeerCorrelationId> {
133 uuid::Uuid::parse_str(&dsl_id.0)
140 .ok()
141 .map(PeerCorrelationId::from_uuid)
142}
143
144impl PeerInteractionHandle for RuntimePeerInteractionHandle {
145 fn request_sent(
146 &self,
147 corr_id: PeerCorrelationId,
148 to: String,
149 ) -> Result<(), DslTransitionError> {
150 self.apply_input_and_dispatch_cleanup(
151 mm_dsl::MeerkatMachineInput::PeerRequestSent {
152 corr_id: corr_id.into(),
153 to,
154 },
155 "PeerInteractionHandle::request_sent",
156 )
157 }
158
159 fn response_progress(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
160 self.apply_input_and_dispatch_cleanup(
161 mm_dsl::MeerkatMachineInput::PeerResponseProgressArrived {
162 corr_id: corr_id.into(),
163 },
164 "PeerInteractionHandle::response_progress",
165 )
166 }
167
168 fn response_terminal(
169 &self,
170 corr_id: PeerCorrelationId,
171 disposition: CorePeerDisposition,
172 ) -> Result<(), DslTransitionError> {
173 self.apply_input_and_dispatch_cleanup(
174 mm_dsl::MeerkatMachineInput::PeerResponseTerminalArrived {
175 corr_id: corr_id.into(),
176 disposition: disposition.into(),
177 },
178 "PeerInteractionHandle::response_terminal",
179 )
180 }
181
182 fn request_timed_out(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
183 self.apply_input_and_dispatch_cleanup(
184 mm_dsl::MeerkatMachineInput::PeerRequestTimedOut {
185 corr_id: corr_id.into(),
186 },
187 "PeerInteractionHandle::request_timed_out",
188 )
189 }
190
191 fn request_received(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
192 self.apply_input_and_dispatch_cleanup(
193 mm_dsl::MeerkatMachineInput::PeerRequestReceived {
194 corr_id: corr_id.into(),
195 },
196 "PeerInteractionHandle::request_received",
197 )
198 }
199
200 fn response_replied(&self, corr_id: PeerCorrelationId) -> Result<(), DslTransitionError> {
201 self.apply_input_and_dispatch_cleanup(
202 mm_dsl::MeerkatMachineInput::PeerResponseReplied {
203 corr_id: corr_id.into(),
204 },
205 "PeerInteractionHandle::response_replied",
206 )
207 }
208
209 fn outbound_state(&self, corr_id: PeerCorrelationId) -> Option<CoreOutboundState> {
210 let dsl_key: mm_dsl::PeerCorrelationId = corr_id.into();
211 self.dsl
212 .snapshot_state()
213 .pending_peer_requests
214 .get(&dsl_key)
215 .copied()
216 .map(Into::into)
217 }
218
219 fn inbound_state(&self, corr_id: PeerCorrelationId) -> Option<CoreInboundState> {
220 let dsl_key: mm_dsl::PeerCorrelationId = corr_id.into();
221 self.dsl
222 .snapshot_state()
223 .inbound_peer_requests
224 .get(&dsl_key)
225 .copied()
226 .map(Into::into)
227 }
228
229 fn install_cleanup_observer(&self, observer: Arc<dyn PeerInteractionCleanupObserver>) {
230 *self
237 .cleanup_observer
238 .write()
239 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Arc::downgrade(&observer));
240 }
241}