1use std::sync::Arc;
4
5use meerkat_core::handles::{DslTransitionError, PeerCommsHandle};
6use meerkat_core::interaction::{
7 PeerIngressAdmission, PeerIngressEnvelopeFacts, PeerIngressPlainEventFacts,
8};
9
10use super::HandleDslAuthority;
11use crate::meerkat_machine::dsl as mm_dsl;
12
13#[derive(Debug)]
18pub struct RuntimePeerCommsHandle {
19 dsl: Arc<HandleDslAuthority>,
20}
21
22impl RuntimePeerCommsHandle {
23 pub fn new(dsl: Arc<HandleDslAuthority>) -> Self {
25 Self { dsl }
26 }
27
28 pub fn ephemeral() -> Self {
32 Self::new(Arc::new(HandleDslAuthority::ephemeral()))
33 }
34}
35
36fn lifecycle_to_dsl(
37 kind: meerkat_core::comms::PeerLifecycleKind,
38) -> mm_dsl::PeerIngressLifecycleClass {
39 match kind {
40 meerkat_core::comms::PeerLifecycleKind::PeerAdded => {
41 mm_dsl::PeerIngressLifecycleClass::PeerAdded
42 }
43 meerkat_core::comms::PeerLifecycleKind::PeerRetired => {
44 mm_dsl::PeerIngressLifecycleClass::PeerRetired
45 }
46 meerkat_core::comms::PeerLifecycleKind::PeerUnwired => {
47 mm_dsl::PeerIngressLifecycleClass::PeerUnwired
48 }
49 }
50}
51
52fn lifecycle_from_dsl(
53 kind: mm_dsl::PeerIngressLifecycleClass,
54) -> meerkat_core::comms::PeerLifecycleKind {
55 match kind {
56 mm_dsl::PeerIngressLifecycleClass::PeerAdded => {
57 meerkat_core::comms::PeerLifecycleKind::PeerAdded
58 }
59 mm_dsl::PeerIngressLifecycleClass::PeerRetired => {
60 meerkat_core::comms::PeerLifecycleKind::PeerRetired
61 }
62 mm_dsl::PeerIngressLifecycleClass::PeerUnwired => {
63 meerkat_core::comms::PeerLifecycleKind::PeerUnwired
64 }
65 }
66}
67
68fn response_status_to_dsl(
69 status: meerkat_core::ResponseStatus,
70) -> mm_dsl::PeerIngressResponseStatus {
71 match status {
72 meerkat_core::ResponseStatus::Accepted => mm_dsl::PeerIngressResponseStatus::Accepted,
73 meerkat_core::ResponseStatus::Completed => mm_dsl::PeerIngressResponseStatus::Completed,
74 meerkat_core::ResponseStatus::Failed => mm_dsl::PeerIngressResponseStatus::Failed,
75 }
76}
77
78fn lifecycle_peer_param(params: &serde_json::Value) -> Option<String> {
79 params
80 .get("peer")
81 .and_then(serde_json::Value::as_str)
82 .map(ToOwned::to_owned)
83}
84
85fn terminality_from_dsl(
86 terminality: mm_dsl::PeerIngressResponseTerminality,
87) -> meerkat_core::TerminalityClass {
88 match terminality {
89 mm_dsl::PeerIngressResponseTerminality::Progress => {
90 meerkat_core::TerminalityClass::Progress
91 }
92 mm_dsl::PeerIngressResponseTerminality::TerminalCompleted => {
93 meerkat_core::TerminalityClass::Terminal {
94 disposition: meerkat_core::TerminalDisposition::Completed,
95 }
96 }
97 mm_dsl::PeerIngressResponseTerminality::TerminalFailed => {
98 meerkat_core::TerminalityClass::Terminal {
99 disposition: meerkat_core::TerminalDisposition::Failed,
100 }
101 }
102 }
103}
104
105fn external_envelope_signal(facts: &PeerIngressEnvelopeFacts) -> mm_dsl::MeerkatMachineSignal {
106 let (
107 envelope_kind,
108 request_intent,
109 lifecycle_kind,
110 lifecycle_peer_param,
111 response_status,
112 in_reply_to,
113 ) = match &facts.kind {
114 meerkat_core::PeerIngressEnvelopeKind::Message { .. } => (
115 mm_dsl::PeerIngressEnvelopeClass::Message,
116 String::new(),
117 mm_dsl::PeerIngressLifecycleClass::PeerAdded,
118 None,
119 mm_dsl::PeerIngressResponseStatus::Accepted,
120 String::new(),
121 ),
122 meerkat_core::PeerIngressEnvelopeKind::Request { intent, params } => (
123 mm_dsl::PeerIngressEnvelopeClass::Request,
124 intent.clone(),
125 mm_dsl::PeerIngressLifecycleClass::PeerAdded,
126 lifecycle_peer_param(params),
127 mm_dsl::PeerIngressResponseStatus::Accepted,
128 String::new(),
129 ),
130 meerkat_core::PeerIngressEnvelopeKind::Lifecycle { kind, params } => (
131 mm_dsl::PeerIngressEnvelopeClass::Lifecycle,
132 String::new(),
133 lifecycle_to_dsl(*kind),
134 lifecycle_peer_param(params),
135 mm_dsl::PeerIngressResponseStatus::Accepted,
136 String::new(),
137 ),
138 meerkat_core::PeerIngressEnvelopeKind::Response {
139 in_reply_to: reply_to,
140 status,
141 ..
142 } => (
143 mm_dsl::PeerIngressEnvelopeClass::Response,
144 String::new(),
145 mm_dsl::PeerIngressLifecycleClass::PeerAdded,
146 None,
147 response_status_to_dsl(*status),
148 reply_to.clone(),
149 ),
150 meerkat_core::PeerIngressEnvelopeKind::Ack {
151 in_reply_to: reply_to,
152 } => (
153 mm_dsl::PeerIngressEnvelopeClass::Ack,
154 String::new(),
155 mm_dsl::PeerIngressLifecycleClass::PeerAdded,
156 None,
157 mm_dsl::PeerIngressResponseStatus::Accepted,
158 reply_to.clone(),
159 ),
160 };
161
162 mm_dsl::MeerkatMachineSignal::ClassifyExternalEnvelope {
163 item_id: facts.item_id.clone(),
164 from_peer: facts.from_peer.clone(),
165 envelope_kind,
166 request_intent,
167 lifecycle_kind,
168 lifecycle_peer_param,
169 response_status,
170 in_reply_to,
171 }
172}
173
174struct PeerIngressClassifiedEffect {
175 class: mm_dsl::PeerIngressInputClass,
176 kind: mm_dsl::PeerIngressAdmittedKind,
177 auth: mm_dsl::PeerIngressAuthClass,
178 lifecycle_kind: Option<mm_dsl::PeerIngressLifecycleClass>,
179 lifecycle_peer: Option<String>,
180 request_id: Option<String>,
181 response_terminality: Option<mm_dsl::PeerIngressResponseTerminality>,
182}
183
184fn classified_effect(
185 effects: Vec<mm_dsl::MeerkatMachineEffect>,
186 context: &'static str,
187) -> Result<PeerIngressClassifiedEffect, DslTransitionError> {
188 effects
189 .into_iter()
190 .find_map(|effect| match effect {
191 mm_dsl::MeerkatMachineEffect::PeerIngressClassified {
192 class,
193 kind,
194 auth,
195 lifecycle_kind,
196 lifecycle_peer,
197 request_id,
198 response_terminality,
199 } => Some(PeerIngressClassifiedEffect {
200 class,
201 kind,
202 auth,
203 lifecycle_kind,
204 lifecycle_peer,
205 request_id,
206 response_terminality,
207 }),
208 _ => None,
209 })
210 .ok_or_else(|| {
211 DslTransitionError::guard_rejected(
212 context,
213 "machine transition did not emit PeerIngressClassified",
214 )
215 })
216}
217
218fn classification_from_effect(
219 effect: &PeerIngressClassifiedEffect,
220) -> meerkat_core::PeerIngressClassification {
221 let class = match effect.class {
222 mm_dsl::PeerIngressInputClass::ActionableMessage => {
223 meerkat_core::PeerInputClass::ActionableMessage
224 }
225 mm_dsl::PeerIngressInputClass::ActionableRequest => {
226 meerkat_core::PeerInputClass::ActionableRequest
227 }
228 mm_dsl::PeerIngressInputClass::ResponseProgress => {
229 meerkat_core::PeerInputClass::ResponseProgress
230 }
231 mm_dsl::PeerIngressInputClass::ResponseTerminal => {
232 meerkat_core::PeerInputClass::ResponseTerminal
233 }
234 mm_dsl::PeerIngressInputClass::PeerLifecycleAdded => {
235 meerkat_core::PeerInputClass::PeerLifecycleAdded
236 }
237 mm_dsl::PeerIngressInputClass::PeerLifecycleRetired => {
238 meerkat_core::PeerInputClass::PeerLifecycleRetired
239 }
240 mm_dsl::PeerIngressInputClass::PeerLifecycleUnwired => {
241 meerkat_core::PeerInputClass::PeerLifecycleUnwired
242 }
243 mm_dsl::PeerIngressInputClass::SilentRequest => meerkat_core::PeerInputClass::SilentRequest,
244 mm_dsl::PeerIngressInputClass::Ack => meerkat_core::PeerInputClass::Ack,
245 mm_dsl::PeerIngressInputClass::PlainEvent => meerkat_core::PeerInputClass::PlainEvent,
246 };
247 let kind = match effect.kind {
248 mm_dsl::PeerIngressAdmittedKind::Message => meerkat_core::PeerIngressKind::Message,
249 mm_dsl::PeerIngressAdmittedKind::Request => meerkat_core::PeerIngressKind::Request,
250 mm_dsl::PeerIngressAdmittedKind::Response => meerkat_core::PeerIngressKind::Response,
251 mm_dsl::PeerIngressAdmittedKind::Ack => meerkat_core::PeerIngressKind::Ack,
252 mm_dsl::PeerIngressAdmittedKind::PlainEvent => meerkat_core::PeerIngressKind::PlainEvent,
253 };
254 let auth = match effect.auth {
255 mm_dsl::PeerIngressAuthClass::Required => meerkat_core::PeerIngressAuthDecision::Required,
256 mm_dsl::PeerIngressAuthClass::SupervisorBridgeExempt => {
257 meerkat_core::PeerIngressAuthDecision::Exempt(
258 meerkat_core::PeerIngressAuthExemption::SupervisorBridge,
259 )
260 }
261 };
262
263 meerkat_core::PeerIngressClassification {
264 class,
265 kind,
266 auth,
267 lifecycle_kind: effect.lifecycle_kind.map(lifecycle_from_dsl),
268 response_terminality: effect.response_terminality.map(terminality_from_dsl),
269 }
270}
271
272impl PeerCommsHandle for RuntimePeerCommsHandle {
273 fn classify_external_envelope(
274 &self,
275 facts: PeerIngressEnvelopeFacts,
276 ) -> Result<PeerIngressAdmission, DslTransitionError> {
277 let context = "PeerCommsHandle::classify_external_envelope";
278 let effects = self
279 .dsl
280 .apply_signal_with_effects(external_envelope_signal(&facts), context)?;
281 let effect = classified_effect(effects, context)?;
282 let classification = classification_from_effect(&effect);
283 Ok(PeerIngressAdmission {
284 rendered_text: meerkat_core::render_peer_ingress_admitted_text(&facts, &classification),
285 classification,
286 lifecycle_peer: effect.lifecycle_peer,
287 request_id: effect.request_id,
288 })
289 }
290
291 fn classify_plain_event(
292 &self,
293 facts: PeerIngressPlainEventFacts,
294 ) -> Result<PeerIngressAdmission, DslTransitionError> {
295 let context = "PeerCommsHandle::classify_plain_event";
296 let effects = self.dsl.apply_signal_with_effects(
297 mm_dsl::MeerkatMachineSignal::ClassifyPlainEvent {
298 source_name: facts.source_name.clone(),
299 },
300 context,
301 )?;
302 let effect = classified_effect(effects, context)?;
303 Ok(PeerIngressAdmission {
304 classification: classification_from_effect(&effect),
305 lifecycle_peer: effect.lifecycle_peer,
306 request_id: effect.request_id,
307 rendered_text: meerkat_core::interaction::format_external_event_projection(
308 &facts.source_name,
309 Some(&facts.body),
310 ),
311 })
312 }
313
314 fn set_peer_ingress_context(&self, keep_alive: bool) -> Result<(), DslTransitionError> {
315 self.dsl.apply_input(
317 mm_dsl::MeerkatMachineInput::SetPeerIngressContext { keep_alive },
318 "PeerCommsHandle::set_peer_ingress_context",
319 )
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326 use std::collections::BTreeSet;
327 use std::sync::Mutex;
328
329 fn handle_for_phase(phase: mm_dsl::MeerkatPhase) -> RuntimePeerCommsHandle {
330 let state = mm_dsl::MeerkatMachineState {
331 lifecycle_phase: phase,
332 session_id: Some(mm_dsl::SessionId("session-1".to_string())),
333 ..Default::default()
334 };
335 let authority = Arc::new(Mutex::new(mm_dsl::MeerkatMachineAuthority::from_state(
336 state,
337 )));
338 RuntimePeerCommsHandle::new(Arc::new(HandleDslAuthority::from_shared(authority)))
339 }
340
341 #[test]
342 fn runtime_peer_comms_handle_classifies_from_dsl_silent_intents() {
343 let state = mm_dsl::MeerkatMachineState {
344 lifecycle_phase: mm_dsl::MeerkatPhase::Attached,
345 session_id: Some(mm_dsl::SessionId("session-1".to_string())),
346 silent_intent_overrides: BTreeSet::from(["probe.silent".to_string()]),
347 ..Default::default()
348 };
349 let authority = Arc::new(Mutex::new(mm_dsl::MeerkatMachineAuthority::from_state(
350 state,
351 )));
352 let handle =
353 RuntimePeerCommsHandle::new(Arc::new(HandleDslAuthority::from_shared(authority)));
354
355 let admission = handle
356 .classify_external_envelope(PeerIngressEnvelopeFacts {
357 item_id: "request-1".to_string(),
358 from_peer: "peer-1".to_string(),
359 from_peer_id: meerkat_core::comms::PeerId::new(),
360 kind: meerkat_core::PeerIngressEnvelopeKind::Request {
361 intent: "probe.silent".to_string(),
362 params: serde_json::json!({}),
363 },
364 })
365 .expect("attached session should classify peer ingress");
366
367 assert_eq!(
368 admission.classification.class,
369 meerkat_core::PeerInputClass::SilentRequest
370 );
371 assert_eq!(
372 admission.classification.auth,
373 meerkat_core::PeerIngressAuthDecision::Required
374 );
375 assert_eq!(admission.request_id.as_deref(), Some("request-1"));
376 }
377
378 #[test]
379 fn runtime_peer_comms_handle_lifecycle_subject_is_selected_by_machine() {
380 let state = mm_dsl::MeerkatMachineState {
381 lifecycle_phase: mm_dsl::MeerkatPhase::Attached,
382 session_id: Some(mm_dsl::SessionId("session-1".to_string())),
383 ..Default::default()
384 };
385 let authority = Arc::new(Mutex::new(mm_dsl::MeerkatMachineAuthority::from_state(
386 state,
387 )));
388 let handle =
389 RuntimePeerCommsHandle::new(Arc::new(HandleDslAuthority::from_shared(authority)));
390
391 let with_param = handle
392 .classify_external_envelope(PeerIngressEnvelopeFacts {
393 item_id: "request-param".to_string(),
394 from_peer: "orchestrator".to_string(),
395 from_peer_id: meerkat_core::comms::PeerId::new(),
396 kind: meerkat_core::PeerIngressEnvelopeKind::Request {
397 intent: "mob.peer_added".to_string(),
398 params: serde_json::json!({ "peer": "worker-1" }),
399 },
400 })
401 .expect("machine should classify lifecycle request");
402 assert_eq!(with_param.lifecycle_peer.as_deref(), Some("worker-1"));
403
404 let without_param = handle
405 .classify_external_envelope(PeerIngressEnvelopeFacts {
406 item_id: "request-fallback".to_string(),
407 from_peer: "orchestrator".to_string(),
408 from_peer_id: meerkat_core::comms::PeerId::new(),
409 kind: meerkat_core::PeerIngressEnvelopeKind::Request {
410 intent: "mob.peer_retired".to_string(),
411 params: serde_json::json!({}),
412 },
413 })
414 .expect("machine should classify lifecycle request fallback");
415 assert_eq!(
416 without_param.lifecycle_peer.as_deref(),
417 Some("orchestrator")
418 );
419
420 let empty_param = handle
421 .classify_external_envelope(PeerIngressEnvelopeFacts {
422 item_id: "lifecycle-empty".to_string(),
423 from_peer: "orchestrator".to_string(),
424 from_peer_id: meerkat_core::comms::PeerId::new(),
425 kind: meerkat_core::PeerIngressEnvelopeKind::Lifecycle {
426 kind: meerkat_core::comms::PeerLifecycleKind::PeerUnwired,
427 params: serde_json::json!({ "peer": "" }),
428 },
429 })
430 .expect("machine should classify lifecycle event fallback");
431 assert_eq!(empty_param.lifecycle_peer.as_deref(), Some("orchestrator"));
432 }
433
434 #[test]
435 fn runtime_peer_comms_handle_classifies_idle_lifecycle_without_opening_peer_work() {
436 let handle = handle_for_phase(mm_dsl::MeerkatPhase::Idle);
437
438 let retired_notice = handle
439 .classify_external_envelope(PeerIngressEnvelopeFacts {
440 item_id: "lifecycle-retired".to_string(),
441 from_peer: "orchestrator".to_string(),
442 from_peer_id: meerkat_core::comms::PeerId::new(),
443 kind: meerkat_core::PeerIngressEnvelopeKind::Lifecycle {
444 kind: meerkat_core::comms::PeerLifecycleKind::PeerRetired,
445 params: serde_json::json!({ "peer": "worker-1" }),
446 },
447 })
448 .expect("idle live session should classify mob lifecycle notices");
449 assert_eq!(
450 retired_notice.classification.class,
451 meerkat_core::PeerInputClass::PeerLifecycleRetired
452 );
453 assert_eq!(
454 retired_notice.classification.lifecycle_kind,
455 Some(meerkat_core::comms::PeerLifecycleKind::PeerRetired)
456 );
457 assert_eq!(retired_notice.lifecycle_peer.as_deref(), Some("worker-1"));
458 assert_eq!(retired_notice.request_id, None);
459
460 let added_request = handle
461 .classify_external_envelope(PeerIngressEnvelopeFacts {
462 item_id: "request-added".to_string(),
463 from_peer: "orchestrator".to_string(),
464 from_peer_id: meerkat_core::comms::PeerId::new(),
465 kind: meerkat_core::PeerIngressEnvelopeKind::Request {
466 intent: "mob.peer_added".to_string(),
467 params: serde_json::json!({ "peer": "worker-2" }),
468 },
469 })
470 .expect("idle live session should classify lifecycle requests");
471 assert_eq!(
472 added_request.classification.class,
473 meerkat_core::PeerInputClass::PeerLifecycleAdded
474 );
475 assert_eq!(added_request.lifecycle_peer.as_deref(), Some("worker-2"));
476 assert_eq!(added_request.request_id.as_deref(), Some("request-added"));
477
478 let work_admission = handle.classify_external_envelope(PeerIngressEnvelopeFacts {
479 item_id: "message-1".to_string(),
480 from_peer: "peer-1".to_string(),
481 from_peer_id: meerkat_core::comms::PeerId::new(),
482 kind: meerkat_core::PeerIngressEnvelopeKind::Message {
483 body: "wake up".to_string(),
484 },
485 });
486 assert!(
487 work_admission.is_err(),
488 "idle lifecycle admission must not reopen normal peer work ingress"
489 );
490 }
491
492 #[test]
493 fn runtime_peer_comms_handle_drains_terminal_cleanup_without_reopening_topology_adds() {
494 for phase in [mm_dsl::MeerkatPhase::Retired, mm_dsl::MeerkatPhase::Stopped] {
495 let handle = handle_for_phase(phase);
496
497 let retired_notice = handle
498 .classify_external_envelope(PeerIngressEnvelopeFacts {
499 item_id: "lifecycle-retired".to_string(),
500 from_peer: "orchestrator".to_string(),
501 from_peer_id: meerkat_core::comms::PeerId::new(),
502 kind: meerkat_core::PeerIngressEnvelopeKind::Lifecycle {
503 kind: meerkat_core::comms::PeerLifecycleKind::PeerRetired,
504 params: serde_json::json!({ "peer": "worker-1" }),
505 },
506 })
507 .expect("terminal sessions should drain peer-retired cleanup notices");
508 assert_eq!(
509 retired_notice.classification.class,
510 meerkat_core::PeerInputClass::PeerLifecycleRetired
511 );
512
513 let unwired_request = handle
514 .classify_external_envelope(PeerIngressEnvelopeFacts {
515 item_id: "request-unwired".to_string(),
516 from_peer: "orchestrator".to_string(),
517 from_peer_id: meerkat_core::comms::PeerId::new(),
518 kind: meerkat_core::PeerIngressEnvelopeKind::Request {
519 intent: "mob.peer_unwired".to_string(),
520 params: serde_json::json!({ "peer": "worker-2" }),
521 },
522 })
523 .expect("terminal sessions should drain peer-unwired cleanup requests");
524 assert_eq!(
525 unwired_request.classification.class,
526 meerkat_core::PeerInputClass::PeerLifecycleUnwired
527 );
528
529 let added_notice = handle.classify_external_envelope(PeerIngressEnvelopeFacts {
530 item_id: "lifecycle-added".to_string(),
531 from_peer: "orchestrator".to_string(),
532 from_peer_id: meerkat_core::comms::PeerId::new(),
533 kind: meerkat_core::PeerIngressEnvelopeKind::Lifecycle {
534 kind: meerkat_core::comms::PeerLifecycleKind::PeerAdded,
535 params: serde_json::json!({ "peer": "worker-3" }),
536 },
537 });
538 assert!(
539 added_notice.is_err(),
540 "terminal cleanup admission must not accept new peer topology"
541 );
542 }
543 }
544
545 #[test]
546 fn runtime_signal_builder_does_not_preselect_lifecycle_subject() {
547 let source = include_str!("peer_comms.rs");
548 let signal_builder = source
549 .split("fn external_envelope_signal")
550 .nth(1)
551 .expect("signal builder should exist")
552 .split("struct PeerIngressClassifiedEffect")
553 .next()
554 .expect("classified effect should follow signal builder");
555 let forbidden_helper = ["peer", "lifecycle", "subject"].join("_");
556
557 assert!(
558 signal_builder.contains("lifecycle_peer_param"),
559 "runtime should pass the parsed lifecycle peer candidate"
560 );
561 assert!(
562 !signal_builder.contains(&forbidden_helper),
563 "runtime must not call the lifecycle subject selector before the machine"
564 );
565 assert!(
566 !signal_builder.contains("facts.from_peer.as_str()"),
567 "fallback peer must remain a machine input fact, not a preselected subject"
568 );
569 assert!(
570 !signal_builder.contains("unwrap_or"),
571 "runtime must not choose a lifecycle subject fallback before the machine"
572 );
573 }
574}