1use chrono::{DateTime, Utc};
8use meerkat_core::lifecycle::InputId;
9use meerkat_core::lifecycle::run_primitive::{
10 ConversationAppend, ConversationAppendRole, ConversationContextAppend, CoreRenderable,
11 RuntimeTurnMetadata,
12};
13use meerkat_core::ops::{OpEvent, OperationId};
14use meerkat_core::types::{
15 HandlingMode, SystemNoticeBlock, SystemNoticeDirection, SystemNoticeKind, SystemNoticePeer,
16};
17use meerkat_core::{
18 BlobStore, BlobStoreError, MissingBlobBehavior, PeerConversationProjection,
19 PeerResponseProgressProjectionPhase, PeerResponseTerminalCorrelationId,
20 PeerResponseTerminalDisplayIdentity, PeerResponseTerminalFact, PeerResponseTerminalFactError,
21 PeerResponseTerminalProjectionStatus, PeerResponseTerminalRenderPayload,
22 PeerResponseTerminalRouteIdentity, PeerResponseTerminalSource,
23 PeerResponseTerminalTransportIdentity, externalize_content_blocks, hydrate_content_blocks,
24};
25use serde::{Deserialize, Serialize};
26
27use crate::identifiers::{
28 CorrelationId, IdempotencyKey, InputKind, KindId, LogicalRuntimeId, SupersessionKey,
29};
30use meerkat_core::types::RenderMetadata;
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct InputHeader {
35 pub id: InputId,
37 pub timestamp: DateTime<Utc>,
39 pub source: InputOrigin,
41 pub durability: InputDurability,
43 pub visibility: InputVisibility,
45 #[serde(skip_serializing_if = "Option::is_none")]
47 pub idempotency_key: Option<IdempotencyKey>,
48 #[serde(skip_serializing_if = "Option::is_none")]
50 pub supersession_key: Option<SupersessionKey>,
51 #[serde(skip_serializing_if = "Option::is_none")]
53 pub correlation_id: Option<CorrelationId>,
54}
55
56#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
58#[serde(tag = "type", rename_all = "snake_case")]
59#[non_exhaustive]
60pub enum InputOrigin {
61 Operator,
63 Peer {
65 peer_id: String,
68 #[serde(default, skip_serializing_if = "Option::is_none")]
72 display_identity: Option<String>,
73 #[serde(skip_serializing_if = "Option::is_none")]
74 runtime_id: Option<LogicalRuntimeId>,
75 },
76 Flow { flow_id: String, step_index: usize },
78 System,
80 External { source_name: String },
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
86#[serde(rename_all = "snake_case")]
87#[non_exhaustive]
88pub enum InputDurability {
89 Durable,
91 Ephemeral,
93 Derived,
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
99pub struct InputVisibility {
100 pub transcript_eligible: bool,
102 pub operator_eligible: bool,
104}
105
106impl Default for InputVisibility {
107 fn default() -> Self {
108 Self {
109 transcript_eligible: true,
110 operator_eligible: true,
111 }
112 }
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117#[serde(tag = "input_type", rename_all = "snake_case")]
118#[non_exhaustive]
119pub enum Input {
120 Prompt(PromptInput),
122 Peer(PeerInput),
124 FlowStep(FlowStepInput),
126 ExternalEvent(ExternalEventInput),
128 #[serde(alias = "system_generated")]
130 Continuation(ContinuationInput),
131 #[serde(alias = "projected")]
133 Operation(OperationInput),
134}
135
136impl Input {
137 pub fn header(&self) -> &InputHeader {
139 match self {
140 Input::Prompt(i) => &i.header,
141 Input::Peer(i) => &i.header,
142 Input::FlowStep(i) => &i.header,
143 Input::ExternalEvent(i) => &i.header,
144 Input::Continuation(i) => &i.header,
145 Input::Operation(i) => &i.header,
146 }
147 }
148
149 pub fn id(&self) -> &InputId {
151 &self.header().id
152 }
153
154 pub fn kind(&self) -> InputKind {
156 match self {
157 Input::Prompt(_) => InputKind::Prompt,
158 Input::Peer(p) => match &p.convention {
159 Some(PeerConvention::Message) | None => InputKind::PeerMessage,
160 Some(PeerConvention::Request { .. }) => InputKind::PeerRequest,
161 Some(PeerConvention::ResponseProgress { .. }) => InputKind::PeerResponseProgress,
162 Some(PeerConvention::ResponseTerminal { .. }) => InputKind::PeerResponseTerminal,
163 },
164 Input::FlowStep(_) => InputKind::FlowStep,
165 Input::ExternalEvent(_) => InputKind::ExternalEvent,
166 Input::Continuation(_) => InputKind::Continuation,
167 Input::Operation(_) => InputKind::Operation,
168 }
169 }
170
171 pub fn kind_id(&self) -> KindId {
173 KindId::new(self.kind())
174 }
175
176 pub fn handling_mode(&self) -> Option<HandlingMode> {
178 match self {
179 Input::Prompt(prompt) => prompt.turn_metadata.as_ref()?.handling_mode,
180 Input::FlowStep(flow_step) => flow_step.turn_metadata.as_ref()?.handling_mode,
181 Input::ExternalEvent(event) => Some(event.handling_mode),
182 Input::Continuation(continuation) => Some(continuation.handling_mode),
183 Input::Peer(peer) => peer.handling_mode,
184 Input::Operation(_) => None,
185 }
186 }
187}
188
189fn migrate_legacy_payload_blocks(event: &mut ExternalEventInput) -> Result<(), BlobStoreError> {
190 let Some(obj) = event.payload.as_object_mut() else {
191 return Ok(());
192 };
193 let Some(blocks_value) = obj.remove("blocks") else {
194 return Ok(());
195 };
196 if event.blocks.is_some() {
197 return Ok(());
198 }
199 let blocks = serde_json::from_value::<Vec<meerkat_core::types::ContentBlock>>(blocks_value)
200 .map_err(|err| {
201 BlobStoreError::Internal(format!("failed to decode payload blocks: {err}"))
202 })?;
203 event.blocks = Some(blocks);
204 Ok(())
205}
206
207pub async fn externalize_input_images(
208 blob_store: &dyn BlobStore,
209 input: &mut Input,
210) -> Result<(), BlobStoreError> {
211 match input {
212 Input::Prompt(prompt) => {
213 if let Some(blocks) = prompt.blocks.as_mut() {
214 externalize_content_blocks(blob_store, blocks).await?;
215 }
216 }
217 Input::Peer(peer) => {
218 if let Some(blocks) = peer.blocks.as_mut() {
219 externalize_content_blocks(blob_store, blocks).await?;
220 }
221 }
222 Input::FlowStep(flow_step) => {
223 if let Some(blocks) = flow_step.blocks.as_mut() {
224 externalize_content_blocks(blob_store, blocks).await?;
225 }
226 }
227 Input::ExternalEvent(event) => {
228 migrate_legacy_payload_blocks(event)?;
229 if let Some(blocks) = event.blocks.as_mut() {
230 externalize_content_blocks(blob_store, blocks).await?;
231 }
232 }
233 Input::Continuation(_) | Input::Operation(_) => {}
234 }
235 Ok(())
236}
237
238pub async fn hydrate_input_images(
239 blob_store: &dyn BlobStore,
240 input: &mut Input,
241 missing_behavior: MissingBlobBehavior,
242) -> Result<(), BlobStoreError> {
243 match input {
244 Input::Prompt(prompt) => {
245 if let Some(blocks) = prompt.blocks.as_mut() {
246 hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
247 }
248 }
249 Input::Peer(peer) => {
250 if let Some(blocks) = peer.blocks.as_mut() {
251 hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
252 }
253 }
254 Input::FlowStep(flow_step) => {
255 if let Some(blocks) = flow_step.blocks.as_mut() {
256 hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
257 }
258 }
259 Input::ExternalEvent(event) => {
260 migrate_legacy_payload_blocks(event)?;
261 if let Some(blocks) = event.blocks.as_mut() {
262 hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
263 }
264 }
265 Input::Continuation(_) | Input::Operation(_) => {}
266 }
267 Ok(())
268}
269
270#[derive(Debug, Clone, Serialize, Deserialize)]
272pub struct PromptInput {
273 pub header: InputHeader,
274 pub text: String,
276 #[serde(default, skip_serializing_if = "Option::is_none")]
279 pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
280 #[serde(default, skip_serializing_if = "Vec::is_empty")]
285 pub typed_turn_appends: Vec<ConversationAppend>,
286 #[serde(default, skip_serializing_if = "Option::is_none")]
287 pub turn_metadata: Option<RuntimeTurnMetadata>,
288}
289
290impl PromptInput {
291 pub fn new(text: impl Into<String>, turn_metadata: Option<RuntimeTurnMetadata>) -> Self {
293 Self {
294 header: InputHeader {
295 id: meerkat_core::lifecycle::InputId::new(),
296 timestamp: chrono::Utc::now(),
297 source: InputOrigin::Operator,
298 durability: InputDurability::Durable,
299 visibility: InputVisibility::default(),
300 idempotency_key: None,
301 supersession_key: None,
302 correlation_id: None,
303 },
304 text: text.into(),
305 blocks: None,
306 typed_turn_appends: Vec::new(),
307 turn_metadata,
308 }
309 }
310
311 pub fn from_content_input(
313 input: meerkat_core::types::ContentInput,
314 turn_metadata: Option<RuntimeTurnMetadata>,
315 ) -> Self {
316 let text = input.text_content();
317 let blocks = if input.has_images() {
318 Some(input.into_blocks())
319 } else {
320 None
321 };
322 Self {
323 header: InputHeader {
324 id: meerkat_core::lifecycle::InputId::new(),
325 timestamp: chrono::Utc::now(),
326 source: InputOrigin::Operator,
327 durability: InputDurability::Durable,
328 visibility: InputVisibility::default(),
329 idempotency_key: None,
330 supersession_key: None,
331 correlation_id: None,
332 },
333 text,
334 blocks,
335 typed_turn_appends: Vec::new(),
336 turn_metadata,
337 }
338 }
339}
340
341#[derive(Debug, Clone, Serialize, Deserialize)]
343pub struct PeerInput {
344 pub header: InputHeader,
345 #[serde(skip_serializing_if = "Option::is_none")]
347 pub convention: Option<PeerConvention>,
348 pub body: String,
354 #[serde(default, skip_serializing_if = "Option::is_none")]
359 pub payload: Option<serde_json::Value>,
360 #[serde(default, skip_serializing_if = "Option::is_none")]
363 pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
364 #[serde(default, skip_serializing_if = "Option::is_none")]
370 pub handling_mode: Option<HandlingMode>,
371}
372
373#[derive(Debug, Clone, Serialize, Deserialize)]
375#[serde(tag = "convention_type", rename_all = "snake_case")]
376#[non_exhaustive]
377pub enum PeerConvention {
378 Message,
380 Request { request_id: String, intent: String },
382 ResponseProgress {
384 request_id: String,
385 phase: ResponseProgressPhase,
386 },
387 ResponseTerminal {
389 request_id: String,
390 status: ResponseTerminalStatus,
391 },
392}
393
394pub type ResponseProgressPhase = PeerResponseProgressProjectionPhase;
397
398pub type ResponseTerminalStatus = PeerResponseTerminalProjectionStatus;
401
402pub fn response_terminal_status_from_wire(
403 status: meerkat_contracts::PeerResponseTerminalStatusWire,
404) -> ResponseTerminalStatus {
405 match status {
406 meerkat_contracts::PeerResponseTerminalStatusWire::Completed => {
407 PeerResponseTerminalProjectionStatus::Completed
408 }
409 meerkat_contracts::PeerResponseTerminalStatusWire::Failed => {
410 PeerResponseTerminalProjectionStatus::Failed
411 }
412 meerkat_contracts::PeerResponseTerminalStatusWire::Cancelled => {
413 PeerResponseTerminalProjectionStatus::Cancelled
414 }
415 }
416}
417
418pub fn peer_response_terminal_input(
419 peer_id: meerkat_core::comms::PeerId,
420 display_name: Option<meerkat_core::comms::PeerName>,
421 request_id: meerkat_core::PeerCorrelationId,
422 status: meerkat_contracts::PeerResponseTerminalStatusWire,
423 result: serde_json::Value,
424) -> Input {
425 let correlation_id = CorrelationId::from_uuid(request_id.as_uuid());
426 let request_id = request_id.to_string();
427 let peer_id = peer_id.to_string();
428 let display_identity = display_name.map_or_else(|| peer_id.clone(), |name| name.as_string());
429
430 Input::Peer(PeerInput {
431 header: InputHeader {
432 id: InputId::new(),
433 timestamp: Utc::now(),
434 source: InputOrigin::Peer {
435 peer_id,
436 display_identity: Some(display_identity),
437 runtime_id: None,
438 },
439 durability: InputDurability::Durable,
440 visibility: InputVisibility::default(),
441 idempotency_key: None,
442 supersession_key: None,
443 correlation_id: Some(correlation_id),
444 },
445 convention: Some(PeerConvention::ResponseTerminal {
446 request_id,
447 status: response_terminal_status_from_wire(status),
448 }),
449 body: String::new(),
450 payload: Some(result),
451 blocks: None,
452 handling_mode: None,
453 })
454}
455
456#[derive(Debug, Clone, Serialize, Deserialize)]
458pub struct FlowStepInput {
459 pub header: InputHeader,
460 pub step_id: String,
462 pub instructions: String,
464 #[serde(default, skip_serializing_if = "Option::is_none")]
468 pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
469 #[serde(default, skip_serializing_if = "Option::is_none")]
470 pub turn_metadata: Option<RuntimeTurnMetadata>,
471}
472
473#[derive(Debug, Clone, Serialize, Deserialize)]
475pub struct ExternalEventInput {
476 pub header: InputHeader,
477 pub event_type: String,
479 pub payload: serde_json::Value,
483 #[serde(default, skip_serializing_if = "Option::is_none")]
486 pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
487 #[serde(default)]
489 pub handling_mode: HandlingMode,
490 #[serde(default, skip_serializing_if = "Option::is_none")]
492 pub render_metadata: Option<RenderMetadata>,
493}
494
495#[derive(Debug, Clone, Serialize, Deserialize)]
499pub struct ContinuationInput {
500 pub header: InputHeader,
501 pub reason: String,
503 #[serde(default)]
505 pub handling_mode: HandlingMode,
506 #[serde(default, skip_serializing_if = "Option::is_none")]
508 pub request_id: Option<String>,
509}
510
511impl ContinuationInput {
512 pub fn detached_background_op_completed() -> Self {
518 Self {
519 header: InputHeader {
520 id: meerkat_core::lifecycle::InputId::new(),
521 timestamp: chrono::Utc::now(),
522 source: InputOrigin::System,
523 durability: InputDurability::Derived,
524 visibility: InputVisibility {
525 transcript_eligible: false,
526 operator_eligible: false,
527 },
528 idempotency_key: None,
529 supersession_key: None,
530 correlation_id: None,
531 },
532 reason: "detached_background_op_completed".to_string(),
533 handling_mode: HandlingMode::Steer,
534 request_id: None,
535 }
536 }
537}
538
539#[derive(Debug, Clone, Serialize, Deserialize)]
542pub struct OperationInput {
543 pub header: InputHeader,
544 pub operation_id: OperationId,
546 pub event: OpEvent,
548}
549
550pub(crate) fn peer_projection_from_peer_input(
557 peer: &PeerInput,
558) -> Option<PeerConversationProjection> {
559 peer_projection_from_peer_input_with_id(peer, peer_canonical_id(peer)?.as_str())
560}
561
562fn peer_projection_from_peer_input_with_id(
563 peer: &PeerInput,
564 peer_id: &str,
565) -> Option<PeerConversationProjection> {
566 let peer_id = peer_id.to_string();
567
568 match &peer.convention {
569 Some(PeerConvention::Message) => Some(PeerConversationProjection::Message { peer_id }),
570 Some(PeerConvention::Request { request_id, intent }) => {
571 let peer_id = match meerkat_core::comms::PeerId::parse(peer_id.as_str()) {
572 Ok(peer_id) => peer_id,
573 Err(error) => {
574 tracing::warn!(
575 peer_id,
576 error = %error,
577 "dropping peer request projection with non-canonical peer_id"
578 );
579 return None;
580 }
581 };
582 Some(PeerConversationProjection::Request {
583 peer_id,
584 display_name: peer_display_label(peer),
585 request_id: request_id.clone(),
586 intent: intent.clone(),
587 payload: peer.payload.clone(),
588 })
589 }
590 Some(PeerConvention::ResponseProgress { request_id, phase }) => {
591 Some(PeerConversationProjection::ResponseProgress {
592 peer_id,
593 request_id: request_id.clone(),
594 phase: *phase,
595 payload: peer.payload.clone(),
596 })
597 }
598 Some(PeerConvention::ResponseTerminal { .. }) => None,
599 None => None,
600 }
601}
602
603pub(crate) fn peer_response_terminal_fact(
604 peer: &PeerInput,
605) -> Result<Option<PeerResponseTerminalFact>, PeerResponseTerminalFactError> {
606 let InputOrigin::Peer {
607 peer_id,
608 display_identity,
609 runtime_id,
610 } = &peer.header.source
611 else {
612 return Ok(None);
613 };
614 let Some(PeerConvention::ResponseTerminal { request_id, status }) = &peer.convention else {
615 return Ok(None);
616 };
617
618 let transport_identity = runtime_id
619 .as_ref()
620 .map(ToString::to_string)
621 .map(PeerResponseTerminalTransportIdentity::parse)
622 .transpose()?;
623 let source = PeerResponseTerminalSource::new(
624 transport_identity,
625 PeerResponseTerminalRouteIdentity::parse(peer_id.clone())?,
626 PeerResponseTerminalDisplayIdentity::parse(
627 display_identity
628 .as_ref()
629 .ok_or(PeerResponseTerminalFactError::MissingDisplayIdentity)?
630 .clone(),
631 )?,
632 );
633 Ok(Some(PeerResponseTerminalFact::new(
634 source,
635 PeerResponseTerminalCorrelationId::parse(request_id)?,
636 *status,
637 PeerResponseTerminalRenderPayload::new(peer.payload.clone()),
638 )))
639}
640
641pub(crate) fn validate_peer_response_terminal_fact(
642 input: &Input,
643) -> Result<(), PeerResponseTerminalFactError> {
644 let Input::Peer(peer) = input else {
645 return Ok(());
646 };
647 peer_response_terminal_fact(peer).map(|_| ())
648}
649
650#[cfg(test)]
653pub(crate) fn peer_projection(input: &Input) -> Option<PeerConversationProjection> {
654 let Input::Peer(peer) = input else {
655 return None;
656 };
657 peer_projection_from_peer_input(peer)
658}
659
660fn peer_canonical_id(peer: &PeerInput) -> Option<String> {
661 let InputOrigin::Peer { peer_id, .. } = &peer.header.source else {
662 return None;
663 };
664 Some(peer_id.clone())
665}
666
667fn peer_display_label(peer: &PeerInput) -> Option<String> {
668 let InputOrigin::Peer {
669 display_identity, ..
670 } = &peer.header.source
671 else {
672 return None;
673 };
674
675 display_identity
676 .as_ref()
677 .map(|label| label.trim())
678 .filter(|label| !label.is_empty())
679 .map(ToOwned::to_owned)
680}
681
682pub(crate) fn peer_prompt_text(peer: &PeerInput) -> String {
684 peer_projection_from_peer_input(peer)
685 .map(|projection| {
686 let prompt = projection.prompt_text();
687 if prompt.is_empty() {
688 peer.body.clone()
689 } else {
690 prompt
691 }
692 })
693 .unwrap_or_else(|| peer.body.clone())
694}
695
696pub(crate) fn input_prompt_text(input: &Input) -> String {
697 match input {
698 Input::Prompt(p) => p.text.clone(),
699 Input::Peer(p) => peer_prompt_text(p),
700 Input::FlowStep(f) => f.instructions.clone(),
701 Input::ExternalEvent(e) => external_event_projection_text(e),
702 Input::Continuation(continuation) => format!("[Continuation] {}", continuation.reason),
703 Input::Operation(operation) => {
704 format!(
705 "[Operation {}] {:?}",
706 operation.operation_id, operation.event
707 )
708 }
709 }
710}
711
712fn external_event_projection_text(event: &ExternalEventInput) -> String {
713 let source_name = match &event.header.source {
714 InputOrigin::External { source_name } if !source_name.trim().is_empty() => {
715 source_name.as_str()
716 }
717 _ => event.event_type.as_str(),
718 };
719 let body = event
720 .payload
721 .get("body")
722 .and_then(serde_json::Value::as_str)
723 .map(str::trim);
724
725 meerkat_core::interaction::format_external_event_projection(source_name, body)
726}
727
728fn peer_notice_renderable(peer: &PeerInput) -> Option<CoreRenderable> {
729 let (peer_id, display_name) = match &peer.header.source {
730 InputOrigin::Peer {
731 peer_id,
732 display_identity,
733 ..
734 } => (peer_id.clone(), display_identity.clone()),
735 _ => return None,
736 };
737 let (kind, request_id, intent, status) = match &peer.convention {
738 Some(PeerConvention::Message) | None => ("message", None, None, None),
739 Some(PeerConvention::Request { request_id, intent }) => (
740 "request",
741 Some(request_id.clone()),
742 Some(intent.clone()),
743 None,
744 ),
745 Some(PeerConvention::ResponseProgress { request_id, phase }) => (
746 "response_progress",
747 Some(request_id.clone()),
748 None,
749 Some(format!("{phase:?}")),
750 ),
751 Some(PeerConvention::ResponseTerminal { request_id, status }) => (
752 "response_terminal",
753 Some(request_id.clone()),
754 None,
755 Some(format!("{status:?}")),
756 ),
757 };
758 let summary = match kind {
759 "request" => intent.as_ref().map_or_else(
760 || "Peer request".to_string(),
761 |intent| format!("Peer request: {intent}"),
762 ),
763 "response_progress" => "Peer response progress".to_string(),
764 "response_terminal" => "Peer response terminal".to_string(),
765 _ => "Peer message".to_string(),
766 };
767 let content = if let Some(blocks) = peer.blocks.clone() {
768 let body_already_in_blocks = blocks.iter().any(|block| {
769 matches!(block, meerkat_core::types::ContentBlock::Text { text } if text.trim() == peer.body.trim())
770 });
771 if peer.body.trim().is_empty() || body_already_in_blocks {
772 blocks
773 } else {
774 let mut content = vec![meerkat_core::types::ContentBlock::Text {
775 text: peer.body.clone(),
776 }];
777 content.extend(blocks);
778 content
779 }
780 } else if peer.body.is_empty() {
781 Vec::new()
782 } else {
783 vec![meerkat_core::types::ContentBlock::Text {
784 text: peer.body.clone(),
785 }]
786 };
787 Some(CoreRenderable::SystemNotice {
788 kind: SystemNoticeKind::Comms,
789 body: Some(summary.clone()),
790 blocks: vec![SystemNoticeBlock::Comms {
791 kind: kind.to_string(),
792 direction: SystemNoticeDirection::Incoming,
793 peer: Some(SystemNoticePeer {
794 id: peer_id,
795 display_name,
796 }),
797 request_id,
798 intent,
799 status,
800 summary: Some(summary),
801 payload: peer.payload.clone(),
802 content,
803 }],
804 })
805}
806
807fn external_event_notice_renderable(event: &ExternalEventInput) -> CoreRenderable {
808 let source = match &event.header.source {
809 InputOrigin::External { source_name } if !source_name.trim().is_empty() => {
810 source_name.clone()
811 }
812 _ => event.event_type.clone(),
813 };
814 let body = event
815 .payload
816 .get("body")
817 .and_then(serde_json::Value::as_str)
818 .map(str::trim)
819 .filter(|body| !body.is_empty())
820 .map(ToOwned::to_owned);
821 let summary = body.as_ref().map_or_else(
822 || format!("External event via {source}"),
823 std::clone::Clone::clone,
824 );
825 CoreRenderable::SystemNotice {
826 kind: SystemNoticeKind::ExternalEvent,
827 body: Some(summary.clone()),
828 blocks: vec![SystemNoticeBlock::ExternalEvent {
829 source,
830 event_type: event.event_type.clone(),
831 summary: Some(summary),
832 body,
833 payload: Some(event.payload.clone()),
834 content: event.blocks.clone().unwrap_or_default(),
835 }],
836 }
837}
838
839fn input_to_append(input: &Input) -> Option<ConversationAppend> {
840 if matches!(
841 input,
842 Input::Peer(PeerInput {
843 convention: Some(PeerConvention::ResponseTerminal { .. }),
844 blocks: None,
845 ..
846 })
847 ) {
848 return None;
849 }
850
851 let (role, content) = match input {
852 Input::Prompt(p)
853 if !p.typed_turn_appends.is_empty()
854 && p.text.trim().is_empty()
855 && p.blocks.as_ref().is_none_or(Vec::is_empty) =>
856 {
857 return None;
858 }
859 Input::Prompt(p) if p.blocks.is_some() => (
860 ConversationAppendRole::User,
861 CoreRenderable::Blocks {
862 blocks: p.blocks.clone().unwrap_or_default(),
863 },
864 ),
865 Input::Prompt(_) => (
866 ConversationAppendRole::User,
867 CoreRenderable::Text {
868 text: input_prompt_text(input),
869 },
870 ),
871 Input::Peer(p) => peer_notice_renderable(p)
872 .map(|content| (ConversationAppendRole::SystemNotice, content))?,
873 Input::FlowStep(f) => (
874 ConversationAppendRole::SystemNotice,
875 CoreRenderable::SystemNotice {
876 kind: SystemNoticeKind::Generic,
877 body: Some(format!("Flow step {}", f.step_id)),
878 blocks: vec![SystemNoticeBlock::RuntimeNotice {
879 category: "flow_step".to_string(),
880 detail: Some(f.instructions.clone()),
881 payload: None,
882 }],
883 },
884 ),
885 Input::ExternalEvent(e) => (
886 ConversationAppendRole::SystemNotice,
887 external_event_notice_renderable(e),
888 ),
889 Input::Continuation(_) | Input::Operation(_) => return None,
890 };
891
892 Some(ConversationAppend { role, content })
893}
894
895fn input_to_context_append(input: &Input) -> Option<ConversationContextAppend> {
896 let (projection, content) = match input {
897 Input::Peer(peer) => {
898 let projection = peer_projection_from_peer_input(peer)?;
899 let content = peer_notice_renderable(peer)?;
900 (projection, content)
901 }
902 _ => return None,
903 };
904
905 Some(ConversationContextAppend {
906 key: projection.context_key()?,
907 content,
908 })
909}
910
911fn peer_response_terminal_context_append(
912 peer: &PeerInput,
913) -> Result<Option<ConversationContextAppend>, PeerResponseTerminalFactError> {
914 let Some(fact) = peer_response_terminal_fact(peer)? else {
915 return Ok(None);
916 };
917
918 Ok(Some(ConversationContextAppend {
919 key: fact.context_key(),
920 content: CoreRenderable::SystemNotice {
921 kind: SystemNoticeKind::Comms,
922 body: Some("Peer terminal response context".to_string()),
923 blocks: vec![SystemNoticeBlock::Comms {
924 kind: "response_terminal".to_string(),
925 direction: SystemNoticeDirection::Incoming,
926 peer: Some(SystemNoticePeer {
927 id: fact.source.route_identity.to_string(),
928 display_name: Some(fact.source.display_identity.to_string()),
929 }),
930 request_id: Some(fact.correlation_id.to_string()),
931 intent: None,
932 status: Some(
933 match fact.status {
934 PeerResponseTerminalProjectionStatus::Completed => "completed",
935 PeerResponseTerminalProjectionStatus::Failed => "failed",
936 PeerResponseTerminalProjectionStatus::Cancelled => "cancelled",
937 }
938 .to_string(),
939 ),
940 summary: Some("Peer terminal response".to_string()),
941 payload: fact.render_payload.as_ref().cloned(),
942 content: Vec::new(),
943 }],
944 },
945 }))
946}
947
948pub(crate) fn runtime_input_projection(
949 input: &Input,
950) -> crate::ingress_types::RuntimeInputProjection {
951 crate::ingress_types::RuntimeInputProjection {
952 append: input_to_append(input),
953 additional_appends: match input {
954 Input::Prompt(prompt) => prompt.typed_turn_appends.clone(),
955 _ => Vec::new(),
956 },
957 context_append: input_to_context_append(input),
958 }
959}
960
961pub(crate) fn runtime_input_projection_for_machine_batch(
962 input: &Input,
963) -> crate::ingress_types::RuntimeInputProjection {
964 let mut projection = runtime_input_projection(input);
965 if let Input::Peer(peer) = input
966 && let Ok(Some(context_append)) = peer_response_terminal_context_append(peer)
967 {
968 projection.context_append = Some(context_append);
969 }
970 projection
971}
972
973#[cfg(test)]
974#[allow(clippy::unwrap_used, clippy::panic)]
975mod tests {
976 use super::*;
977 use chrono::Utc;
978
979 fn make_header() -> InputHeader {
980 InputHeader {
981 id: InputId::new(),
982 timestamp: Utc::now(),
983 source: InputOrigin::Operator,
984 durability: InputDurability::Durable,
985 visibility: InputVisibility::default(),
986 idempotency_key: None,
987 supersession_key: None,
988 correlation_id: None,
989 }
990 }
991
992 fn typed_runtime_notice_append(detail: &str) -> ConversationAppend {
993 ConversationAppend {
994 role: ConversationAppendRole::SystemNotice,
995 content: CoreRenderable::SystemNotice {
996 kind: meerkat_core::types::SystemNoticeKind::Generic,
997 body: Some(detail.to_string()),
998 blocks: vec![meerkat_core::types::SystemNoticeBlock::RuntimeNotice {
999 category: "test".to_string(),
1000 detail: Some(detail.to_string()),
1001 payload: None,
1002 }],
1003 },
1004 }
1005 }
1006
1007 #[test]
1008 fn prompt_input_serde() {
1009 let input = Input::Prompt(PromptInput {
1010 header: make_header(),
1011 text: "hello".into(),
1012 blocks: None,
1013 typed_turn_appends: Vec::new(),
1014 turn_metadata: None,
1015 });
1016 let json = serde_json::to_value(&input).unwrap();
1017 assert_eq!(json["input_type"], "prompt");
1018 let parsed: Input = serde_json::from_value(json).unwrap();
1019 assert!(matches!(parsed, Input::Prompt(_)));
1020 }
1021
1022 #[test]
1023 fn prompt_input_typed_turn_appends_project_without_user_text() {
1024 let append = typed_runtime_notice_append("peer delivery");
1025 let input = Input::Prompt(PromptInput {
1026 header: make_header(),
1027 text: String::new(),
1028 blocks: None,
1029 typed_turn_appends: vec![append.clone()],
1030 turn_metadata: None,
1031 });
1032
1033 let projection = runtime_input_projection(&input);
1034 assert!(
1035 projection.append.is_none(),
1036 "empty runtime-authored prompt carrier must not synthesize a user append"
1037 );
1038 assert_eq!(projection.additional_appends, vec![append]);
1039 }
1040
1041 #[test]
1042 fn prompt_input_typed_turn_appends_serde_roundtrip() {
1043 let append = typed_runtime_notice_append("typed appends persist");
1044 let input = Input::Prompt(PromptInput {
1045 header: make_header(),
1046 text: String::new(),
1047 blocks: None,
1048 typed_turn_appends: vec![append.clone()],
1049 turn_metadata: None,
1050 });
1051
1052 let json = serde_json::to_value(&input).unwrap();
1053 let parsed: Input = serde_json::from_value(json).unwrap();
1054 let Input::Prompt(prompt) = parsed else {
1055 panic!("expected prompt input");
1056 };
1057 assert_eq!(prompt.text, "");
1058 assert_eq!(prompt.typed_turn_appends, vec![append]);
1059 }
1060
1061 #[test]
1062 fn peer_input_message_serde() {
1063 let input = Input::Peer(PeerInput {
1064 header: make_header(),
1065 convention: Some(PeerConvention::Message),
1066 body: "hi there".into(),
1067 payload: None,
1068 blocks: None,
1069 handling_mode: None,
1070 });
1071 let json = serde_json::to_value(&input).unwrap();
1072 assert_eq!(json["input_type"], "peer");
1073 let parsed: Input = serde_json::from_value(json).unwrap();
1074 assert!(matches!(parsed, Input::Peer(_)));
1075 }
1076
1077 #[test]
1078 fn peer_message_blocks_preserve_typed_comms_content_without_prefix_injection() {
1079 let mut header = make_header();
1080 header.source = InputOrigin::Peer {
1081 peer_id: "canonical-peer-id".into(),
1082 display_identity: Some("display-agent".into()),
1083 runtime_id: None,
1084 };
1085 let input = Input::Peer(PeerInput {
1086 header,
1087 convention: Some(PeerConvention::Message),
1088 body: "caption".into(),
1089 payload: None,
1090 blocks: Some(vec![
1091 meerkat_core::types::ContentBlock::Text {
1092 text: "caption".into(),
1093 },
1094 meerkat_core::types::ContentBlock::Image {
1095 media_type: "image/png".into(),
1096 data: "abc".into(),
1097 },
1098 ]),
1099 handling_mode: None,
1100 });
1101
1102 let Input::Peer(peer) = &input else {
1103 panic!("expected peer input");
1104 };
1105 assert_eq!(
1106 peer_projection_from_peer_input(peer)
1107 .and_then(|projection| projection.block_prefix_text())
1108 .as_deref(),
1109 Some("Peer message from canonical-peer-id")
1110 );
1111
1112 let projection = runtime_input_projection(&input);
1113 let append = projection.append.expect("conversation append");
1114 let CoreRenderable::SystemNotice { blocks, .. } = append.content else {
1115 panic!("expected typed system notice");
1116 };
1117 let Some(meerkat_core::types::SystemNoticeBlock::Comms { content, peer, .. }) =
1118 blocks.first()
1119 else {
1120 panic!("expected comms block");
1121 };
1122 assert_eq!(
1123 peer.as_ref().and_then(|peer| peer.display_name.as_deref()),
1124 Some("display-agent")
1125 );
1126 assert_eq!(
1127 content.first(),
1128 Some(&meerkat_core::types::ContentBlock::Text {
1129 text: "caption".into()
1130 })
1131 );
1132 }
1133
1134 #[test]
1135 fn peer_response_terminal_context_is_deferred_to_machine_batch_projection() {
1136 let route_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f2";
1137 let request_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f1";
1138 let mut header = make_header();
1139 header.source = InputOrigin::Peer {
1140 peer_id: route_id.into(),
1141 display_identity: Some("display-agent".into()),
1142 runtime_id: None,
1143 };
1144 let input = Input::Peer(PeerInput {
1145 header,
1146 convention: Some(PeerConvention::ResponseTerminal {
1147 request_id: request_id.into(),
1148 status: ResponseTerminalStatus::Completed,
1149 }),
1150 body: "legacy response body".into(),
1151 payload: Some(serde_json::json!({"answer":"ok"})),
1152 blocks: None,
1153 handling_mode: None,
1154 });
1155
1156 let Input::Peer(peer) = &input else {
1157 panic!("expected peer input");
1158 };
1159 let expected_canonical_key = format!("peer_response_terminal:{route_id}:{request_id}");
1160 assert!(
1161 peer_projection_from_peer_input(peer).is_none(),
1162 "terminal peer response projection must not be built before machine batch selection"
1163 );
1164
1165 let projection = runtime_input_projection(&input);
1166 assert!(
1167 projection.context_append.is_none(),
1168 "admission projection must not store terminal peer response context"
1169 );
1170 let projection = runtime_input_projection_for_machine_batch(&input);
1171 let context = projection.context_append.expect("context append");
1172 assert_eq!(context.key, expected_canonical_key);
1173 let CoreRenderable::SystemNotice { blocks, .. } = context.content else {
1174 panic!("expected typed context");
1175 };
1176 let Some(meerkat_core::types::SystemNoticeBlock::Comms { peer, .. }) = blocks.first()
1177 else {
1178 panic!("expected comms block");
1179 };
1180 assert_eq!(
1181 peer.as_ref().and_then(|peer| peer.display_name.as_deref()),
1182 Some("display-agent")
1183 );
1184 assert_eq!(peer.as_ref().map(|peer| peer.id.as_str()), Some(route_id));
1185 }
1186
1187 #[test]
1188 fn peer_response_terminal_with_blocks_projects_append_and_context() {
1189 let route_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f2";
1190 let request_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f1";
1191 let mut header = make_header();
1192 header.source = InputOrigin::Peer {
1193 peer_id: route_id.into(),
1194 display_identity: Some("display-agent".into()),
1195 runtime_id: None,
1196 };
1197 let input = Input::Peer(PeerInput {
1198 header,
1199 convention: Some(PeerConvention::ResponseTerminal {
1200 request_id: request_id.into(),
1201 status: ResponseTerminalStatus::Completed,
1202 }),
1203 body: String::new(),
1204 payload: Some(serde_json::json!({"answer":"ok"})),
1205 blocks: Some(vec![meerkat_core::types::ContentBlock::Image {
1206 media_type: "image/jpeg".into(),
1207 data: "abc".into(),
1208 }]),
1209 handling_mode: None,
1210 });
1211
1212 let projection = runtime_input_projection_for_machine_batch(&input);
1213 let append = projection.append.expect("conversation append");
1214 let CoreRenderable::SystemNotice { blocks, .. } = append.content else {
1215 panic!("expected typed append");
1216 };
1217 let Some(meerkat_core::types::SystemNoticeBlock::Comms { content, peer, .. }) =
1218 blocks.first()
1219 else {
1220 panic!("expected comms block");
1221 };
1222 assert_eq!(
1223 peer.as_ref().and_then(|peer| peer.display_name.as_deref()),
1224 Some("display-agent")
1225 );
1226 assert!(matches!(
1227 content.first(),
1228 Some(meerkat_core::types::ContentBlock::Image { media_type, .. })
1229 if media_type == "image/jpeg"
1230 ));
1231 assert!(
1232 projection.context_append.is_some(),
1233 "terminal response must still apply runtime-owned context"
1234 );
1235 }
1236
1237 #[test]
1238 fn peer_input_request_serde() {
1239 let input = Input::Peer(PeerInput {
1240 header: make_header(),
1241 convention: Some(PeerConvention::Request {
1242 request_id: "req-1".into(),
1243 intent: "mob.peer_added".into(),
1244 }),
1245 body: "Agent joined".into(),
1246 payload: Some(serde_json::json!({"name": "agent-1"})),
1247 blocks: None,
1248 handling_mode: None,
1249 });
1250 let json = serde_json::to_value(&input).unwrap();
1251 let parsed: Input = serde_json::from_value(json).unwrap();
1252 if let Input::Peer(p) = parsed {
1253 assert!(matches!(p.convention, Some(PeerConvention::Request { .. })));
1254 } else {
1255 panic!("Expected PeerInput");
1256 }
1257 }
1258
1259 #[test]
1260 fn peer_input_response_terminal_serde() {
1261 let input = Input::Peer(PeerInput {
1262 header: make_header(),
1263 convention: Some(PeerConvention::ResponseTerminal {
1264 request_id: "req-1".into(),
1265 status: ResponseTerminalStatus::Completed,
1266 }),
1267 body: "Done".into(),
1268 payload: Some(serde_json::json!({"ok": true})),
1269 blocks: None,
1270 handling_mode: None,
1271 });
1272 let json = serde_json::to_value(&input).unwrap();
1273 let parsed: Input = serde_json::from_value(json).unwrap();
1274 assert!(matches!(parsed, Input::Peer(_)));
1275 }
1276
1277 #[test]
1278 fn peer_input_response_progress_serde() {
1279 let input = Input::Peer(PeerInput {
1280 header: make_header(),
1281 convention: Some(PeerConvention::ResponseProgress {
1282 request_id: "req-1".into(),
1283 phase: ResponseProgressPhase::InProgress,
1284 }),
1285 body: "Working...".into(),
1286 payload: Some(serde_json::json!({"progress": "working"})),
1287 blocks: None,
1288 handling_mode: None,
1289 });
1290 let json = serde_json::to_value(&input).unwrap();
1291 let parsed: Input = serde_json::from_value(json).unwrap();
1292 assert!(matches!(parsed, Input::Peer(_)));
1293 }
1294
1295 #[test]
1296 fn flow_step_input_serde() {
1297 let input = Input::FlowStep(FlowStepInput {
1298 header: make_header(),
1299 step_id: "step-1".into(),
1300 instructions: "analyze the data".into(),
1301 blocks: Some(vec![
1302 meerkat_core::types::ContentBlock::Text {
1303 text: "analyze the data".into(),
1304 },
1305 meerkat_core::types::ContentBlock::Image {
1306 media_type: "image/png".into(),
1307 data: meerkat_core::types::ImageData::Inline {
1308 data: "abc123".into(),
1309 },
1310 },
1311 ]),
1312 turn_metadata: None,
1313 });
1314 let json = serde_json::to_value(&input).unwrap();
1315 assert_eq!(json["input_type"], "flow_step");
1316 let parsed: Input = serde_json::from_value(json).unwrap();
1317 assert!(matches!(parsed, Input::FlowStep(_)));
1318 }
1319
1320 #[test]
1321 fn external_event_input_serde() {
1322 let input = Input::ExternalEvent(ExternalEventInput {
1323 header: make_header(),
1324 event_type: "webhook.received".into(),
1325 payload: serde_json::json!({"url": "https://example.com"}),
1326 blocks: Some(vec![
1327 meerkat_core::types::ContentBlock::Text {
1328 text: "look".into(),
1329 },
1330 meerkat_core::types::ContentBlock::Image {
1331 media_type: "image/png".into(),
1332 data: meerkat_core::types::ImageData::Inline {
1333 data: "abc123".into(),
1334 },
1335 },
1336 ]),
1337 handling_mode: HandlingMode::Queue,
1338 render_metadata: None,
1339 });
1340 let json = serde_json::to_value(&input).unwrap();
1341 assert_eq!(json["input_type"], "external_event");
1342 let parsed: Input = serde_json::from_value(json).unwrap();
1343 assert!(matches!(parsed, Input::ExternalEvent(_)));
1344 }
1345
1346 #[test]
1347 fn legacy_external_event_payload_blocks_migrate_to_canonical_blocks_owner() {
1348 let mut input = Input::ExternalEvent(ExternalEventInput {
1349 header: make_header(),
1350 event_type: "webhook.received".into(),
1351 payload: serde_json::json!({
1352 "body": "see image",
1353 "blocks": [
1354 { "type": "text", "text": "caption text" },
1355 { "type": "image", "media_type": "image/png", "source": "inline", "data": "abc123" }
1356 ]
1357 }),
1358 blocks: None,
1359 handling_mode: HandlingMode::Queue,
1360 render_metadata: None,
1361 });
1362
1363 match &mut input {
1364 Input::ExternalEvent(event) => {
1365 migrate_legacy_payload_blocks(event).unwrap();
1366 assert!(event.payload.get("blocks").is_none());
1367 assert_eq!(event.payload["body"], "see image");
1368 assert_eq!(event.blocks.as_ref().map(Vec::len), Some(2));
1369 }
1370 other => panic!("Expected ExternalEvent, got {other:?}"),
1371 }
1372 }
1373
1374 #[test]
1375 fn continuation_input_serde() {
1376 let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
1377 let json = serde_json::to_value(&input).unwrap();
1378 assert_eq!(json["input_type"], "continuation");
1379 let parsed: Input = serde_json::from_value(json).unwrap();
1380 match parsed {
1381 Input::Continuation(continuation) => {
1382 assert_eq!(continuation.handling_mode, HandlingMode::Steer);
1383 assert_eq!(continuation.reason, "detached_background_op_completed");
1384 }
1385 other => panic!("Expected Continuation, got {other:?}"),
1386 }
1387 }
1388
1389 #[test]
1390 fn continuation_input_accepts_legacy_system_generated_tag() {
1391 let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
1392 let mut json = serde_json::to_value(&input).unwrap();
1393 json["input_type"] = serde_json::Value::String("system_generated".into());
1394 let parsed: Input = serde_json::from_value(json).unwrap();
1395 match parsed {
1396 Input::Continuation(continuation) => {
1397 assert_eq!(continuation.reason, "detached_background_op_completed");
1398 }
1399 other => panic!("Expected Continuation, got {other:?}"),
1400 }
1401 }
1402
1403 #[test]
1404 fn operation_input_serde() {
1405 let input = Input::Operation(OperationInput {
1406 header: InputHeader {
1407 durability: InputDurability::Derived,
1408 ..make_header()
1409 },
1410 operation_id: OperationId::new(),
1411 event: OpEvent::Cancelled {
1412 id: OperationId::new(),
1413 },
1414 });
1415 let json = serde_json::to_value(&input).unwrap();
1416 assert_eq!(json["input_type"], "operation");
1417 let parsed: Input = serde_json::from_value(json).unwrap();
1418 assert!(matches!(parsed, Input::Operation(_)));
1419 }
1420
1421 #[test]
1422 fn operation_input_accepts_legacy_projected_tag() {
1423 let input = Input::Operation(OperationInput {
1424 header: InputHeader {
1425 durability: InputDurability::Derived,
1426 ..make_header()
1427 },
1428 operation_id: OperationId::new(),
1429 event: OpEvent::Cancelled {
1430 id: OperationId::new(),
1431 },
1432 });
1433 let mut json = serde_json::to_value(&input).unwrap();
1434 json["input_type"] = serde_json::Value::String("projected".into());
1435 let parsed: Input = serde_json::from_value(json).unwrap();
1436 assert!(matches!(parsed, Input::Operation(_)));
1437 }
1438
1439 #[test]
1440 fn input_kind_id() {
1441 let prompt = Input::Prompt(PromptInput {
1442 header: make_header(),
1443 text: "hi".into(),
1444 blocks: None,
1445 typed_turn_appends: Vec::new(),
1446 turn_metadata: None,
1447 });
1448 assert_eq!(prompt.kind(), InputKind::Prompt);
1449
1450 let peer_msg = Input::Peer(PeerInput {
1451 header: make_header(),
1452 convention: Some(PeerConvention::Message),
1453 body: "hi".into(),
1454 payload: None,
1455 blocks: None,
1456 handling_mode: None,
1457 });
1458 assert_eq!(peer_msg.kind(), InputKind::PeerMessage);
1459
1460 let peer_req = Input::Peer(PeerInput {
1461 header: make_header(),
1462 convention: Some(PeerConvention::Request {
1463 request_id: "r".into(),
1464 intent: "i".into(),
1465 }),
1466 body: "hi".into(),
1467 payload: Some(serde_json::json!({"subject": "x"})),
1468 blocks: None,
1469 handling_mode: None,
1470 });
1471 assert_eq!(peer_req.kind(), InputKind::PeerRequest);
1472
1473 let continuation = Input::Continuation(ContinuationInput {
1474 header: make_header(),
1475 reason: "continue".into(),
1476 handling_mode: HandlingMode::Steer,
1477 request_id: None,
1478 });
1479 assert_eq!(continuation.kind(), InputKind::Continuation);
1480
1481 let operation = Input::Operation(OperationInput {
1482 header: make_header(),
1483 operation_id: OperationId::new(),
1484 event: OpEvent::Cancelled {
1485 id: OperationId::new(),
1486 },
1487 });
1488 assert_eq!(operation.kind(), InputKind::Operation);
1489 }
1490
1491 #[test]
1492 fn input_source_variants() {
1493 let sources = vec![
1494 InputOrigin::Operator,
1495 InputOrigin::Peer {
1496 peer_id: "p1".into(),
1497 display_identity: None,
1498 runtime_id: None,
1499 },
1500 InputOrigin::Flow {
1501 flow_id: "f1".into(),
1502 step_index: 0,
1503 },
1504 InputOrigin::System,
1505 InputOrigin::External {
1506 source_name: "webhook".into(),
1507 },
1508 ];
1509 for source in sources {
1510 let json = serde_json::to_value(&source).unwrap();
1511 let parsed: InputOrigin = serde_json::from_value(json).unwrap();
1512 assert_eq!(source, parsed);
1513 }
1514 }
1515
1516 #[test]
1517 fn input_durability_serde() {
1518 for d in [
1519 InputDurability::Durable,
1520 InputDurability::Ephemeral,
1521 InputDurability::Derived,
1522 ] {
1523 let json = serde_json::to_value(d).unwrap();
1524 let parsed: InputDurability = serde_json::from_value(json).unwrap();
1525 assert_eq!(d, parsed);
1526 }
1527 }
1528
1529 #[test]
1530 fn peer_input_without_handling_mode_deserializes_as_none() {
1531 let json = serde_json::json!({
1533 "input_type": "peer",
1534 "header": serde_json::to_value(make_header()).unwrap(),
1535 "convention": { "convention_type": "message" },
1536 "body": "hello"
1537 });
1538 let parsed: Input = serde_json::from_value(json).unwrap();
1539 match parsed {
1540 Input::Peer(p) => assert!(p.handling_mode.is_none()),
1541 other => panic!("Expected Peer, got {other:?}"),
1542 }
1543 }
1544
1545 #[test]
1546 fn peer_input_with_queue_handling_mode_roundtrips() {
1547 let input = Input::Peer(PeerInput {
1548 header: make_header(),
1549 convention: Some(PeerConvention::Message),
1550 body: "hi".into(),
1551 payload: None,
1552 blocks: None,
1553 handling_mode: Some(HandlingMode::Queue),
1554 });
1555 let json = serde_json::to_value(&input).unwrap();
1556 assert_eq!(json["handling_mode"], "queue");
1557 let parsed: Input = serde_json::from_value(json).unwrap();
1558 match parsed {
1559 Input::Peer(p) => assert_eq!(p.handling_mode, Some(HandlingMode::Queue)),
1560 other => panic!("Expected Peer, got {other:?}"),
1561 }
1562 }
1563
1564 #[test]
1565 fn peer_response_terminal_input_owns_wire_status_mapping() {
1566 let peer_id = meerkat_core::comms::PeerId::from_uuid(
1567 uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000161").unwrap(),
1568 );
1569 let display_name = meerkat_core::comms::PeerName::new("analyst").unwrap();
1570 let request_id = meerkat_core::PeerCorrelationId::from_uuid(
1571 uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000162").unwrap(),
1572 );
1573 let input = peer_response_terminal_input(
1574 peer_id,
1575 Some(display_name),
1576 request_id,
1577 meerkat_contracts::PeerResponseTerminalStatusWire::Cancelled,
1578 serde_json::json!({"ok": false}),
1579 );
1580
1581 match input {
1582 Input::Peer(PeerInput {
1583 header:
1584 InputHeader {
1585 source:
1586 InputOrigin::Peer {
1587 peer_id,
1588 display_identity,
1589 runtime_id,
1590 },
1591 durability: InputDurability::Durable,
1592 correlation_id,
1593 ..
1594 },
1595 convention: Some(PeerConvention::ResponseTerminal { request_id, status }),
1596 payload: Some(payload),
1597 handling_mode: None,
1598 ..
1599 }) => {
1600 assert_eq!(peer_id, "00000000-0000-4000-8000-000000000161");
1601 assert_eq!(display_identity.as_deref(), Some("analyst"));
1602 assert_eq!(runtime_id, None);
1603 assert_eq!(request_id, "00000000-0000-4000-8000-000000000162");
1604 assert_eq!(
1605 correlation_id,
1606 Some(CorrelationId::from_uuid(
1607 uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000162").unwrap()
1608 ))
1609 );
1610 assert_eq!(status, ResponseTerminalStatus::Cancelled);
1611 assert_eq!(payload["ok"], false);
1612 }
1613 other => panic!("expected terminal peer input, got {other:?}"),
1614 }
1615 }
1616
1617 #[test]
1618 fn peer_input_with_steer_handling_mode_roundtrips() {
1619 let input = Input::Peer(PeerInput {
1620 header: make_header(),
1621 convention: Some(PeerConvention::Message),
1622 body: "hi".into(),
1623 payload: None,
1624 blocks: None,
1625 handling_mode: Some(HandlingMode::Steer),
1626 });
1627 let json = serde_json::to_value(&input).unwrap();
1628 assert_eq!(json["handling_mode"], "steer");
1629 let parsed: Input = serde_json::from_value(json).unwrap();
1630 match parsed {
1631 Input::Peer(p) => assert_eq!(p.handling_mode, Some(HandlingMode::Steer)),
1632 other => panic!("Expected Peer, got {other:?}"),
1633 }
1634 }
1635
1636 #[test]
1637 fn peer_input_handling_mode_not_serialized_when_none() {
1638 let input = Input::Peer(PeerInput {
1639 header: make_header(),
1640 convention: Some(PeerConvention::Message),
1641 body: "hi".into(),
1642 payload: None,
1643 blocks: None,
1644 handling_mode: None,
1645 });
1646 let json = serde_json::to_value(&input).unwrap();
1647 assert!(json.get("handling_mode").is_none());
1648 }
1649}