1use chrono::Utc;
7#[cfg(test)]
8use meerkat_core::comms::PeerId;
9use meerkat_core::interaction::{
10 InboxInteraction, InteractionContent, PeerIngressConvention, PeerIngressFact, PeerIngressKind,
11 PeerInputCandidate, PeerInputClass,
12};
13#[cfg(test)]
14use meerkat_core::interaction::{PeerIngressIdentity, ResponseStatus};
15use meerkat_core::lifecycle::InputId;
16
17use crate::identifiers::{CorrelationId, LogicalRuntimeId};
18use crate::input::{
19 ExternalEventInput, Input, InputDurability, InputHeader, InputOrigin, InputVisibility,
20 PeerConvention, PeerInput, ResponseProgressPhase, ResponseTerminalStatus,
21};
22
23#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
24pub enum PeerIngressProjectionError {
25 #[error(
26 "classified peer ingress {interaction_id} ({kind:?}) cannot project to a runtime PeerInput"
27 )]
28 UnsupportedPeerConvention {
29 interaction_id: meerkat_core::InteractionId,
30 kind: PeerIngressKind,
31 },
32 #[error("classified peer ingress {interaction_id} missing canonical peer id")]
33 MissingCanonicalPeerId {
34 interaction_id: meerkat_core::InteractionId,
35 },
36 #[error("classified peer response {interaction_id} missing machine response terminality")]
37 MissingResponseTerminality {
38 interaction_id: meerkat_core::InteractionId,
39 },
40 #[error(
41 "classified peer response {interaction_id} has unsupported machine response terminality"
42 )]
43 UnsupportedResponseTerminality {
44 interaction_id: meerkat_core::InteractionId,
45 },
46}
47
48pub fn classified_interaction_to_runtime_input(
51 classified: &PeerInputCandidate,
52 runtime_id: &LogicalRuntimeId,
53) -> Result<Input, PeerIngressProjectionError> {
54 let interaction = &classified.interaction;
55
56 if classified.class() == PeerInputClass::PlainEvent {
57 let source_name = classified
58 .ingress
59 .plain_event_source_name()
60 .unwrap_or("unknown");
61 let blocks = external_event_blocks(interaction);
62 return Ok(Input::ExternalEvent(ExternalEventInput {
63 header: InputHeader {
64 id: InputId::new(),
65 timestamp: Utc::now(),
66 source: InputOrigin::External {
67 source_name: source_name.to_string(),
68 },
69 durability: InputDurability::Durable,
70 visibility: InputVisibility {
71 transcript_eligible: true,
72 operator_eligible: true,
73 },
74 idempotency_key: None,
75 supersession_key: None,
76 correlation_id: Some(CorrelationId::from_uuid(interaction.id.0)),
77 },
78 event_type: source_name.to_string(),
79 payload: external_event_payload(interaction),
80 blocks,
81 handling_mode: interaction.handling_mode,
82 render_metadata: interaction.render_metadata.clone(),
83 }));
84 }
85
86 peer_candidate_to_peer_input(classified, runtime_id)
87}
88
89fn peer_candidate_to_peer_input(
90 classified: &PeerInputCandidate,
91 runtime_id: &LogicalRuntimeId,
92) -> Result<Input, PeerIngressProjectionError> {
93 peer_input_from_ingress_fact(
94 &classified.interaction,
95 runtime_id,
96 &classified.ingress,
97 classified.response_terminality,
98 )
99}
100
101fn peer_input_from_ingress_fact(
102 interaction: &InboxInteraction,
103 runtime_id: &LogicalRuntimeId,
104 ingress: &PeerIngressFact,
105 response_terminality: Option<meerkat_core::interaction::TerminalityClass>,
106) -> Result<Input, PeerIngressProjectionError> {
107 let convention = map_ingress_convention(interaction.id, ingress, response_terminality)?;
108 let durability = map_durability(&convention);
109 let handling_mode = match &convention {
110 PeerConvention::ResponseProgress { .. } => None,
111 _ => Some(interaction.handling_mode),
112 };
113 let peer_id = ingress.canonical_peer_id_string().ok_or(
114 PeerIngressProjectionError::MissingCanonicalPeerId {
115 interaction_id: interaction.id,
116 },
117 )?;
118 let display_identity = ingress
119 .route
120 .as_ref()
121 .map(meerkat_core::PeerRoute::label)
122 .or_else(|| ingress.display_label());
123
124 Ok(Input::Peer(PeerInput {
125 header: InputHeader {
126 id: InputId::new(),
127 timestamp: Utc::now(),
128 source: InputOrigin::Peer {
129 peer_id,
130 display_identity,
131 runtime_id: Some(runtime_id.clone()),
132 },
133 durability,
134 visibility: InputVisibility {
135 transcript_eligible: true,
136 operator_eligible: true,
137 },
138 idempotency_key: None,
139 supersession_key: None,
140 correlation_id: Some(CorrelationId::from_uuid(interaction.id.0)),
141 },
142 convention: Some(convention),
143 content: match peer_blocks(interaction) {
144 Some(blocks) => meerkat_core::types::ContentInput::Blocks(blocks),
145 None => meerkat_core::types::ContentInput::Text(peer_rendered_body(interaction)),
146 },
147 payload: peer_payload(interaction),
148 handling_mode,
149 }))
150}
151
152fn map_ingress_convention(
153 interaction_id: meerkat_core::InteractionId,
154 ingress: &PeerIngressFact,
155 response_terminality: Option<meerkat_core::interaction::TerminalityClass>,
156) -> Result<PeerConvention, PeerIngressProjectionError> {
157 match &ingress.convention {
158 PeerIngressConvention::Message => Ok(PeerConvention::Message),
159 PeerIngressConvention::Request { request_id, intent } => Ok(PeerConvention::Request {
160 request_id: request_id.clone(),
161 intent: intent.clone(),
162 }),
163 PeerIngressConvention::Response {
164 in_reply_to,
165 status: _,
166 } => {
167 let terminality = response_terminality
168 .ok_or(PeerIngressProjectionError::MissingResponseTerminality { interaction_id })?;
169 map_response_convention(interaction_id, *in_reply_to, terminality)
170 }
171 PeerIngressConvention::Lifecycle { kind, .. } => Ok(PeerConvention::Request {
172 request_id: ingress.interaction_id.to_string(),
173 intent: kind.to_string(),
174 }),
175 PeerIngressConvention::Ack { .. } | PeerIngressConvention::PlainEvent { .. } => {
176 Err(PeerIngressProjectionError::UnsupportedPeerConvention {
177 interaction_id,
178 kind: ingress.kind,
179 })
180 }
181 }
182}
183
184fn map_response_convention(
185 interaction_id: meerkat_core::InteractionId,
186 in_reply_to: meerkat_core::InteractionId,
187 terminality: meerkat_core::interaction::TerminalityClass,
188) -> Result<PeerConvention, PeerIngressProjectionError> {
189 let request_id = in_reply_to.to_string();
190 Ok(match terminality {
191 meerkat_core::interaction::TerminalityClass::Progress => PeerConvention::ResponseProgress {
192 request_id,
193 phase: ResponseProgressPhase::Accepted,
194 },
195 meerkat_core::interaction::TerminalityClass::Terminal { disposition } => {
196 let term = match disposition {
197 meerkat_core::interaction::TerminalDisposition::Completed => {
198 ResponseTerminalStatus::Completed
199 }
200 meerkat_core::interaction::TerminalDisposition::Failed => {
201 ResponseTerminalStatus::Failed
202 }
203 _ => {
204 return Err(PeerIngressProjectionError::UnsupportedResponseTerminality {
205 interaction_id,
206 });
207 }
208 };
209 PeerConvention::ResponseTerminal {
210 request_id,
211 status: term,
212 }
213 }
214 _ => {
215 return Err(PeerIngressProjectionError::UnsupportedResponseTerminality {
216 interaction_id,
217 });
218 }
219 })
220}
221
222fn peer_rendered_body(interaction: &InboxInteraction) -> String {
223 if !interaction.rendered_text.trim().is_empty() {
224 return interaction.rendered_text.clone();
225 }
226 match &interaction.content {
227 InteractionContent::Message { body, .. } => body.clone(),
228 InteractionContent::Request { params, .. } => {
229 serde_json::to_string(params).unwrap_or_default()
230 }
231 InteractionContent::Response { result, .. } => {
232 serde_json::to_string(result).unwrap_or_default()
233 }
234 }
235}
236
237fn peer_blocks(interaction: &InboxInteraction) -> Option<Vec<meerkat_core::types::ContentBlock>> {
238 match &interaction.content {
239 InteractionContent::Message { blocks, .. } => blocks.clone(),
240 InteractionContent::Request { blocks, .. } => blocks.clone(),
241 InteractionContent::Response { blocks, .. } => blocks.clone(),
242 }
243}
244
245fn peer_payload(interaction: &InboxInteraction) -> Option<serde_json::Value> {
246 match &interaction.content {
247 InteractionContent::Message { .. } => None,
248 InteractionContent::Request { params, .. } => Some(params.clone()),
249 InteractionContent::Response { result, .. } => Some(result.clone()),
250 }
251}
252
253fn external_event_payload(interaction: &InboxInteraction) -> serde_json::Value {
254 match &interaction.content {
255 InteractionContent::Message { body, .. } => serde_json::json!({ "body": body }),
256 InteractionContent::Request { intent, params, .. } => {
257 serde_json::json!({ "intent": intent, "params": params })
258 }
259 InteractionContent::Response {
260 in_reply_to,
261 status,
262 result,
263 blocks,
264 } => serde_json::json!({
265 "in_reply_to": in_reply_to,
266 "status": status,
267 "result": result,
268 "blocks": blocks,
269 }),
270 }
271}
272
273fn external_event_blocks(
274 interaction: &InboxInteraction,
275) -> Option<Vec<meerkat_core::types::ContentBlock>> {
276 match &interaction.content {
277 InteractionContent::Message { blocks, .. } => blocks.clone(),
278 InteractionContent::Request { blocks, .. } => blocks.clone(),
279 _ => None,
280 }
281}
282
283fn map_durability(convention: &PeerConvention) -> InputDurability {
284 match convention {
285 PeerConvention::ResponseProgress { .. } => InputDurability::Ephemeral,
286 _ => InputDurability::Durable,
287 }
288}
289
290#[cfg(test)]
291#[allow(clippy::unwrap_used, clippy::panic)]
292mod tests {
293 use super::*;
294 use meerkat_core::interaction::{PeerIngressIdentity, ResponseStatus};
295
296 fn make_interaction_id() -> meerkat_core::interaction::InteractionId {
297 meerkat_core::interaction::InteractionId(meerkat_core::time_compat::new_uuid_v7())
298 }
299
300 fn plain_event_ingress(
301 id: meerkat_core::interaction::InteractionId,
302 source_name: &str,
303 ) -> PeerIngressFact {
304 PeerIngressFact::plain_event(
305 id,
306 source_name,
307 PeerInputClass::PlainEvent,
308 meerkat_core::PeerIngressKind::PlainEvent,
309 )
310 }
311
312 fn test_peer_id() -> PeerId {
313 PeerId::parse("22222222-2222-4222-8222-222222222222").expect("canonical test peer id")
314 }
315
316 fn peer_kind_for_convention(
317 convention: &PeerIngressConvention,
318 ) -> meerkat_core::PeerIngressKind {
319 match convention {
320 PeerIngressConvention::Message => meerkat_core::PeerIngressKind::Message,
321 PeerIngressConvention::Request { .. } | PeerIngressConvention::Lifecycle { .. } => {
322 meerkat_core::PeerIngressKind::Request
323 }
324 PeerIngressConvention::Response { .. } => meerkat_core::PeerIngressKind::Response,
325 PeerIngressConvention::Ack { .. } => meerkat_core::PeerIngressKind::Ack,
326 PeerIngressConvention::PlainEvent { .. } => meerkat_core::PeerIngressKind::PlainEvent,
327 }
328 }
329
330 fn peer_ingress(
331 id: meerkat_core::interaction::InteractionId,
332 peer_id: PeerId,
333 label: &str,
334 class: PeerInputClass,
335 convention: PeerIngressConvention,
336 ) -> PeerIngressFact {
337 let kind = peer_kind_for_convention(&convention);
338 PeerIngressFact::peer(
339 id,
340 class,
341 kind,
342 Some(meerkat_core::PeerIngressAuthDecision::Required),
343 PeerIngressIdentity::new(peer_id, label, convention),
344 )
345 }
346
347 fn candidate_for_interaction(interaction: InboxInteraction) -> PeerInputCandidate {
348 let peer_id = interaction.from_route.unwrap_or_else(test_peer_id);
349 crate::test_peer_input_candidate_from_interaction(interaction, peer_id)
350 }
351
352 fn peer_input_for_test(interaction: &InboxInteraction, runtime_id: &LogicalRuntimeId) -> Input {
353 let candidate = candidate_for_interaction(interaction.clone());
354 classified_interaction_to_runtime_input(&candidate, runtime_id)
355 .expect("test candidate should project to runtime input")
356 }
357
358 #[test]
359 fn message_to_peer_input() {
360 let interaction = InboxInteraction {
361 from_route: None,
362 from: "peer-1".into(),
363 content: InteractionContent::Message {
364 body: "hello".into(),
365 blocks: None,
366 },
367 id: make_interaction_id(),
368 rendered_text: String::new(),
369 handling_mode: meerkat_core::types::HandlingMode::Queue,
370 render_metadata: None,
371 };
372 let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
373 if let Input::Peer(p) = &input {
374 assert!(matches!(p.convention, Some(PeerConvention::Message)));
375 assert_eq!(p.content.text_content(), "hello");
376 assert_eq!(p.header.durability, InputDurability::Durable);
377 assert_eq!(
378 p.handling_mode,
379 Some(meerkat_core::types::HandlingMode::Queue),
380 "explicit queue must survive comms -> runtime projection so it can suppress running-turn interruption"
381 );
382 } else {
383 panic!("Expected PeerInput");
384 }
385 }
386
387 #[test]
388 fn request_to_peer_input() {
389 let interaction = InboxInteraction {
390 from_route: None,
391 from: "peer-1".into(),
392 content: InteractionContent::Request {
393 intent: "mob.peer_added".into(),
394 params: serde_json::json!({"peer": "agent-1"}),
397 blocks: None,
398 },
399 id: make_interaction_id(),
400 rendered_text: String::new(),
401 handling_mode: meerkat_core::types::HandlingMode::Queue,
402 render_metadata: None,
403 };
404 let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
405 if let Input::Peer(p) = &input {
406 assert!(matches!(p.convention, Some(PeerConvention::Request { .. })));
407 match p.convention.as_ref() {
408 Some(PeerConvention::Request { request_id, .. }) => {
409 assert_eq!(request_id, &interaction.id.0.to_string());
410 }
411 other => panic!("Expected request convention, got {other:?}"),
412 }
413 assert_eq!(p.header.durability, InputDurability::Durable);
414 assert_eq!(
415 p.payload,
416 Some(serde_json::json!({"peer": "agent-1"})),
417 "request params must remain structured on PeerInput so runtime prompt projection does not depend on pre-rendered comms prose"
418 );
419 assert_eq!(
420 p.handling_mode,
421 Some(meerkat_core::types::HandlingMode::Queue),
422 "explicit queue request semantics must not collapse to default policy"
423 );
424 } else {
425 panic!("Expected PeerInput");
426 }
427 }
428
429 #[test]
430 fn classified_request_uses_canonical_peer_id_for_runtime_projection() {
431 let source_peer_id =
432 PeerId::parse("11111111-1111-4111-8111-111111111111").expect("canonical peer id");
433 let request_id = make_interaction_id();
434 let classified = PeerInputCandidate {
435 interaction: InboxInteraction {
436 from_route: None,
437 from: "test-mob/lead/l-requester".into(),
438 content: InteractionContent::Request {
439 intent: "interpret_image".into(),
440 params: serde_json::json!({"description": "tower with a light"}),
441 blocks: None,
442 },
443 id: request_id,
444 rendered_text: "stale helper prose".into(),
445 handling_mode: meerkat_core::types::HandlingMode::Steer,
446 render_metadata: None,
447 },
448 ingress: PeerIngressFact::peer(
449 request_id,
450 PeerInputClass::ActionableRequest,
451 meerkat_core::PeerIngressKind::Request,
452 Some(meerkat_core::PeerIngressAuthDecision::Required),
453 PeerIngressIdentity::new(
454 source_peer_id,
455 "test-mob/lead/l-requester",
456 PeerIngressConvention::Request {
457 request_id: request_id.to_string(),
458 intent: "interpret_image".to_string(),
459 },
460 ),
461 ),
462 lifecycle_peer: None,
463 response_terminality: None,
464 };
465
466 let input =
467 classified_interaction_to_runtime_input(&classified, &LogicalRuntimeId::new("worker"))
468 .expect("classified request should project to peer input");
469 let Input::Peer(peer) = &input else {
470 panic!("Expected PeerInput");
471 };
472 let InputOrigin::Peer { peer_id, .. } = &peer.header.source else {
473 panic!("Expected peer source");
474 };
475 assert_eq!(peer_id, "11111111-1111-4111-8111-111111111111");
476 assert_eq!(peer.content.text_content(), "stale helper prose");
477
478 let prompt = crate::input::input_prompt_text(&input);
479 assert!(prompt.starts_with(
480 "Peer request from peer_id 11111111-1111-4111-8111-111111111111 (display_name: test-mob/lead/l-requester)."
481 ));
482 assert!(prompt.contains("\"peer_id\":\"11111111-1111-4111-8111-111111111111\""));
483 assert!(prompt.contains("\"display_name\":\"test-mob/lead/l-requester\""));
484 assert!(prompt.contains(&format!("\"in_reply_to\":\"{}\"", request_id.0)));
485 assert!(prompt.contains("\"status\":\"completed\""));
486 assert!(!prompt.contains("to=\""));
487 }
488
489 #[test]
490 fn plain_event_to_external_event_input() {
491 let id = make_interaction_id();
492 let classified = PeerInputCandidate {
493 lifecycle_peer: None,
494 response_terminality: None,
495 ingress: plain_event_ingress(id, "webhook"),
496 interaction: InboxInteraction {
497 from_route: None,
498 from: "event:webhook".into(),
499 content: InteractionContent::Message {
500 body: "{\"ok\":true}".into(),
501 blocks: None,
502 },
503 id,
504 rendered_text: String::new(),
505 handling_mode: meerkat_core::types::HandlingMode::Queue,
506 render_metadata: None,
507 },
508 };
509 let input =
510 classified_interaction_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
511 .expect("plain event should project to external event input");
512 match input {
513 Input::ExternalEvent(event) => {
514 assert_eq!(event.event_type, "webhook");
515 assert_eq!(event.payload["body"], "{\"ok\":true}");
516 assert_eq!(event.blocks, None);
517 assert_eq!(
518 event.handling_mode,
519 meerkat_core::types::HandlingMode::Queue
520 );
521 assert_eq!(event.render_metadata, None);
522 }
523 other => panic!("Expected ExternalEvent input, got {other:?}"),
524 }
525 }
526
527 #[test]
528 fn peer_named_event_prefix_stays_peer_without_plain_event_class() {
529 let id = make_interaction_id();
530 let classified = PeerInputCandidate {
531 lifecycle_peer: None,
532 response_terminality: None,
533 ingress: peer_ingress(
534 id,
535 test_peer_id(),
536 "event:webhook",
537 PeerInputClass::ActionableMessage,
538 PeerIngressConvention::Message,
539 ),
540 interaction: InboxInteraction {
541 from_route: None,
542 from: "event:webhook".into(),
543 content: InteractionContent::Message {
544 body: "hello".into(),
545 blocks: None,
546 },
547 id,
548 rendered_text: "stale rendered text".into(),
549 handling_mode: meerkat_core::types::HandlingMode::Queue,
550 render_metadata: None,
551 },
552 };
553 let input =
554 classified_interaction_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
555 .expect("classified peer event should project to peer input");
556 match input {
557 Input::Peer(peer) => {
558 assert_eq!(peer.content.text_content(), "stale rendered text");
559 match peer.header.source {
560 InputOrigin::Peer { peer_id, .. } => {
561 assert_eq!(peer_id, test_peer_id().as_str());
562 }
563 other => panic!("Expected peer source, got {other:?}"),
564 }
565 }
566 other => panic!("Expected Peer input, got {other:?}"),
567 }
568 }
569
570 #[test]
571 fn classified_peer_projection_uses_ingress_canonical_peer_id_not_display_from() {
572 let id = make_interaction_id();
573 let canonical_peer_id = meerkat_core::comms::PeerId::new();
574 let classified = PeerInputCandidate {
575 lifecycle_peer: None,
576 response_terminality: None,
577 ingress: PeerIngressFact::peer(
578 id,
579 PeerInputClass::ActionableRequest,
580 meerkat_core::PeerIngressKind::Request,
581 Some(meerkat_core::PeerIngressAuthDecision::Required),
582 PeerIngressIdentity::new(
583 canonical_peer_id,
584 "display-agent",
585 PeerIngressConvention::Request {
586 request_id: id.to_string(),
587 intent: "review".to_string(),
588 },
589 ),
590 ),
591 interaction: InboxInteraction {
592 from_route: None,
593 from: "display-agent".into(),
594 content: InteractionContent::Request {
595 intent: "review".into(),
596 params: serde_json::json!({"pr": 42}),
597 blocks: None,
598 },
599 id,
600 rendered_text: "stale rendered text".into(),
601 handling_mode: meerkat_core::types::HandlingMode::Queue,
602 render_metadata: None,
603 },
604 };
605
606 let input =
607 classified_interaction_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
608 .expect("classified peer projection should use typed canonical id");
609 let Input::Peer(peer) = input else {
610 panic!("Expected Peer input");
611 };
612 match peer.header.source {
613 InputOrigin::Peer { peer_id, .. } => {
614 assert_eq!(peer_id, canonical_peer_id.as_str());
615 assert_ne!(peer_id, "display-agent");
616 }
617 other => panic!("Expected peer source, got {other:?}"),
618 }
619 assert_eq!(peer.content.text_content(), "stale rendered text");
620 }
621
622 #[test]
623 fn classified_peer_projection_rejects_display_only_ingress_identity() {
624 let id = make_interaction_id();
625 let classified = PeerInputCandidate {
626 lifecycle_peer: None,
627 response_terminality: None,
628 ingress: PeerIngressFact {
629 interaction_id: id,
630 class: PeerInputClass::ActionableMessage,
631 kind: meerkat_core::PeerIngressKind::Message,
632 canonical_peer_id: None,
633 display_name: meerkat_core::comms::PeerName::new("display-agent".to_string()).ok(),
634 signing_pubkey: None,
635 route: None,
636 auth: Some(meerkat_core::PeerIngressAuthDecision::Required),
637 convention: PeerIngressConvention::Message,
638 },
639 interaction: InboxInteraction {
640 from_route: None,
641 from: "display-agent".into(),
642 content: InteractionContent::Message {
643 body: "hello".into(),
644 blocks: None,
645 },
646 id,
647 rendered_text: "stale rendered text".into(),
648 handling_mode: meerkat_core::types::HandlingMode::Queue,
649 render_metadata: None,
650 },
651 };
652
653 let result =
654 classified_interaction_to_runtime_input(&classified, &LogicalRuntimeId::new("test"));
655 assert!(
656 matches!(
657 result,
658 Err(PeerIngressProjectionError::MissingCanonicalPeerId { interaction_id })
659 if interaction_id == id
660 ),
661 "display-only ingress must fail closed, got {result:?}"
662 );
663 }
664
665 #[test]
666 fn request_body_preserves_rendered_text_and_structured_payload() {
667 let interaction = InboxInteraction {
668 from_route: None,
669 from: "event:webhook".into(),
670 content: InteractionContent::Request {
671 intent: "mob.peer_added".into(),
672 params: serde_json::json!({"peer":"agent-1"}),
673 blocks: None,
674 },
675 id: make_interaction_id(),
676 rendered_text: "stale rendered text".into(),
677 handling_mode: meerkat_core::types::HandlingMode::Queue,
678 render_metadata: None,
679 };
680 let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
681 if let Input::Peer(peer) = input {
682 assert_eq!(peer.content.text_content(), "stale rendered text");
683 assert_eq!(peer.payload, Some(serde_json::json!({"peer":"agent-1"})));
684 } else {
685 panic!("Expected PeerInput");
686 }
687 }
688
689 #[test]
690 fn message_blocks_are_preserved_on_peer_input() {
691 let blocks = vec![
692 meerkat_core::types::ContentBlock::Text {
693 text: "see image".into(),
694 },
695 meerkat_core::types::ContentBlock::Image {
696 media_type: "image/png".into(),
697 data: "abc".into(),
698 },
699 ];
700 let interaction = InboxInteraction {
701 from_route: None,
702 from: "peer-1".into(),
703 content: InteractionContent::Message {
704 body: "see image".into(),
705 blocks: Some(blocks.clone()),
706 },
707 id: make_interaction_id(),
708 rendered_text: "stale rendered text".into(),
709 handling_mode: meerkat_core::types::HandlingMode::Queue,
710 render_metadata: None,
711 };
712 let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
713 if let Input::Peer(peer) = input {
714 assert_eq!(
717 peer.content,
718 meerkat_core::types::ContentInput::Blocks(blocks)
719 );
720 } else {
721 panic!("Expected PeerInput");
722 }
723 }
724
725 #[test]
726 fn request_blocks_are_preserved_on_peer_input() {
727 let blocks = vec![
728 meerkat_core::types::ContentBlock::Text {
729 text: "describe this image".into(),
730 },
731 meerkat_core::types::ContentBlock::Image {
732 media_type: "image/png".into(),
733 data: "abc".into(),
734 },
735 ];
736 let interaction_id = make_interaction_id();
737 let peer_id = PeerId::new();
738 let classified = PeerInputCandidate {
739 interaction: InboxInteraction {
740 from_route: Some(peer_id),
741 from: "vision-peer".into(),
742 content: InteractionContent::Request {
743 intent: "checksum_token".into(),
744 params: serde_json::json!({"subject": "describe-image"}),
745 blocks: Some(blocks.clone()),
746 },
747 id: interaction_id,
748 rendered_text: String::new(),
749 handling_mode: meerkat_core::types::HandlingMode::Steer,
750 render_metadata: None,
751 },
752 ingress: PeerIngressFact::peer(
753 interaction_id,
754 PeerInputClass::ActionableRequest,
755 PeerIngressKind::Request,
756 Some(meerkat_core::interaction::PeerIngressAuthDecision::Required),
757 PeerIngressIdentity::new(
758 peer_id,
759 "vision-peer",
760 meerkat_core::interaction::PeerIngressConvention::Request {
761 request_id: interaction_id.to_string(),
762 intent: "checksum_token".to_string(),
763 },
764 ),
765 ),
766 lifecycle_peer: None,
767 response_terminality: None,
768 };
769
770 let input = classified_interaction_to_runtime_input(
771 &classified,
772 &LogicalRuntimeId::new("runtime-a"),
773 )
774 .expect("classified request should project");
775 if let Input::Peer(peer) = input {
776 assert_eq!(
777 peer.content,
778 meerkat_core::types::ContentInput::Blocks(blocks)
779 );
780 assert_eq!(
781 peer.payload,
782 Some(serde_json::json!({"subject": "describe-image"}))
783 );
784 } else {
785 panic!("Expected PeerInput");
786 }
787 }
788
789 #[test]
790 fn multimodal_message_blocks_own_content_with_derived_text_projection() {
791 let blocks = vec![
792 meerkat_core::types::ContentBlock::Text {
793 text: "caption text".into(),
794 },
795 meerkat_core::types::ContentBlock::Image {
796 media_type: "image/png".into(),
797 data: "abc".into(),
798 },
799 ];
800 let interaction = InboxInteraction {
801 from_route: None,
802 from: "peer-1".into(),
803 content: InteractionContent::Message {
804 body: "please inspect this image".into(),
805 blocks: Some(blocks.clone()),
806 },
807 id: make_interaction_id(),
808 rendered_text: "stale rendered text".into(),
809 handling_mode: meerkat_core::types::HandlingMode::Queue,
810 render_metadata: None,
811 };
812 let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
813 if let Input::Peer(peer) = input {
814 assert_eq!(
817 peer.content,
818 meerkat_core::types::ContentInput::Blocks(blocks)
819 );
820 assert_eq!(
821 peer.content.text_content(),
822 "caption text\n[image: image/png]"
823 );
824 } else {
825 panic!("Expected PeerInput");
826 }
827 }
828
829 #[test]
830 fn plain_event_blocks_are_preserved_on_external_event_input() {
831 let blocks = vec![
832 meerkat_core::types::ContentBlock::Text {
833 text: "see image".into(),
834 },
835 meerkat_core::types::ContentBlock::Image {
836 media_type: "image/png".into(),
837 data: "abc".into(),
838 },
839 ];
840 let id = make_interaction_id();
841 let classified = PeerInputCandidate {
842 lifecycle_peer: None,
843 response_terminality: None,
844 ingress: plain_event_ingress(id, "webhook"),
845 interaction: InboxInteraction {
846 from_route: None,
847 from: "event:webhook".into(),
848 content: InteractionContent::Message {
849 body: "see image".into(),
850 blocks: Some(blocks.clone()),
851 },
852 id,
853 rendered_text: "stale rendered text".into(),
854 handling_mode: meerkat_core::types::HandlingMode::Queue,
855 render_metadata: None,
856 },
857 };
858 let input =
859 classified_interaction_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
860 .expect("plain event with blocks should project");
861 match input {
862 Input::ExternalEvent(event) => {
863 assert_eq!(event.payload["body"], "see image");
864 assert!(event.payload.get("blocks").is_none());
865 assert_eq!(event.blocks, Some(blocks));
866 assert_eq!(
867 event.handling_mode,
868 meerkat_core::types::HandlingMode::Queue
869 );
870 assert_eq!(event.render_metadata, None);
871 }
872 other => panic!("Expected ExternalEvent input, got {other:?}"),
873 }
874 }
875
876 #[test]
877 fn plain_event_preserves_handling_mode_and_render_metadata() {
878 let render_metadata = meerkat_core::types::RenderMetadata {
879 class: meerkat_core::types::RenderClass::ExternalEvent,
880 salience: meerkat_core::types::RenderSalience::Urgent,
881 };
882 let id = make_interaction_id();
883 let classified = PeerInputCandidate {
884 lifecycle_peer: None,
885 response_terminality: None,
886 ingress: plain_event_ingress(id, "webhook"),
887 interaction: InboxInteraction {
888 from_route: None,
889 from: "event:webhook".into(),
890 content: InteractionContent::Message {
891 body: "urgent".into(),
892 blocks: None,
893 },
894 id,
895 rendered_text: "stale rendered text".into(),
896 handling_mode: meerkat_core::types::HandlingMode::Steer,
897 render_metadata: Some(render_metadata.clone()),
898 },
899 };
900
901 match classified_interaction_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
902 .expect("plain event should preserve render metadata")
903 {
904 Input::ExternalEvent(event) => {
905 assert_eq!(
906 event.handling_mode,
907 meerkat_core::types::HandlingMode::Steer
908 );
909 assert_eq!(event.render_metadata, Some(render_metadata));
910 }
911 other => panic!("Expected ExternalEvent input, got {other:?}"),
912 }
913 }
914
915 #[test]
916 fn response_completed_to_terminal() {
917 let in_reply_to = make_interaction_id();
918 let route_id = meerkat_core::comms::PeerId::from_uuid(
919 uuid::Uuid::parse_str("018f6f79-7a82-7c4e-a552-a3b86f9630f2").unwrap(),
920 );
921 let interaction = InboxInteraction {
922 from_route: Some(route_id),
923 from: "Peer One".into(),
924 content: InteractionContent::Response {
925 status: ResponseStatus::Completed,
926 result: serde_json::json!({"ok": true}),
927 in_reply_to,
928 blocks: None,
929 },
930 id: make_interaction_id(),
931 rendered_text: String::new(),
932 handling_mode: meerkat_core::types::HandlingMode::Queue,
933 render_metadata: None,
934 };
935 let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
936 if let Input::Peer(p) = &input {
937 match &p.header.source {
938 InputOrigin::Peer {
939 peer_id,
940 display_identity,
941 ..
942 } => {
943 assert_eq!(peer_id, &route_id.to_string());
944 assert_eq!(display_identity.as_deref(), Some("Peer One"));
945 }
946 other => panic!("Expected Peer source, got {other:?}"),
947 }
948 assert!(matches!(
949 p.convention,
950 Some(PeerConvention::ResponseTerminal {
951 status: ResponseTerminalStatus::Completed,
952 ..
953 })
954 ));
955 assert_eq!(p.header.durability, InputDurability::Durable);
956 assert_eq!(
957 p.payload,
958 Some(serde_json::json!({"ok": true})),
959 "terminal response result must remain structured on PeerInput so runtime prompt projection stays runtime-owned"
960 );
961 } else {
962 panic!("Expected PeerInput");
963 }
964 let projection = crate::input::runtime_input_projection_for_machine_batch(&input);
965 let context = projection
966 .context_append
967 .expect("terminal machine-batch context projection");
968 let expected_key = format!("peer_response_terminal:{route_id}:{in_reply_to}");
969 assert_eq!(context.key, expected_key);
970 let meerkat_core::lifecycle::run_primitive::CoreRenderable::SystemNotice { blocks, .. } =
971 context.content
972 else {
973 panic!("Expected terminal context notice");
974 };
975 assert!(matches!(
976 blocks.first(),
977 Some(meerkat_core::types::SystemNoticeBlock::Comms { peer, .. })
978 if peer.as_ref().and_then(|peer| peer.display_name.as_deref()) == Some("Peer One")
979 ));
980 }
981
982 #[test]
983 fn classified_response_uses_ingress_terminal_class() {
984 let in_reply_to = make_interaction_id();
985 let id = make_interaction_id();
986 let classified = PeerInputCandidate {
987 interaction: InboxInteraction {
988 from_route: None,
989 from: "peer-1".into(),
990 content: InteractionContent::Response {
991 status: ResponseStatus::Completed,
992 result: serde_json::json!({"ok": true}),
993 in_reply_to,
994 blocks: None,
995 },
996 id,
997 rendered_text: String::new(),
998 handling_mode: meerkat_core::types::HandlingMode::Queue,
999 render_metadata: None,
1000 },
1001 ingress: PeerIngressFact::peer(
1002 id,
1003 PeerInputClass::ResponseProgress,
1004 meerkat_core::PeerIngressKind::Response,
1005 Some(meerkat_core::PeerIngressAuthDecision::Required),
1006 PeerIngressIdentity::new(
1007 test_peer_id(),
1008 "peer-1",
1009 PeerIngressConvention::Response {
1010 in_reply_to,
1011 status: ResponseStatus::Completed,
1012 },
1013 ),
1014 ),
1015 lifecycle_peer: None,
1016 response_terminality: Some(meerkat_core::TerminalityClass::Progress),
1017 };
1018
1019 let input =
1020 classified_interaction_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
1021 .expect("classified response should project");
1022 if let Input::Peer(peer) = input {
1023 assert!(
1024 matches!(
1025 peer.convention,
1026 Some(PeerConvention::ResponseProgress { .. })
1027 ),
1028 "classified bridge must consume ingress-owned response class"
1029 );
1030 } else {
1031 panic!("Expected PeerInput");
1032 }
1033 }
1034
1035 #[test]
1036 fn classified_response_missing_machine_terminality_fails_closed() {
1037 let in_reply_to = make_interaction_id();
1038 let id = make_interaction_id();
1039 let classified = PeerInputCandidate {
1040 interaction: InboxInteraction {
1041 from_route: None,
1042 from: "peer-1".into(),
1043 content: InteractionContent::Response {
1044 status: ResponseStatus::Completed,
1045 result: serde_json::json!({"ok": true}),
1046 in_reply_to,
1047 blocks: None,
1048 },
1049 id,
1050 rendered_text: String::new(),
1051 handling_mode: meerkat_core::types::HandlingMode::Queue,
1052 render_metadata: None,
1053 },
1054 ingress: PeerIngressFact::peer(
1055 id,
1056 PeerInputClass::ResponseTerminal,
1057 meerkat_core::PeerIngressKind::Response,
1058 Some(meerkat_core::PeerIngressAuthDecision::Required),
1059 PeerIngressIdentity::new(
1060 test_peer_id(),
1061 "peer-1",
1062 PeerIngressConvention::Response {
1063 in_reply_to,
1064 status: ResponseStatus::Completed,
1065 },
1066 ),
1067 ),
1068 lifecycle_peer: None,
1069 response_terminality: None,
1070 };
1071
1072 let result =
1073 classified_interaction_to_runtime_input(&classified, &LogicalRuntimeId::new("test"));
1074 assert!(
1075 matches!(
1076 result,
1077 Err(PeerIngressProjectionError::MissingResponseTerminality { interaction_id })
1078 if interaction_id == id
1079 ),
1080 "runtime projection must not infer public terminality from raw status: {result:?}"
1081 );
1082 }
1083
1084 #[test]
1085 fn response_terminal_without_canonical_peer_id_fails_typed_projection() {
1086 let in_reply_to = make_interaction_id();
1087 let interaction_id = make_interaction_id();
1088 let candidate = PeerInputCandidate {
1089 interaction: InboxInteraction {
1090 from_route: None,
1091 from: "Peer One".into(),
1092 content: InteractionContent::Response {
1093 status: ResponseStatus::Completed,
1094 result: serde_json::json!({"ok": true}),
1095 in_reply_to,
1096 blocks: None,
1097 },
1098 id: interaction_id,
1099 rendered_text: String::new(),
1100 handling_mode: meerkat_core::types::HandlingMode::Queue,
1101 render_metadata: None,
1102 },
1103 ingress: PeerIngressFact {
1104 interaction_id,
1105 class: PeerInputClass::ResponseTerminal,
1106 kind: meerkat_core::PeerIngressKind::Response,
1107 canonical_peer_id: None,
1108 display_name: meerkat_core::comms::PeerName::new("Peer One".to_string()).ok(),
1109 signing_pubkey: None,
1110 route: None,
1111 auth: Some(meerkat_core::PeerIngressAuthDecision::Required),
1112 convention: PeerIngressConvention::Response {
1113 in_reply_to,
1114 status: ResponseStatus::Completed,
1115 },
1116 },
1117 lifecycle_peer: None,
1118 response_terminality: Some(meerkat_core::TerminalityClass::Terminal {
1119 disposition: meerkat_core::TerminalDisposition::Completed,
1120 }),
1121 };
1122 let err =
1123 classified_interaction_to_runtime_input(&candidate, &LogicalRuntimeId::new("test"))
1124 .unwrap_err();
1125 assert!(matches!(
1126 err,
1127 PeerIngressProjectionError::MissingCanonicalPeerId { .. }
1128 ));
1129 }
1130
1131 #[test]
1132 fn response_failed_to_terminal() {
1133 let in_reply_to = make_interaction_id();
1134 let route_id = meerkat_core::comms::PeerId::from_uuid(
1135 uuid::Uuid::parse_str("018f6f79-7a82-7c4e-a552-a3b86f9630f3").unwrap(),
1136 );
1137 let interaction = InboxInteraction {
1138 from_route: Some(route_id),
1139 from: "peer-1".into(),
1140 content: InteractionContent::Response {
1141 status: ResponseStatus::Failed,
1142 result: serde_json::json!({"error": "timeout"}),
1143 in_reply_to,
1144 blocks: None,
1145 },
1146 id: make_interaction_id(),
1147 rendered_text: String::new(),
1148 handling_mode: meerkat_core::types::HandlingMode::Queue,
1149 render_metadata: None,
1150 };
1151 let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
1152 if let Input::Peer(p) = &input {
1153 assert!(matches!(
1154 p.convention,
1155 Some(PeerConvention::ResponseTerminal {
1156 status: ResponseTerminalStatus::Failed,
1157 ..
1158 })
1159 ));
1160 } else {
1161 panic!("Expected PeerInput");
1162 }
1163 }
1164
1165 #[test]
1166 fn response_accepted_to_progress() {
1167 let in_reply_to = make_interaction_id();
1168 let interaction = InboxInteraction {
1169 from_route: None,
1170 from: "peer-1".into(),
1171 content: InteractionContent::Response {
1172 status: ResponseStatus::Accepted,
1173 result: serde_json::json!(null),
1174 in_reply_to,
1175 blocks: None,
1176 },
1177 id: make_interaction_id(),
1178 rendered_text: String::new(),
1179 handling_mode: meerkat_core::types::HandlingMode::Queue,
1180 render_metadata: None,
1181 };
1182 let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("test"));
1183 if let Input::Peer(p) = &input {
1184 assert!(matches!(
1185 p.convention,
1186 Some(PeerConvention::ResponseProgress {
1187 phase: ResponseProgressPhase::Accepted,
1188 ..
1189 })
1190 ));
1191 assert_eq!(p.header.durability, InputDurability::Ephemeral);
1192 assert!(
1193 p.handling_mode.is_none(),
1194 "ResponseProgress inputs must not carry handling_mode"
1195 );
1196 } else {
1197 panic!("Expected PeerInput");
1198 }
1199 }
1200
1201 #[test]
1202 fn classified_response_uses_ingress_terminality_over_raw_status() {
1203 let in_reply_to = make_interaction_id();
1204 let id = make_interaction_id();
1205 let classified = PeerInputCandidate {
1206 interaction: InboxInteraction {
1207 from_route: None,
1208 from: "peer-1".into(),
1209 content: InteractionContent::Response {
1210 status: ResponseStatus::Completed,
1211 result: serde_json::json!({"ok": true}),
1212 in_reply_to,
1213 blocks: None,
1214 },
1215 id,
1216 rendered_text: String::new(),
1217 handling_mode: meerkat_core::types::HandlingMode::Queue,
1218 render_metadata: None,
1219 },
1220 ingress: PeerIngressFact::peer(
1221 id,
1222 PeerInputClass::ResponseProgress,
1223 meerkat_core::PeerIngressKind::Response,
1224 Some(meerkat_core::PeerIngressAuthDecision::Required),
1225 PeerIngressIdentity::new(
1226 test_peer_id(),
1227 "peer-1",
1228 PeerIngressConvention::Response {
1229 in_reply_to,
1230 status: ResponseStatus::Completed,
1231 },
1232 ),
1233 ),
1234 lifecycle_peer: None,
1235 response_terminality: Some(meerkat_core::TerminalityClass::Progress),
1236 };
1237
1238 let input =
1239 classified_interaction_to_runtime_input(&classified, &LogicalRuntimeId::new("test"))
1240 .expect("classified response should project");
1241
1242 if let Input::Peer(p) = &input {
1243 assert!(matches!(
1244 p.convention,
1245 Some(PeerConvention::ResponseProgress {
1246 phase: ResponseProgressPhase::Accepted,
1247 ..
1248 })
1249 ));
1250 assert_eq!(p.header.durability, InputDurability::Ephemeral);
1251 assert_eq!(p.handling_mode, None);
1252 } else {
1253 panic!("Expected PeerInput");
1254 }
1255 }
1256
1257 #[test]
1258 fn peer_source_includes_runtime_id() {
1259 let interaction = InboxInteraction {
1260 from_route: None,
1261 from: "peer-1".into(),
1262 content: InteractionContent::Message {
1263 body: "hi".into(),
1264 blocks: None,
1265 },
1266 id: make_interaction_id(),
1267 rendered_text: String::new(),
1268 handling_mode: meerkat_core::types::HandlingMode::Queue,
1269 render_metadata: None,
1270 };
1271 let input = peer_input_for_test(&interaction, &LogicalRuntimeId::new("agent-runtime-1"));
1272 if let Input::Peer(p) = &input {
1273 if let InputOrigin::Peer {
1274 peer_id,
1275 display_identity,
1276 runtime_id,
1277 ..
1278 } = &p.header.source
1279 {
1280 assert_eq!(peer_id, &test_peer_id().as_str());
1281 assert_eq!(display_identity.as_deref(), Some("peer-1"));
1282 assert_eq!(runtime_id.as_ref().unwrap().0, "agent-runtime-1");
1283 } else {
1284 panic!("Expected Peer source");
1285 }
1286 } else {
1287 panic!("Expected PeerInput");
1288 }
1289 }
1290
1291 #[test]
1292 fn all_interaction_types_produce_valid_inputs() {
1293 let in_reply_to = make_interaction_id();
1294 let interactions = vec![
1295 InboxInteraction {
1296 from_route: None,
1297 from: "p".into(),
1298 content: InteractionContent::Message {
1299 body: "m".into(),
1300 blocks: None,
1301 },
1302 id: make_interaction_id(),
1303 rendered_text: String::new(),
1304 handling_mode: meerkat_core::types::HandlingMode::Queue,
1305 render_metadata: None,
1306 },
1307 InboxInteraction {
1308 from_route: None,
1309 from: "p".into(),
1310 content: InteractionContent::Request {
1311 intent: "i".into(),
1312 params: serde_json::json!({}),
1313 blocks: None,
1314 },
1315 id: make_interaction_id(),
1316 rendered_text: String::new(),
1317 handling_mode: meerkat_core::types::HandlingMode::Queue,
1318 render_metadata: None,
1319 },
1320 InboxInteraction {
1321 from_route: Some(meerkat_core::comms::PeerId::from_uuid(
1322 uuid::Uuid::parse_str("018f6f79-7a82-7c4e-a552-a3b86f9630f6").unwrap(),
1323 )),
1324 from: "p".into(),
1325 content: InteractionContent::Response {
1326 status: ResponseStatus::Completed,
1327 result: serde_json::json!(null),
1328 in_reply_to,
1329 blocks: None,
1330 },
1331 id: make_interaction_id(),
1332 rendered_text: String::new(),
1333 handling_mode: meerkat_core::types::HandlingMode::Queue,
1334 render_metadata: None,
1335 },
1336 ];
1337
1338 let rid = LogicalRuntimeId::new("test");
1339 for interaction in &interactions {
1340 let input = peer_input_for_test(interaction, &rid);
1341 assert!(matches!(input, Input::Peer(_)));
1342 }
1343 }
1344}