1use chrono::Utc;
7#[cfg(test)]
8use meerkat_core::comms::PeerId;
9use meerkat_core::interaction::{
10 InboxInteraction, InteractionContent, PeerIngressConvention, PeerIngressFact, PeerIngressKind,
11 PeerInputCandidate, PeerInputClass,
12};
13#[cfg(test)]
14use meerkat_core::interaction::{PeerIngressIdentity, ResponseStatus};
15use meerkat_core::lifecycle::InputId;
16
17use crate::identifiers::{CorrelationId, LogicalRuntimeId};
18use crate::input::{
19 ExternalEventInput, Input, InputDurability, InputHeader, InputOrigin, InputVisibility,
20 PeerConvention, PeerInput, ResponseProgressPhase, ResponseTerminalStatus,
21};
22
23#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
24pub enum PeerIngressProjectionError {
25 #[error(
26 "classified peer ingress {interaction_id} ({kind:?}) cannot project to a runtime PeerInput"
27 )]
28 UnsupportedPeerConvention {
29 interaction_id: meerkat_core::InteractionId,
30 kind: PeerIngressKind,
31 },
32 #[error("classified peer ingress {interaction_id} missing canonical peer id")]
33 MissingCanonicalPeerId {
34 interaction_id: meerkat_core::InteractionId,
35 },
36 #[error("classified peer response {interaction_id} missing machine response terminality")]
37 MissingResponseTerminality {
38 interaction_id: meerkat_core::InteractionId,
39 },
40}
41
42pub fn classified_interaction_to_runtime_input(
45 classified: &PeerInputCandidate,
46 runtime_id: &LogicalRuntimeId,
47) -> Result<Input, PeerIngressProjectionError> {
48 let interaction = &classified.interaction;
49
50 if classified.class() == PeerInputClass::PlainEvent {
51 let source_name = classified
52 .ingress
53 .plain_event_source_name()
54 .unwrap_or("unknown");
55 let blocks = external_event_blocks(interaction);
56 return Ok(Input::ExternalEvent(ExternalEventInput {
57 header: InputHeader {
58 id: InputId::new(),
59 timestamp: Utc::now(),
60 source: InputOrigin::External {
61 source_name: source_name.to_string(),
62 },
63 durability: InputDurability::Durable,
64 visibility: InputVisibility {
65 transcript_eligible: true,
66 operator_eligible: true,
67 },
68 idempotency_key: None,
69 supersession_key: None,
70 correlation_id: Some(CorrelationId::from_uuid(interaction.id.0)),
71 },
72 event_type: source_name.to_string(),
73 payload: external_event_payload(interaction),
74 blocks,
75 handling_mode: interaction.handling_mode,
76 render_metadata: interaction.render_metadata.clone(),
77 }));
78 }
79
80 peer_candidate_to_peer_input(classified, runtime_id)
81}
82
83pub fn peer_input_candidate_to_runtime_input(
85 classified: &PeerInputCandidate,
86 runtime_id: &LogicalRuntimeId,
87) -> Result<Input, PeerIngressProjectionError> {
88 classified_interaction_to_runtime_input(classified, runtime_id)
89}
90
91fn peer_candidate_to_peer_input(
92 classified: &PeerInputCandidate,
93 runtime_id: &LogicalRuntimeId,
94) -> Result<Input, PeerIngressProjectionError> {
95 peer_input_from_ingress_fact(
96 &classified.interaction,
97 runtime_id,
98 &classified.ingress,
99 classified.response_terminality,
100 )
101}
102
103fn peer_input_from_ingress_fact(
104 interaction: &InboxInteraction,
105 runtime_id: &LogicalRuntimeId,
106 ingress: &PeerIngressFact,
107 response_terminality: Option<meerkat_core::interaction::TerminalityClass>,
108) -> Result<Input, PeerIngressProjectionError> {
109 let convention = map_ingress_convention(interaction.id, ingress, response_terminality)?;
110 let durability = map_durability(&convention);
111 let handling_mode = match &convention {
112 PeerConvention::ResponseProgress { .. } => None,
113 _ => Some(interaction.handling_mode),
114 };
115 let peer_id = ingress.canonical_peer_id_string().ok_or(
116 PeerIngressProjectionError::MissingCanonicalPeerId {
117 interaction_id: interaction.id,
118 },
119 )?;
120 let display_identity = ingress
121 .route
122 .as_ref()
123 .map(meerkat_core::PeerRoute::label)
124 .or_else(|| ingress.display_label());
125
126 Ok(Input::Peer(PeerInput {
127 header: InputHeader {
128 id: InputId::new(),
129 timestamp: Utc::now(),
130 source: InputOrigin::Peer {
131 peer_id,
132 display_identity,
133 runtime_id: Some(runtime_id.clone()),
134 },
135 durability,
136 visibility: InputVisibility {
137 transcript_eligible: true,
138 operator_eligible: true,
139 },
140 idempotency_key: None,
141 supersession_key: None,
142 correlation_id: Some(CorrelationId::from_uuid(interaction.id.0)),
143 },
144 convention: Some(convention),
145 body: peer_rendered_body(interaction),
146 payload: peer_payload(interaction),
147 blocks: peer_blocks(interaction),
148 handling_mode,
149 }))
150}
151
152fn map_ingress_convention(
153 interaction_id: meerkat_core::InteractionId,
154 ingress: &PeerIngressFact,
155 response_terminality: Option<meerkat_core::interaction::TerminalityClass>,
156) -> Result<PeerConvention, PeerIngressProjectionError> {
157 match &ingress.convention {
158 PeerIngressConvention::Message => Ok(PeerConvention::Message),
159 PeerIngressConvention::Request { request_id, intent } => Ok(PeerConvention::Request {
160 request_id: request_id.clone(),
161 intent: intent.clone(),
162 }),
163 PeerIngressConvention::Response {
164 in_reply_to,
165 status: _,
166 } => {
167 let terminality = response_terminality
168 .ok_or(PeerIngressProjectionError::MissingResponseTerminality { interaction_id })?;
169 Ok(map_response_convention(*in_reply_to, terminality))
170 }
171 PeerIngressConvention::Lifecycle { kind, .. } => Ok(PeerConvention::Request {
172 request_id: ingress.interaction_id.to_string(),
173 intent: kind.to_string(),
174 }),
175 PeerIngressConvention::Ack { .. } | PeerIngressConvention::PlainEvent { .. } => {
176 Err(PeerIngressProjectionError::UnsupportedPeerConvention {
177 interaction_id,
178 kind: ingress.kind,
179 })
180 }
181 }
182}
183
184fn map_response_convention(
185 in_reply_to: meerkat_core::InteractionId,
186 terminality: meerkat_core::interaction::TerminalityClass,
187) -> PeerConvention {
188 let request_id = in_reply_to.to_string();
189 match terminality {
190 meerkat_core::interaction::TerminalityClass::Progress => PeerConvention::ResponseProgress {
191 request_id,
192 phase: ResponseProgressPhase::Accepted,
193 },
194 meerkat_core::interaction::TerminalityClass::Terminal { disposition } => {
195 let term = match disposition {
196 meerkat_core::interaction::TerminalDisposition::Completed => {
197 ResponseTerminalStatus::Completed
198 }
199 meerkat_core::interaction::TerminalDisposition::Failed => {
200 ResponseTerminalStatus::Failed
201 }
202 other => {
203 tracing::warn!(
204 disposition = ?other,
205 "unknown terminal disposition; treating as Failed"
206 );
207 ResponseTerminalStatus::Failed
208 }
209 };
210 PeerConvention::ResponseTerminal {
211 request_id,
212 status: term,
213 }
214 }
215 other => {
216 tracing::warn!(
217 class = ?other,
218 "unknown terminality class; routing response as progress (non-terminal)"
219 );
220 PeerConvention::ResponseProgress {
221 request_id,
222 phase: ResponseProgressPhase::Accepted,
223 }
224 }
225 }
226}
227
228fn peer_rendered_body(interaction: &InboxInteraction) -> String {
229 if !interaction.rendered_text.trim().is_empty() {
230 return interaction.rendered_text.clone();
231 }
232 match &interaction.content {
233 InteractionContent::Message { body, .. } => body.clone(),
234 InteractionContent::Request { params, .. } => {
235 serde_json::to_string(params).unwrap_or_default()
236 }
237 InteractionContent::Response { result, .. } => {
238 serde_json::to_string(result).unwrap_or_default()
239 }
240 }
241}
242
243fn peer_blocks(interaction: &InboxInteraction) -> Option<Vec<meerkat_core::types::ContentBlock>> {
244 match &interaction.content {
245 InteractionContent::Message { blocks, .. } => blocks.clone(),
246 InteractionContent::Request { blocks, .. } => blocks.clone(),
247 InteractionContent::Response { blocks, .. } => blocks.clone(),
248 }
249}
250
251fn peer_payload(interaction: &InboxInteraction) -> Option<serde_json::Value> {
252 match &interaction.content {
253 InteractionContent::Message { .. } => None,
254 InteractionContent::Request { params, .. } => Some(params.clone()),
255 InteractionContent::Response { result, .. } => Some(result.clone()),
256 }
257}
258
259fn external_event_payload(interaction: &InboxInteraction) -> serde_json::Value {
260 match &interaction.content {
261 InteractionContent::Message { body, .. } => serde_json::json!({ "body": body }),
262 InteractionContent::Request { intent, params, .. } => {
263 serde_json::json!({ "intent": intent, "params": params })
264 }
265 InteractionContent::Response {
266 in_reply_to,
267 status,
268 result,
269 blocks,
270 } => serde_json::json!({
271 "in_reply_to": in_reply_to,
272 "status": status,
273 "result": result,
274 "blocks": blocks,
275 }),
276 }
277}
278
279fn external_event_blocks(
280 interaction: &InboxInteraction,
281) -> Option<Vec<meerkat_core::types::ContentBlock>> {
282 match &interaction.content {
283 InteractionContent::Message { blocks, .. } => blocks.clone(),
284 InteractionContent::Request { blocks, .. } => blocks.clone(),
285 _ => None,
286 }
287}
288
289fn map_durability(convention: &PeerConvention) -> InputDurability {
290 match convention {
291 PeerConvention::ResponseProgress { .. } => InputDurability::Ephemeral,
292 _ => InputDurability::Durable,
293 }
294}
295
296#[cfg(test)]
297#[allow(clippy::unwrap_used, clippy::panic)]
298mod tests {
299 use super::*;
300 use meerkat_core::interaction::{PeerIngressIdentity, ResponseStatus};
301
302 fn make_interaction_id() -> meerkat_core::interaction::InteractionId {
303 meerkat_core::interaction::InteractionId(meerkat_core::time_compat::new_uuid_v7())
304 }
305
306 fn plain_event_ingress(
307 id: meerkat_core::interaction::InteractionId,
308 source_name: &str,
309 ) -> PeerIngressFact {
310 PeerIngressFact::plain_event(
311 id,
312 source_name,
313 PeerInputClass::PlainEvent,
314 meerkat_core::PeerIngressKind::PlainEvent,
315 )
316 }
317
318 fn test_peer_id() -> PeerId {
319 PeerId::parse("22222222-2222-4222-8222-222222222222").expect("canonical test peer id")
320 }
321
322 fn peer_kind_for_convention(
323 convention: &PeerIngressConvention,
324 ) -> meerkat_core::PeerIngressKind {
325 match convention {
326 PeerIngressConvention::Message => meerkat_core::PeerIngressKind::Message,
327 PeerIngressConvention::Request { .. } | PeerIngressConvention::Lifecycle { .. } => {
328 meerkat_core::PeerIngressKind::Request
329 }
330 PeerIngressConvention::Response { .. } => meerkat_core::PeerIngressKind::Response,
331 PeerIngressConvention::Ack { .. } => meerkat_core::PeerIngressKind::Ack,
332 PeerIngressConvention::PlainEvent { .. } => meerkat_core::PeerIngressKind::PlainEvent,
333 }
334 }
335
336 fn peer_ingress(
337 id: meerkat_core::interaction::InteractionId,
338 peer_id: PeerId,
339 label: &str,
340 class: PeerInputClass,
341 convention: PeerIngressConvention,
342 ) -> PeerIngressFact {
343 let kind = peer_kind_for_convention(&convention);
344 PeerIngressFact::peer(
345 id,
346 class,
347 kind,
348 Some(meerkat_core::PeerIngressAuthDecision::Required),
349 PeerIngressIdentity::new(peer_id, label, convention),
350 )
351 }
352
353 fn candidate_for_interaction(interaction: InboxInteraction) -> PeerInputCandidate {
354 let id = interaction.id;
355 let label = interaction.from.clone();
356 let peer_id = interaction.from_route.unwrap_or_else(test_peer_id);
357 let (class, convention, response_terminality) = match &interaction.content {
358 InteractionContent::Message { .. } => (
359 PeerInputClass::ActionableMessage,
360 PeerIngressConvention::Message,
361 None,
362 ),
363 InteractionContent::Request { intent, .. } => (
364 PeerInputClass::ActionableRequest,
365 PeerIngressConvention::Request {
366 request_id: id.to_string(),
367 intent: intent.clone(),
368 },
369 None,
370 ),
371 InteractionContent::Response {
372 in_reply_to,
373 status,
374 ..
375 } => {
376 let terminality = meerkat_core::interaction::classify_response_terminality(*status);
377 let class = match terminality {
378 meerkat_core::TerminalityClass::Progress => PeerInputClass::ResponseProgress,
379 meerkat_core::TerminalityClass::Terminal { .. } => {
380 PeerInputClass::ResponseTerminal
381 }
382 _ => PeerInputClass::ResponseTerminal,
383 };
384 (
385 class,
386 PeerIngressConvention::Response {
387 in_reply_to: *in_reply_to,
388 status: *status,
389 },
390 Some(terminality),
391 )
392 }
393 };
394 let ingress = peer_ingress(id, peer_id, &label, class, convention);
395 PeerInputCandidate {
396 interaction,
397 ingress,
398 lifecycle_peer: None,
399 response_terminality,
400 }
401 }
402
403 fn peer_input_for_test(interaction: &InboxInteraction, runtime_id: &LogicalRuntimeId) -> Input {
404 let candidate = candidate_for_interaction(interaction.clone());
405 peer_input_candidate_to_runtime_input(&candidate, runtime_id)
406 .expect("test candidate should project to runtime input")
407 }
408
409 #[test]
410 fn message_to_peer_input() {
411 let interaction = InboxInteraction {
412 from_route: None,
413 from: "peer-1".into(),
414 content: InteractionContent::Message {
415 body: "hello".into(),
416 blocks: None,
417 },
418 id: make_interaction_id(),
419 rendered_text: String::new(),
420 handling_mode: meerkat_core::types::HandlingMode::Queue,
421 render_metadata: None,
422 };
423 let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
424 if let Input::Peer(p) = &input {
425 assert!(matches!(p.convention, Some(PeerConvention::Message)));
426 assert_eq!(p.body, "hello");
427 assert_eq!(p.header.durability, InputDurability::Durable);
428 assert_eq!(
429 p.handling_mode,
430 Some(meerkat_core::types::HandlingMode::Queue),
431 "explicit queue must survive comms -> runtime projection so it can suppress running-turn interruption"
432 );
433 } else {
434 panic!("Expected PeerInput");
435 }
436 }
437
438 #[test]
439 fn request_to_peer_input() {
440 let interaction = InboxInteraction {
441 from_route: None,
442 from: "peer-1".into(),
443 content: InteractionContent::Request {
444 intent: "mob.peer_added".into(),
445 params: serde_json::json!({"name": "agent-1"}),
446 blocks: None,
447 },
448 id: make_interaction_id(),
449 rendered_text: String::new(),
450 handling_mode: meerkat_core::types::HandlingMode::Queue,
451 render_metadata: None,
452 };
453 let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
454 if let Input::Peer(p) = &input {
455 assert!(matches!(p.convention, Some(PeerConvention::Request { .. })));
456 match p.convention.as_ref() {
457 Some(PeerConvention::Request { request_id, .. }) => {
458 assert_eq!(request_id, &interaction.id.0.to_string());
459 }
460 other => panic!("Expected request convention, got {other:?}"),
461 }
462 assert_eq!(p.header.durability, InputDurability::Durable);
463 assert_eq!(
464 p.payload,
465 Some(serde_json::json!({"name": "agent-1"})),
466 "request params must remain structured on PeerInput so runtime prompt projection does not depend on pre-rendered comms prose"
467 );
468 assert_eq!(
469 p.handling_mode,
470 Some(meerkat_core::types::HandlingMode::Queue),
471 "explicit queue request semantics must not collapse to default policy"
472 );
473 } else {
474 panic!("Expected PeerInput");
475 }
476 }
477
478 #[test]
479 fn classified_request_uses_canonical_peer_id_for_runtime_projection() {
480 let source_peer_id =
481 PeerId::parse("11111111-1111-4111-8111-111111111111").expect("canonical peer id");
482 let request_id = make_interaction_id();
483 let classified = PeerInputCandidate {
484 interaction: InboxInteraction {
485 from_route: None,
486 from: "test-mob/lead/l-requester".into(),
487 content: InteractionContent::Request {
488 intent: "interpret_image".into(),
489 params: serde_json::json!({"description": "tower with a light"}),
490 blocks: None,
491 },
492 id: request_id,
493 rendered_text: "stale helper prose".into(),
494 handling_mode: meerkat_core::types::HandlingMode::Steer,
495 render_metadata: None,
496 },
497 ingress: PeerIngressFact::peer(
498 request_id,
499 PeerInputClass::ActionableRequest,
500 meerkat_core::PeerIngressKind::Request,
501 Some(meerkat_core::PeerIngressAuthDecision::Required),
502 PeerIngressIdentity::new(
503 source_peer_id,
504 "test-mob/lead/l-requester",
505 PeerIngressConvention::Request {
506 request_id: request_id.to_string(),
507 intent: "interpret_image".to_string(),
508 },
509 ),
510 ),
511 lifecycle_peer: None,
512 response_terminality: None,
513 };
514
515 let input =
516 peer_input_candidate_to_runtime_input(&classified, &LogicalRuntimeId::new("worker"))
517 .expect("classified request should project to peer input");
518 let Input::Peer(peer) = &input else {
519 panic!("Expected PeerInput");
520 };
521 let InputOrigin::Peer { peer_id, .. } = &peer.header.source else {
522 panic!("Expected peer source");
523 };
524 assert_eq!(peer_id, "11111111-1111-4111-8111-111111111111");
525 assert_eq!(peer.body, "stale helper prose");
526
527 let prompt = crate::input::input_prompt_text(&input);
528 assert!(prompt.starts_with(
529 "Peer request from peer_id 11111111-1111-4111-8111-111111111111 (display_name: test-mob/lead/l-requester)."
530 ));
531 assert!(prompt.contains("\"peer_id\":\"11111111-1111-4111-8111-111111111111\""));
532 assert!(prompt.contains("\"display_name\":\"test-mob/lead/l-requester\""));
533 assert!(prompt.contains(&format!("\"in_reply_to\":\"{}\"", request_id.0)));
534 assert!(prompt.contains("\"status\":\"completed\""));
535 assert!(!prompt.contains("to=\""));
536 }
537
538 #[test]
539 fn plain_event_to_external_event_input() {
540 let id = make_interaction_id();
541 let classified = PeerInputCandidate {
542 lifecycle_peer: None,
543 response_terminality: None,
544 ingress: plain_event_ingress(id, "webhook"),
545 interaction: InboxInteraction {
546 from_route: None,
547 from: "event:webhook".into(),
548 content: InteractionContent::Message {
549 body: "{\"ok\":true}".into(),
550 blocks: None,
551 },
552 id,
553 rendered_text: String::new(),
554 handling_mode: meerkat_core::types::HandlingMode::Queue,
555 render_metadata: None,
556 },
557 };
558 let input =
559 peer_input_candidate_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
560 .expect("plain event should project to external event input");
561 match input {
562 Input::ExternalEvent(event) => {
563 assert_eq!(event.event_type, "webhook");
564 assert_eq!(event.payload["body"], "{\"ok\":true}");
565 assert_eq!(event.blocks, None);
566 assert_eq!(
567 event.handling_mode,
568 meerkat_core::types::HandlingMode::Queue
569 );
570 assert_eq!(event.render_metadata, None);
571 }
572 other => panic!("Expected ExternalEvent input, got {other:?}"),
573 }
574 }
575
576 #[test]
577 fn peer_named_event_prefix_stays_peer_without_plain_event_class() {
578 let id = make_interaction_id();
579 let classified = PeerInputCandidate {
580 lifecycle_peer: None,
581 response_terminality: None,
582 ingress: peer_ingress(
583 id,
584 test_peer_id(),
585 "event:webhook",
586 PeerInputClass::ActionableMessage,
587 PeerIngressConvention::Message,
588 ),
589 interaction: InboxInteraction {
590 from_route: None,
591 from: "event:webhook".into(),
592 content: InteractionContent::Message {
593 body: "hello".into(),
594 blocks: None,
595 },
596 id,
597 rendered_text: "stale rendered text".into(),
598 handling_mode: meerkat_core::types::HandlingMode::Queue,
599 render_metadata: None,
600 },
601 };
602 let input =
603 peer_input_candidate_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
604 .expect("classified peer event should project to peer input");
605 match input {
606 Input::Peer(peer) => {
607 assert_eq!(peer.body, "stale rendered text");
608 match peer.header.source {
609 InputOrigin::Peer { peer_id, .. } => {
610 assert_eq!(peer_id, test_peer_id().as_str());
611 }
612 other => panic!("Expected peer source, got {other:?}"),
613 }
614 }
615 other => panic!("Expected Peer input, got {other:?}"),
616 }
617 }
618
619 #[test]
620 fn classified_peer_projection_uses_ingress_canonical_peer_id_not_display_from() {
621 let id = make_interaction_id();
622 let canonical_peer_id = meerkat_core::comms::PeerId::new();
623 let classified = PeerInputCandidate {
624 lifecycle_peer: None,
625 response_terminality: None,
626 ingress: PeerIngressFact::peer(
627 id,
628 PeerInputClass::ActionableRequest,
629 meerkat_core::PeerIngressKind::Request,
630 Some(meerkat_core::PeerIngressAuthDecision::Required),
631 PeerIngressIdentity::new(
632 canonical_peer_id,
633 "display-agent",
634 PeerIngressConvention::Request {
635 request_id: id.to_string(),
636 intent: "review".to_string(),
637 },
638 ),
639 ),
640 interaction: InboxInteraction {
641 from_route: None,
642 from: "display-agent".into(),
643 content: InteractionContent::Request {
644 intent: "review".into(),
645 params: serde_json::json!({"pr": 42}),
646 blocks: None,
647 },
648 id,
649 rendered_text: "stale rendered text".into(),
650 handling_mode: meerkat_core::types::HandlingMode::Queue,
651 render_metadata: None,
652 },
653 };
654
655 let input =
656 peer_input_candidate_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
657 .expect("classified peer projection should use typed canonical id");
658 let Input::Peer(peer) = input else {
659 panic!("Expected Peer input");
660 };
661 match peer.header.source {
662 InputOrigin::Peer { peer_id, .. } => {
663 assert_eq!(peer_id, canonical_peer_id.as_str());
664 assert_ne!(peer_id, "display-agent");
665 }
666 other => panic!("Expected peer source, got {other:?}"),
667 }
668 assert_eq!(peer.body, "stale rendered text");
669 }
670
671 #[test]
672 fn classified_peer_projection_rejects_display_only_ingress_identity() {
673 let id = make_interaction_id();
674 let classified = PeerInputCandidate {
675 lifecycle_peer: None,
676 response_terminality: None,
677 ingress: PeerIngressFact::legacy_peer_label(
678 id,
679 "display-agent",
680 PeerInputClass::ActionableMessage,
681 meerkat_core::PeerIngressKind::Message,
682 Some(meerkat_core::PeerIngressAuthDecision::Required),
683 PeerIngressConvention::Message,
684 ),
685 interaction: InboxInteraction {
686 from_route: None,
687 from: "display-agent".into(),
688 content: InteractionContent::Message {
689 body: "hello".into(),
690 blocks: None,
691 },
692 id,
693 rendered_text: "stale rendered text".into(),
694 handling_mode: meerkat_core::types::HandlingMode::Queue,
695 render_metadata: None,
696 },
697 };
698
699 let result =
700 peer_input_candidate_to_runtime_input(&classified, &LogicalRuntimeId::new("test"));
701 assert!(
702 matches!(
703 result,
704 Err(PeerIngressProjectionError::MissingCanonicalPeerId { interaction_id })
705 if interaction_id == id
706 ),
707 "display-only ingress must fail closed, got {result:?}"
708 );
709 }
710
711 #[test]
712 fn request_body_preserves_rendered_text_and_structured_payload() {
713 let interaction = InboxInteraction {
714 from_route: None,
715 from: "event:webhook".into(),
716 content: InteractionContent::Request {
717 intent: "mob.peer_added".into(),
718 params: serde_json::json!({"peer":"agent-1"}),
719 blocks: None,
720 },
721 id: make_interaction_id(),
722 rendered_text: "stale rendered text".into(),
723 handling_mode: meerkat_core::types::HandlingMode::Queue,
724 render_metadata: None,
725 };
726 let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
727 if let Input::Peer(peer) = input {
728 assert_eq!(peer.body, "stale rendered text");
729 assert_eq!(peer.payload, Some(serde_json::json!({"peer":"agent-1"})));
730 } else {
731 panic!("Expected PeerInput");
732 }
733 }
734
735 #[test]
736 fn message_blocks_are_preserved_on_peer_input() {
737 let blocks = vec![
738 meerkat_core::types::ContentBlock::Text {
739 text: "see image".into(),
740 },
741 meerkat_core::types::ContentBlock::Image {
742 media_type: "image/png".into(),
743 data: "abc".into(),
744 },
745 ];
746 let interaction = InboxInteraction {
747 from_route: None,
748 from: "peer-1".into(),
749 content: InteractionContent::Message {
750 body: "see image".into(),
751 blocks: Some(blocks.clone()),
752 },
753 id: make_interaction_id(),
754 rendered_text: "stale rendered text".into(),
755 handling_mode: meerkat_core::types::HandlingMode::Queue,
756 render_metadata: None,
757 };
758 let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
759 if let Input::Peer(peer) = input {
760 assert_eq!(peer.body, "stale rendered text");
761 assert_eq!(peer.blocks, Some(blocks));
762 } else {
763 panic!("Expected PeerInput");
764 }
765 }
766
767 #[test]
768 fn request_blocks_are_preserved_on_peer_input() {
769 let blocks = vec![
770 meerkat_core::types::ContentBlock::Text {
771 text: "describe this image".into(),
772 },
773 meerkat_core::types::ContentBlock::Image {
774 media_type: "image/png".into(),
775 data: "abc".into(),
776 },
777 ];
778 let interaction_id = make_interaction_id();
779 let peer_id = PeerId::new();
780 let classified = PeerInputCandidate {
781 interaction: InboxInteraction {
782 from_route: Some(peer_id),
783 from: "vision-peer".into(),
784 content: InteractionContent::Request {
785 intent: "checksum_token".into(),
786 params: serde_json::json!({"subject": "describe-image"}),
787 blocks: Some(blocks.clone()),
788 },
789 id: interaction_id,
790 rendered_text: String::new(),
791 handling_mode: meerkat_core::types::HandlingMode::Steer,
792 render_metadata: None,
793 },
794 ingress: PeerIngressFact::peer(
795 interaction_id,
796 PeerInputClass::ActionableRequest,
797 PeerIngressKind::Request,
798 Some(meerkat_core::interaction::PeerIngressAuthDecision::Required),
799 PeerIngressIdentity::new(
800 peer_id,
801 "vision-peer",
802 meerkat_core::interaction::PeerIngressConvention::Request {
803 request_id: interaction_id.to_string(),
804 intent: "checksum_token".to_string(),
805 },
806 ),
807 ),
808 lifecycle_peer: None,
809 response_terminality: None,
810 };
811
812 let input = classified_interaction_to_runtime_input(
813 &classified,
814 &LogicalRuntimeId::new("runtime-a"),
815 )
816 .expect("classified request should project");
817 if let Input::Peer(peer) = input {
818 assert_eq!(peer.blocks, Some(blocks));
819 assert_eq!(
820 peer.payload,
821 Some(serde_json::json!({"subject": "describe-image"}))
822 );
823 } else {
824 panic!("Expected PeerInput");
825 }
826 }
827
828 #[test]
829 fn multimodal_message_uses_rendered_projection_while_preserving_blocks() {
830 let blocks = vec![
831 meerkat_core::types::ContentBlock::Text {
832 text: "caption text".into(),
833 },
834 meerkat_core::types::ContentBlock::Image {
835 media_type: "image/png".into(),
836 data: "abc".into(),
837 },
838 ];
839 let interaction = InboxInteraction {
840 from_route: None,
841 from: "peer-1".into(),
842 content: InteractionContent::Message {
843 body: "please inspect this image".into(),
844 blocks: Some(blocks),
845 },
846 id: make_interaction_id(),
847 rendered_text: "stale rendered text".into(),
848 handling_mode: meerkat_core::types::HandlingMode::Queue,
849 render_metadata: None,
850 };
851 let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
852 if let Input::Peer(peer) = input {
853 assert_eq!(peer.body, "stale rendered text");
854 } else {
855 panic!("Expected PeerInput");
856 }
857 }
858
859 #[test]
860 fn plain_event_blocks_are_preserved_on_external_event_input() {
861 let blocks = vec![
862 meerkat_core::types::ContentBlock::Text {
863 text: "see image".into(),
864 },
865 meerkat_core::types::ContentBlock::Image {
866 media_type: "image/png".into(),
867 data: "abc".into(),
868 },
869 ];
870 let id = make_interaction_id();
871 let classified = PeerInputCandidate {
872 lifecycle_peer: None,
873 response_terminality: None,
874 ingress: plain_event_ingress(id, "webhook"),
875 interaction: InboxInteraction {
876 from_route: None,
877 from: "event:webhook".into(),
878 content: InteractionContent::Message {
879 body: "see image".into(),
880 blocks: Some(blocks.clone()),
881 },
882 id,
883 rendered_text: "stale rendered text".into(),
884 handling_mode: meerkat_core::types::HandlingMode::Queue,
885 render_metadata: None,
886 },
887 };
888 let input =
889 peer_input_candidate_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
890 .expect("plain event with blocks should project");
891 match input {
892 Input::ExternalEvent(event) => {
893 assert_eq!(event.payload["body"], "see image");
894 assert!(event.payload.get("blocks").is_none());
895 assert_eq!(event.blocks, Some(blocks));
896 assert_eq!(
897 event.handling_mode,
898 meerkat_core::types::HandlingMode::Queue
899 );
900 assert_eq!(event.render_metadata, None);
901 }
902 other => panic!("Expected ExternalEvent input, got {other:?}"),
903 }
904 }
905
906 #[test]
907 fn plain_event_preserves_handling_mode_and_render_metadata() {
908 let render_metadata = meerkat_core::types::RenderMetadata {
909 class: meerkat_core::types::RenderClass::ExternalEvent,
910 salience: meerkat_core::types::RenderSalience::Urgent,
911 };
912 let id = make_interaction_id();
913 let classified = PeerInputCandidate {
914 lifecycle_peer: None,
915 response_terminality: None,
916 ingress: plain_event_ingress(id, "webhook"),
917 interaction: InboxInteraction {
918 from_route: None,
919 from: "event:webhook".into(),
920 content: InteractionContent::Message {
921 body: "urgent".into(),
922 blocks: None,
923 },
924 id,
925 rendered_text: "stale rendered text".into(),
926 handling_mode: meerkat_core::types::HandlingMode::Steer,
927 render_metadata: Some(render_metadata.clone()),
928 },
929 };
930
931 match peer_input_candidate_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
932 .expect("plain event should preserve render metadata")
933 {
934 Input::ExternalEvent(event) => {
935 assert_eq!(
936 event.handling_mode,
937 meerkat_core::types::HandlingMode::Steer
938 );
939 assert_eq!(event.render_metadata, Some(render_metadata));
940 }
941 other => panic!("Expected ExternalEvent input, got {other:?}"),
942 }
943 }
944
945 #[test]
946 fn response_completed_to_terminal() {
947 let in_reply_to = make_interaction_id();
948 let route_id = meerkat_core::comms::PeerId::from_uuid(
949 uuid::Uuid::parse_str("018f6f79-7a82-7c4e-a552-a3b86f9630f2").unwrap(),
950 );
951 let interaction = InboxInteraction {
952 from_route: Some(route_id),
953 from: "Peer One".into(),
954 content: InteractionContent::Response {
955 status: ResponseStatus::Completed,
956 result: serde_json::json!({"ok": true}),
957 in_reply_to,
958 blocks: None,
959 },
960 id: make_interaction_id(),
961 rendered_text: String::new(),
962 handling_mode: meerkat_core::types::HandlingMode::Queue,
963 render_metadata: None,
964 };
965 let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
966 if let Input::Peer(p) = &input {
967 match &p.header.source {
968 InputOrigin::Peer {
969 peer_id,
970 display_identity,
971 ..
972 } => {
973 assert_eq!(peer_id, &route_id.to_string());
974 assert_eq!(display_identity.as_deref(), Some("Peer One"));
975 }
976 other => panic!("Expected Peer source, got {other:?}"),
977 }
978 assert!(matches!(
979 p.convention,
980 Some(PeerConvention::ResponseTerminal {
981 status: ResponseTerminalStatus::Completed,
982 ..
983 })
984 ));
985 assert_eq!(p.header.durability, InputDurability::Durable);
986 assert_eq!(
987 p.payload,
988 Some(serde_json::json!({"ok": true})),
989 "terminal response result must remain structured on PeerInput so runtime prompt projection stays runtime-owned"
990 );
991 } else {
992 panic!("Expected PeerInput");
993 }
994 let projection = crate::input::runtime_input_projection_for_machine_batch(&input);
995 let context = projection
996 .context_append
997 .expect("terminal machine-batch context projection");
998 let expected_key = format!("peer_response_terminal:{route_id}:{in_reply_to}");
999 assert_eq!(context.key, expected_key);
1000 let meerkat_core::lifecycle::run_primitive::CoreRenderable::SystemNotice { blocks, .. } =
1001 context.content
1002 else {
1003 panic!("Expected terminal context notice");
1004 };
1005 assert!(matches!(
1006 blocks.first(),
1007 Some(meerkat_core::types::SystemNoticeBlock::Comms { peer, .. })
1008 if peer.as_ref().and_then(|peer| peer.display_name.as_deref()) == Some("Peer One")
1009 ));
1010 }
1011
1012 #[test]
1013 fn classified_response_uses_ingress_terminal_class() {
1014 let in_reply_to = make_interaction_id();
1015 let id = make_interaction_id();
1016 let classified = PeerInputCandidate {
1017 interaction: InboxInteraction {
1018 from_route: None,
1019 from: "peer-1".into(),
1020 content: InteractionContent::Response {
1021 status: ResponseStatus::Completed,
1022 result: serde_json::json!({"ok": true}),
1023 in_reply_to,
1024 blocks: None,
1025 },
1026 id,
1027 rendered_text: String::new(),
1028 handling_mode: meerkat_core::types::HandlingMode::Queue,
1029 render_metadata: None,
1030 },
1031 ingress: PeerIngressFact::peer(
1032 id,
1033 PeerInputClass::ResponseProgress,
1034 meerkat_core::PeerIngressKind::Response,
1035 Some(meerkat_core::PeerIngressAuthDecision::Required),
1036 PeerIngressIdentity::new(
1037 test_peer_id(),
1038 "peer-1",
1039 PeerIngressConvention::Response {
1040 in_reply_to,
1041 status: ResponseStatus::Completed,
1042 },
1043 ),
1044 ),
1045 lifecycle_peer: None,
1046 response_terminality: Some(meerkat_core::TerminalityClass::Progress),
1047 };
1048
1049 let input =
1050 classified_interaction_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
1051 .expect("classified response should project");
1052 if let Input::Peer(peer) = input {
1053 assert!(
1054 matches!(
1055 peer.convention,
1056 Some(PeerConvention::ResponseProgress { .. })
1057 ),
1058 "classified bridge must consume ingress-owned response class"
1059 );
1060 } else {
1061 panic!("Expected PeerInput");
1062 }
1063 }
1064
1065 #[test]
1066 fn classified_response_missing_machine_terminality_fails_closed() {
1067 let in_reply_to = make_interaction_id();
1068 let id = make_interaction_id();
1069 let classified = PeerInputCandidate {
1070 interaction: InboxInteraction {
1071 from_route: None,
1072 from: "peer-1".into(),
1073 content: InteractionContent::Response {
1074 status: ResponseStatus::Completed,
1075 result: serde_json::json!({"ok": true}),
1076 in_reply_to,
1077 blocks: None,
1078 },
1079 id,
1080 rendered_text: String::new(),
1081 handling_mode: meerkat_core::types::HandlingMode::Queue,
1082 render_metadata: None,
1083 },
1084 ingress: PeerIngressFact::peer(
1085 id,
1086 PeerInputClass::ResponseTerminal,
1087 meerkat_core::PeerIngressKind::Response,
1088 Some(meerkat_core::PeerIngressAuthDecision::Required),
1089 PeerIngressIdentity::new(
1090 test_peer_id(),
1091 "peer-1",
1092 PeerIngressConvention::Response {
1093 in_reply_to,
1094 status: ResponseStatus::Completed,
1095 },
1096 ),
1097 ),
1098 lifecycle_peer: None,
1099 response_terminality: None,
1100 };
1101
1102 let result =
1103 classified_interaction_to_runtime_input(&classified, &LogicalRuntimeId::new("test"));
1104 assert!(
1105 matches!(
1106 result,
1107 Err(PeerIngressProjectionError::MissingResponseTerminality { interaction_id })
1108 if interaction_id == id
1109 ),
1110 "runtime projection must not infer public terminality from raw status: {result:?}"
1111 );
1112 }
1113
1114 #[test]
1115 fn response_terminal_without_canonical_peer_id_fails_typed_projection() {
1116 let in_reply_to = make_interaction_id();
1117 let interaction_id = make_interaction_id();
1118 let candidate = PeerInputCandidate {
1119 interaction: InboxInteraction {
1120 from_route: None,
1121 from: "Peer One".into(),
1122 content: InteractionContent::Response {
1123 status: ResponseStatus::Completed,
1124 result: serde_json::json!({"ok": true}),
1125 in_reply_to,
1126 blocks: None,
1127 },
1128 id: interaction_id,
1129 rendered_text: String::new(),
1130 handling_mode: meerkat_core::types::HandlingMode::Queue,
1131 render_metadata: None,
1132 },
1133 ingress: PeerIngressFact::legacy_peer_label(
1134 interaction_id,
1135 "Peer One",
1136 PeerInputClass::ResponseTerminal,
1137 meerkat_core::PeerIngressKind::Response,
1138 Some(meerkat_core::PeerIngressAuthDecision::Required),
1139 PeerIngressConvention::Response {
1140 in_reply_to,
1141 status: ResponseStatus::Completed,
1142 },
1143 ),
1144 lifecycle_peer: None,
1145 response_terminality: Some(meerkat_core::TerminalityClass::Terminal {
1146 disposition: meerkat_core::TerminalDisposition::Completed,
1147 }),
1148 };
1149 let err = peer_input_candidate_to_runtime_input(&candidate, &LogicalRuntimeId::new("test"))
1150 .unwrap_err();
1151 assert!(matches!(
1152 err,
1153 PeerIngressProjectionError::MissingCanonicalPeerId { .. }
1154 ));
1155 }
1156
1157 #[test]
1158 fn response_failed_to_terminal() {
1159 let in_reply_to = make_interaction_id();
1160 let route_id = meerkat_core::comms::PeerId::from_uuid(
1161 uuid::Uuid::parse_str("018f6f79-7a82-7c4e-a552-a3b86f9630f3").unwrap(),
1162 );
1163 let interaction = InboxInteraction {
1164 from_route: Some(route_id),
1165 from: "peer-1".into(),
1166 content: InteractionContent::Response {
1167 status: ResponseStatus::Failed,
1168 result: serde_json::json!({"error": "timeout"}),
1169 in_reply_to,
1170 blocks: None,
1171 },
1172 id: make_interaction_id(),
1173 rendered_text: String::new(),
1174 handling_mode: meerkat_core::types::HandlingMode::Queue,
1175 render_metadata: None,
1176 };
1177 let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
1178 if let Input::Peer(p) = &input {
1179 assert!(matches!(
1180 p.convention,
1181 Some(PeerConvention::ResponseTerminal {
1182 status: ResponseTerminalStatus::Failed,
1183 ..
1184 })
1185 ));
1186 } else {
1187 panic!("Expected PeerInput");
1188 }
1189 }
1190
1191 #[test]
1192 fn response_accepted_to_progress() {
1193 let in_reply_to = make_interaction_id();
1194 let interaction = InboxInteraction {
1195 from_route: None,
1196 from: "peer-1".into(),
1197 content: InteractionContent::Response {
1198 status: ResponseStatus::Accepted,
1199 result: serde_json::json!(null),
1200 in_reply_to,
1201 blocks: None,
1202 },
1203 id: make_interaction_id(),
1204 rendered_text: String::new(),
1205 handling_mode: meerkat_core::types::HandlingMode::Queue,
1206 render_metadata: None,
1207 };
1208 let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
1209 if let Input::Peer(p) = &input {
1210 assert!(matches!(
1211 p.convention,
1212 Some(PeerConvention::ResponseProgress {
1213 phase: ResponseProgressPhase::Accepted,
1214 ..
1215 })
1216 ));
1217 assert_eq!(p.header.durability, InputDurability::Ephemeral);
1218 assert!(
1219 p.handling_mode.is_none(),
1220 "ResponseProgress inputs must not carry handling_mode"
1221 );
1222 } else {
1223 panic!("Expected PeerInput");
1224 }
1225 }
1226
1227 #[test]
1228 fn classified_response_uses_ingress_terminality_over_raw_status() {
1229 let in_reply_to = make_interaction_id();
1230 let id = make_interaction_id();
1231 let classified = PeerInputCandidate {
1232 interaction: InboxInteraction {
1233 from_route: None,
1234 from: "peer-1".into(),
1235 content: InteractionContent::Response {
1236 status: ResponseStatus::Completed,
1237 result: serde_json::json!({"ok": true}),
1238 in_reply_to,
1239 blocks: None,
1240 },
1241 id,
1242 rendered_text: String::new(),
1243 handling_mode: meerkat_core::types::HandlingMode::Queue,
1244 render_metadata: None,
1245 },
1246 ingress: PeerIngressFact::peer(
1247 id,
1248 PeerInputClass::ResponseProgress,
1249 meerkat_core::PeerIngressKind::Response,
1250 Some(meerkat_core::PeerIngressAuthDecision::Required),
1251 PeerIngressIdentity::new(
1252 test_peer_id(),
1253 "peer-1",
1254 PeerIngressConvention::Response {
1255 in_reply_to,
1256 status: ResponseStatus::Completed,
1257 },
1258 ),
1259 ),
1260 lifecycle_peer: None,
1261 response_terminality: Some(meerkat_core::TerminalityClass::Progress),
1262 };
1263
1264 let input =
1265 peer_input_candidate_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
1266 .expect("classified response should project");
1267
1268 if let Input::Peer(p) = &input {
1269 assert!(matches!(
1270 p.convention,
1271 Some(PeerConvention::ResponseProgress {
1272 phase: ResponseProgressPhase::Accepted,
1273 ..
1274 })
1275 ));
1276 assert_eq!(p.header.durability, InputDurability::Ephemeral);
1277 assert_eq!(p.handling_mode, None);
1278 } else {
1279 panic!("Expected PeerInput");
1280 }
1281 }
1282
1283 #[test]
1284 fn peer_source_includes_runtime_id() {
1285 let interaction = InboxInteraction {
1286 from_route: None,
1287 from: "peer-1".into(),
1288 content: InteractionContent::Message {
1289 body: "hi".into(),
1290 blocks: None,
1291 },
1292 id: make_interaction_id(),
1293 rendered_text: String::new(),
1294 handling_mode: meerkat_core::types::HandlingMode::Queue,
1295 render_metadata: None,
1296 };
1297 let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("agent-runtime-1"));
1298 if let Input::Peer(p) = &input {
1299 if let InputOrigin::Peer {
1300 peer_id,
1301 display_identity,
1302 runtime_id,
1303 ..
1304 } = &p.header.source
1305 {
1306 assert_eq!(peer_id, &test_peer_id().as_str());
1307 assert_eq!(display_identity.as_deref(), Some("peer-1"));
1308 assert_eq!(runtime_id.as_ref().unwrap().0, "agent-runtime-1");
1309 } else {
1310 panic!("Expected Peer source");
1311 }
1312 } else {
1313 panic!("Expected PeerInput");
1314 }
1315 }
1316
1317 #[test]
1318 fn all_interaction_types_produce_valid_inputs() {
1319 let in_reply_to = make_interaction_id();
1320 let interactions = vec![
1321 InboxInteraction {
1322 from_route: None,
1323 from: "p".into(),
1324 content: InteractionContent::Message {
1325 body: "m".into(),
1326 blocks: None,
1327 },
1328 id: make_interaction_id(),
1329 rendered_text: String::new(),
1330 handling_mode: meerkat_core::types::HandlingMode::Queue,
1331 render_metadata: None,
1332 },
1333 InboxInteraction {
1334 from_route: None,
1335 from: "p".into(),
1336 content: InteractionContent::Request {
1337 intent: "i".into(),
1338 params: serde_json::json!({}),
1339 blocks: None,
1340 },
1341 id: make_interaction_id(),
1342 rendered_text: String::new(),
1343 handling_mode: meerkat_core::types::HandlingMode::Queue,
1344 render_metadata: None,
1345 },
1346 InboxInteraction {
1347 from_route: Some(meerkat_core::comms::PeerId::from_uuid(
1348 uuid::Uuid::parse_str("018f6f79-7a82-7c4e-a552-a3b86f9630f6").unwrap(),
1349 )),
1350 from: "p".into(),
1351 content: InteractionContent::Response {
1352 status: ResponseStatus::Completed,
1353 result: serde_json::json!(null),
1354 in_reply_to,
1355 blocks: None,
1356 },
1357 id: make_interaction_id(),
1358 rendered_text: String::new(),
1359 handling_mode: meerkat_core::types::HandlingMode::Queue,
1360 render_metadata: None,
1361 },
1362 ];
1363
1364 let rid = LogicalRuntimeId::new("test");
1365 for interaction in &interactions {
1366 let input = peer_input_for_test(interaction, &rid);
1367 assert!(matches!(input, Input::Peer(_)));
1368 }
1369 }
1370}