Skip to main content

meerkat_runtime/
comms_bridge.rs

1//! Runtime comms bridge helpers.
2//!
3//! These helpers translate drained comms interactions into the runtime-owned
4//! input families used by the comms classification bridge.
5
6use chrono::Utc;
7#[cfg(test)]
8use meerkat_core::comms::PeerId;
9use meerkat_core::interaction::{
10    InboxInteraction, InteractionContent, PeerIngressConvention, PeerIngressFact, PeerIngressKind,
11    PeerInputCandidate, PeerInputClass,
12};
13#[cfg(test)]
14use meerkat_core::interaction::{PeerIngressIdentity, ResponseStatus};
15use meerkat_core::lifecycle::InputId;
16
17use crate::identifiers::{CorrelationId, LogicalRuntimeId};
18use crate::input::{
19    ExternalEventInput, Input, InputDurability, InputHeader, InputOrigin, InputVisibility,
20    PeerConvention, PeerInput, ResponseProgressPhase, ResponseTerminalStatus,
21};
22
23#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
24pub enum PeerIngressProjectionError {
25    #[error(
26        "classified peer ingress {interaction_id} ({kind:?}) cannot project to a runtime PeerInput"
27    )]
28    UnsupportedPeerConvention {
29        interaction_id: meerkat_core::InteractionId,
30        kind: PeerIngressKind,
31    },
32    #[error("classified peer ingress {interaction_id} missing canonical peer id")]
33    MissingCanonicalPeerId {
34        interaction_id: meerkat_core::InteractionId,
35    },
36    #[error("classified peer response {interaction_id} missing machine response terminality")]
37    MissingResponseTerminality {
38        interaction_id: meerkat_core::InteractionId,
39    },
40}
41
42/// Convert a classified comms interaction into the appropriate runtime-owned
43/// input family.
44pub fn classified_interaction_to_runtime_input(
45    classified: &PeerInputCandidate,
46    runtime_id: &LogicalRuntimeId,
47) -> Result<Input, PeerIngressProjectionError> {
48    let interaction = &classified.interaction;
49
50    if classified.class() == PeerInputClass::PlainEvent {
51        let source_name = classified
52            .ingress
53            .plain_event_source_name()
54            .unwrap_or("unknown");
55        let blocks = external_event_blocks(interaction);
56        return Ok(Input::ExternalEvent(ExternalEventInput {
57            header: InputHeader {
58                id: InputId::new(),
59                timestamp: Utc::now(),
60                source: InputOrigin::External {
61                    source_name: source_name.to_string(),
62                },
63                durability: InputDurability::Durable,
64                visibility: InputVisibility {
65                    transcript_eligible: true,
66                    operator_eligible: true,
67                },
68                idempotency_key: None,
69                supersession_key: None,
70                correlation_id: Some(CorrelationId::from_uuid(interaction.id.0)),
71            },
72            event_type: source_name.to_string(),
73            payload: external_event_payload(interaction),
74            blocks,
75            handling_mode: interaction.handling_mode,
76            render_metadata: interaction.render_metadata.clone(),
77        }));
78    }
79
80    peer_candidate_to_peer_input(classified, runtime_id)
81}
82
83/// Compatibility alias while callers migrate to the classified bridge naming.
84pub fn peer_input_candidate_to_runtime_input(
85    classified: &PeerInputCandidate,
86    runtime_id: &LogicalRuntimeId,
87) -> Result<Input, PeerIngressProjectionError> {
88    classified_interaction_to_runtime_input(classified, runtime_id)
89}
90
91fn peer_candidate_to_peer_input(
92    classified: &PeerInputCandidate,
93    runtime_id: &LogicalRuntimeId,
94) -> Result<Input, PeerIngressProjectionError> {
95    peer_input_from_ingress_fact(
96        &classified.interaction,
97        runtime_id,
98        &classified.ingress,
99        classified.response_terminality,
100    )
101}
102
103fn peer_input_from_ingress_fact(
104    interaction: &InboxInteraction,
105    runtime_id: &LogicalRuntimeId,
106    ingress: &PeerIngressFact,
107    response_terminality: Option<meerkat_core::interaction::TerminalityClass>,
108) -> Result<Input, PeerIngressProjectionError> {
109    let convention = map_ingress_convention(interaction.id, ingress, response_terminality)?;
110    let durability = map_durability(&convention);
111    let handling_mode = match &convention {
112        PeerConvention::ResponseProgress { .. } => None,
113        _ => Some(interaction.handling_mode),
114    };
115    let peer_id = ingress.canonical_peer_id_string().ok_or(
116        PeerIngressProjectionError::MissingCanonicalPeerId {
117            interaction_id: interaction.id,
118        },
119    )?;
120    let display_identity = ingress
121        .route
122        .as_ref()
123        .map(meerkat_core::PeerRoute::label)
124        .or_else(|| ingress.display_label());
125
126    Ok(Input::Peer(PeerInput {
127        header: InputHeader {
128            id: InputId::new(),
129            timestamp: Utc::now(),
130            source: InputOrigin::Peer {
131                peer_id,
132                display_identity,
133                runtime_id: Some(runtime_id.clone()),
134            },
135            durability,
136            visibility: InputVisibility {
137                transcript_eligible: true,
138                operator_eligible: true,
139            },
140            idempotency_key: None,
141            supersession_key: None,
142            correlation_id: Some(CorrelationId::from_uuid(interaction.id.0)),
143        },
144        convention: Some(convention),
145        body: peer_rendered_body(interaction),
146        payload: peer_payload(interaction),
147        blocks: peer_blocks(interaction),
148        handling_mode,
149    }))
150}
151
152fn map_ingress_convention(
153    interaction_id: meerkat_core::InteractionId,
154    ingress: &PeerIngressFact,
155    response_terminality: Option<meerkat_core::interaction::TerminalityClass>,
156) -> Result<PeerConvention, PeerIngressProjectionError> {
157    match &ingress.convention {
158        PeerIngressConvention::Message => Ok(PeerConvention::Message),
159        PeerIngressConvention::Request { request_id, intent } => Ok(PeerConvention::Request {
160            request_id: request_id.clone(),
161            intent: intent.clone(),
162        }),
163        PeerIngressConvention::Response {
164            in_reply_to,
165            status: _,
166        } => {
167            let terminality = response_terminality
168                .ok_or(PeerIngressProjectionError::MissingResponseTerminality { interaction_id })?;
169            Ok(map_response_convention(*in_reply_to, terminality))
170        }
171        PeerIngressConvention::Lifecycle { kind, .. } => Ok(PeerConvention::Request {
172            request_id: ingress.interaction_id.to_string(),
173            intent: kind.to_string(),
174        }),
175        PeerIngressConvention::Ack { .. } | PeerIngressConvention::PlainEvent { .. } => {
176            Err(PeerIngressProjectionError::UnsupportedPeerConvention {
177                interaction_id,
178                kind: ingress.kind,
179            })
180        }
181    }
182}
183
184fn map_response_convention(
185    in_reply_to: meerkat_core::InteractionId,
186    terminality: meerkat_core::interaction::TerminalityClass,
187) -> PeerConvention {
188    let request_id = in_reply_to.to_string();
189    match terminality {
190        meerkat_core::interaction::TerminalityClass::Progress => PeerConvention::ResponseProgress {
191            request_id,
192            phase: ResponseProgressPhase::Accepted,
193        },
194        meerkat_core::interaction::TerminalityClass::Terminal { disposition } => {
195            let term = match disposition {
196                meerkat_core::interaction::TerminalDisposition::Completed => {
197                    ResponseTerminalStatus::Completed
198                }
199                meerkat_core::interaction::TerminalDisposition::Failed => {
200                    ResponseTerminalStatus::Failed
201                }
202                other => {
203                    tracing::warn!(
204                        disposition = ?other,
205                        "unknown terminal disposition; treating as Failed"
206                    );
207                    ResponseTerminalStatus::Failed
208                }
209            };
210            PeerConvention::ResponseTerminal {
211                request_id,
212                status: term,
213            }
214        }
215        other => {
216            tracing::warn!(
217                class = ?other,
218                "unknown terminality class; routing response as progress (non-terminal)"
219            );
220            PeerConvention::ResponseProgress {
221                request_id,
222                phase: ResponseProgressPhase::Accepted,
223            }
224        }
225    }
226}
227
228fn peer_rendered_body(interaction: &InboxInteraction) -> String {
229    if !interaction.rendered_text.trim().is_empty() {
230        return interaction.rendered_text.clone();
231    }
232    match &interaction.content {
233        InteractionContent::Message { body, .. } => body.clone(),
234        InteractionContent::Request { params, .. } => {
235            serde_json::to_string(params).unwrap_or_default()
236        }
237        InteractionContent::Response { result, .. } => {
238            serde_json::to_string(result).unwrap_or_default()
239        }
240    }
241}
242
243fn peer_blocks(interaction: &InboxInteraction) -> Option<Vec<meerkat_core::types::ContentBlock>> {
244    match &interaction.content {
245        InteractionContent::Message { blocks, .. } => blocks.clone(),
246        InteractionContent::Request { blocks, .. } => blocks.clone(),
247        InteractionContent::Response { blocks, .. } => blocks.clone(),
248    }
249}
250
251fn peer_payload(interaction: &InboxInteraction) -> Option<serde_json::Value> {
252    match &interaction.content {
253        InteractionContent::Message { .. } => None,
254        InteractionContent::Request { params, .. } => Some(params.clone()),
255        InteractionContent::Response { result, .. } => Some(result.clone()),
256    }
257}
258
259fn external_event_payload(interaction: &InboxInteraction) -> serde_json::Value {
260    match &interaction.content {
261        InteractionContent::Message { body, .. } => serde_json::json!({ "body": body }),
262        InteractionContent::Request { intent, params, .. } => {
263            serde_json::json!({ "intent": intent, "params": params })
264        }
265        InteractionContent::Response {
266            in_reply_to,
267            status,
268            result,
269            blocks,
270        } => serde_json::json!({
271            "in_reply_to": in_reply_to,
272            "status": status,
273            "result": result,
274            "blocks": blocks,
275        }),
276    }
277}
278
279fn external_event_blocks(
280    interaction: &InboxInteraction,
281) -> Option<Vec<meerkat_core::types::ContentBlock>> {
282    match &interaction.content {
283        InteractionContent::Message { blocks, .. } => blocks.clone(),
284        InteractionContent::Request { blocks, .. } => blocks.clone(),
285        _ => None,
286    }
287}
288
289fn map_durability(convention: &PeerConvention) -> InputDurability {
290    match convention {
291        PeerConvention::ResponseProgress { .. } => InputDurability::Ephemeral,
292        _ => InputDurability::Durable,
293    }
294}
295
296#[cfg(test)]
297#[allow(clippy::unwrap_used, clippy::panic)]
298mod tests {
299    use super::*;
300    use meerkat_core::interaction::{PeerIngressIdentity, ResponseStatus};
301
302    fn make_interaction_id() -> meerkat_core::interaction::InteractionId {
303        meerkat_core::interaction::InteractionId(meerkat_core::time_compat::new_uuid_v7())
304    }
305
306    fn plain_event_ingress(
307        id: meerkat_core::interaction::InteractionId,
308        source_name: &str,
309    ) -> PeerIngressFact {
310        PeerIngressFact::plain_event(
311            id,
312            source_name,
313            PeerInputClass::PlainEvent,
314            meerkat_core::PeerIngressKind::PlainEvent,
315        )
316    }
317
318    fn test_peer_id() -> PeerId {
319        PeerId::parse("22222222-2222-4222-8222-222222222222").expect("canonical test peer id")
320    }
321
322    fn peer_kind_for_convention(
323        convention: &PeerIngressConvention,
324    ) -> meerkat_core::PeerIngressKind {
325        match convention {
326            PeerIngressConvention::Message => meerkat_core::PeerIngressKind::Message,
327            PeerIngressConvention::Request { .. } | PeerIngressConvention::Lifecycle { .. } => {
328                meerkat_core::PeerIngressKind::Request
329            }
330            PeerIngressConvention::Response { .. } => meerkat_core::PeerIngressKind::Response,
331            PeerIngressConvention::Ack { .. } => meerkat_core::PeerIngressKind::Ack,
332            PeerIngressConvention::PlainEvent { .. } => meerkat_core::PeerIngressKind::PlainEvent,
333        }
334    }
335
336    fn peer_ingress(
337        id: meerkat_core::interaction::InteractionId,
338        peer_id: PeerId,
339        label: &str,
340        class: PeerInputClass,
341        convention: PeerIngressConvention,
342    ) -> PeerIngressFact {
343        let kind = peer_kind_for_convention(&convention);
344        PeerIngressFact::peer(
345            id,
346            class,
347            kind,
348            Some(meerkat_core::PeerIngressAuthDecision::Required),
349            PeerIngressIdentity::new(peer_id, label, convention),
350        )
351    }
352
353    fn candidate_for_interaction(interaction: InboxInteraction) -> PeerInputCandidate {
354        let id = interaction.id;
355        let label = interaction.from.clone();
356        let peer_id = interaction.from_route.unwrap_or_else(test_peer_id);
357        let (class, convention, response_terminality) = match &interaction.content {
358            InteractionContent::Message { .. } => (
359                PeerInputClass::ActionableMessage,
360                PeerIngressConvention::Message,
361                None,
362            ),
363            InteractionContent::Request { intent, .. } => (
364                PeerInputClass::ActionableRequest,
365                PeerIngressConvention::Request {
366                    request_id: id.to_string(),
367                    intent: intent.clone(),
368                },
369                None,
370            ),
371            InteractionContent::Response {
372                in_reply_to,
373                status,
374                ..
375            } => {
376                let terminality = meerkat_core::interaction::classify_response_terminality(*status);
377                let class = match terminality {
378                    meerkat_core::TerminalityClass::Progress => PeerInputClass::ResponseProgress,
379                    meerkat_core::TerminalityClass::Terminal { .. } => {
380                        PeerInputClass::ResponseTerminal
381                    }
382                    _ => PeerInputClass::ResponseTerminal,
383                };
384                (
385                    class,
386                    PeerIngressConvention::Response {
387                        in_reply_to: *in_reply_to,
388                        status: *status,
389                    },
390                    Some(terminality),
391                )
392            }
393        };
394        let ingress = peer_ingress(id, peer_id, &label, class, convention);
395        PeerInputCandidate {
396            interaction,
397            ingress,
398            lifecycle_peer: None,
399            response_terminality,
400        }
401    }
402
403    fn peer_input_for_test(interaction: &InboxInteraction, runtime_id: &LogicalRuntimeId) -> Input {
404        let candidate = candidate_for_interaction(interaction.clone());
405        peer_input_candidate_to_runtime_input(&candidate, runtime_id)
406            .expect("test candidate should project to runtime input")
407    }
408
409    #[test]
410    fn message_to_peer_input() {
411        let interaction = InboxInteraction {
412            from_route: None,
413            from: "peer-1".into(),
414            content: InteractionContent::Message {
415                body: "hello".into(),
416                blocks: None,
417            },
418            id: make_interaction_id(),
419            rendered_text: String::new(),
420            handling_mode: meerkat_core::types::HandlingMode::Queue,
421            render_metadata: None,
422        };
423        let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
424        if let Input::Peer(p) = &input {
425            assert!(matches!(p.convention, Some(PeerConvention::Message)));
426            assert_eq!(p.body, "hello");
427            assert_eq!(p.header.durability, InputDurability::Durable);
428            assert_eq!(
429                p.handling_mode,
430                Some(meerkat_core::types::HandlingMode::Queue),
431                "explicit queue must survive comms -> runtime projection so it can suppress running-turn interruption"
432            );
433        } else {
434            panic!("Expected PeerInput");
435        }
436    }
437
438    #[test]
439    fn request_to_peer_input() {
440        let interaction = InboxInteraction {
441            from_route: None,
442            from: "peer-1".into(),
443            content: InteractionContent::Request {
444                intent: "mob.peer_added".into(),
445                params: serde_json::json!({"name": "agent-1"}),
446                blocks: None,
447            },
448            id: make_interaction_id(),
449            rendered_text: String::new(),
450            handling_mode: meerkat_core::types::HandlingMode::Queue,
451            render_metadata: None,
452        };
453        let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
454        if let Input::Peer(p) = &input {
455            assert!(matches!(p.convention, Some(PeerConvention::Request { .. })));
456            match p.convention.as_ref() {
457                Some(PeerConvention::Request { request_id, .. }) => {
458                    assert_eq!(request_id, &interaction.id.0.to_string());
459                }
460                other => panic!("Expected request convention, got {other:?}"),
461            }
462            assert_eq!(p.header.durability, InputDurability::Durable);
463            assert_eq!(
464                p.payload,
465                Some(serde_json::json!({"name": "agent-1"})),
466                "request params must remain structured on PeerInput so runtime prompt projection does not depend on pre-rendered comms prose"
467            );
468            assert_eq!(
469                p.handling_mode,
470                Some(meerkat_core::types::HandlingMode::Queue),
471                "explicit queue request semantics must not collapse to default policy"
472            );
473        } else {
474            panic!("Expected PeerInput");
475        }
476    }
477
478    #[test]
479    fn classified_request_uses_canonical_peer_id_for_runtime_projection() {
480        let source_peer_id =
481            PeerId::parse("11111111-1111-4111-8111-111111111111").expect("canonical peer id");
482        let request_id = make_interaction_id();
483        let classified = PeerInputCandidate {
484            interaction: InboxInteraction {
485                from_route: None,
486                from: "test-mob/lead/l-requester".into(),
487                content: InteractionContent::Request {
488                    intent: "interpret_image".into(),
489                    params: serde_json::json!({"description": "tower with a light"}),
490                    blocks: None,
491                },
492                id: request_id,
493                rendered_text: "stale helper prose".into(),
494                handling_mode: meerkat_core::types::HandlingMode::Steer,
495                render_metadata: None,
496            },
497            ingress: PeerIngressFact::peer(
498                request_id,
499                PeerInputClass::ActionableRequest,
500                meerkat_core::PeerIngressKind::Request,
501                Some(meerkat_core::PeerIngressAuthDecision::Required),
502                PeerIngressIdentity::new(
503                    source_peer_id,
504                    "test-mob/lead/l-requester",
505                    PeerIngressConvention::Request {
506                        request_id: request_id.to_string(),
507                        intent: "interpret_image".to_string(),
508                    },
509                ),
510            ),
511            lifecycle_peer: None,
512            response_terminality: None,
513        };
514
515        let input =
516            peer_input_candidate_to_runtime_input(&classified, &LogicalRuntimeId::new("worker"))
517                .expect("classified request should project to peer input");
518        let Input::Peer(peer) = &input else {
519            panic!("Expected PeerInput");
520        };
521        let InputOrigin::Peer { peer_id, .. } = &peer.header.source else {
522            panic!("Expected peer source");
523        };
524        assert_eq!(peer_id, "11111111-1111-4111-8111-111111111111");
525        assert_eq!(peer.body, "stale helper prose");
526
527        let prompt = crate::input::input_prompt_text(&input);
528        assert!(prompt.starts_with(
529            "Peer request from peer_id 11111111-1111-4111-8111-111111111111 (display_name: test-mob/lead/l-requester)."
530        ));
531        assert!(prompt.contains("\"peer_id\":\"11111111-1111-4111-8111-111111111111\""));
532        assert!(prompt.contains("\"display_name\":\"test-mob/lead/l-requester\""));
533        assert!(prompt.contains(&format!("\"in_reply_to\":\"{}\"", request_id.0)));
534        assert!(prompt.contains("\"status\":\"completed\""));
535        assert!(!prompt.contains("to=\""));
536    }
537
538    #[test]
539    fn plain_event_to_external_event_input() {
540        let id = make_interaction_id();
541        let classified = PeerInputCandidate {
542            lifecycle_peer: None,
543            response_terminality: None,
544            ingress: plain_event_ingress(id, "webhook"),
545            interaction: InboxInteraction {
546                from_route: None,
547                from: "event:webhook".into(),
548                content: InteractionContent::Message {
549                    body: "{\"ok\":true}".into(),
550                    blocks: None,
551                },
552                id,
553                rendered_text: String::new(),
554                handling_mode: meerkat_core::types::HandlingMode::Queue,
555                render_metadata: None,
556            },
557        };
558        let input =
559            peer_input_candidate_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
560                .expect("plain event should project to external event input");
561        match input {
562            Input::ExternalEvent(event) => {
563                assert_eq!(event.event_type, "webhook");
564                assert_eq!(event.payload["body"], "{\"ok\":true}");
565                assert_eq!(event.blocks, None);
566                assert_eq!(
567                    event.handling_mode,
568                    meerkat_core::types::HandlingMode::Queue
569                );
570                assert_eq!(event.render_metadata, None);
571            }
572            other => panic!("Expected ExternalEvent input, got {other:?}"),
573        }
574    }
575
576    #[test]
577    fn peer_named_event_prefix_stays_peer_without_plain_event_class() {
578        let id = make_interaction_id();
579        let classified = PeerInputCandidate {
580            lifecycle_peer: None,
581            response_terminality: None,
582            ingress: peer_ingress(
583                id,
584                test_peer_id(),
585                "event:webhook",
586                PeerInputClass::ActionableMessage,
587                PeerIngressConvention::Message,
588            ),
589            interaction: InboxInteraction {
590                from_route: None,
591                from: "event:webhook".into(),
592                content: InteractionContent::Message {
593                    body: "hello".into(),
594                    blocks: None,
595                },
596                id,
597                rendered_text: "stale rendered text".into(),
598                handling_mode: meerkat_core::types::HandlingMode::Queue,
599                render_metadata: None,
600            },
601        };
602        let input =
603            peer_input_candidate_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
604                .expect("classified peer event should project to peer input");
605        match input {
606            Input::Peer(peer) => {
607                assert_eq!(peer.body, "stale rendered text");
608                match peer.header.source {
609                    InputOrigin::Peer { peer_id, .. } => {
610                        assert_eq!(peer_id, test_peer_id().as_str());
611                    }
612                    other => panic!("Expected peer source, got {other:?}"),
613                }
614            }
615            other => panic!("Expected Peer input, got {other:?}"),
616        }
617    }
618
619    #[test]
620    fn classified_peer_projection_uses_ingress_canonical_peer_id_not_display_from() {
621        let id = make_interaction_id();
622        let canonical_peer_id = meerkat_core::comms::PeerId::new();
623        let classified = PeerInputCandidate {
624            lifecycle_peer: None,
625            response_terminality: None,
626            ingress: PeerIngressFact::peer(
627                id,
628                PeerInputClass::ActionableRequest,
629                meerkat_core::PeerIngressKind::Request,
630                Some(meerkat_core::PeerIngressAuthDecision::Required),
631                PeerIngressIdentity::new(
632                    canonical_peer_id,
633                    "display-agent",
634                    PeerIngressConvention::Request {
635                        request_id: id.to_string(),
636                        intent: "review".to_string(),
637                    },
638                ),
639            ),
640            interaction: InboxInteraction {
641                from_route: None,
642                from: "display-agent".into(),
643                content: InteractionContent::Request {
644                    intent: "review".into(),
645                    params: serde_json::json!({"pr": 42}),
646                    blocks: None,
647                },
648                id,
649                rendered_text: "stale rendered text".into(),
650                handling_mode: meerkat_core::types::HandlingMode::Queue,
651                render_metadata: None,
652            },
653        };
654
655        let input =
656            peer_input_candidate_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
657                .expect("classified peer projection should use typed canonical id");
658        let Input::Peer(peer) = input else {
659            panic!("Expected Peer input");
660        };
661        match peer.header.source {
662            InputOrigin::Peer { peer_id, .. } => {
663                assert_eq!(peer_id, canonical_peer_id.as_str());
664                assert_ne!(peer_id, "display-agent");
665            }
666            other => panic!("Expected peer source, got {other:?}"),
667        }
668        assert_eq!(peer.body, "stale rendered text");
669    }
670
671    #[test]
672    fn classified_peer_projection_rejects_display_only_ingress_identity() {
673        let id = make_interaction_id();
674        let classified = PeerInputCandidate {
675            lifecycle_peer: None,
676            response_terminality: None,
677            ingress: PeerIngressFact::legacy_peer_label(
678                id,
679                "display-agent",
680                PeerInputClass::ActionableMessage,
681                meerkat_core::PeerIngressKind::Message,
682                Some(meerkat_core::PeerIngressAuthDecision::Required),
683                PeerIngressConvention::Message,
684            ),
685            interaction: InboxInteraction {
686                from_route: None,
687                from: "display-agent".into(),
688                content: InteractionContent::Message {
689                    body: "hello".into(),
690                    blocks: None,
691                },
692                id,
693                rendered_text: "stale rendered text".into(),
694                handling_mode: meerkat_core::types::HandlingMode::Queue,
695                render_metadata: None,
696            },
697        };
698
699        let result =
700            peer_input_candidate_to_runtime_input(&classified, &LogicalRuntimeId::new("test"));
701        assert!(
702            matches!(
703                result,
704                Err(PeerIngressProjectionError::MissingCanonicalPeerId { interaction_id })
705                    if interaction_id == id
706            ),
707            "display-only ingress must fail closed, got {result:?}"
708        );
709    }
710
711    #[test]
712    fn request_body_preserves_rendered_text_and_structured_payload() {
713        let interaction = InboxInteraction {
714            from_route: None,
715            from: "event:webhook".into(),
716            content: InteractionContent::Request {
717                intent: "mob.peer_added".into(),
718                params: serde_json::json!({"peer":"agent-1"}),
719                blocks: None,
720            },
721            id: make_interaction_id(),
722            rendered_text: "stale rendered text".into(),
723            handling_mode: meerkat_core::types::HandlingMode::Queue,
724            render_metadata: None,
725        };
726        let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
727        if let Input::Peer(peer) = input {
728            assert_eq!(peer.body, "stale rendered text");
729            assert_eq!(peer.payload, Some(serde_json::json!({"peer":"agent-1"})));
730        } else {
731            panic!("Expected PeerInput");
732        }
733    }
734
735    #[test]
736    fn message_blocks_are_preserved_on_peer_input() {
737        let blocks = vec![
738            meerkat_core::types::ContentBlock::Text {
739                text: "see image".into(),
740            },
741            meerkat_core::types::ContentBlock::Image {
742                media_type: "image/png".into(),
743                data: "abc".into(),
744            },
745        ];
746        let interaction = InboxInteraction {
747            from_route: None,
748            from: "peer-1".into(),
749            content: InteractionContent::Message {
750                body: "see image".into(),
751                blocks: Some(blocks.clone()),
752            },
753            id: make_interaction_id(),
754            rendered_text: "stale rendered text".into(),
755            handling_mode: meerkat_core::types::HandlingMode::Queue,
756            render_metadata: None,
757        };
758        let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
759        if let Input::Peer(peer) = input {
760            assert_eq!(peer.body, "stale rendered text");
761            assert_eq!(peer.blocks, Some(blocks));
762        } else {
763            panic!("Expected PeerInput");
764        }
765    }
766
767    #[test]
768    fn request_blocks_are_preserved_on_peer_input() {
769        let blocks = vec![
770            meerkat_core::types::ContentBlock::Text {
771                text: "describe this image".into(),
772            },
773            meerkat_core::types::ContentBlock::Image {
774                media_type: "image/png".into(),
775                data: "abc".into(),
776            },
777        ];
778        let interaction_id = make_interaction_id();
779        let peer_id = PeerId::new();
780        let classified = PeerInputCandidate {
781            interaction: InboxInteraction {
782                from_route: Some(peer_id),
783                from: "vision-peer".into(),
784                content: InteractionContent::Request {
785                    intent: "checksum_token".into(),
786                    params: serde_json::json!({"subject": "describe-image"}),
787                    blocks: Some(blocks.clone()),
788                },
789                id: interaction_id,
790                rendered_text: String::new(),
791                handling_mode: meerkat_core::types::HandlingMode::Steer,
792                render_metadata: None,
793            },
794            ingress: PeerIngressFact::peer(
795                interaction_id,
796                PeerInputClass::ActionableRequest,
797                PeerIngressKind::Request,
798                Some(meerkat_core::interaction::PeerIngressAuthDecision::Required),
799                PeerIngressIdentity::new(
800                    peer_id,
801                    "vision-peer",
802                    meerkat_core::interaction::PeerIngressConvention::Request {
803                        request_id: interaction_id.to_string(),
804                        intent: "checksum_token".to_string(),
805                    },
806                ),
807            ),
808            lifecycle_peer: None,
809            response_terminality: None,
810        };
811
812        let input = classified_interaction_to_runtime_input(
813            &classified,
814            &LogicalRuntimeId::new("runtime-a"),
815        )
816        .expect("classified request should project");
817        if let Input::Peer(peer) = input {
818            assert_eq!(peer.blocks, Some(blocks));
819            assert_eq!(
820                peer.payload,
821                Some(serde_json::json!({"subject": "describe-image"}))
822            );
823        } else {
824            panic!("Expected PeerInput");
825        }
826    }
827
828    #[test]
829    fn multimodal_message_uses_rendered_projection_while_preserving_blocks() {
830        let blocks = vec![
831            meerkat_core::types::ContentBlock::Text {
832                text: "caption text".into(),
833            },
834            meerkat_core::types::ContentBlock::Image {
835                media_type: "image/png".into(),
836                data: "abc".into(),
837            },
838        ];
839        let interaction = InboxInteraction {
840            from_route: None,
841            from: "peer-1".into(),
842            content: InteractionContent::Message {
843                body: "please inspect this image".into(),
844                blocks: Some(blocks),
845            },
846            id: make_interaction_id(),
847            rendered_text: "stale rendered text".into(),
848            handling_mode: meerkat_core::types::HandlingMode::Queue,
849            render_metadata: None,
850        };
851        let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
852        if let Input::Peer(peer) = input {
853            assert_eq!(peer.body, "stale rendered text");
854        } else {
855            panic!("Expected PeerInput");
856        }
857    }
858
859    #[test]
860    fn plain_event_blocks_are_preserved_on_external_event_input() {
861        let blocks = vec![
862            meerkat_core::types::ContentBlock::Text {
863                text: "see image".into(),
864            },
865            meerkat_core::types::ContentBlock::Image {
866                media_type: "image/png".into(),
867                data: "abc".into(),
868            },
869        ];
870        let id = make_interaction_id();
871        let classified = PeerInputCandidate {
872            lifecycle_peer: None,
873            response_terminality: None,
874            ingress: plain_event_ingress(id, "webhook"),
875            interaction: InboxInteraction {
876                from_route: None,
877                from: "event:webhook".into(),
878                content: InteractionContent::Message {
879                    body: "see image".into(),
880                    blocks: Some(blocks.clone()),
881                },
882                id,
883                rendered_text: "stale rendered text".into(),
884                handling_mode: meerkat_core::types::HandlingMode::Queue,
885                render_metadata: None,
886            },
887        };
888        let input =
889            peer_input_candidate_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
890                .expect("plain event with blocks should project");
891        match input {
892            Input::ExternalEvent(event) => {
893                assert_eq!(event.payload["body"], "see image");
894                assert!(event.payload.get("blocks").is_none());
895                assert_eq!(event.blocks, Some(blocks));
896                assert_eq!(
897                    event.handling_mode,
898                    meerkat_core::types::HandlingMode::Queue
899                );
900                assert_eq!(event.render_metadata, None);
901            }
902            other => panic!("Expected ExternalEvent input, got {other:?}"),
903        }
904    }
905
906    #[test]
907    fn plain_event_preserves_handling_mode_and_render_metadata() {
908        let render_metadata = meerkat_core::types::RenderMetadata {
909            class: meerkat_core::types::RenderClass::ExternalEvent,
910            salience: meerkat_core::types::RenderSalience::Urgent,
911        };
912        let id = make_interaction_id();
913        let classified = PeerInputCandidate {
914            lifecycle_peer: None,
915            response_terminality: None,
916            ingress: plain_event_ingress(id, "webhook"),
917            interaction: InboxInteraction {
918                from_route: None,
919                from: "event:webhook".into(),
920                content: InteractionContent::Message {
921                    body: "urgent".into(),
922                    blocks: None,
923                },
924                id,
925                rendered_text: "stale rendered text".into(),
926                handling_mode: meerkat_core::types::HandlingMode::Steer,
927                render_metadata: Some(render_metadata.clone()),
928            },
929        };
930
931        match peer_input_candidate_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
932            .expect("plain event should preserve render metadata")
933        {
934            Input::ExternalEvent(event) => {
935                assert_eq!(
936                    event.handling_mode,
937                    meerkat_core::types::HandlingMode::Steer
938                );
939                assert_eq!(event.render_metadata, Some(render_metadata));
940            }
941            other => panic!("Expected ExternalEvent input, got {other:?}"),
942        }
943    }
944
945    #[test]
946    fn response_completed_to_terminal() {
947        let in_reply_to = make_interaction_id();
948        let route_id = meerkat_core::comms::PeerId::from_uuid(
949            uuid::Uuid::parse_str("018f6f79-7a82-7c4e-a552-a3b86f9630f2").unwrap(),
950        );
951        let interaction = InboxInteraction {
952            from_route: Some(route_id),
953            from: "Peer One".into(),
954            content: InteractionContent::Response {
955                status: ResponseStatus::Completed,
956                result: serde_json::json!({"ok": true}),
957                in_reply_to,
958                blocks: None,
959            },
960            id: make_interaction_id(),
961            rendered_text: String::new(),
962            handling_mode: meerkat_core::types::HandlingMode::Queue,
963            render_metadata: None,
964        };
965        let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
966        if let Input::Peer(p) = &input {
967            match &p.header.source {
968                InputOrigin::Peer {
969                    peer_id,
970                    display_identity,
971                    ..
972                } => {
973                    assert_eq!(peer_id, &route_id.to_string());
974                    assert_eq!(display_identity.as_deref(), Some("Peer One"));
975                }
976                other => panic!("Expected Peer source, got {other:?}"),
977            }
978            assert!(matches!(
979                p.convention,
980                Some(PeerConvention::ResponseTerminal {
981                    status: ResponseTerminalStatus::Completed,
982                    ..
983                })
984            ));
985            assert_eq!(p.header.durability, InputDurability::Durable);
986            assert_eq!(
987                p.payload,
988                Some(serde_json::json!({"ok": true})),
989                "terminal response result must remain structured on PeerInput so runtime prompt projection stays runtime-owned"
990            );
991        } else {
992            panic!("Expected PeerInput");
993        }
994        let projection = crate::input::runtime_input_projection_for_machine_batch(&input);
995        let context = projection
996            .context_append
997            .expect("terminal machine-batch context projection");
998        let expected_key = format!("peer_response_terminal:{route_id}:{in_reply_to}");
999        assert_eq!(context.key, expected_key);
1000        let meerkat_core::lifecycle::run_primitive::CoreRenderable::SystemNotice { blocks, .. } =
1001            context.content
1002        else {
1003            panic!("Expected terminal context notice");
1004        };
1005        assert!(matches!(
1006            blocks.first(),
1007            Some(meerkat_core::types::SystemNoticeBlock::Comms { peer, .. })
1008                if peer.as_ref().and_then(|peer| peer.display_name.as_deref()) == Some("Peer One")
1009        ));
1010    }
1011
1012    #[test]
1013    fn classified_response_uses_ingress_terminal_class() {
1014        let in_reply_to = make_interaction_id();
1015        let id = make_interaction_id();
1016        let classified = PeerInputCandidate {
1017            interaction: InboxInteraction {
1018                from_route: None,
1019                from: "peer-1".into(),
1020                content: InteractionContent::Response {
1021                    status: ResponseStatus::Completed,
1022                    result: serde_json::json!({"ok": true}),
1023                    in_reply_to,
1024                    blocks: None,
1025                },
1026                id,
1027                rendered_text: String::new(),
1028                handling_mode: meerkat_core::types::HandlingMode::Queue,
1029                render_metadata: None,
1030            },
1031            ingress: PeerIngressFact::peer(
1032                id,
1033                PeerInputClass::ResponseProgress,
1034                meerkat_core::PeerIngressKind::Response,
1035                Some(meerkat_core::PeerIngressAuthDecision::Required),
1036                PeerIngressIdentity::new(
1037                    test_peer_id(),
1038                    "peer-1",
1039                    PeerIngressConvention::Response {
1040                        in_reply_to,
1041                        status: ResponseStatus::Completed,
1042                    },
1043                ),
1044            ),
1045            lifecycle_peer: None,
1046            response_terminality: Some(meerkat_core::TerminalityClass::Progress),
1047        };
1048
1049        let input =
1050            classified_interaction_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
1051                .expect("classified response should project");
1052        if let Input::Peer(peer) = input {
1053            assert!(
1054                matches!(
1055                    peer.convention,
1056                    Some(PeerConvention::ResponseProgress { .. })
1057                ),
1058                "classified bridge must consume ingress-owned response class"
1059            );
1060        } else {
1061            panic!("Expected PeerInput");
1062        }
1063    }
1064
1065    #[test]
1066    fn classified_response_missing_machine_terminality_fails_closed() {
1067        let in_reply_to = make_interaction_id();
1068        let id = make_interaction_id();
1069        let classified = PeerInputCandidate {
1070            interaction: InboxInteraction {
1071                from_route: None,
1072                from: "peer-1".into(),
1073                content: InteractionContent::Response {
1074                    status: ResponseStatus::Completed,
1075                    result: serde_json::json!({"ok": true}),
1076                    in_reply_to,
1077                    blocks: None,
1078                },
1079                id,
1080                rendered_text: String::new(),
1081                handling_mode: meerkat_core::types::HandlingMode::Queue,
1082                render_metadata: None,
1083            },
1084            ingress: PeerIngressFact::peer(
1085                id,
1086                PeerInputClass::ResponseTerminal,
1087                meerkat_core::PeerIngressKind::Response,
1088                Some(meerkat_core::PeerIngressAuthDecision::Required),
1089                PeerIngressIdentity::new(
1090                    test_peer_id(),
1091                    "peer-1",
1092                    PeerIngressConvention::Response {
1093                        in_reply_to,
1094                        status: ResponseStatus::Completed,
1095                    },
1096                ),
1097            ),
1098            lifecycle_peer: None,
1099            response_terminality: None,
1100        };
1101
1102        let result =
1103            classified_interaction_to_runtime_input(&classified, &LogicalRuntimeId::new("test"));
1104        assert!(
1105            matches!(
1106                result,
1107                Err(PeerIngressProjectionError::MissingResponseTerminality { interaction_id })
1108                    if interaction_id == id
1109            ),
1110            "runtime projection must not infer public terminality from raw status: {result:?}"
1111        );
1112    }
1113
1114    #[test]
1115    fn response_terminal_without_canonical_peer_id_fails_typed_projection() {
1116        let in_reply_to = make_interaction_id();
1117        let interaction_id = make_interaction_id();
1118        let candidate = PeerInputCandidate {
1119            interaction: InboxInteraction {
1120                from_route: None,
1121                from: "Peer One".into(),
1122                content: InteractionContent::Response {
1123                    status: ResponseStatus::Completed,
1124                    result: serde_json::json!({"ok": true}),
1125                    in_reply_to,
1126                    blocks: None,
1127                },
1128                id: interaction_id,
1129                rendered_text: String::new(),
1130                handling_mode: meerkat_core::types::HandlingMode::Queue,
1131                render_metadata: None,
1132            },
1133            ingress: PeerIngressFact::legacy_peer_label(
1134                interaction_id,
1135                "Peer One",
1136                PeerInputClass::ResponseTerminal,
1137                meerkat_core::PeerIngressKind::Response,
1138                Some(meerkat_core::PeerIngressAuthDecision::Required),
1139                PeerIngressConvention::Response {
1140                    in_reply_to,
1141                    status: ResponseStatus::Completed,
1142                },
1143            ),
1144            lifecycle_peer: None,
1145            response_terminality: Some(meerkat_core::TerminalityClass::Terminal {
1146                disposition: meerkat_core::TerminalDisposition::Completed,
1147            }),
1148        };
1149        let err = peer_input_candidate_to_runtime_input(&candidate, &LogicalRuntimeId::new("test"))
1150            .unwrap_err();
1151        assert!(matches!(
1152            err,
1153            PeerIngressProjectionError::MissingCanonicalPeerId { .. }
1154        ));
1155    }
1156
1157    #[test]
1158    fn response_failed_to_terminal() {
1159        let in_reply_to = make_interaction_id();
1160        let route_id = meerkat_core::comms::PeerId::from_uuid(
1161            uuid::Uuid::parse_str("018f6f79-7a82-7c4e-a552-a3b86f9630f3").unwrap(),
1162        );
1163        let interaction = InboxInteraction {
1164            from_route: Some(route_id),
1165            from: "peer-1".into(),
1166            content: InteractionContent::Response {
1167                status: ResponseStatus::Failed,
1168                result: serde_json::json!({"error": "timeout"}),
1169                in_reply_to,
1170                blocks: None,
1171            },
1172            id: make_interaction_id(),
1173            rendered_text: String::new(),
1174            handling_mode: meerkat_core::types::HandlingMode::Queue,
1175            render_metadata: None,
1176        };
1177        let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
1178        if let Input::Peer(p) = &input {
1179            assert!(matches!(
1180                p.convention,
1181                Some(PeerConvention::ResponseTerminal {
1182                    status: ResponseTerminalStatus::Failed,
1183                    ..
1184                })
1185            ));
1186        } else {
1187            panic!("Expected PeerInput");
1188        }
1189    }
1190
1191    #[test]
1192    fn response_accepted_to_progress() {
1193        let in_reply_to = make_interaction_id();
1194        let interaction = InboxInteraction {
1195            from_route: None,
1196            from: "peer-1".into(),
1197            content: InteractionContent::Response {
1198                status: ResponseStatus::Accepted,
1199                result: serde_json::json!(null),
1200                in_reply_to,
1201                blocks: None,
1202            },
1203            id: make_interaction_id(),
1204            rendered_text: String::new(),
1205            handling_mode: meerkat_core::types::HandlingMode::Queue,
1206            render_metadata: None,
1207        };
1208        let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
1209        if let Input::Peer(p) = &input {
1210            assert!(matches!(
1211                p.convention,
1212                Some(PeerConvention::ResponseProgress {
1213                    phase: ResponseProgressPhase::Accepted,
1214                    ..
1215                })
1216            ));
1217            assert_eq!(p.header.durability, InputDurability::Ephemeral);
1218            assert!(
1219                p.handling_mode.is_none(),
1220                "ResponseProgress inputs must not carry handling_mode"
1221            );
1222        } else {
1223            panic!("Expected PeerInput");
1224        }
1225    }
1226
1227    #[test]
1228    fn classified_response_uses_ingress_terminality_over_raw_status() {
1229        let in_reply_to = make_interaction_id();
1230        let id = make_interaction_id();
1231        let classified = PeerInputCandidate {
1232            interaction: InboxInteraction {
1233                from_route: None,
1234                from: "peer-1".into(),
1235                content: InteractionContent::Response {
1236                    status: ResponseStatus::Completed,
1237                    result: serde_json::json!({"ok": true}),
1238                    in_reply_to,
1239                    blocks: None,
1240                },
1241                id,
1242                rendered_text: String::new(),
1243                handling_mode: meerkat_core::types::HandlingMode::Queue,
1244                render_metadata: None,
1245            },
1246            ingress: PeerIngressFact::peer(
1247                id,
1248                PeerInputClass::ResponseProgress,
1249                meerkat_core::PeerIngressKind::Response,
1250                Some(meerkat_core::PeerIngressAuthDecision::Required),
1251                PeerIngressIdentity::new(
1252                    test_peer_id(),
1253                    "peer-1",
1254                    PeerIngressConvention::Response {
1255                        in_reply_to,
1256                        status: ResponseStatus::Completed,
1257                    },
1258                ),
1259            ),
1260            lifecycle_peer: None,
1261            response_terminality: Some(meerkat_core::TerminalityClass::Progress),
1262        };
1263
1264        let input =
1265            peer_input_candidate_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
1266                .expect("classified response should project");
1267
1268        if let Input::Peer(p) = &input {
1269            assert!(matches!(
1270                p.convention,
1271                Some(PeerConvention::ResponseProgress {
1272                    phase: ResponseProgressPhase::Accepted,
1273                    ..
1274                })
1275            ));
1276            assert_eq!(p.header.durability, InputDurability::Ephemeral);
1277            assert_eq!(p.handling_mode, None);
1278        } else {
1279            panic!("Expected PeerInput");
1280        }
1281    }
1282
1283    #[test]
1284    fn peer_source_includes_runtime_id() {
1285        let interaction = InboxInteraction {
1286            from_route: None,
1287            from: "peer-1".into(),
1288            content: InteractionContent::Message {
1289                body: "hi".into(),
1290                blocks: None,
1291            },
1292            id: make_interaction_id(),
1293            rendered_text: String::new(),
1294            handling_mode: meerkat_core::types::HandlingMode::Queue,
1295            render_metadata: None,
1296        };
1297        let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("agent-runtime-1"));
1298        if let Input::Peer(p) = &input {
1299            if let InputOrigin::Peer {
1300                peer_id,
1301                display_identity,
1302                runtime_id,
1303                ..
1304            } = &p.header.source
1305            {
1306                assert_eq!(peer_id, &test_peer_id().as_str());
1307                assert_eq!(display_identity.as_deref(), Some("peer-1"));
1308                assert_eq!(runtime_id.as_ref().unwrap().0, "agent-runtime-1");
1309            } else {
1310                panic!("Expected Peer source");
1311            }
1312        } else {
1313            panic!("Expected PeerInput");
1314        }
1315    }
1316
1317    #[test]
1318    fn all_interaction_types_produce_valid_inputs() {
1319        let in_reply_to = make_interaction_id();
1320        let interactions = vec![
1321            InboxInteraction {
1322                from_route: None,
1323                from: "p".into(),
1324                content: InteractionContent::Message {
1325                    body: "m".into(),
1326                    blocks: None,
1327                },
1328                id: make_interaction_id(),
1329                rendered_text: String::new(),
1330                handling_mode: meerkat_core::types::HandlingMode::Queue,
1331                render_metadata: None,
1332            },
1333            InboxInteraction {
1334                from_route: None,
1335                from: "p".into(),
1336                content: InteractionContent::Request {
1337                    intent: "i".into(),
1338                    params: serde_json::json!({}),
1339                    blocks: None,
1340                },
1341                id: make_interaction_id(),
1342                rendered_text: String::new(),
1343                handling_mode: meerkat_core::types::HandlingMode::Queue,
1344                render_metadata: None,
1345            },
1346            InboxInteraction {
1347                from_route: Some(meerkat_core::comms::PeerId::from_uuid(
1348                    uuid::Uuid::parse_str("018f6f79-7a82-7c4e-a552-a3b86f9630f6").unwrap(),
1349                )),
1350                from: "p".into(),
1351                content: InteractionContent::Response {
1352                    status: ResponseStatus::Completed,
1353                    result: serde_json::json!(null),
1354                    in_reply_to,
1355                    blocks: None,
1356                },
1357                id: make_interaction_id(),
1358                rendered_text: String::new(),
1359                handling_mode: meerkat_core::types::HandlingMode::Queue,
1360                render_metadata: None,
1361            },
1362        ];
1363
1364        let rid = LogicalRuntimeId::new("test");
1365        for interaction in &interactions {
1366            let input = peer_input_for_test(interaction, &rid);
1367            assert!(matches!(input, Input::Peer(_)));
1368        }
1369    }
1370}