Skip to main content

meerkat_runtime/handles/
peer_comms.rs

1//! Runtime impl of [`meerkat_core::handles::PeerCommsHandle`].
2
3use std::sync::Arc;
4
5use meerkat_core::handles::{DslTransitionError, PeerCommsHandle};
6use meerkat_core::interaction::{
7    PeerIngressAdmission, PeerIngressEnvelopeFacts, PeerIngressPlainEventFacts,
8};
9
10use super::HandleDslAuthority;
11use crate::meerkat_machine::dsl as mm_dsl;
12
13/// Runtime-backed [`PeerCommsHandle`] impl.
14///
15/// Routes every trait method to the corresponding DSL signal / input on a
16/// dedicated per-session MeerkatMachine DSL authority.
17#[derive(Debug)]
18pub struct RuntimePeerCommsHandle {
19    dsl: Arc<HandleDslAuthority>,
20}
21
22impl RuntimePeerCommsHandle {
23    /// Construct a handle backed by the session's shared DSL authority.
24    pub fn new(dsl: Arc<HandleDslAuthority>) -> Self {
25        Self { dsl }
26    }
27
28    /// Construct a handle backed by an ephemeral DSL authority.
29    ///
30    /// See [`RuntimeTurnStateHandle::ephemeral`].
31    pub fn ephemeral() -> Self {
32        Self::new(Arc::new(HandleDslAuthority::ephemeral()))
33    }
34}
35
36fn lifecycle_to_dsl(
37    kind: meerkat_core::comms::PeerLifecycleKind,
38) -> mm_dsl::PeerIngressLifecycleClass {
39    match kind {
40        meerkat_core::comms::PeerLifecycleKind::PeerAdded => {
41            mm_dsl::PeerIngressLifecycleClass::PeerAdded
42        }
43        meerkat_core::comms::PeerLifecycleKind::PeerRetired => {
44            mm_dsl::PeerIngressLifecycleClass::PeerRetired
45        }
46        meerkat_core::comms::PeerLifecycleKind::PeerUnwired => {
47            mm_dsl::PeerIngressLifecycleClass::PeerUnwired
48        }
49    }
50}
51
52fn lifecycle_from_dsl(
53    kind: mm_dsl::PeerIngressLifecycleClass,
54) -> meerkat_core::comms::PeerLifecycleKind {
55    match kind {
56        mm_dsl::PeerIngressLifecycleClass::PeerAdded => {
57            meerkat_core::comms::PeerLifecycleKind::PeerAdded
58        }
59        mm_dsl::PeerIngressLifecycleClass::PeerRetired => {
60            meerkat_core::comms::PeerLifecycleKind::PeerRetired
61        }
62        mm_dsl::PeerIngressLifecycleClass::PeerUnwired => {
63            meerkat_core::comms::PeerLifecycleKind::PeerUnwired
64        }
65    }
66}
67
68fn response_status_to_dsl(
69    status: meerkat_core::ResponseStatus,
70) -> mm_dsl::PeerIngressResponseStatus {
71    match status {
72        meerkat_core::ResponseStatus::Accepted => mm_dsl::PeerIngressResponseStatus::Accepted,
73        meerkat_core::ResponseStatus::Completed => mm_dsl::PeerIngressResponseStatus::Completed,
74        meerkat_core::ResponseStatus::Failed => mm_dsl::PeerIngressResponseStatus::Failed,
75    }
76}
77
78fn lifecycle_peer_param(params: &serde_json::Value) -> Option<String> {
79    params
80        .get("peer")
81        .and_then(serde_json::Value::as_str)
82        .map(ToOwned::to_owned)
83}
84
85fn terminality_from_dsl(
86    terminality: mm_dsl::PeerIngressResponseTerminality,
87) -> meerkat_core::TerminalityClass {
88    match terminality {
89        mm_dsl::PeerIngressResponseTerminality::Progress => {
90            meerkat_core::TerminalityClass::Progress
91        }
92        mm_dsl::PeerIngressResponseTerminality::TerminalCompleted => {
93            meerkat_core::TerminalityClass::Terminal {
94                disposition: meerkat_core::TerminalDisposition::Completed,
95            }
96        }
97        mm_dsl::PeerIngressResponseTerminality::TerminalFailed => {
98            meerkat_core::TerminalityClass::Terminal {
99                disposition: meerkat_core::TerminalDisposition::Failed,
100            }
101        }
102    }
103}
104
105fn external_envelope_signal(facts: &PeerIngressEnvelopeFacts) -> mm_dsl::MeerkatMachineSignal {
106    let (
107        envelope_kind,
108        request_intent,
109        lifecycle_kind,
110        lifecycle_peer_param,
111        response_status,
112        in_reply_to,
113    ) = match &facts.kind {
114        meerkat_core::PeerIngressEnvelopeKind::Message { .. } => (
115            mm_dsl::PeerIngressEnvelopeClass::Message,
116            String::new(),
117            mm_dsl::PeerIngressLifecycleClass::PeerAdded,
118            None,
119            mm_dsl::PeerIngressResponseStatus::Accepted,
120            String::new(),
121        ),
122        meerkat_core::PeerIngressEnvelopeKind::Request { intent, params } => (
123            mm_dsl::PeerIngressEnvelopeClass::Request,
124            intent.clone(),
125            mm_dsl::PeerIngressLifecycleClass::PeerAdded,
126            lifecycle_peer_param(params),
127            mm_dsl::PeerIngressResponseStatus::Accepted,
128            String::new(),
129        ),
130        meerkat_core::PeerIngressEnvelopeKind::Lifecycle { kind, params } => (
131            mm_dsl::PeerIngressEnvelopeClass::Lifecycle,
132            String::new(),
133            lifecycle_to_dsl(*kind),
134            lifecycle_peer_param(params),
135            mm_dsl::PeerIngressResponseStatus::Accepted,
136            String::new(),
137        ),
138        meerkat_core::PeerIngressEnvelopeKind::Response {
139            in_reply_to: reply_to,
140            status,
141            ..
142        } => (
143            mm_dsl::PeerIngressEnvelopeClass::Response,
144            String::new(),
145            mm_dsl::PeerIngressLifecycleClass::PeerAdded,
146            None,
147            response_status_to_dsl(*status),
148            reply_to.clone(),
149        ),
150        meerkat_core::PeerIngressEnvelopeKind::Ack {
151            in_reply_to: reply_to,
152        } => (
153            mm_dsl::PeerIngressEnvelopeClass::Ack,
154            String::new(),
155            mm_dsl::PeerIngressLifecycleClass::PeerAdded,
156            None,
157            mm_dsl::PeerIngressResponseStatus::Accepted,
158            reply_to.clone(),
159        ),
160    };
161
162    mm_dsl::MeerkatMachineSignal::ClassifyExternalEnvelope {
163        item_id: facts.item_id.clone(),
164        from_peer: facts.from_peer.clone(),
165        envelope_kind,
166        request_intent,
167        lifecycle_kind,
168        lifecycle_peer_param,
169        response_status,
170        in_reply_to,
171    }
172}
173
174struct PeerIngressClassifiedEffect {
175    class: mm_dsl::PeerIngressInputClass,
176    kind: mm_dsl::PeerIngressAdmittedKind,
177    auth: mm_dsl::PeerIngressAuthClass,
178    lifecycle_kind: Option<mm_dsl::PeerIngressLifecycleClass>,
179    lifecycle_peer: Option<String>,
180    request_id: Option<String>,
181    response_terminality: Option<mm_dsl::PeerIngressResponseTerminality>,
182}
183
184fn classified_effect(
185    effects: Vec<mm_dsl::MeerkatMachineEffect>,
186    context: &'static str,
187) -> Result<PeerIngressClassifiedEffect, DslTransitionError> {
188    effects
189        .into_iter()
190        .find_map(|effect| match effect {
191            mm_dsl::MeerkatMachineEffect::PeerIngressClassified {
192                class,
193                kind,
194                auth,
195                lifecycle_kind,
196                lifecycle_peer,
197                request_id,
198                response_terminality,
199            } => Some(PeerIngressClassifiedEffect {
200                class,
201                kind,
202                auth,
203                lifecycle_kind,
204                lifecycle_peer,
205                request_id,
206                response_terminality,
207            }),
208            _ => None,
209        })
210        .ok_or_else(|| {
211            DslTransitionError::guard_rejected(
212                context,
213                "machine transition did not emit PeerIngressClassified",
214            )
215        })
216}
217
218fn classification_from_effect(
219    effect: &PeerIngressClassifiedEffect,
220) -> meerkat_core::PeerIngressClassification {
221    let class = match effect.class {
222        mm_dsl::PeerIngressInputClass::ActionableMessage => {
223            meerkat_core::PeerInputClass::ActionableMessage
224        }
225        mm_dsl::PeerIngressInputClass::ActionableRequest => {
226            meerkat_core::PeerInputClass::ActionableRequest
227        }
228        mm_dsl::PeerIngressInputClass::ResponseProgress => {
229            meerkat_core::PeerInputClass::ResponseProgress
230        }
231        mm_dsl::PeerIngressInputClass::ResponseTerminal => {
232            meerkat_core::PeerInputClass::ResponseTerminal
233        }
234        mm_dsl::PeerIngressInputClass::PeerLifecycleAdded => {
235            meerkat_core::PeerInputClass::PeerLifecycleAdded
236        }
237        mm_dsl::PeerIngressInputClass::PeerLifecycleRetired => {
238            meerkat_core::PeerInputClass::PeerLifecycleRetired
239        }
240        mm_dsl::PeerIngressInputClass::PeerLifecycleUnwired => {
241            meerkat_core::PeerInputClass::PeerLifecycleUnwired
242        }
243        mm_dsl::PeerIngressInputClass::SilentRequest => meerkat_core::PeerInputClass::SilentRequest,
244        mm_dsl::PeerIngressInputClass::Ack => meerkat_core::PeerInputClass::Ack,
245        mm_dsl::PeerIngressInputClass::PlainEvent => meerkat_core::PeerInputClass::PlainEvent,
246    };
247    let kind = match effect.kind {
248        mm_dsl::PeerIngressAdmittedKind::Message => meerkat_core::PeerIngressKind::Message,
249        mm_dsl::PeerIngressAdmittedKind::Request => meerkat_core::PeerIngressKind::Request,
250        mm_dsl::PeerIngressAdmittedKind::Response => meerkat_core::PeerIngressKind::Response,
251        mm_dsl::PeerIngressAdmittedKind::Ack => meerkat_core::PeerIngressKind::Ack,
252        mm_dsl::PeerIngressAdmittedKind::PlainEvent => meerkat_core::PeerIngressKind::PlainEvent,
253    };
254    let auth = match effect.auth {
255        mm_dsl::PeerIngressAuthClass::Required => meerkat_core::PeerIngressAuthDecision::Required,
256        mm_dsl::PeerIngressAuthClass::SupervisorBridgeExempt => {
257            meerkat_core::PeerIngressAuthDecision::Exempt(
258                meerkat_core::PeerIngressAuthExemption::SupervisorBridge,
259            )
260        }
261    };
262
263    meerkat_core::PeerIngressClassification {
264        class,
265        kind,
266        auth,
267        lifecycle_kind: effect.lifecycle_kind.map(lifecycle_from_dsl),
268        response_terminality: effect.response_terminality.map(terminality_from_dsl),
269    }
270}
271
272impl PeerCommsHandle for RuntimePeerCommsHandle {
273    fn classify_external_envelope(
274        &self,
275        facts: PeerIngressEnvelopeFacts,
276    ) -> Result<PeerIngressAdmission, DslTransitionError> {
277        let context = "PeerCommsHandle::classify_external_envelope";
278        let effects = self
279            .dsl
280            .apply_signal_with_effects(external_envelope_signal(&facts), context)?;
281        let effect = classified_effect(effects, context)?;
282        let classification = classification_from_effect(&effect);
283        Ok(PeerIngressAdmission {
284            rendered_text: meerkat_core::render_peer_ingress_admitted_text(&facts, &classification),
285            classification,
286            lifecycle_peer: effect.lifecycle_peer,
287            request_id: effect.request_id,
288        })
289    }
290
291    fn classify_plain_event(
292        &self,
293        facts: PeerIngressPlainEventFacts,
294    ) -> Result<PeerIngressAdmission, DslTransitionError> {
295        let context = "PeerCommsHandle::classify_plain_event";
296        let effects = self.dsl.apply_signal_with_effects(
297            mm_dsl::MeerkatMachineSignal::ClassifyPlainEvent {
298                source_name: facts.source_name.clone(),
299            },
300            context,
301        )?;
302        let effect = classified_effect(effects, context)?;
303        Ok(PeerIngressAdmission {
304            classification: classification_from_effect(&effect),
305            lifecycle_peer: effect.lifecycle_peer,
306            request_id: effect.request_id,
307            rendered_text: meerkat_core::interaction::format_external_event_projection(
308                &facts.source_name,
309                Some(&facts.body),
310            ),
311        })
312    }
313
314    fn set_peer_ingress_context(&self, keep_alive: bool) -> Result<(), DslTransitionError> {
315        // intra-machine: no route; dispatcher not applicable (handle targets the meerkat DSL directly, not a CompositionDispatcher seam)
316        self.dsl.apply_input(
317            mm_dsl::MeerkatMachineInput::SetPeerIngressContext { keep_alive },
318            "PeerCommsHandle::set_peer_ingress_context",
319        )
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326    use std::collections::BTreeSet;
327    use std::sync::Mutex;
328
329    fn handle_for_phase(phase: mm_dsl::MeerkatPhase) -> RuntimePeerCommsHandle {
330        let state = mm_dsl::MeerkatMachineState {
331            lifecycle_phase: phase,
332            session_id: Some(mm_dsl::SessionId("session-1".to_string())),
333            ..Default::default()
334        };
335        let authority = Arc::new(Mutex::new(mm_dsl::MeerkatMachineAuthority::from_state(
336            state,
337        )));
338        RuntimePeerCommsHandle::new(Arc::new(HandleDslAuthority::from_shared(authority)))
339    }
340
341    #[test]
342    fn runtime_peer_comms_handle_classifies_from_dsl_silent_intents() {
343        let state = mm_dsl::MeerkatMachineState {
344            lifecycle_phase: mm_dsl::MeerkatPhase::Attached,
345            session_id: Some(mm_dsl::SessionId("session-1".to_string())),
346            silent_intent_overrides: BTreeSet::from(["probe.silent".to_string()]),
347            ..Default::default()
348        };
349        let authority = Arc::new(Mutex::new(mm_dsl::MeerkatMachineAuthority::from_state(
350            state,
351        )));
352        let handle =
353            RuntimePeerCommsHandle::new(Arc::new(HandleDslAuthority::from_shared(authority)));
354
355        let admission = handle
356            .classify_external_envelope(PeerIngressEnvelopeFacts {
357                item_id: "request-1".to_string(),
358                from_peer: "peer-1".to_string(),
359                from_peer_id: meerkat_core::comms::PeerId::new(),
360                kind: meerkat_core::PeerIngressEnvelopeKind::Request {
361                    intent: "probe.silent".to_string(),
362                    params: serde_json::json!({}),
363                },
364            })
365            .expect("attached session should classify peer ingress");
366
367        assert_eq!(
368            admission.classification.class,
369            meerkat_core::PeerInputClass::SilentRequest
370        );
371        assert_eq!(
372            admission.classification.auth,
373            meerkat_core::PeerIngressAuthDecision::Required
374        );
375        assert_eq!(admission.request_id.as_deref(), Some("request-1"));
376    }
377
378    #[test]
379    fn runtime_peer_comms_handle_lifecycle_subject_is_selected_by_machine() {
380        let state = mm_dsl::MeerkatMachineState {
381            lifecycle_phase: mm_dsl::MeerkatPhase::Attached,
382            session_id: Some(mm_dsl::SessionId("session-1".to_string())),
383            ..Default::default()
384        };
385        let authority = Arc::new(Mutex::new(mm_dsl::MeerkatMachineAuthority::from_state(
386            state,
387        )));
388        let handle =
389            RuntimePeerCommsHandle::new(Arc::new(HandleDslAuthority::from_shared(authority)));
390
391        let with_param = handle
392            .classify_external_envelope(PeerIngressEnvelopeFacts {
393                item_id: "request-param".to_string(),
394                from_peer: "orchestrator".to_string(),
395                from_peer_id: meerkat_core::comms::PeerId::new(),
396                kind: meerkat_core::PeerIngressEnvelopeKind::Request {
397                    intent: "mob.peer_added".to_string(),
398                    params: serde_json::json!({ "peer": "worker-1" }),
399                },
400            })
401            .expect("machine should classify lifecycle request");
402        assert_eq!(with_param.lifecycle_peer.as_deref(), Some("worker-1"));
403
404        let without_param = handle
405            .classify_external_envelope(PeerIngressEnvelopeFacts {
406                item_id: "request-fallback".to_string(),
407                from_peer: "orchestrator".to_string(),
408                from_peer_id: meerkat_core::comms::PeerId::new(),
409                kind: meerkat_core::PeerIngressEnvelopeKind::Request {
410                    intent: "mob.peer_retired".to_string(),
411                    params: serde_json::json!({}),
412                },
413            })
414            .expect("machine should classify lifecycle request fallback");
415        assert_eq!(
416            without_param.lifecycle_peer.as_deref(),
417            Some("orchestrator")
418        );
419
420        let empty_param = handle
421            .classify_external_envelope(PeerIngressEnvelopeFacts {
422                item_id: "lifecycle-empty".to_string(),
423                from_peer: "orchestrator".to_string(),
424                from_peer_id: meerkat_core::comms::PeerId::new(),
425                kind: meerkat_core::PeerIngressEnvelopeKind::Lifecycle {
426                    kind: meerkat_core::comms::PeerLifecycleKind::PeerUnwired,
427                    params: serde_json::json!({ "peer": "" }),
428                },
429            })
430            .expect("machine should classify lifecycle event fallback");
431        assert_eq!(empty_param.lifecycle_peer.as_deref(), Some("orchestrator"));
432    }
433
434    #[test]
435    fn runtime_peer_comms_handle_classifies_idle_lifecycle_without_opening_peer_work() {
436        let handle = handle_for_phase(mm_dsl::MeerkatPhase::Idle);
437
438        let retired_notice = handle
439            .classify_external_envelope(PeerIngressEnvelopeFacts {
440                item_id: "lifecycle-retired".to_string(),
441                from_peer: "orchestrator".to_string(),
442                from_peer_id: meerkat_core::comms::PeerId::new(),
443                kind: meerkat_core::PeerIngressEnvelopeKind::Lifecycle {
444                    kind: meerkat_core::comms::PeerLifecycleKind::PeerRetired,
445                    params: serde_json::json!({ "peer": "worker-1" }),
446                },
447            })
448            .expect("idle live session should classify mob lifecycle notices");
449        assert_eq!(
450            retired_notice.classification.class,
451            meerkat_core::PeerInputClass::PeerLifecycleRetired
452        );
453        assert_eq!(
454            retired_notice.classification.lifecycle_kind,
455            Some(meerkat_core::comms::PeerLifecycleKind::PeerRetired)
456        );
457        assert_eq!(retired_notice.lifecycle_peer.as_deref(), Some("worker-1"));
458        assert_eq!(retired_notice.request_id, None);
459
460        let added_request = handle
461            .classify_external_envelope(PeerIngressEnvelopeFacts {
462                item_id: "request-added".to_string(),
463                from_peer: "orchestrator".to_string(),
464                from_peer_id: meerkat_core::comms::PeerId::new(),
465                kind: meerkat_core::PeerIngressEnvelopeKind::Request {
466                    intent: "mob.peer_added".to_string(),
467                    params: serde_json::json!({ "peer": "worker-2" }),
468                },
469            })
470            .expect("idle live session should classify lifecycle requests");
471        assert_eq!(
472            added_request.classification.class,
473            meerkat_core::PeerInputClass::PeerLifecycleAdded
474        );
475        assert_eq!(added_request.lifecycle_peer.as_deref(), Some("worker-2"));
476        assert_eq!(added_request.request_id.as_deref(), Some("request-added"));
477
478        let work_admission = handle.classify_external_envelope(PeerIngressEnvelopeFacts {
479            item_id: "message-1".to_string(),
480            from_peer: "peer-1".to_string(),
481            from_peer_id: meerkat_core::comms::PeerId::new(),
482            kind: meerkat_core::PeerIngressEnvelopeKind::Message {
483                body: "wake up".to_string(),
484            },
485        });
486        assert!(
487            work_admission.is_err(),
488            "idle lifecycle admission must not reopen normal peer work ingress"
489        );
490    }
491
492    #[test]
493    fn runtime_peer_comms_handle_drains_terminal_cleanup_without_reopening_topology_adds() {
494        for phase in [mm_dsl::MeerkatPhase::Retired, mm_dsl::MeerkatPhase::Stopped] {
495            let handle = handle_for_phase(phase);
496
497            let retired_notice = handle
498                .classify_external_envelope(PeerIngressEnvelopeFacts {
499                    item_id: "lifecycle-retired".to_string(),
500                    from_peer: "orchestrator".to_string(),
501                    from_peer_id: meerkat_core::comms::PeerId::new(),
502                    kind: meerkat_core::PeerIngressEnvelopeKind::Lifecycle {
503                        kind: meerkat_core::comms::PeerLifecycleKind::PeerRetired,
504                        params: serde_json::json!({ "peer": "worker-1" }),
505                    },
506                })
507                .expect("terminal sessions should drain peer-retired cleanup notices");
508            assert_eq!(
509                retired_notice.classification.class,
510                meerkat_core::PeerInputClass::PeerLifecycleRetired
511            );
512
513            let unwired_request = handle
514                .classify_external_envelope(PeerIngressEnvelopeFacts {
515                    item_id: "request-unwired".to_string(),
516                    from_peer: "orchestrator".to_string(),
517                    from_peer_id: meerkat_core::comms::PeerId::new(),
518                    kind: meerkat_core::PeerIngressEnvelopeKind::Request {
519                        intent: "mob.peer_unwired".to_string(),
520                        params: serde_json::json!({ "peer": "worker-2" }),
521                    },
522                })
523                .expect("terminal sessions should drain peer-unwired cleanup requests");
524            assert_eq!(
525                unwired_request.classification.class,
526                meerkat_core::PeerInputClass::PeerLifecycleUnwired
527            );
528
529            let added_notice = handle.classify_external_envelope(PeerIngressEnvelopeFacts {
530                item_id: "lifecycle-added".to_string(),
531                from_peer: "orchestrator".to_string(),
532                from_peer_id: meerkat_core::comms::PeerId::new(),
533                kind: meerkat_core::PeerIngressEnvelopeKind::Lifecycle {
534                    kind: meerkat_core::comms::PeerLifecycleKind::PeerAdded,
535                    params: serde_json::json!({ "peer": "worker-3" }),
536                },
537            });
538            assert!(
539                added_notice.is_err(),
540                "terminal cleanup admission must not accept new peer topology"
541            );
542        }
543    }
544
545    #[test]
546    fn runtime_signal_builder_does_not_preselect_lifecycle_subject() {
547        let source = include_str!("peer_comms.rs");
548        let signal_builder = source
549            .split("fn external_envelope_signal")
550            .nth(1)
551            .expect("signal builder should exist")
552            .split("struct PeerIngressClassifiedEffect")
553            .next()
554            .expect("classified effect should follow signal builder");
555        let forbidden_helper = ["peer", "lifecycle", "subject"].join("_");
556
557        assert!(
558            signal_builder.contains("lifecycle_peer_param"),
559            "runtime should pass the parsed lifecycle peer candidate"
560        );
561        assert!(
562            !signal_builder.contains(&forbidden_helper),
563            "runtime must not call the lifecycle subject selector before the machine"
564        );
565        assert!(
566            !signal_builder.contains("facts.from_peer.as_str()"),
567            "fallback peer must remain a machine input fact, not a preselected subject"
568        );
569        assert!(
570            !signal_builder.contains("unwrap_or"),
571            "runtime must not choose a lifecycle subject fallback before the machine"
572        );
573    }
574}