1use chrono::{DateTime, Utc};
8use meerkat_core::lifecycle::InputId;
9use meerkat_core::lifecycle::run_primitive::{
10 ConversationAppend, ConversationAppendRole, ConversationContextAppend, CoreRenderable,
11 RuntimeTurnMetadata,
12};
13use meerkat_core::ops::{OpEvent, OperationId};
14use meerkat_core::types::{
15 HandlingMode, SystemNoticeBlock, SystemNoticeDirection, SystemNoticeKind, SystemNoticePeer,
16};
17use meerkat_core::{
18 BlobStore, BlobStoreError, MissingBlobBehavior, PeerConversationProjection,
19 PeerResponseProgressProjectionPhase, PeerResponseTerminalCorrelationId,
20 PeerResponseTerminalDisplayIdentity, PeerResponseTerminalFact, PeerResponseTerminalFactError,
21 PeerResponseTerminalProjectionStatus, PeerResponseTerminalRenderPayload,
22 PeerResponseTerminalRouteIdentity, PeerResponseTerminalSource,
23 PeerResponseTerminalTransportIdentity, externalize_content_blocks, hydrate_content_blocks,
24};
25use serde::{Deserialize, Serialize};
26
27use crate::identifiers::{
28 CorrelationId, IdempotencyKey, InputKind, KindId, LogicalRuntimeId, SupersessionKey,
29};
30use meerkat_core::types::RenderMetadata;
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct InputHeader {
35 pub id: InputId,
37 pub timestamp: DateTime<Utc>,
39 pub source: InputOrigin,
41 pub durability: InputDurability,
43 pub visibility: InputVisibility,
45 #[serde(skip_serializing_if = "Option::is_none")]
47 pub idempotency_key: Option<IdempotencyKey>,
48 #[serde(skip_serializing_if = "Option::is_none")]
50 pub supersession_key: Option<SupersessionKey>,
51 #[serde(skip_serializing_if = "Option::is_none")]
53 pub correlation_id: Option<CorrelationId>,
54}
55
56#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
58#[serde(tag = "type", rename_all = "snake_case")]
59#[non_exhaustive]
60pub enum InputOrigin {
61 Operator,
63 Peer {
65 peer_id: String,
68 #[serde(default, skip_serializing_if = "Option::is_none")]
72 display_identity: Option<String>,
73 #[serde(skip_serializing_if = "Option::is_none")]
74 runtime_id: Option<LogicalRuntimeId>,
75 },
76 Flow { flow_id: String, step_index: usize },
78 System,
80 External { source_name: String },
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
86#[serde(rename_all = "snake_case")]
87#[non_exhaustive]
88pub enum InputDurability {
89 Durable,
91 Ephemeral,
93 Derived,
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
99pub struct InputVisibility {
100 pub transcript_eligible: bool,
102 pub operator_eligible: bool,
104}
105
106impl Default for InputVisibility {
107 fn default() -> Self {
108 Self {
109 transcript_eligible: true,
110 operator_eligible: true,
111 }
112 }
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117#[serde(tag = "input_type", rename_all = "snake_case")]
118#[non_exhaustive]
119pub enum Input {
120 Prompt(PromptInput),
122 Peer(PeerInput),
124 FlowStep(FlowStepInput),
126 ExternalEvent(ExternalEventInput),
128 #[serde(alias = "system_generated")]
130 Continuation(ContinuationInput),
131 #[serde(alias = "projected")]
133 Operation(OperationInput),
134}
135
136impl Input {
137 pub fn header(&self) -> &InputHeader {
139 match self {
140 Input::Prompt(i) => &i.header,
141 Input::Peer(i) => &i.header,
142 Input::FlowStep(i) => &i.header,
143 Input::ExternalEvent(i) => &i.header,
144 Input::Continuation(i) => &i.header,
145 Input::Operation(i) => &i.header,
146 }
147 }
148
149 pub fn id(&self) -> &InputId {
151 &self.header().id
152 }
153
154 pub fn kind(&self) -> InputKind {
156 match self {
157 Input::Prompt(_) => InputKind::Prompt,
158 Input::Peer(p) => match &p.convention {
159 Some(PeerConvention::Message) | None => InputKind::PeerMessage,
160 Some(PeerConvention::Request { .. }) => InputKind::PeerRequest,
161 Some(PeerConvention::ResponseProgress { .. }) => InputKind::PeerResponseProgress,
162 Some(PeerConvention::ResponseTerminal { .. }) => InputKind::PeerResponseTerminal,
163 },
164 Input::FlowStep(_) => InputKind::FlowStep,
165 Input::ExternalEvent(_) => InputKind::ExternalEvent,
166 Input::Continuation(_) => InputKind::Continuation,
167 Input::Operation(_) => InputKind::Operation,
168 }
169 }
170
171 pub fn kind_id(&self) -> KindId {
173 KindId::new(self.kind())
174 }
175
176 pub fn handling_mode(&self) -> Option<HandlingMode> {
178 match self {
179 Input::Prompt(prompt) => prompt.turn_metadata.as_ref()?.handling_mode,
180 Input::FlowStep(flow_step) => flow_step.turn_metadata.as_ref()?.handling_mode,
181 Input::ExternalEvent(event) => Some(event.handling_mode),
182 Input::Continuation(continuation) => Some(continuation.handling_mode),
183 Input::Peer(peer) => peer.handling_mode,
184 Input::Operation(_) => None,
185 }
186 }
187}
188
189fn migrate_legacy_payload_blocks(event: &mut ExternalEventInput) -> Result<(), BlobStoreError> {
190 let Some(obj) = event.payload.as_object_mut() else {
191 return Ok(());
192 };
193 let Some(blocks_value) = obj.remove("blocks") else {
194 return Ok(());
195 };
196 if event.blocks.is_some() {
197 return Ok(());
198 }
199 let blocks = serde_json::from_value::<Vec<meerkat_core::types::ContentBlock>>(blocks_value)
200 .map_err(|err| {
201 BlobStoreError::Internal(format!("failed to decode payload blocks: {err}"))
202 })?;
203 event.blocks = Some(blocks);
204 Ok(())
205}
206
207pub async fn externalize_input_images(
208 blob_store: &dyn BlobStore,
209 input: &mut Input,
210) -> Result<(), BlobStoreError> {
211 match input {
212 Input::Prompt(prompt) => {
213 if let Some(blocks) = prompt.blocks.as_mut() {
214 externalize_content_blocks(blob_store, blocks).await?;
215 }
216 }
217 Input::Peer(peer) => {
218 if let Some(blocks) = peer.blocks.as_mut() {
219 externalize_content_blocks(blob_store, blocks).await?;
220 }
221 }
222 Input::FlowStep(flow_step) => {
223 if let Some(blocks) = flow_step.blocks.as_mut() {
224 externalize_content_blocks(blob_store, blocks).await?;
225 }
226 }
227 Input::ExternalEvent(event) => {
228 migrate_legacy_payload_blocks(event)?;
229 if let Some(blocks) = event.blocks.as_mut() {
230 externalize_content_blocks(blob_store, blocks).await?;
231 }
232 }
233 Input::Continuation(_) | Input::Operation(_) => {}
234 }
235 Ok(())
236}
237
238pub async fn hydrate_input_images(
239 blob_store: &dyn BlobStore,
240 input: &mut Input,
241 missing_behavior: MissingBlobBehavior,
242) -> Result<(), BlobStoreError> {
243 match input {
244 Input::Prompt(prompt) => {
245 if let Some(blocks) = prompt.blocks.as_mut() {
246 hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
247 }
248 }
249 Input::Peer(peer) => {
250 if let Some(blocks) = peer.blocks.as_mut() {
251 hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
252 }
253 }
254 Input::FlowStep(flow_step) => {
255 if let Some(blocks) = flow_step.blocks.as_mut() {
256 hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
257 }
258 }
259 Input::ExternalEvent(event) => {
260 migrate_legacy_payload_blocks(event)?;
261 if let Some(blocks) = event.blocks.as_mut() {
262 hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
263 }
264 }
265 Input::Continuation(_) | Input::Operation(_) => {}
266 }
267 Ok(())
268}
269
270#[derive(Debug, Clone, Serialize, Deserialize)]
272pub struct PromptInput {
273 pub header: InputHeader,
274 pub text: String,
276 #[serde(default, skip_serializing_if = "Option::is_none")]
279 pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
280 #[serde(default, skip_serializing_if = "Vec::is_empty")]
285 pub typed_turn_appends: Vec<ConversationAppend>,
286 #[serde(default, skip_serializing_if = "Option::is_none")]
287 pub turn_metadata: Option<RuntimeTurnMetadata>,
288}
289
290impl PromptInput {
291 pub fn new(text: impl Into<String>, turn_metadata: Option<RuntimeTurnMetadata>) -> Self {
293 Self {
294 header: InputHeader {
295 id: meerkat_core::lifecycle::InputId::new(),
296 timestamp: chrono::Utc::now(),
297 source: InputOrigin::Operator,
298 durability: InputDurability::Durable,
299 visibility: InputVisibility::default(),
300 idempotency_key: None,
301 supersession_key: None,
302 correlation_id: None,
303 },
304 text: text.into(),
305 blocks: None,
306 typed_turn_appends: Vec::new(),
307 turn_metadata,
308 }
309 }
310
311 pub fn from_content_input(
313 input: meerkat_core::types::ContentInput,
314 turn_metadata: Option<RuntimeTurnMetadata>,
315 ) -> Self {
316 let text = input.text_content();
317 let blocks = if input.has_images() {
318 Some(input.into_blocks())
319 } else {
320 None
321 };
322 Self {
323 header: InputHeader {
324 id: meerkat_core::lifecycle::InputId::new(),
325 timestamp: chrono::Utc::now(),
326 source: InputOrigin::Operator,
327 durability: InputDurability::Durable,
328 visibility: InputVisibility::default(),
329 idempotency_key: None,
330 supersession_key: None,
331 correlation_id: None,
332 },
333 text,
334 blocks,
335 typed_turn_appends: Vec::new(),
336 turn_metadata,
337 }
338 }
339}
340
341#[derive(Debug, Clone, Serialize, Deserialize)]
343pub struct PeerInput {
344 pub header: InputHeader,
345 #[serde(skip_serializing_if = "Option::is_none")]
347 pub convention: Option<PeerConvention>,
348 pub body: String,
354 #[serde(default, skip_serializing_if = "Option::is_none")]
359 pub payload: Option<serde_json::Value>,
360 #[serde(default, skip_serializing_if = "Option::is_none")]
363 pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
364 #[serde(default, skip_serializing_if = "Option::is_none")]
370 pub handling_mode: Option<HandlingMode>,
371}
372
373#[derive(Debug, Clone, Serialize, Deserialize)]
375#[serde(tag = "convention_type", rename_all = "snake_case")]
376#[non_exhaustive]
377pub enum PeerConvention {
378 Message,
380 Request { request_id: String, intent: String },
382 ResponseProgress {
384 request_id: String,
385 phase: ResponseProgressPhase,
386 },
387 ResponseTerminal {
389 request_id: String,
390 status: ResponseTerminalStatus,
391 },
392}
393
394pub type ResponseProgressPhase = PeerResponseProgressProjectionPhase;
397
398pub type ResponseTerminalStatus = PeerResponseTerminalProjectionStatus;
401
402pub fn response_terminal_status_from_wire(
403 status: meerkat_contracts::PeerResponseTerminalStatusWire,
404) -> ResponseTerminalStatus {
405 match status {
406 meerkat_contracts::PeerResponseTerminalStatusWire::Completed => {
407 PeerResponseTerminalProjectionStatus::Completed
408 }
409 meerkat_contracts::PeerResponseTerminalStatusWire::Failed => {
410 PeerResponseTerminalProjectionStatus::Failed
411 }
412 meerkat_contracts::PeerResponseTerminalStatusWire::Cancelled => {
413 PeerResponseTerminalProjectionStatus::Cancelled
414 }
415 }
416}
417
418pub fn peer_response_terminal_input(
419 peer_id: meerkat_core::comms::PeerId,
420 display_name: Option<meerkat_core::comms::PeerName>,
421 request_id: meerkat_core::PeerCorrelationId,
422 status: meerkat_contracts::PeerResponseTerminalStatusWire,
423 result: serde_json::Value,
424) -> Input {
425 let correlation_id = CorrelationId::from_uuid(request_id.as_uuid());
426 let request_id = request_id.to_string();
427 let peer_id = peer_id.to_string();
428 let display_identity = display_name.map_or_else(|| peer_id.clone(), |name| name.as_string());
429
430 Input::Peer(PeerInput {
431 header: InputHeader {
432 id: InputId::new(),
433 timestamp: Utc::now(),
434 source: InputOrigin::Peer {
435 peer_id,
436 display_identity: Some(display_identity),
437 runtime_id: None,
438 },
439 durability: InputDurability::Durable,
440 visibility: InputVisibility::default(),
441 idempotency_key: None,
442 supersession_key: None,
443 correlation_id: Some(correlation_id),
444 },
445 convention: Some(PeerConvention::ResponseTerminal {
446 request_id,
447 status: response_terminal_status_from_wire(status),
448 }),
449 body: String::new(),
450 payload: Some(result),
451 blocks: None,
452 handling_mode: None,
453 })
454}
455
456#[derive(Debug, Clone, Serialize, Deserialize)]
458pub struct FlowStepInput {
459 pub header: InputHeader,
460 pub step_id: String,
462 pub instructions: String,
464 #[serde(default, skip_serializing_if = "Option::is_none")]
468 pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
469 #[serde(default, skip_serializing_if = "Option::is_none")]
470 pub turn_metadata: Option<RuntimeTurnMetadata>,
471}
472
473#[derive(Debug, Clone, Serialize, Deserialize)]
475pub struct ExternalEventInput {
476 pub header: InputHeader,
477 pub event_type: String,
479 pub payload: serde_json::Value,
483 #[serde(default, skip_serializing_if = "Option::is_none")]
486 pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
487 #[serde(default)]
489 pub handling_mode: HandlingMode,
490 #[serde(default, skip_serializing_if = "Option::is_none")]
492 pub render_metadata: Option<RenderMetadata>,
493}
494
495#[derive(Debug, Clone, Serialize, Deserialize)]
499pub struct ContinuationInput {
500 pub header: InputHeader,
501 pub reason: String,
503 #[serde(default)]
505 pub handling_mode: HandlingMode,
506 #[serde(default, skip_serializing_if = "Option::is_none")]
508 pub request_id: Option<String>,
509}
510
511impl ContinuationInput {
512 pub fn detached_background_op_completed() -> Self {
518 Self {
519 header: InputHeader {
520 id: meerkat_core::lifecycle::InputId::new(),
521 timestamp: chrono::Utc::now(),
522 source: InputOrigin::System,
523 durability: InputDurability::Derived,
524 visibility: InputVisibility {
525 transcript_eligible: false,
526 operator_eligible: false,
527 },
528 idempotency_key: None,
529 supersession_key: None,
530 correlation_id: None,
531 },
532 reason: "detached_background_op_completed".to_string(),
533 handling_mode: HandlingMode::Steer,
534 request_id: None,
535 }
536 }
537}
538
539#[derive(Debug, Clone, Serialize, Deserialize)]
542pub struct OperationInput {
543 pub header: InputHeader,
544 pub operation_id: OperationId,
546 pub event: OpEvent,
548}
549
550pub(crate) fn peer_projection_from_peer_input(
557 peer: &PeerInput,
558) -> Option<PeerConversationProjection> {
559 peer_projection_from_peer_input_with_id(peer, peer_canonical_id(peer)?.as_str())
560}
561
562fn peer_projection_from_peer_input_with_id(
563 peer: &PeerInput,
564 peer_id: &str,
565) -> Option<PeerConversationProjection> {
566 let peer_id = peer_id.to_string();
567
568 match &peer.convention {
569 Some(PeerConvention::Message) => Some(PeerConversationProjection::Message { peer_id }),
570 Some(PeerConvention::Request { request_id, intent }) => {
571 let peer_id = match meerkat_core::comms::PeerId::parse(peer_id.as_str()) {
572 Ok(peer_id) => peer_id,
573 Err(error) => {
574 tracing::warn!(
575 peer_id,
576 error = %error,
577 "dropping peer request projection with non-canonical peer_id"
578 );
579 return None;
580 }
581 };
582 Some(PeerConversationProjection::Request {
583 peer_id,
584 display_name: peer_display_label(peer),
585 request_id: request_id.clone(),
586 intent: intent.clone(),
587 payload: peer.payload.clone(),
588 })
589 }
590 Some(PeerConvention::ResponseProgress { request_id, phase }) => {
591 Some(PeerConversationProjection::ResponseProgress {
592 peer_id,
593 request_id: request_id.clone(),
594 phase: *phase,
595 payload: peer.payload.clone(),
596 })
597 }
598 Some(PeerConvention::ResponseTerminal { .. }) => None,
599 None => None,
600 }
601}
602
603pub(crate) fn peer_response_terminal_fact(
604 peer: &PeerInput,
605) -> Result<Option<PeerResponseTerminalFact>, PeerResponseTerminalFactError> {
606 let InputOrigin::Peer {
607 peer_id,
608 display_identity,
609 runtime_id,
610 } = &peer.header.source
611 else {
612 return Ok(None);
613 };
614 let Some(PeerConvention::ResponseTerminal { request_id, status }) = &peer.convention else {
615 return Ok(None);
616 };
617
618 let transport_identity = runtime_id
619 .as_ref()
620 .map(ToString::to_string)
621 .map(PeerResponseTerminalTransportIdentity::parse)
622 .transpose()?;
623 let source = PeerResponseTerminalSource::new(
624 transport_identity,
625 PeerResponseTerminalRouteIdentity::parse(peer_id.clone())?,
626 PeerResponseTerminalDisplayIdentity::parse(
627 display_identity
628 .as_ref()
629 .ok_or(PeerResponseTerminalFactError::MissingDisplayIdentity)?
630 .clone(),
631 )?,
632 );
633 Ok(Some(PeerResponseTerminalFact::new(
634 source,
635 PeerResponseTerminalCorrelationId::parse(request_id)?,
636 *status,
637 PeerResponseTerminalRenderPayload::new(peer.payload.clone()),
638 )))
639}
640
641pub(crate) fn validate_peer_response_terminal_fact(
642 input: &Input,
643) -> Result<(), PeerResponseTerminalFactError> {
644 let Input::Peer(peer) = input else {
645 return Ok(());
646 };
647 peer_response_terminal_fact(peer).map(|_| ())
648}
649
650#[cfg(test)]
653pub(crate) fn peer_projection(input: &Input) -> Option<PeerConversationProjection> {
654 let Input::Peer(peer) = input else {
655 return None;
656 };
657 peer_projection_from_peer_input(peer)
658}
659
660fn peer_canonical_id(peer: &PeerInput) -> Option<String> {
661 let InputOrigin::Peer { peer_id, .. } = &peer.header.source else {
662 return None;
663 };
664 Some(peer_id.clone())
665}
666
667fn peer_display_label(peer: &PeerInput) -> Option<String> {
668 let InputOrigin::Peer {
669 display_identity, ..
670 } = &peer.header.source
671 else {
672 return None;
673 };
674
675 display_identity
676 .as_ref()
677 .map(|label| label.trim())
678 .filter(|label| !label.is_empty())
679 .map(ToOwned::to_owned)
680}
681
682pub(crate) fn peer_prompt_text(peer: &PeerInput) -> String {
684 peer_projection_from_peer_input(peer)
685 .map(|projection| {
686 let prompt = projection.prompt_text();
687 if prompt.is_empty() {
688 peer.body.clone()
689 } else {
690 prompt
691 }
692 })
693 .unwrap_or_else(|| peer.body.clone())
694}
695
696pub(crate) fn input_prompt_text(input: &Input) -> String {
697 match input {
698 Input::Prompt(p) => p.text.clone(),
699 Input::Peer(p) => peer_prompt_text(p),
700 Input::FlowStep(f) => f.instructions.clone(),
701 Input::ExternalEvent(e) => external_event_projection_text(e),
702 Input::Continuation(continuation) => format!("[Continuation] {}", continuation.reason),
703 Input::Operation(operation) => {
704 format!(
705 "[Operation {}] {:?}",
706 operation.operation_id, operation.event
707 )
708 }
709 }
710}
711
712fn external_event_projection_text(event: &ExternalEventInput) -> String {
713 let source_name = match &event.header.source {
714 InputOrigin::External { source_name } if !source_name.trim().is_empty() => {
715 source_name.as_str()
716 }
717 _ => event.event_type.as_str(),
718 };
719 let body = event
720 .payload
721 .get("body")
722 .and_then(serde_json::Value::as_str)
723 .map(str::trim);
724
725 meerkat_core::interaction::format_external_event_projection(source_name, body)
726}
727
728fn peer_notice_renderable(peer: &PeerInput) -> Option<CoreRenderable> {
729 let (peer_id, display_name) = match &peer.header.source {
730 InputOrigin::Peer {
731 peer_id,
732 display_identity,
733 ..
734 } => (peer_id.clone(), display_identity.clone()),
735 _ => return None,
736 };
737 let (kind, request_id, intent, status) = match &peer.convention {
738 Some(PeerConvention::Message) | None => ("message", None, None, None),
739 Some(PeerConvention::Request { request_id, intent }) => (
740 "request",
741 Some(request_id.clone()),
742 Some(intent.clone()),
743 None,
744 ),
745 Some(PeerConvention::ResponseProgress { request_id, phase }) => (
746 "response_progress",
747 Some(request_id.clone()),
748 None,
749 Some(format!("{phase:?}")),
750 ),
751 Some(PeerConvention::ResponseTerminal { request_id, status }) => (
752 "response_terminal",
753 Some(request_id.clone()),
754 None,
755 Some(format!("{status:?}")),
756 ),
757 };
758 let summary = match kind {
759 "request" => intent.as_ref().map_or_else(
760 || "Peer request".to_string(),
761 |intent| format!("Peer request: {intent}"),
762 ),
763 "response_progress" => "Peer response progress".to_string(),
764 "response_terminal" => "Peer response terminal".to_string(),
765 _ => "Peer message".to_string(),
766 };
767 let content = if let Some(blocks) = peer.blocks.clone() {
768 let body_already_in_blocks = blocks.iter().any(|block| {
769 matches!(block, meerkat_core::types::ContentBlock::Text { text } if text.trim() == peer.body.trim())
770 });
771 if peer.body.trim().is_empty() || body_already_in_blocks {
772 blocks
773 } else {
774 let mut content = vec![meerkat_core::types::ContentBlock::Text {
775 text: peer.body.clone(),
776 }];
777 content.extend(blocks);
778 content
779 }
780 } else if peer.body.is_empty() {
781 Vec::new()
782 } else {
783 vec![meerkat_core::types::ContentBlock::Text {
784 text: peer.body.clone(),
785 }]
786 };
787 Some(CoreRenderable::SystemNotice {
788 kind: SystemNoticeKind::Comms,
789 body: Some(summary.clone()),
790 blocks: vec![SystemNoticeBlock::Comms {
791 kind: kind.to_string(),
792 direction: SystemNoticeDirection::Incoming,
793 peer: Some(SystemNoticePeer {
794 id: peer_id,
795 display_name,
796 }),
797 request_id,
798 intent,
799 status,
800 summary: Some(summary),
801 payload: peer.payload.clone(),
802 content,
803 }],
804 })
805}
806
807fn external_event_notice_renderable(event: &ExternalEventInput) -> CoreRenderable {
808 let source = match &event.header.source {
809 InputOrigin::External { source_name } if !source_name.trim().is_empty() => {
810 source_name.clone()
811 }
812 _ => event.event_type.clone(),
813 };
814 let body = event
815 .payload
816 .get("body")
817 .and_then(serde_json::Value::as_str)
818 .map(str::trim)
819 .filter(|body| !body.is_empty())
820 .map(ToOwned::to_owned);
821 let summary = body.as_ref().map_or_else(
822 || format!("External event via {source}"),
823 std::clone::Clone::clone,
824 );
825 CoreRenderable::SystemNotice {
826 kind: SystemNoticeKind::ExternalEvent,
827 body: Some(summary.clone()),
828 blocks: vec![SystemNoticeBlock::ExternalEvent {
829 source,
830 event_type: event.event_type.clone(),
831 summary: Some(summary),
832 body,
833 payload: Some(event.payload.clone()),
834 content: event.blocks.clone().unwrap_or_default(),
835 }],
836 }
837}
838
839fn input_to_append(input: &Input) -> Option<ConversationAppend> {
840 if matches!(
841 input,
842 Input::Peer(PeerInput {
843 convention: Some(PeerConvention::ResponseTerminal { .. }),
844 blocks: None,
845 ..
846 })
847 ) {
848 return None;
849 }
850
851 let (role, content) = match input {
852 Input::Prompt(p)
853 if !p.typed_turn_appends.is_empty()
854 && p.text.trim().is_empty()
855 && p.blocks.as_ref().is_none_or(Vec::is_empty) =>
856 {
857 return None;
858 }
859 Input::Prompt(p) if p.blocks.is_some() => (
860 ConversationAppendRole::User,
861 CoreRenderable::Blocks {
862 blocks: p.blocks.clone().unwrap_or_default(),
863 },
864 ),
865 Input::Prompt(_) => (
866 ConversationAppendRole::User,
867 CoreRenderable::Text {
868 text: input_prompt_text(input),
869 },
870 ),
871 Input::Peer(p) => peer_notice_renderable(p)
872 .map(|content| (ConversationAppendRole::SystemNotice, content))?,
873 Input::FlowStep(f) => (
874 ConversationAppendRole::SystemNotice,
875 CoreRenderable::SystemNotice {
876 kind: SystemNoticeKind::Generic,
877 body: Some(format!("Flow step {}", f.step_id)),
878 blocks: vec![SystemNoticeBlock::RuntimeNotice {
879 category: "flow_step".to_string(),
880 detail: Some(f.instructions.clone()),
881 payload: None,
882 }],
883 },
884 ),
885 Input::ExternalEvent(e) => (
886 ConversationAppendRole::SystemNotice,
887 external_event_notice_renderable(e),
888 ),
889 Input::Continuation(_) | Input::Operation(_) => return None,
890 };
891
892 Some(ConversationAppend { role, content })
893}
894
895fn input_to_context_append(input: &Input) -> Option<ConversationContextAppend> {
896 let (projection, content) = match input {
897 Input::Peer(peer) => {
898 let projection = peer_projection_from_peer_input(peer)?;
899 let content = peer_notice_renderable(peer)?;
900 (projection, content)
901 }
902 _ => return None,
903 };
904
905 Some(ConversationContextAppend {
906 key: projection.context_key()?,
907 content,
908 })
909}
910
911fn peer_response_terminal_context_append(
912 peer: &PeerInput,
913) -> Result<Option<ConversationContextAppend>, PeerResponseTerminalFactError> {
914 let Some(fact) = peer_response_terminal_fact(peer)? else {
915 return Ok(None);
916 };
917
918 Ok(Some(ConversationContextAppend {
919 key: fact.context_key(),
920 content: CoreRenderable::SystemNotice {
921 kind: SystemNoticeKind::Comms,
922 body: Some("Peer terminal response context".to_string()),
923 blocks: vec![SystemNoticeBlock::Comms {
924 kind: "response_terminal".to_string(),
925 direction: SystemNoticeDirection::Incoming,
926 peer: Some(SystemNoticePeer {
927 id: fact.source.route_identity.to_string(),
928 display_name: Some(fact.source.display_identity.to_string()),
929 }),
930 request_id: Some(fact.correlation_id.to_string()),
931 intent: None,
932 status: Some(
933 match fact.status {
934 PeerResponseTerminalProjectionStatus::Completed => "completed",
935 PeerResponseTerminalProjectionStatus::Failed => "failed",
936 PeerResponseTerminalProjectionStatus::Cancelled => "cancelled",
937 }
938 .to_string(),
939 ),
940 summary: Some("Peer terminal response".to_string()),
941 payload: fact.render_payload.as_ref().cloned(),
942 content: Vec::new(),
943 }],
944 },
945 }))
946}
947
948pub(crate) fn runtime_input_projection(
949 input: &Input,
950) -> crate::ingress_types::RuntimeInputProjection {
951 crate::ingress_types::RuntimeInputProjection {
952 append: input_to_append(input),
953 additional_appends: match input {
954 Input::Prompt(prompt) => prompt.typed_turn_appends.clone(),
955 _ => Vec::new(),
956 },
957 context_append: input_to_context_append(input),
958 }
959}
960
961pub(crate) fn runtime_input_projection_for_machine_batch(
962 input: &Input,
963) -> crate::ingress_types::RuntimeInputProjection {
964 let mut projection = runtime_input_projection(input);
965 if let Input::Peer(peer) = input
966 && let Ok(Some(context_append)) = peer_response_terminal_context_append(peer)
967 {
968 projection.context_append = Some(context_append);
969 }
970 projection
971}
972
973pub(crate) fn context_append_to_pending_system_context_append(
974 append: &ConversationContextAppend,
975) -> meerkat_core::PendingSystemContextAppend {
976 let text = render_core_context_for_pending_system_context(&append.content);
977 meerkat_core::PendingSystemContextAppend {
978 text,
979 source: Some(append.key.clone()),
980 idempotency_key: Some(append.key.clone()),
981 accepted_at: meerkat_core::time_compat::SystemTime::now(),
982 }
983}
984
985pub(crate) fn projection_to_pending_system_context_appends(
986 input_id: &InputId,
987 projection: &crate::ingress_types::RuntimeInputProjection,
988) -> Vec<meerkat_core::PendingSystemContextAppend> {
989 if let Some(append) = projection.context_append.as_ref() {
990 return std::iter::once(context_append_to_pending_system_context_append(append))
991 .filter(|append| !append.text.trim().is_empty())
992 .collect();
993 }
994
995 projection
996 .append
997 .as_ref()
998 .map(|append| {
999 let key = format!("runtime:steer:{input_id}");
1000 meerkat_core::PendingSystemContextAppend {
1001 text: render_core_context_for_pending_system_context(&append.content),
1002 source: Some(key.clone()),
1003 idempotency_key: Some(key),
1004 accepted_at: meerkat_core::time_compat::SystemTime::now(),
1005 }
1006 })
1007 .into_iter()
1008 .filter(|append| !append.text.trim().is_empty())
1009 .collect()
1010}
1011
1012fn render_core_context_for_pending_system_context(content: &CoreRenderable) -> String {
1013 match content {
1014 CoreRenderable::Text { text } => text.clone(),
1015 CoreRenderable::Blocks { blocks } => meerkat_core::types::text_content(blocks),
1016 CoreRenderable::Json { value } => {
1017 serde_json::to_string_pretty(value).unwrap_or_else(|_| value.to_string())
1018 }
1019 CoreRenderable::Reference { uri, label } => match label {
1020 Some(label) => format!("{label}: {uri}"),
1021 None => uri.clone(),
1022 },
1023 CoreRenderable::SystemNotice { kind, body, blocks } => {
1024 meerkat_core::types::SystemNoticeMessage::with_blocks(
1025 *kind,
1026 body.clone(),
1027 blocks.clone(),
1028 )
1029 .model_projection_text()
1030 }
1031 _ => String::new(),
1032 }
1033}
1034
1035#[cfg(test)]
1036#[allow(clippy::unwrap_used, clippy::panic)]
1037mod tests {
1038 use super::*;
1039 use chrono::Utc;
1040
1041 fn make_header() -> InputHeader {
1042 InputHeader {
1043 id: InputId::new(),
1044 timestamp: Utc::now(),
1045 source: InputOrigin::Operator,
1046 durability: InputDurability::Durable,
1047 visibility: InputVisibility::default(),
1048 idempotency_key: None,
1049 supersession_key: None,
1050 correlation_id: None,
1051 }
1052 }
1053
1054 fn typed_runtime_notice_append(detail: &str) -> ConversationAppend {
1055 ConversationAppend {
1056 role: ConversationAppendRole::SystemNotice,
1057 content: CoreRenderable::SystemNotice {
1058 kind: meerkat_core::types::SystemNoticeKind::Generic,
1059 body: Some(detail.to_string()),
1060 blocks: vec![meerkat_core::types::SystemNoticeBlock::RuntimeNotice {
1061 category: "test".to_string(),
1062 detail: Some(detail.to_string()),
1063 payload: None,
1064 }],
1065 },
1066 }
1067 }
1068
1069 #[test]
1070 fn prompt_input_serde() {
1071 let input = Input::Prompt(PromptInput {
1072 header: make_header(),
1073 text: "hello".into(),
1074 blocks: None,
1075 typed_turn_appends: Vec::new(),
1076 turn_metadata: None,
1077 });
1078 let json = serde_json::to_value(&input).unwrap();
1079 assert_eq!(json["input_type"], "prompt");
1080 let parsed: Input = serde_json::from_value(json).unwrap();
1081 assert!(matches!(parsed, Input::Prompt(_)));
1082 }
1083
1084 #[test]
1085 fn prompt_input_typed_turn_appends_project_without_user_text() {
1086 let append = typed_runtime_notice_append("peer delivery");
1087 let input = Input::Prompt(PromptInput {
1088 header: make_header(),
1089 text: String::new(),
1090 blocks: None,
1091 typed_turn_appends: vec![append.clone()],
1092 turn_metadata: None,
1093 });
1094
1095 let projection = runtime_input_projection(&input);
1096 assert!(
1097 projection.append.is_none(),
1098 "empty runtime-authored prompt carrier must not synthesize a user append"
1099 );
1100 assert_eq!(projection.additional_appends, vec![append]);
1101 }
1102
1103 #[test]
1104 fn prompt_input_typed_turn_appends_serde_roundtrip() {
1105 let append = typed_runtime_notice_append("typed appends persist");
1106 let input = Input::Prompt(PromptInput {
1107 header: make_header(),
1108 text: String::new(),
1109 blocks: None,
1110 typed_turn_appends: vec![append.clone()],
1111 turn_metadata: None,
1112 });
1113
1114 let json = serde_json::to_value(&input).unwrap();
1115 let parsed: Input = serde_json::from_value(json).unwrap();
1116 let Input::Prompt(prompt) = parsed else {
1117 panic!("expected prompt input");
1118 };
1119 assert_eq!(prompt.text, "");
1120 assert_eq!(prompt.typed_turn_appends, vec![append]);
1121 }
1122
1123 #[test]
1124 fn peer_input_message_serde() {
1125 let input = Input::Peer(PeerInput {
1126 header: make_header(),
1127 convention: Some(PeerConvention::Message),
1128 body: "hi there".into(),
1129 payload: None,
1130 blocks: None,
1131 handling_mode: None,
1132 });
1133 let json = serde_json::to_value(&input).unwrap();
1134 assert_eq!(json["input_type"], "peer");
1135 let parsed: Input = serde_json::from_value(json).unwrap();
1136 assert!(matches!(parsed, Input::Peer(_)));
1137 }
1138
1139 #[test]
1140 fn peer_message_blocks_preserve_typed_comms_content_without_prefix_injection() {
1141 let mut header = make_header();
1142 header.source = InputOrigin::Peer {
1143 peer_id: "canonical-peer-id".into(),
1144 display_identity: Some("display-agent".into()),
1145 runtime_id: None,
1146 };
1147 let input = Input::Peer(PeerInput {
1148 header,
1149 convention: Some(PeerConvention::Message),
1150 body: "caption".into(),
1151 payload: None,
1152 blocks: Some(vec![
1153 meerkat_core::types::ContentBlock::Text {
1154 text: "caption".into(),
1155 },
1156 meerkat_core::types::ContentBlock::Image {
1157 media_type: "image/png".into(),
1158 data: "abc".into(),
1159 },
1160 ]),
1161 handling_mode: None,
1162 });
1163
1164 let Input::Peer(peer) = &input else {
1165 panic!("expected peer input");
1166 };
1167 assert_eq!(
1168 peer_projection_from_peer_input(peer)
1169 .and_then(|projection| projection.block_prefix_text())
1170 .as_deref(),
1171 Some("Peer message from canonical-peer-id")
1172 );
1173
1174 let projection = runtime_input_projection(&input);
1175 let append = projection.append.expect("conversation append");
1176 let CoreRenderable::SystemNotice { blocks, .. } = append.content else {
1177 panic!("expected typed system notice");
1178 };
1179 let Some(meerkat_core::types::SystemNoticeBlock::Comms { content, peer, .. }) =
1180 blocks.first()
1181 else {
1182 panic!("expected comms block");
1183 };
1184 assert_eq!(
1185 peer.as_ref().and_then(|peer| peer.display_name.as_deref()),
1186 Some("display-agent")
1187 );
1188 assert_eq!(
1189 content.first(),
1190 Some(&meerkat_core::types::ContentBlock::Text {
1191 text: "caption".into()
1192 })
1193 );
1194 }
1195
1196 #[test]
1197 fn peer_response_terminal_context_is_deferred_to_machine_batch_projection() {
1198 let route_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f2";
1199 let request_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f1";
1200 let mut header = make_header();
1201 header.source = InputOrigin::Peer {
1202 peer_id: route_id.into(),
1203 display_identity: Some("display-agent".into()),
1204 runtime_id: None,
1205 };
1206 let input = Input::Peer(PeerInput {
1207 header,
1208 convention: Some(PeerConvention::ResponseTerminal {
1209 request_id: request_id.into(),
1210 status: ResponseTerminalStatus::Completed,
1211 }),
1212 body: "legacy response body".into(),
1213 payload: Some(serde_json::json!({"answer":"ok"})),
1214 blocks: None,
1215 handling_mode: None,
1216 });
1217
1218 let Input::Peer(peer) = &input else {
1219 panic!("expected peer input");
1220 };
1221 let expected_canonical_key = format!("peer_response_terminal:{route_id}:{request_id}");
1222 assert!(
1223 peer_projection_from_peer_input(peer).is_none(),
1224 "terminal peer response projection must not be built before machine batch selection"
1225 );
1226
1227 let projection = runtime_input_projection(&input);
1228 assert!(
1229 projection.context_append.is_none(),
1230 "admission projection must not store terminal peer response context"
1231 );
1232 let projection = runtime_input_projection_for_machine_batch(&input);
1233 let context = projection.context_append.expect("context append");
1234 assert_eq!(context.key, expected_canonical_key);
1235 let CoreRenderable::SystemNotice { blocks, .. } = context.content else {
1236 panic!("expected typed context");
1237 };
1238 let Some(meerkat_core::types::SystemNoticeBlock::Comms { peer, .. }) = blocks.first()
1239 else {
1240 panic!("expected comms block");
1241 };
1242 assert_eq!(
1243 peer.as_ref().and_then(|peer| peer.display_name.as_deref()),
1244 Some("display-agent")
1245 );
1246 assert_eq!(peer.as_ref().map(|peer| peer.id.as_str()), Some(route_id));
1247 }
1248
1249 #[test]
1250 fn steer_projection_uses_context_append_as_pending_system_context() {
1251 let input_id = InputId::new();
1252 let projection = crate::ingress_types::RuntimeInputProjection {
1253 append: Some(ConversationAppend {
1254 role: ConversationAppendRole::SystemNotice,
1255 content: CoreRenderable::Text {
1256 text: "ordinary append must lose to context append".into(),
1257 },
1258 }),
1259 additional_appends: Vec::new(),
1260 context_append: Some(ConversationContextAppend {
1261 key: "peer_response_terminal:peer:req".into(),
1262 content: CoreRenderable::Text {
1263 text: "terminal response is ready".into(),
1264 },
1265 }),
1266 };
1267
1268 let appends = projection_to_pending_system_context_appends(&input_id, &projection);
1269
1270 assert_eq!(appends.len(), 1);
1271 assert_eq!(appends[0].text, "terminal response is ready");
1272 assert_eq!(
1273 appends[0].source.as_deref(),
1274 Some("peer_response_terminal:peer:req")
1275 );
1276 assert_eq!(
1277 appends[0].idempotency_key.as_deref(),
1278 Some("peer_response_terminal:peer:req")
1279 );
1280 }
1281
1282 #[test]
1283 fn steer_projection_falls_back_to_ordinary_peer_append() {
1284 let mut header = make_header();
1285 header.source = InputOrigin::Peer {
1286 peer_id: "peer-a".into(),
1287 display_identity: Some("Peer A".into()),
1288 runtime_id: None,
1289 };
1290 let input = Input::Peer(PeerInput {
1291 header,
1292 convention: Some(PeerConvention::Message),
1293 body: "please look at this while you work".into(),
1294 payload: None,
1295 blocks: None,
1296 handling_mode: Some(HandlingMode::Steer),
1297 });
1298 let input_id = input.id().clone();
1299 let projection = runtime_input_projection(&input);
1300
1301 let appends = projection_to_pending_system_context_appends(&input_id, &projection);
1302
1303 assert_eq!(appends.len(), 1);
1304 assert!(
1305 appends[0]
1306 .text
1307 .contains("please look at this while you work"),
1308 "peer message append should be renderable as live system context: {:?}",
1309 appends[0].text
1310 );
1311 assert_eq!(
1312 appends[0].source.as_deref(),
1313 Some(format!("runtime:steer:{input_id}").as_str())
1314 );
1315 assert_eq!(
1316 appends[0].idempotency_key.as_deref(),
1317 Some(format!("runtime:steer:{input_id}").as_str())
1318 );
1319 }
1320
1321 #[test]
1322 fn steer_projection_filters_empty_context_and_empty_append() {
1323 let input_id = InputId::new();
1324 let context_projection = crate::ingress_types::RuntimeInputProjection {
1325 append: None,
1326 additional_appends: Vec::new(),
1327 context_append: Some(ConversationContextAppend {
1328 key: "empty-context".into(),
1329 content: CoreRenderable::Text { text: " ".into() },
1330 }),
1331 };
1332 assert!(
1333 projection_to_pending_system_context_appends(&input_id, &context_projection).is_empty()
1334 );
1335
1336 let append_projection = crate::ingress_types::RuntimeInputProjection {
1337 append: Some(ConversationAppend {
1338 role: ConversationAppendRole::SystemNotice,
1339 content: CoreRenderable::Text { text: "\n".into() },
1340 }),
1341 additional_appends: Vec::new(),
1342 context_append: None,
1343 };
1344 assert!(
1345 projection_to_pending_system_context_appends(&input_id, &append_projection).is_empty()
1346 );
1347 }
1348
1349 #[test]
1350 fn peer_response_terminal_with_blocks_projects_append_and_context() {
1351 let route_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f2";
1352 let request_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f1";
1353 let mut header = make_header();
1354 header.source = InputOrigin::Peer {
1355 peer_id: route_id.into(),
1356 display_identity: Some("display-agent".into()),
1357 runtime_id: None,
1358 };
1359 let input = Input::Peer(PeerInput {
1360 header,
1361 convention: Some(PeerConvention::ResponseTerminal {
1362 request_id: request_id.into(),
1363 status: ResponseTerminalStatus::Completed,
1364 }),
1365 body: String::new(),
1366 payload: Some(serde_json::json!({"answer":"ok"})),
1367 blocks: Some(vec![meerkat_core::types::ContentBlock::Image {
1368 media_type: "image/jpeg".into(),
1369 data: "abc".into(),
1370 }]),
1371 handling_mode: None,
1372 });
1373
1374 let projection = runtime_input_projection_for_machine_batch(&input);
1375 let append = projection.append.expect("conversation append");
1376 let CoreRenderable::SystemNotice { blocks, .. } = append.content else {
1377 panic!("expected typed append");
1378 };
1379 let Some(meerkat_core::types::SystemNoticeBlock::Comms { content, peer, .. }) =
1380 blocks.first()
1381 else {
1382 panic!("expected comms block");
1383 };
1384 assert_eq!(
1385 peer.as_ref().and_then(|peer| peer.display_name.as_deref()),
1386 Some("display-agent")
1387 );
1388 assert!(matches!(
1389 content.first(),
1390 Some(meerkat_core::types::ContentBlock::Image { media_type, .. })
1391 if media_type == "image/jpeg"
1392 ));
1393 assert!(
1394 projection.context_append.is_some(),
1395 "terminal response must still apply runtime-owned context"
1396 );
1397 }
1398
1399 #[test]
1400 fn peer_input_request_serde() {
1401 let input = Input::Peer(PeerInput {
1402 header: make_header(),
1403 convention: Some(PeerConvention::Request {
1404 request_id: "req-1".into(),
1405 intent: "mob.peer_added".into(),
1406 }),
1407 body: "Agent joined".into(),
1408 payload: Some(serde_json::json!({"name": "agent-1"})),
1409 blocks: None,
1410 handling_mode: None,
1411 });
1412 let json = serde_json::to_value(&input).unwrap();
1413 let parsed: Input = serde_json::from_value(json).unwrap();
1414 if let Input::Peer(p) = parsed {
1415 assert!(matches!(p.convention, Some(PeerConvention::Request { .. })));
1416 } else {
1417 panic!("Expected PeerInput");
1418 }
1419 }
1420
1421 #[test]
1422 fn peer_input_response_terminal_serde() {
1423 let input = Input::Peer(PeerInput {
1424 header: make_header(),
1425 convention: Some(PeerConvention::ResponseTerminal {
1426 request_id: "req-1".into(),
1427 status: ResponseTerminalStatus::Completed,
1428 }),
1429 body: "Done".into(),
1430 payload: Some(serde_json::json!({"ok": true})),
1431 blocks: None,
1432 handling_mode: None,
1433 });
1434 let json = serde_json::to_value(&input).unwrap();
1435 let parsed: Input = serde_json::from_value(json).unwrap();
1436 assert!(matches!(parsed, Input::Peer(_)));
1437 }
1438
1439 #[test]
1440 fn peer_input_response_progress_serde() {
1441 let input = Input::Peer(PeerInput {
1442 header: make_header(),
1443 convention: Some(PeerConvention::ResponseProgress {
1444 request_id: "req-1".into(),
1445 phase: ResponseProgressPhase::InProgress,
1446 }),
1447 body: "Working...".into(),
1448 payload: Some(serde_json::json!({"progress": "working"})),
1449 blocks: None,
1450 handling_mode: None,
1451 });
1452 let json = serde_json::to_value(&input).unwrap();
1453 let parsed: Input = serde_json::from_value(json).unwrap();
1454 assert!(matches!(parsed, Input::Peer(_)));
1455 }
1456
1457 #[test]
1458 fn flow_step_input_serde() {
1459 let input = Input::FlowStep(FlowStepInput {
1460 header: make_header(),
1461 step_id: "step-1".into(),
1462 instructions: "analyze the data".into(),
1463 blocks: Some(vec![
1464 meerkat_core::types::ContentBlock::Text {
1465 text: "analyze the data".into(),
1466 },
1467 meerkat_core::types::ContentBlock::Image {
1468 media_type: "image/png".into(),
1469 data: meerkat_core::types::ImageData::Inline {
1470 data: "abc123".into(),
1471 },
1472 },
1473 ]),
1474 turn_metadata: None,
1475 });
1476 let json = serde_json::to_value(&input).unwrap();
1477 assert_eq!(json["input_type"], "flow_step");
1478 let parsed: Input = serde_json::from_value(json).unwrap();
1479 assert!(matches!(parsed, Input::FlowStep(_)));
1480 }
1481
1482 #[test]
1483 fn external_event_input_serde() {
1484 let input = Input::ExternalEvent(ExternalEventInput {
1485 header: make_header(),
1486 event_type: "webhook.received".into(),
1487 payload: serde_json::json!({"url": "https://example.com"}),
1488 blocks: Some(vec![
1489 meerkat_core::types::ContentBlock::Text {
1490 text: "look".into(),
1491 },
1492 meerkat_core::types::ContentBlock::Image {
1493 media_type: "image/png".into(),
1494 data: meerkat_core::types::ImageData::Inline {
1495 data: "abc123".into(),
1496 },
1497 },
1498 ]),
1499 handling_mode: HandlingMode::Queue,
1500 render_metadata: None,
1501 });
1502 let json = serde_json::to_value(&input).unwrap();
1503 assert_eq!(json["input_type"], "external_event");
1504 let parsed: Input = serde_json::from_value(json).unwrap();
1505 assert!(matches!(parsed, Input::ExternalEvent(_)));
1506 }
1507
1508 #[test]
1509 fn legacy_external_event_payload_blocks_migrate_to_canonical_blocks_owner() {
1510 let mut input = Input::ExternalEvent(ExternalEventInput {
1511 header: make_header(),
1512 event_type: "webhook.received".into(),
1513 payload: serde_json::json!({
1514 "body": "see image",
1515 "blocks": [
1516 { "type": "text", "text": "caption text" },
1517 { "type": "image", "media_type": "image/png", "source": "inline", "data": "abc123" }
1518 ]
1519 }),
1520 blocks: None,
1521 handling_mode: HandlingMode::Queue,
1522 render_metadata: None,
1523 });
1524
1525 match &mut input {
1526 Input::ExternalEvent(event) => {
1527 migrate_legacy_payload_blocks(event).unwrap();
1528 assert!(event.payload.get("blocks").is_none());
1529 assert_eq!(event.payload["body"], "see image");
1530 assert_eq!(event.blocks.as_ref().map(Vec::len), Some(2));
1531 }
1532 other => panic!("Expected ExternalEvent, got {other:?}"),
1533 }
1534 }
1535
1536 #[test]
1537 fn continuation_input_serde() {
1538 let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
1539 let json = serde_json::to_value(&input).unwrap();
1540 assert_eq!(json["input_type"], "continuation");
1541 let parsed: Input = serde_json::from_value(json).unwrap();
1542 match parsed {
1543 Input::Continuation(continuation) => {
1544 assert_eq!(continuation.handling_mode, HandlingMode::Steer);
1545 assert_eq!(continuation.reason, "detached_background_op_completed");
1546 }
1547 other => panic!("Expected Continuation, got {other:?}"),
1548 }
1549 }
1550
1551 #[test]
1552 fn continuation_input_accepts_legacy_system_generated_tag() {
1553 let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
1554 let mut json = serde_json::to_value(&input).unwrap();
1555 json["input_type"] = serde_json::Value::String("system_generated".into());
1556 let parsed: Input = serde_json::from_value(json).unwrap();
1557 match parsed {
1558 Input::Continuation(continuation) => {
1559 assert_eq!(continuation.reason, "detached_background_op_completed");
1560 }
1561 other => panic!("Expected Continuation, got {other:?}"),
1562 }
1563 }
1564
1565 #[test]
1566 fn operation_input_serde() {
1567 let input = Input::Operation(OperationInput {
1568 header: InputHeader {
1569 durability: InputDurability::Derived,
1570 ..make_header()
1571 },
1572 operation_id: OperationId::new(),
1573 event: OpEvent::Cancelled {
1574 id: OperationId::new(),
1575 },
1576 });
1577 let json = serde_json::to_value(&input).unwrap();
1578 assert_eq!(json["input_type"], "operation");
1579 let parsed: Input = serde_json::from_value(json).unwrap();
1580 assert!(matches!(parsed, Input::Operation(_)));
1581 }
1582
1583 #[test]
1584 fn operation_input_accepts_legacy_projected_tag() {
1585 let input = Input::Operation(OperationInput {
1586 header: InputHeader {
1587 durability: InputDurability::Derived,
1588 ..make_header()
1589 },
1590 operation_id: OperationId::new(),
1591 event: OpEvent::Cancelled {
1592 id: OperationId::new(),
1593 },
1594 });
1595 let mut json = serde_json::to_value(&input).unwrap();
1596 json["input_type"] = serde_json::Value::String("projected".into());
1597 let parsed: Input = serde_json::from_value(json).unwrap();
1598 assert!(matches!(parsed, Input::Operation(_)));
1599 }
1600
1601 #[test]
1602 fn input_kind_id() {
1603 let prompt = Input::Prompt(PromptInput {
1604 header: make_header(),
1605 text: "hi".into(),
1606 blocks: None,
1607 typed_turn_appends: Vec::new(),
1608 turn_metadata: None,
1609 });
1610 assert_eq!(prompt.kind(), InputKind::Prompt);
1611
1612 let peer_msg = Input::Peer(PeerInput {
1613 header: make_header(),
1614 convention: Some(PeerConvention::Message),
1615 body: "hi".into(),
1616 payload: None,
1617 blocks: None,
1618 handling_mode: None,
1619 });
1620 assert_eq!(peer_msg.kind(), InputKind::PeerMessage);
1621
1622 let peer_req = Input::Peer(PeerInput {
1623 header: make_header(),
1624 convention: Some(PeerConvention::Request {
1625 request_id: "r".into(),
1626 intent: "i".into(),
1627 }),
1628 body: "hi".into(),
1629 payload: Some(serde_json::json!({"subject": "x"})),
1630 blocks: None,
1631 handling_mode: None,
1632 });
1633 assert_eq!(peer_req.kind(), InputKind::PeerRequest);
1634
1635 let continuation = Input::Continuation(ContinuationInput {
1636 header: make_header(),
1637 reason: "continue".into(),
1638 handling_mode: HandlingMode::Steer,
1639 request_id: None,
1640 });
1641 assert_eq!(continuation.kind(), InputKind::Continuation);
1642
1643 let operation = Input::Operation(OperationInput {
1644 header: make_header(),
1645 operation_id: OperationId::new(),
1646 event: OpEvent::Cancelled {
1647 id: OperationId::new(),
1648 },
1649 });
1650 assert_eq!(operation.kind(), InputKind::Operation);
1651 }
1652
1653 #[test]
1654 fn input_source_variants() {
1655 let sources = vec![
1656 InputOrigin::Operator,
1657 InputOrigin::Peer {
1658 peer_id: "p1".into(),
1659 display_identity: None,
1660 runtime_id: None,
1661 },
1662 InputOrigin::Flow {
1663 flow_id: "f1".into(),
1664 step_index: 0,
1665 },
1666 InputOrigin::System,
1667 InputOrigin::External {
1668 source_name: "webhook".into(),
1669 },
1670 ];
1671 for source in sources {
1672 let json = serde_json::to_value(&source).unwrap();
1673 let parsed: InputOrigin = serde_json::from_value(json).unwrap();
1674 assert_eq!(source, parsed);
1675 }
1676 }
1677
1678 #[test]
1679 fn input_durability_serde() {
1680 for d in [
1681 InputDurability::Durable,
1682 InputDurability::Ephemeral,
1683 InputDurability::Derived,
1684 ] {
1685 let json = serde_json::to_value(d).unwrap();
1686 let parsed: InputDurability = serde_json::from_value(json).unwrap();
1687 assert_eq!(d, parsed);
1688 }
1689 }
1690
1691 #[test]
1692 fn peer_input_without_handling_mode_deserializes_as_none() {
1693 let json = serde_json::json!({
1695 "input_type": "peer",
1696 "header": serde_json::to_value(make_header()).unwrap(),
1697 "convention": { "convention_type": "message" },
1698 "body": "hello"
1699 });
1700 let parsed: Input = serde_json::from_value(json).unwrap();
1701 match parsed {
1702 Input::Peer(p) => assert!(p.handling_mode.is_none()),
1703 other => panic!("Expected Peer, got {other:?}"),
1704 }
1705 }
1706
1707 #[test]
1708 fn peer_input_with_queue_handling_mode_roundtrips() {
1709 let input = Input::Peer(PeerInput {
1710 header: make_header(),
1711 convention: Some(PeerConvention::Message),
1712 body: "hi".into(),
1713 payload: None,
1714 blocks: None,
1715 handling_mode: Some(HandlingMode::Queue),
1716 });
1717 let json = serde_json::to_value(&input).unwrap();
1718 assert_eq!(json["handling_mode"], "queue");
1719 let parsed: Input = serde_json::from_value(json).unwrap();
1720 match parsed {
1721 Input::Peer(p) => assert_eq!(p.handling_mode, Some(HandlingMode::Queue)),
1722 other => panic!("Expected Peer, got {other:?}"),
1723 }
1724 }
1725
1726 #[test]
1727 fn peer_response_terminal_input_owns_wire_status_mapping() {
1728 let peer_id = meerkat_core::comms::PeerId::from_uuid(
1729 uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000161").unwrap(),
1730 );
1731 let display_name = meerkat_core::comms::PeerName::new("analyst").unwrap();
1732 let request_id = meerkat_core::PeerCorrelationId::from_uuid(
1733 uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000162").unwrap(),
1734 );
1735 let input = peer_response_terminal_input(
1736 peer_id,
1737 Some(display_name),
1738 request_id,
1739 meerkat_contracts::PeerResponseTerminalStatusWire::Cancelled,
1740 serde_json::json!({"ok": false}),
1741 );
1742
1743 match input {
1744 Input::Peer(PeerInput {
1745 header:
1746 InputHeader {
1747 source:
1748 InputOrigin::Peer {
1749 peer_id,
1750 display_identity,
1751 runtime_id,
1752 },
1753 durability: InputDurability::Durable,
1754 correlation_id,
1755 ..
1756 },
1757 convention: Some(PeerConvention::ResponseTerminal { request_id, status }),
1758 payload: Some(payload),
1759 handling_mode: None,
1760 ..
1761 }) => {
1762 assert_eq!(peer_id, "00000000-0000-4000-8000-000000000161");
1763 assert_eq!(display_identity.as_deref(), Some("analyst"));
1764 assert_eq!(runtime_id, None);
1765 assert_eq!(request_id, "00000000-0000-4000-8000-000000000162");
1766 assert_eq!(
1767 correlation_id,
1768 Some(CorrelationId::from_uuid(
1769 uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000162").unwrap()
1770 ))
1771 );
1772 assert_eq!(status, ResponseTerminalStatus::Cancelled);
1773 assert_eq!(payload["ok"], false);
1774 }
1775 other => panic!("expected terminal peer input, got {other:?}"),
1776 }
1777 }
1778
1779 #[test]
1780 fn peer_input_with_steer_handling_mode_roundtrips() {
1781 let input = Input::Peer(PeerInput {
1782 header: make_header(),
1783 convention: Some(PeerConvention::Message),
1784 body: "hi".into(),
1785 payload: None,
1786 blocks: None,
1787 handling_mode: Some(HandlingMode::Steer),
1788 });
1789 let json = serde_json::to_value(&input).unwrap();
1790 assert_eq!(json["handling_mode"], "steer");
1791 let parsed: Input = serde_json::from_value(json).unwrap();
1792 match parsed {
1793 Input::Peer(p) => assert_eq!(p.handling_mode, Some(HandlingMode::Steer)),
1794 other => panic!("Expected Peer, got {other:?}"),
1795 }
1796 }
1797
1798 #[test]
1799 fn peer_input_handling_mode_not_serialized_when_none() {
1800 let input = Input::Peer(PeerInput {
1801 header: make_header(),
1802 convention: Some(PeerConvention::Message),
1803 body: "hi".into(),
1804 payload: None,
1805 blocks: None,
1806 handling_mode: None,
1807 });
1808 let json = serde_json::to_value(&input).unwrap();
1809 assert!(json.get("handling_mode").is_none());
1810 }
1811}