Skip to main content

meerkat_runtime/
comms_bridge.rs

1//! Runtime comms bridge helpers.
2//!
3//! These helpers translate drained comms interactions into the runtime-owned
4//! input families used by the comms classification bridge.
5
6use chrono::Utc;
7#[cfg(test)]
8use meerkat_core::comms::PeerId;
9use meerkat_core::interaction::{
10    InboxInteraction, InteractionContent, PeerIngressConvention, PeerIngressFact, PeerIngressKind,
11    PeerInputCandidate, PeerInputClass,
12};
13#[cfg(test)]
14use meerkat_core::interaction::{PeerIngressIdentity, ResponseStatus};
15use meerkat_core::lifecycle::InputId;
16
17use crate::identifiers::{CorrelationId, LogicalRuntimeId};
18use crate::input::{
19    ExternalEventInput, Input, InputDurability, InputHeader, InputOrigin, InputVisibility,
20    PeerConvention, PeerInput, ResponseProgressPhase, ResponseTerminalStatus,
21};
22
23#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
24pub enum PeerIngressProjectionError {
25    #[error(
26        "classified peer ingress {interaction_id} ({kind:?}) cannot project to a runtime PeerInput"
27    )]
28    UnsupportedPeerConvention {
29        interaction_id: meerkat_core::InteractionId,
30        kind: PeerIngressKind,
31    },
32    #[error("classified peer ingress {interaction_id} missing canonical peer id")]
33    MissingCanonicalPeerId {
34        interaction_id: meerkat_core::InteractionId,
35    },
36    #[error("classified peer response {interaction_id} missing machine response terminality")]
37    MissingResponseTerminality {
38        interaction_id: meerkat_core::InteractionId,
39    },
40    #[error(
41        "classified peer response {interaction_id} has unsupported machine response terminality"
42    )]
43    UnsupportedResponseTerminality {
44        interaction_id: meerkat_core::InteractionId,
45    },
46}
47
48/// Convert a classified comms interaction into the appropriate runtime-owned
49/// input family.
50pub fn classified_interaction_to_runtime_input(
51    classified: &PeerInputCandidate,
52    runtime_id: &LogicalRuntimeId,
53) -> Result<Input, PeerIngressProjectionError> {
54    let interaction = &classified.interaction;
55
56    if classified.class() == PeerInputClass::PlainEvent {
57        let source_name = classified
58            .ingress
59            .plain_event_source_name()
60            .unwrap_or("unknown");
61        let blocks = external_event_blocks(interaction);
62        return Ok(Input::ExternalEvent(ExternalEventInput {
63            header: InputHeader {
64                id: InputId::new(),
65                timestamp: Utc::now(),
66                source: InputOrigin::External {
67                    source_name: source_name.to_string(),
68                },
69                durability: InputDurability::Durable,
70                visibility: InputVisibility {
71                    transcript_eligible: true,
72                    operator_eligible: true,
73                },
74                idempotency_key: None,
75                supersession_key: None,
76                correlation_id: Some(CorrelationId::from_uuid(interaction.id.0)),
77            },
78            event_type: source_name.to_string(),
79            payload: external_event_payload(interaction),
80            blocks,
81            handling_mode: interaction.handling_mode,
82            render_metadata: interaction.render_metadata.clone(),
83        }));
84    }
85
86    peer_candidate_to_peer_input(classified, runtime_id)
87}
88
89fn peer_candidate_to_peer_input(
90    classified: &PeerInputCandidate,
91    runtime_id: &LogicalRuntimeId,
92) -> Result<Input, PeerIngressProjectionError> {
93    peer_input_from_ingress_fact(
94        &classified.interaction,
95        runtime_id,
96        &classified.ingress,
97        classified.response_terminality,
98    )
99}
100
101fn peer_input_from_ingress_fact(
102    interaction: &InboxInteraction,
103    runtime_id: &LogicalRuntimeId,
104    ingress: &PeerIngressFact,
105    response_terminality: Option<meerkat_core::interaction::TerminalityClass>,
106) -> Result<Input, PeerIngressProjectionError> {
107    let convention = map_ingress_convention(interaction.id, ingress, response_terminality)?;
108    let durability = map_durability(&convention);
109    let handling_mode = match &convention {
110        PeerConvention::ResponseProgress { .. } => None,
111        _ => Some(interaction.handling_mode),
112    };
113    let peer_id = ingress.canonical_peer_id_string().ok_or(
114        PeerIngressProjectionError::MissingCanonicalPeerId {
115            interaction_id: interaction.id,
116        },
117    )?;
118    let display_identity = ingress
119        .route
120        .as_ref()
121        .map(meerkat_core::PeerRoute::label)
122        .or_else(|| ingress.display_label());
123
124    Ok(Input::Peer(PeerInput {
125        header: InputHeader {
126            id: InputId::new(),
127            timestamp: Utc::now(),
128            source: InputOrigin::Peer {
129                peer_id,
130                display_identity,
131                runtime_id: Some(runtime_id.clone()),
132            },
133            durability,
134            visibility: InputVisibility {
135                transcript_eligible: true,
136                operator_eligible: true,
137            },
138            idempotency_key: None,
139            supersession_key: None,
140            correlation_id: Some(CorrelationId::from_uuid(interaction.id.0)),
141        },
142        convention: Some(convention),
143        content: match peer_blocks(interaction) {
144            Some(blocks) => meerkat_core::types::ContentInput::Blocks(blocks),
145            None => meerkat_core::types::ContentInput::Text(peer_rendered_body(interaction)),
146        },
147        payload: peer_payload(interaction),
148        handling_mode,
149    }))
150}
151
152fn map_ingress_convention(
153    interaction_id: meerkat_core::InteractionId,
154    ingress: &PeerIngressFact,
155    response_terminality: Option<meerkat_core::interaction::TerminalityClass>,
156) -> Result<PeerConvention, PeerIngressProjectionError> {
157    match &ingress.convention {
158        PeerIngressConvention::Message => Ok(PeerConvention::Message),
159        PeerIngressConvention::Request { request_id, intent } => Ok(PeerConvention::Request {
160            request_id: request_id.clone(),
161            intent: intent.clone(),
162        }),
163        PeerIngressConvention::Response {
164            in_reply_to,
165            status: _,
166        } => {
167            let terminality = response_terminality
168                .ok_or(PeerIngressProjectionError::MissingResponseTerminality { interaction_id })?;
169            map_response_convention(interaction_id, *in_reply_to, terminality)
170        }
171        PeerIngressConvention::Lifecycle { kind, .. } => Ok(PeerConvention::Request {
172            request_id: ingress.interaction_id.to_string(),
173            intent: kind.to_string(),
174        }),
175        PeerIngressConvention::Ack { .. } | PeerIngressConvention::PlainEvent { .. } => {
176            Err(PeerIngressProjectionError::UnsupportedPeerConvention {
177                interaction_id,
178                kind: ingress.kind,
179            })
180        }
181    }
182}
183
184fn map_response_convention(
185    interaction_id: meerkat_core::InteractionId,
186    in_reply_to: meerkat_core::InteractionId,
187    terminality: meerkat_core::interaction::TerminalityClass,
188) -> Result<PeerConvention, PeerIngressProjectionError> {
189    let request_id = in_reply_to.to_string();
190    Ok(match terminality {
191        meerkat_core::interaction::TerminalityClass::Progress => PeerConvention::ResponseProgress {
192            request_id,
193            phase: ResponseProgressPhase::Accepted,
194        },
195        meerkat_core::interaction::TerminalityClass::Terminal { disposition } => {
196            let term = match disposition {
197                meerkat_core::interaction::TerminalDisposition::Completed => {
198                    ResponseTerminalStatus::Completed
199                }
200                meerkat_core::interaction::TerminalDisposition::Failed => {
201                    ResponseTerminalStatus::Failed
202                }
203                _ => {
204                    return Err(PeerIngressProjectionError::UnsupportedResponseTerminality {
205                        interaction_id,
206                    });
207                }
208            };
209            PeerConvention::ResponseTerminal {
210                request_id,
211                status: term,
212            }
213        }
214        _ => {
215            return Err(PeerIngressProjectionError::UnsupportedResponseTerminality {
216                interaction_id,
217            });
218        }
219    })
220}
221
222fn peer_rendered_body(interaction: &InboxInteraction) -> String {
223    if !interaction.rendered_text.trim().is_empty() {
224        return interaction.rendered_text.clone();
225    }
226    match &interaction.content {
227        InteractionContent::Message { body, .. } => body.clone(),
228        InteractionContent::Request { params, .. } => {
229            serde_json::to_string(params).unwrap_or_default()
230        }
231        InteractionContent::Response { result, .. } => {
232            serde_json::to_string(result).unwrap_or_default()
233        }
234    }
235}
236
237fn peer_blocks(interaction: &InboxInteraction) -> Option<Vec<meerkat_core::types::ContentBlock>> {
238    match &interaction.content {
239        InteractionContent::Message { blocks, .. } => blocks.clone(),
240        InteractionContent::Request { blocks, .. } => blocks.clone(),
241        InteractionContent::Response { blocks, .. } => blocks.clone(),
242    }
243}
244
245fn peer_payload(interaction: &InboxInteraction) -> Option<serde_json::Value> {
246    match &interaction.content {
247        InteractionContent::Message { .. } => None,
248        InteractionContent::Request { params, .. } => Some(params.clone()),
249        InteractionContent::Response { result, .. } => Some(result.clone()),
250    }
251}
252
253fn external_event_payload(interaction: &InboxInteraction) -> serde_json::Value {
254    match &interaction.content {
255        InteractionContent::Message { body, .. } => serde_json::json!({ "body": body }),
256        InteractionContent::Request { intent, params, .. } => {
257            serde_json::json!({ "intent": intent, "params": params })
258        }
259        InteractionContent::Response {
260            in_reply_to,
261            status,
262            result,
263            blocks,
264        } => serde_json::json!({
265            "in_reply_to": in_reply_to,
266            "status": status,
267            "result": result,
268            "blocks": blocks,
269        }),
270    }
271}
272
273fn external_event_blocks(
274    interaction: &InboxInteraction,
275) -> Option<Vec<meerkat_core::types::ContentBlock>> {
276    match &interaction.content {
277        InteractionContent::Message { blocks, .. } => blocks.clone(),
278        InteractionContent::Request { blocks, .. } => blocks.clone(),
279        _ => None,
280    }
281}
282
283fn map_durability(convention: &PeerConvention) -> InputDurability {
284    match convention {
285        PeerConvention::ResponseProgress { .. } => InputDurability::Ephemeral,
286        _ => InputDurability::Durable,
287    }
288}
289
290#[cfg(test)]
291#[allow(clippy::unwrap_used, clippy::panic)]
292mod tests {
293    use super::*;
294    use meerkat_core::interaction::{PeerIngressIdentity, ResponseStatus};
295
296    fn make_interaction_id() -> meerkat_core::interaction::InteractionId {
297        meerkat_core::interaction::InteractionId(meerkat_core::time_compat::new_uuid_v7())
298    }
299
300    fn plain_event_ingress(
301        id: meerkat_core::interaction::InteractionId,
302        source_name: &str,
303    ) -> PeerIngressFact {
304        PeerIngressFact::plain_event(
305            id,
306            source_name,
307            PeerInputClass::PlainEvent,
308            meerkat_core::PeerIngressKind::PlainEvent,
309        )
310    }
311
312    fn test_peer_id() -> PeerId {
313        PeerId::parse("22222222-2222-4222-8222-222222222222").expect("canonical test peer id")
314    }
315
316    fn peer_kind_for_convention(
317        convention: &PeerIngressConvention,
318    ) -> meerkat_core::PeerIngressKind {
319        match convention {
320            PeerIngressConvention::Message => meerkat_core::PeerIngressKind::Message,
321            PeerIngressConvention::Request { .. } | PeerIngressConvention::Lifecycle { .. } => {
322                meerkat_core::PeerIngressKind::Request
323            }
324            PeerIngressConvention::Response { .. } => meerkat_core::PeerIngressKind::Response,
325            PeerIngressConvention::Ack { .. } => meerkat_core::PeerIngressKind::Ack,
326            PeerIngressConvention::PlainEvent { .. } => meerkat_core::PeerIngressKind::PlainEvent,
327        }
328    }
329
330    fn peer_ingress(
331        id: meerkat_core::interaction::InteractionId,
332        peer_id: PeerId,
333        label: &str,
334        class: PeerInputClass,
335        convention: PeerIngressConvention,
336    ) -> PeerIngressFact {
337        let kind = peer_kind_for_convention(&convention);
338        PeerIngressFact::peer(
339            id,
340            class,
341            kind,
342            Some(meerkat_core::PeerIngressAuthDecision::Required),
343            PeerIngressIdentity::new(peer_id, label, convention),
344        )
345    }
346
347    fn candidate_for_interaction(interaction: InboxInteraction) -> PeerInputCandidate {
348        let peer_id = interaction.from_route.unwrap_or_else(test_peer_id);
349        crate::test_peer_input_candidate_from_interaction(interaction, peer_id)
350    }
351
352    fn peer_input_for_test(interaction: &InboxInteraction, runtime_id: &LogicalRuntimeId) -> Input {
353        let candidate = candidate_for_interaction(interaction.clone());
354        classified_interaction_to_runtime_input(&candidate, runtime_id)
355            .expect("test candidate should project to runtime input")
356    }
357
358    #[test]
359    fn message_to_peer_input() {
360        let interaction = InboxInteraction {
361            from_route: None,
362            from: "peer-1".into(),
363            content: InteractionContent::Message {
364                body: "hello".into(),
365                blocks: None,
366            },
367            id: make_interaction_id(),
368            rendered_text: String::new(),
369            handling_mode: meerkat_core::types::HandlingMode::Queue,
370            render_metadata: None,
371        };
372        let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
373        if let Input::Peer(p) = &input {
374            assert!(matches!(p.convention, Some(PeerConvention::Message)));
375            assert_eq!(p.content.text_content(), "hello");
376            assert_eq!(p.header.durability, InputDurability::Durable);
377            assert_eq!(
378                p.handling_mode,
379                Some(meerkat_core::types::HandlingMode::Queue),
380                "explicit queue must survive comms -> runtime projection so it can suppress running-turn interruption"
381            );
382        } else {
383            panic!("Expected PeerInput");
384        }
385    }
386
387    #[test]
388    fn request_to_peer_input() {
389        let interaction = InboxInteraction {
390            from_route: None,
391            from: "peer-1".into(),
392            content: InteractionContent::Request {
393                intent: "mob.peer_added".into(),
394                // K15: lifecycle-classed requests carry the typed peer
395                // subject; a missing subject is rejected at ingress.
396                params: serde_json::json!({"peer": "agent-1"}),
397                blocks: None,
398            },
399            id: make_interaction_id(),
400            rendered_text: String::new(),
401            handling_mode: meerkat_core::types::HandlingMode::Queue,
402            render_metadata: None,
403        };
404        let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
405        if let Input::Peer(p) = &input {
406            assert!(matches!(p.convention, Some(PeerConvention::Request { .. })));
407            match p.convention.as_ref() {
408                Some(PeerConvention::Request { request_id, .. }) => {
409                    assert_eq!(request_id, &interaction.id.0.to_string());
410                }
411                other => panic!("Expected request convention, got {other:?}"),
412            }
413            assert_eq!(p.header.durability, InputDurability::Durable);
414            assert_eq!(
415                p.payload,
416                Some(serde_json::json!({"peer": "agent-1"})),
417                "request params must remain structured on PeerInput so runtime prompt projection does not depend on pre-rendered comms prose"
418            );
419            assert_eq!(
420                p.handling_mode,
421                Some(meerkat_core::types::HandlingMode::Queue),
422                "explicit queue request semantics must not collapse to default policy"
423            );
424        } else {
425            panic!("Expected PeerInput");
426        }
427    }
428
429    #[test]
430    fn classified_request_uses_canonical_peer_id_for_runtime_projection() {
431        let source_peer_id =
432            PeerId::parse("11111111-1111-4111-8111-111111111111").expect("canonical peer id");
433        let request_id = make_interaction_id();
434        let classified = PeerInputCandidate {
435            interaction: InboxInteraction {
436                from_route: None,
437                from: "test-mob/lead/l-requester".into(),
438                content: InteractionContent::Request {
439                    intent: "interpret_image".into(),
440                    params: serde_json::json!({"description": "tower with a light"}),
441                    blocks: None,
442                },
443                id: request_id,
444                rendered_text: "stale helper prose".into(),
445                handling_mode: meerkat_core::types::HandlingMode::Steer,
446                render_metadata: None,
447            },
448            ingress: PeerIngressFact::peer(
449                request_id,
450                PeerInputClass::ActionableRequest,
451                meerkat_core::PeerIngressKind::Request,
452                Some(meerkat_core::PeerIngressAuthDecision::Required),
453                PeerIngressIdentity::new(
454                    source_peer_id,
455                    "test-mob/lead/l-requester",
456                    PeerIngressConvention::Request {
457                        request_id: request_id.to_string(),
458                        intent: "interpret_image".to_string(),
459                    },
460                ),
461            ),
462            lifecycle_peer: None,
463            response_terminality: None,
464        };
465
466        let input =
467            classified_interaction_to_runtime_input(&classified, &LogicalRuntimeId::new("worker"))
468                .expect("classified request should project to peer input");
469        let Input::Peer(peer) = &input else {
470            panic!("Expected PeerInput");
471        };
472        let InputOrigin::Peer { peer_id, .. } = &peer.header.source else {
473            panic!("Expected peer source");
474        };
475        assert_eq!(peer_id, "11111111-1111-4111-8111-111111111111");
476        assert_eq!(peer.content.text_content(), "stale helper prose");
477
478        let prompt = crate::input::input_prompt_text(&input);
479        assert!(prompt.starts_with(
480            "Peer request from peer_id 11111111-1111-4111-8111-111111111111 (display_name: test-mob/lead/l-requester)."
481        ));
482        assert!(prompt.contains("\"peer_id\":\"11111111-1111-4111-8111-111111111111\""));
483        assert!(prompt.contains("\"display_name\":\"test-mob/lead/l-requester\""));
484        assert!(prompt.contains(&format!("\"in_reply_to\":\"{}\"", request_id.0)));
485        assert!(prompt.contains("\"status\":\"completed\""));
486        assert!(!prompt.contains("to=\""));
487    }
488
489    #[test]
490    fn plain_event_to_external_event_input() {
491        let id = make_interaction_id();
492        let classified = PeerInputCandidate {
493            lifecycle_peer: None,
494            response_terminality: None,
495            ingress: plain_event_ingress(id, "webhook"),
496            interaction: InboxInteraction {
497                from_route: None,
498                from: "event:webhook".into(),
499                content: InteractionContent::Message {
500                    body: "{\"ok\":true}".into(),
501                    blocks: None,
502                },
503                id,
504                rendered_text: String::new(),
505                handling_mode: meerkat_core::types::HandlingMode::Queue,
506                render_metadata: None,
507            },
508        };
509        let input =
510            classified_interaction_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
511                .expect("plain event should project to external event input");
512        match input {
513            Input::ExternalEvent(event) => {
514                assert_eq!(event.event_type, "webhook");
515                assert_eq!(event.payload["body"], "{\"ok\":true}");
516                assert_eq!(event.blocks, None);
517                assert_eq!(
518                    event.handling_mode,
519                    meerkat_core::types::HandlingMode::Queue
520                );
521                assert_eq!(event.render_metadata, None);
522            }
523            other => panic!("Expected ExternalEvent input, got {other:?}"),
524        }
525    }
526
527    #[test]
528    fn peer_named_event_prefix_stays_peer_without_plain_event_class() {
529        let id = make_interaction_id();
530        let classified = PeerInputCandidate {
531            lifecycle_peer: None,
532            response_terminality: None,
533            ingress: peer_ingress(
534                id,
535                test_peer_id(),
536                "event:webhook",
537                PeerInputClass::ActionableMessage,
538                PeerIngressConvention::Message,
539            ),
540            interaction: InboxInteraction {
541                from_route: None,
542                from: "event:webhook".into(),
543                content: InteractionContent::Message {
544                    body: "hello".into(),
545                    blocks: None,
546                },
547                id,
548                rendered_text: "stale rendered text".into(),
549                handling_mode: meerkat_core::types::HandlingMode::Queue,
550                render_metadata: None,
551            },
552        };
553        let input =
554            classified_interaction_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
555                .expect("classified peer event should project to peer input");
556        match input {
557            Input::Peer(peer) => {
558                assert_eq!(peer.content.text_content(), "stale rendered text");
559                match peer.header.source {
560                    InputOrigin::Peer { peer_id, .. } => {
561                        assert_eq!(peer_id, test_peer_id().as_str());
562                    }
563                    other => panic!("Expected peer source, got {other:?}"),
564                }
565            }
566            other => panic!("Expected Peer input, got {other:?}"),
567        }
568    }
569
570    #[test]
571    fn classified_peer_projection_uses_ingress_canonical_peer_id_not_display_from() {
572        let id = make_interaction_id();
573        let canonical_peer_id = meerkat_core::comms::PeerId::new();
574        let classified = PeerInputCandidate {
575            lifecycle_peer: None,
576            response_terminality: None,
577            ingress: PeerIngressFact::peer(
578                id,
579                PeerInputClass::ActionableRequest,
580                meerkat_core::PeerIngressKind::Request,
581                Some(meerkat_core::PeerIngressAuthDecision::Required),
582                PeerIngressIdentity::new(
583                    canonical_peer_id,
584                    "display-agent",
585                    PeerIngressConvention::Request {
586                        request_id: id.to_string(),
587                        intent: "review".to_string(),
588                    },
589                ),
590            ),
591            interaction: InboxInteraction {
592                from_route: None,
593                from: "display-agent".into(),
594                content: InteractionContent::Request {
595                    intent: "review".into(),
596                    params: serde_json::json!({"pr": 42}),
597                    blocks: None,
598                },
599                id,
600                rendered_text: "stale rendered text".into(),
601                handling_mode: meerkat_core::types::HandlingMode::Queue,
602                render_metadata: None,
603            },
604        };
605
606        let input =
607            classified_interaction_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
608                .expect("classified peer projection should use typed canonical id");
609        let Input::Peer(peer) = input else {
610            panic!("Expected Peer input");
611        };
612        match peer.header.source {
613            InputOrigin::Peer { peer_id, .. } => {
614                assert_eq!(peer_id, canonical_peer_id.as_str());
615                assert_ne!(peer_id, "display-agent");
616            }
617            other => panic!("Expected peer source, got {other:?}"),
618        }
619        assert_eq!(peer.content.text_content(), "stale rendered text");
620    }
621
622    #[test]
623    fn classified_peer_projection_rejects_display_only_ingress_identity() {
624        let id = make_interaction_id();
625        let classified = PeerInputCandidate {
626            lifecycle_peer: None,
627            response_terminality: None,
628            ingress: PeerIngressFact {
629                interaction_id: id,
630                class: PeerInputClass::ActionableMessage,
631                kind: meerkat_core::PeerIngressKind::Message,
632                canonical_peer_id: None,
633                display_name: meerkat_core::comms::PeerName::new("display-agent".to_string()).ok(),
634                signing_pubkey: None,
635                route: None,
636                auth: Some(meerkat_core::PeerIngressAuthDecision::Required),
637                convention: PeerIngressConvention::Message,
638            },
639            interaction: InboxInteraction {
640                from_route: None,
641                from: "display-agent".into(),
642                content: InteractionContent::Message {
643                    body: "hello".into(),
644                    blocks: None,
645                },
646                id,
647                rendered_text: "stale rendered text".into(),
648                handling_mode: meerkat_core::types::HandlingMode::Queue,
649                render_metadata: None,
650            },
651        };
652
653        let result =
654            classified_interaction_to_runtime_input(&classified, &LogicalRuntimeId::new("test"));
655        assert!(
656            matches!(
657                result,
658                Err(PeerIngressProjectionError::MissingCanonicalPeerId { interaction_id })
659                    if interaction_id == id
660            ),
661            "display-only ingress must fail closed, got {result:?}"
662        );
663    }
664
665    #[test]
666    fn request_body_preserves_rendered_text_and_structured_payload() {
667        let interaction = InboxInteraction {
668            from_route: None,
669            from: "event:webhook".into(),
670            content: InteractionContent::Request {
671                intent: "mob.peer_added".into(),
672                params: serde_json::json!({"peer":"agent-1"}),
673                blocks: None,
674            },
675            id: make_interaction_id(),
676            rendered_text: "stale rendered text".into(),
677            handling_mode: meerkat_core::types::HandlingMode::Queue,
678            render_metadata: None,
679        };
680        let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
681        if let Input::Peer(peer) = input {
682            assert_eq!(peer.content.text_content(), "stale rendered text");
683            assert_eq!(peer.payload, Some(serde_json::json!({"peer":"agent-1"})));
684        } else {
685            panic!("Expected PeerInput");
686        }
687    }
688
689    #[test]
690    fn message_blocks_are_preserved_on_peer_input() {
691        let blocks = vec![
692            meerkat_core::types::ContentBlock::Text {
693                text: "see image".into(),
694            },
695            meerkat_core::types::ContentBlock::Image {
696                media_type: "image/png".into(),
697                data: "abc".into(),
698            },
699        ];
700        let interaction = InboxInteraction {
701            from_route: None,
702            from: "peer-1".into(),
703            content: InteractionContent::Message {
704                body: "see image".into(),
705                blocks: Some(blocks.clone()),
706            },
707            id: make_interaction_id(),
708            rendered_text: "stale rendered text".into(),
709            handling_mode: meerkat_core::types::HandlingMode::Queue,
710            render_metadata: None,
711        };
712        let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
713        if let Input::Peer(peer) = input {
714            // Single-owner semantics: when typed blocks are present they ARE
715            // the content; the stale rendered text is not stored beside them.
716            assert_eq!(
717                peer.content,
718                meerkat_core::types::ContentInput::Blocks(blocks)
719            );
720        } else {
721            panic!("Expected PeerInput");
722        }
723    }
724
725    #[test]
726    fn request_blocks_are_preserved_on_peer_input() {
727        let blocks = vec![
728            meerkat_core::types::ContentBlock::Text {
729                text: "describe this image".into(),
730            },
731            meerkat_core::types::ContentBlock::Image {
732                media_type: "image/png".into(),
733                data: "abc".into(),
734            },
735        ];
736        let interaction_id = make_interaction_id();
737        let peer_id = PeerId::new();
738        let classified = PeerInputCandidate {
739            interaction: InboxInteraction {
740                from_route: Some(peer_id),
741                from: "vision-peer".into(),
742                content: InteractionContent::Request {
743                    intent: "checksum_token".into(),
744                    params: serde_json::json!({"subject": "describe-image"}),
745                    blocks: Some(blocks.clone()),
746                },
747                id: interaction_id,
748                rendered_text: String::new(),
749                handling_mode: meerkat_core::types::HandlingMode::Steer,
750                render_metadata: None,
751            },
752            ingress: PeerIngressFact::peer(
753                interaction_id,
754                PeerInputClass::ActionableRequest,
755                PeerIngressKind::Request,
756                Some(meerkat_core::interaction::PeerIngressAuthDecision::Required),
757                PeerIngressIdentity::new(
758                    peer_id,
759                    "vision-peer",
760                    meerkat_core::interaction::PeerIngressConvention::Request {
761                        request_id: interaction_id.to_string(),
762                        intent: "checksum_token".to_string(),
763                    },
764                ),
765            ),
766            lifecycle_peer: None,
767            response_terminality: None,
768        };
769
770        let input = classified_interaction_to_runtime_input(
771            &classified,
772            &LogicalRuntimeId::new("runtime-a"),
773        )
774        .expect("classified request should project");
775        if let Input::Peer(peer) = input {
776            assert_eq!(
777                peer.content,
778                meerkat_core::types::ContentInput::Blocks(blocks)
779            );
780            assert_eq!(
781                peer.payload,
782                Some(serde_json::json!({"subject": "describe-image"}))
783            );
784        } else {
785            panic!("Expected PeerInput");
786        }
787    }
788
789    #[test]
790    fn multimodal_message_blocks_own_content_with_derived_text_projection() {
791        let blocks = vec![
792            meerkat_core::types::ContentBlock::Text {
793                text: "caption text".into(),
794            },
795            meerkat_core::types::ContentBlock::Image {
796                media_type: "image/png".into(),
797                data: "abc".into(),
798            },
799        ];
800        let interaction = InboxInteraction {
801            from_route: None,
802            from: "peer-1".into(),
803            content: InteractionContent::Message {
804                body: "please inspect this image".into(),
805                blocks: Some(blocks.clone()),
806            },
807            id: make_interaction_id(),
808            rendered_text: "stale rendered text".into(),
809            handling_mode: meerkat_core::types::HandlingMode::Queue,
810            render_metadata: None,
811        };
812        let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
813        if let Input::Peer(peer) = input {
814            // Single-owner semantics: typed blocks ARE the content; the text
815            // projection is derived from them at read time, never stored.
816            assert_eq!(
817                peer.content,
818                meerkat_core::types::ContentInput::Blocks(blocks)
819            );
820            assert_eq!(
821                peer.content.text_content(),
822                "caption text\n[image: image/png]"
823            );
824        } else {
825            panic!("Expected PeerInput");
826        }
827    }
828
829    #[test]
830    fn plain_event_blocks_are_preserved_on_external_event_input() {
831        let blocks = vec![
832            meerkat_core::types::ContentBlock::Text {
833                text: "see image".into(),
834            },
835            meerkat_core::types::ContentBlock::Image {
836                media_type: "image/png".into(),
837                data: "abc".into(),
838            },
839        ];
840        let id = make_interaction_id();
841        let classified = PeerInputCandidate {
842            lifecycle_peer: None,
843            response_terminality: None,
844            ingress: plain_event_ingress(id, "webhook"),
845            interaction: InboxInteraction {
846                from_route: None,
847                from: "event:webhook".into(),
848                content: InteractionContent::Message {
849                    body: "see image".into(),
850                    blocks: Some(blocks.clone()),
851                },
852                id,
853                rendered_text: "stale rendered text".into(),
854                handling_mode: meerkat_core::types::HandlingMode::Queue,
855                render_metadata: None,
856            },
857        };
858        let input =
859            classified_interaction_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
860                .expect("plain event with blocks should project");
861        match input {
862            Input::ExternalEvent(event) => {
863                assert_eq!(event.payload["body"], "see image");
864                assert!(event.payload.get("blocks").is_none());
865                assert_eq!(event.blocks, Some(blocks));
866                assert_eq!(
867                    event.handling_mode,
868                    meerkat_core::types::HandlingMode::Queue
869                );
870                assert_eq!(event.render_metadata, None);
871            }
872            other => panic!("Expected ExternalEvent input, got {other:?}"),
873        }
874    }
875
876    #[test]
877    fn plain_event_preserves_handling_mode_and_render_metadata() {
878        let render_metadata = meerkat_core::types::RenderMetadata {
879            class: meerkat_core::types::RenderClass::ExternalEvent,
880            salience: meerkat_core::types::RenderSalience::Urgent,
881        };
882        let id = make_interaction_id();
883        let classified = PeerInputCandidate {
884            lifecycle_peer: None,
885            response_terminality: None,
886            ingress: plain_event_ingress(id, "webhook"),
887            interaction: InboxInteraction {
888                from_route: None,
889                from: "event:webhook".into(),
890                content: InteractionContent::Message {
891                    body: "urgent".into(),
892                    blocks: None,
893                },
894                id,
895                rendered_text: "stale rendered text".into(),
896                handling_mode: meerkat_core::types::HandlingMode::Steer,
897                render_metadata: Some(render_metadata.clone()),
898            },
899        };
900
901        match classified_interaction_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
902            .expect("plain event should preserve render metadata")
903        {
904            Input::ExternalEvent(event) => {
905                assert_eq!(
906                    event.handling_mode,
907                    meerkat_core::types::HandlingMode::Steer
908                );
909                assert_eq!(event.render_metadata, Some(render_metadata));
910            }
911            other => panic!("Expected ExternalEvent input, got {other:?}"),
912        }
913    }
914
915    #[test]
916    fn response_completed_to_terminal() {
917        let in_reply_to = make_interaction_id();
918        let route_id = meerkat_core::comms::PeerId::from_uuid(
919            uuid::Uuid::parse_str("018f6f79-7a82-7c4e-a552-a3b86f9630f2").unwrap(),
920        );
921        let interaction = InboxInteraction {
922            from_route: Some(route_id),
923            from: "Peer One".into(),
924            content: InteractionContent::Response {
925                status: ResponseStatus::Completed,
926                result: serde_json::json!({"ok": true}),
927                in_reply_to,
928                blocks: None,
929            },
930            id: make_interaction_id(),
931            rendered_text: String::new(),
932            handling_mode: meerkat_core::types::HandlingMode::Queue,
933            render_metadata: None,
934        };
935        let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
936        if let Input::Peer(p) = &input {
937            match &p.header.source {
938                InputOrigin::Peer {
939                    peer_id,
940                    display_identity,
941                    ..
942                } => {
943                    assert_eq!(peer_id, &route_id.to_string());
944                    assert_eq!(display_identity.as_deref(), Some("Peer One"));
945                }
946                other => panic!("Expected Peer source, got {other:?}"),
947            }
948            assert!(matches!(
949                p.convention,
950                Some(PeerConvention::ResponseTerminal {
951                    status: ResponseTerminalStatus::Completed,
952                    ..
953                })
954            ));
955            assert_eq!(p.header.durability, InputDurability::Durable);
956            assert_eq!(
957                p.payload,
958                Some(serde_json::json!({"ok": true})),
959                "terminal response result must remain structured on PeerInput so runtime prompt projection stays runtime-owned"
960            );
961        } else {
962            panic!("Expected PeerInput");
963        }
964        let projection = crate::input::runtime_input_projection_for_machine_batch(&input);
965        let context = projection
966            .context_append
967            .expect("terminal machine-batch context projection");
968        let expected_key = format!("peer_response_terminal:{route_id}:{in_reply_to}");
969        assert_eq!(context.key, expected_key);
970        let meerkat_core::lifecycle::run_primitive::CoreRenderable::SystemNotice { blocks, .. } =
971            context.content
972        else {
973            panic!("Expected terminal context notice");
974        };
975        assert!(matches!(
976            blocks.first(),
977            Some(meerkat_core::types::SystemNoticeBlock::Comms { peer, .. })
978                if peer.as_ref().and_then(|peer| peer.display_name.as_deref()) == Some("Peer One")
979        ));
980    }
981
982    #[test]
983    fn classified_response_uses_ingress_terminal_class() {
984        let in_reply_to = make_interaction_id();
985        let id = make_interaction_id();
986        let classified = PeerInputCandidate {
987            interaction: InboxInteraction {
988                from_route: None,
989                from: "peer-1".into(),
990                content: InteractionContent::Response {
991                    status: ResponseStatus::Completed,
992                    result: serde_json::json!({"ok": true}),
993                    in_reply_to,
994                    blocks: None,
995                },
996                id,
997                rendered_text: String::new(),
998                handling_mode: meerkat_core::types::HandlingMode::Queue,
999                render_metadata: None,
1000            },
1001            ingress: PeerIngressFact::peer(
1002                id,
1003                PeerInputClass::ResponseProgress,
1004                meerkat_core::PeerIngressKind::Response,
1005                Some(meerkat_core::PeerIngressAuthDecision::Required),
1006                PeerIngressIdentity::new(
1007                    test_peer_id(),
1008                    "peer-1",
1009                    PeerIngressConvention::Response {
1010                        in_reply_to,
1011                        status: ResponseStatus::Completed,
1012                    },
1013                ),
1014            ),
1015            lifecycle_peer: None,
1016            response_terminality: Some(meerkat_core::TerminalityClass::Progress),
1017        };
1018
1019        let input =
1020            classified_interaction_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
1021                .expect("classified response should project");
1022        if let Input::Peer(peer) = input {
1023            assert!(
1024                matches!(
1025                    peer.convention,
1026                    Some(PeerConvention::ResponseProgress { .. })
1027                ),
1028                "classified bridge must consume ingress-owned response class"
1029            );
1030        } else {
1031            panic!("Expected PeerInput");
1032        }
1033    }
1034
1035    #[test]
1036    fn classified_response_missing_machine_terminality_fails_closed() {
1037        let in_reply_to = make_interaction_id();
1038        let id = make_interaction_id();
1039        let classified = PeerInputCandidate {
1040            interaction: InboxInteraction {
1041                from_route: None,
1042                from: "peer-1".into(),
1043                content: InteractionContent::Response {
1044                    status: ResponseStatus::Completed,
1045                    result: serde_json::json!({"ok": true}),
1046                    in_reply_to,
1047                    blocks: None,
1048                },
1049                id,
1050                rendered_text: String::new(),
1051                handling_mode: meerkat_core::types::HandlingMode::Queue,
1052                render_metadata: None,
1053            },
1054            ingress: PeerIngressFact::peer(
1055                id,
1056                PeerInputClass::ResponseTerminal,
1057                meerkat_core::PeerIngressKind::Response,
1058                Some(meerkat_core::PeerIngressAuthDecision::Required),
1059                PeerIngressIdentity::new(
1060                    test_peer_id(),
1061                    "peer-1",
1062                    PeerIngressConvention::Response {
1063                        in_reply_to,
1064                        status: ResponseStatus::Completed,
1065                    },
1066                ),
1067            ),
1068            lifecycle_peer: None,
1069            response_terminality: None,
1070        };
1071
1072        let result =
1073            classified_interaction_to_runtime_input(&classified, &LogicalRuntimeId::new("test"));
1074        assert!(
1075            matches!(
1076                result,
1077                Err(PeerIngressProjectionError::MissingResponseTerminality { interaction_id })
1078                    if interaction_id == id
1079            ),
1080            "runtime projection must not infer public terminality from raw status: {result:?}"
1081        );
1082    }
1083
1084    #[test]
1085    fn response_terminal_without_canonical_peer_id_fails_typed_projection() {
1086        let in_reply_to = make_interaction_id();
1087        let interaction_id = make_interaction_id();
1088        let candidate = PeerInputCandidate {
1089            interaction: InboxInteraction {
1090                from_route: None,
1091                from: "Peer One".into(),
1092                content: InteractionContent::Response {
1093                    status: ResponseStatus::Completed,
1094                    result: serde_json::json!({"ok": true}),
1095                    in_reply_to,
1096                    blocks: None,
1097                },
1098                id: interaction_id,
1099                rendered_text: String::new(),
1100                handling_mode: meerkat_core::types::HandlingMode::Queue,
1101                render_metadata: None,
1102            },
1103            ingress: PeerIngressFact {
1104                interaction_id,
1105                class: PeerInputClass::ResponseTerminal,
1106                kind: meerkat_core::PeerIngressKind::Response,
1107                canonical_peer_id: None,
1108                display_name: meerkat_core::comms::PeerName::new("Peer One".to_string()).ok(),
1109                signing_pubkey: None,
1110                route: None,
1111                auth: Some(meerkat_core::PeerIngressAuthDecision::Required),
1112                convention: PeerIngressConvention::Response {
1113                    in_reply_to,
1114                    status: ResponseStatus::Completed,
1115                },
1116            },
1117            lifecycle_peer: None,
1118            response_terminality: Some(meerkat_core::TerminalityClass::Terminal {
1119                disposition: meerkat_core::TerminalDisposition::Completed,
1120            }),
1121        };
1122        let err =
1123            classified_interaction_to_runtime_input(&candidate, &LogicalRuntimeId::new("test"))
1124                .unwrap_err();
1125        assert!(matches!(
1126            err,
1127            PeerIngressProjectionError::MissingCanonicalPeerId { .. }
1128        ));
1129    }
1130
1131    #[test]
1132    fn response_failed_to_terminal() {
1133        let in_reply_to = make_interaction_id();
1134        let route_id = meerkat_core::comms::PeerId::from_uuid(
1135            uuid::Uuid::parse_str("018f6f79-7a82-7c4e-a552-a3b86f9630f3").unwrap(),
1136        );
1137        let interaction = InboxInteraction {
1138            from_route: Some(route_id),
1139            from: "peer-1".into(),
1140            content: InteractionContent::Response {
1141                status: ResponseStatus::Failed,
1142                result: serde_json::json!({"error": "timeout"}),
1143                in_reply_to,
1144                blocks: None,
1145            },
1146            id: make_interaction_id(),
1147            rendered_text: String::new(),
1148            handling_mode: meerkat_core::types::HandlingMode::Queue,
1149            render_metadata: None,
1150        };
1151        let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
1152        if let Input::Peer(p) = &input {
1153            assert!(matches!(
1154                p.convention,
1155                Some(PeerConvention::ResponseTerminal {
1156                    status: ResponseTerminalStatus::Failed,
1157                    ..
1158                })
1159            ));
1160        } else {
1161            panic!("Expected PeerInput");
1162        }
1163    }
1164
1165    #[test]
1166    fn response_accepted_to_progress() {
1167        let in_reply_to = make_interaction_id();
1168        let interaction = InboxInteraction {
1169            from_route: None,
1170            from: "peer-1".into(),
1171            content: InteractionContent::Response {
1172                status: ResponseStatus::Accepted,
1173                result: serde_json::json!(null),
1174                in_reply_to,
1175                blocks: None,
1176            },
1177            id: make_interaction_id(),
1178            rendered_text: String::new(),
1179            handling_mode: meerkat_core::types::HandlingMode::Queue,
1180            render_metadata: None,
1181        };
1182        let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
1183        if let Input::Peer(p) = &input {
1184            assert!(matches!(
1185                p.convention,
1186                Some(PeerConvention::ResponseProgress {
1187                    phase: ResponseProgressPhase::Accepted,
1188                    ..
1189                })
1190            ));
1191            assert_eq!(p.header.durability, InputDurability::Ephemeral);
1192            assert!(
1193                p.handling_mode.is_none(),
1194                "ResponseProgress inputs must not carry handling_mode"
1195            );
1196        } else {
1197            panic!("Expected PeerInput");
1198        }
1199    }
1200
1201    #[test]
1202    fn classified_response_uses_ingress_terminality_over_raw_status() {
1203        let in_reply_to = make_interaction_id();
1204        let id = make_interaction_id();
1205        let classified = PeerInputCandidate {
1206            interaction: InboxInteraction {
1207                from_route: None,
1208                from: "peer-1".into(),
1209                content: InteractionContent::Response {
1210                    status: ResponseStatus::Completed,
1211                    result: serde_json::json!({"ok": true}),
1212                    in_reply_to,
1213                    blocks: None,
1214                },
1215                id,
1216                rendered_text: String::new(),
1217                handling_mode: meerkat_core::types::HandlingMode::Queue,
1218                render_metadata: None,
1219            },
1220            ingress: PeerIngressFact::peer(
1221                id,
1222                PeerInputClass::ResponseProgress,
1223                meerkat_core::PeerIngressKind::Response,
1224                Some(meerkat_core::PeerIngressAuthDecision::Required),
1225                PeerIngressIdentity::new(
1226                    test_peer_id(),
1227                    "peer-1",
1228                    PeerIngressConvention::Response {
1229                        in_reply_to,
1230                        status: ResponseStatus::Completed,
1231                    },
1232                ),
1233            ),
1234            lifecycle_peer: None,
1235            response_terminality: Some(meerkat_core::TerminalityClass::Progress),
1236        };
1237
1238        let input =
1239            classified_interaction_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
1240                .expect("classified response should project");
1241
1242        if let Input::Peer(p) = &input {
1243            assert!(matches!(
1244                p.convention,
1245                Some(PeerConvention::ResponseProgress {
1246                    phase: ResponseProgressPhase::Accepted,
1247                    ..
1248                })
1249            ));
1250            assert_eq!(p.header.durability, InputDurability::Ephemeral);
1251            assert_eq!(p.handling_mode, None);
1252        } else {
1253            panic!("Expected PeerInput");
1254        }
1255    }
1256
1257    #[test]
1258    fn peer_source_includes_runtime_id() {
1259        let interaction = InboxInteraction {
1260            from_route: None,
1261            from: "peer-1".into(),
1262            content: InteractionContent::Message {
1263                body: "hi".into(),
1264                blocks: None,
1265            },
1266            id: make_interaction_id(),
1267            rendered_text: String::new(),
1268            handling_mode: meerkat_core::types::HandlingMode::Queue,
1269            render_metadata: None,
1270        };
1271        let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("agent-runtime-1"));
1272        if let Input::Peer(p) = &input {
1273            if let InputOrigin::Peer {
1274                peer_id,
1275                display_identity,
1276                runtime_id,
1277                ..
1278            } = &p.header.source
1279            {
1280                assert_eq!(peer_id, &test_peer_id().as_str());
1281                assert_eq!(display_identity.as_deref(), Some("peer-1"));
1282                assert_eq!(runtime_id.as_ref().unwrap().0, "agent-runtime-1");
1283            } else {
1284                panic!("Expected Peer source");
1285            }
1286        } else {
1287            panic!("Expected PeerInput");
1288        }
1289    }
1290
1291    #[test]
1292    fn all_interaction_types_produce_valid_inputs() {
1293        let in_reply_to = make_interaction_id();
1294        let interactions = vec![
1295            InboxInteraction {
1296                from_route: None,
1297                from: "p".into(),
1298                content: InteractionContent::Message {
1299                    body: "m".into(),
1300                    blocks: None,
1301                },
1302                id: make_interaction_id(),
1303                rendered_text: String::new(),
1304                handling_mode: meerkat_core::types::HandlingMode::Queue,
1305                render_metadata: None,
1306            },
1307            InboxInteraction {
1308                from_route: None,
1309                from: "p".into(),
1310                content: InteractionContent::Request {
1311                    intent: "i".into(),
1312                    params: serde_json::json!({}),
1313                    blocks: None,
1314                },
1315                id: make_interaction_id(),
1316                rendered_text: String::new(),
1317                handling_mode: meerkat_core::types::HandlingMode::Queue,
1318                render_metadata: None,
1319            },
1320            InboxInteraction {
1321                from_route: Some(meerkat_core::comms::PeerId::from_uuid(
1322                    uuid::Uuid::parse_str("018f6f79-7a82-7c4e-a552-a3b86f9630f6").unwrap(),
1323                )),
1324                from: "p".into(),
1325                content: InteractionContent::Response {
1326                    status: ResponseStatus::Completed,
1327                    result: serde_json::json!(null),
1328                    in_reply_to,
1329                    blocks: None,
1330                },
1331                id: make_interaction_id(),
1332                rendered_text: String::new(),
1333                handling_mode: meerkat_core::types::HandlingMode::Queue,
1334                render_metadata: None,
1335            },
1336        ];
1337
1338        let rid = LogicalRuntimeId::new("test");
1339        for interaction in &interactions {
1340            let input = peer_input_for_test(interaction, &rid);
1341            assert!(matches!(input, Input::Peer(_)));
1342        }
1343    }
1344}