1use chrono::{DateTime, Utc};
8use meerkat_core::lifecycle::InputId;
9use meerkat_core::lifecycle::run_primitive::{
10 ConversationAppend, ConversationAppendRole, ConversationContextAppend, CoreRenderable,
11 RuntimeTurnMetadata,
12};
13use meerkat_core::ops::{OpEvent, OperationId};
14use meerkat_core::service::TurnToolOverlay;
15use meerkat_core::types::{
16 ContentInput, HandlingMode, SystemNoticeBlock, SystemNoticeDirection, SystemNoticeKind,
17 SystemNoticePeer,
18};
19use meerkat_core::{
20 BlobStore, BlobStoreError, MissingBlobBehavior, PeerConversationProjection,
21 PeerResponseProgressProjectionPhase, PeerResponseTerminalCorrelationId,
22 PeerResponseTerminalDisplayIdentity, PeerResponseTerminalFact, PeerResponseTerminalFactError,
23 PeerResponseTerminalProjectionStatus, PeerResponseTerminalRenderPayload,
24 PeerResponseTerminalRouteIdentity, PeerResponseTerminalSource,
25 PeerResponseTerminalTransportIdentity, externalize_content_blocks, hydrate_content_blocks,
26};
27use serde::{Deserialize, Serialize};
28
29use crate::identifiers::{
30 CorrelationId, IdempotencyKey, InputKind, KindId, LogicalRuntimeId, SupersessionKey,
31};
32use meerkat_core::types::RenderMetadata;
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct InputHeader {
37 pub id: InputId,
39 pub timestamp: DateTime<Utc>,
41 pub source: InputOrigin,
43 pub durability: InputDurability,
45 pub visibility: InputVisibility,
47 #[serde(skip_serializing_if = "Option::is_none")]
49 pub idempotency_key: Option<IdempotencyKey>,
50 #[serde(skip_serializing_if = "Option::is_none")]
52 pub supersession_key: Option<SupersessionKey>,
53 #[serde(skip_serializing_if = "Option::is_none")]
55 pub correlation_id: Option<CorrelationId>,
56}
57
58#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
60#[serde(tag = "type", rename_all = "snake_case")]
61#[non_exhaustive]
62pub enum InputOrigin {
63 Operator,
65 Peer {
67 peer_id: String,
70 #[serde(default, skip_serializing_if = "Option::is_none")]
74 display_identity: Option<String>,
75 #[serde(skip_serializing_if = "Option::is_none")]
76 runtime_id: Option<LogicalRuntimeId>,
77 },
78 Flow { flow_id: String, step_index: usize },
80 System,
82 External { source_name: String },
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
88#[serde(rename_all = "snake_case")]
89#[non_exhaustive]
90pub enum InputDurability {
91 Durable,
93 Ephemeral,
95 Derived,
97}
98
99#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
101pub struct InputVisibility {
102 pub transcript_eligible: bool,
104 pub operator_eligible: bool,
106}
107
108impl Default for InputVisibility {
109 fn default() -> Self {
110 Self {
111 transcript_eligible: true,
112 operator_eligible: true,
113 }
114 }
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
119#[serde(tag = "input_type", rename_all = "snake_case")]
120#[non_exhaustive]
121pub enum Input {
122 Prompt(PromptInput),
124 Peer(PeerInput),
126 FlowStep(FlowStepInput),
128 ExternalEvent(ExternalEventInput),
130 Continuation(ContinuationInput),
132 Operation(OperationInput),
134}
135
136impl Input {
137 pub fn header(&self) -> &InputHeader {
139 match self {
140 Input::Prompt(i) => &i.header,
141 Input::Peer(i) => &i.header,
142 Input::FlowStep(i) => &i.header,
143 Input::ExternalEvent(i) => &i.header,
144 Input::Continuation(i) => &i.header,
145 Input::Operation(i) => &i.header,
146 }
147 }
148
149 pub fn id(&self) -> &InputId {
151 &self.header().id
152 }
153
154 pub fn kind(&self) -> InputKind {
156 match self {
157 Input::Prompt(_) => InputKind::Prompt,
158 Input::Peer(p) => match &p.convention {
159 Some(PeerConvention::Message) | None => InputKind::PeerMessage,
160 Some(PeerConvention::Request { .. }) => InputKind::PeerRequest,
161 Some(PeerConvention::ResponseProgress { .. }) => InputKind::PeerResponseProgress,
162 Some(PeerConvention::ResponseTerminal { .. }) => InputKind::PeerResponseTerminal,
163 },
164 Input::FlowStep(_) => InputKind::FlowStep,
165 Input::ExternalEvent(_) => InputKind::ExternalEvent,
166 Input::Continuation(_) => InputKind::Continuation,
167 Input::Operation(_) => InputKind::Operation,
168 }
169 }
170
171 pub fn kind_id(&self) -> KindId {
173 KindId::new(self.kind())
174 }
175
176 pub fn handling_mode(&self) -> Option<HandlingMode> {
178 match self {
179 Input::Prompt(prompt) => prompt.turn_metadata.as_ref()?.handling_mode,
180 Input::FlowStep(flow_step) => flow_step.turn_metadata.as_ref()?.handling_mode,
181 Input::ExternalEvent(event) => Some(event.handling_mode),
182 Input::Continuation(continuation) => Some(continuation.handling_mode),
183 Input::Peer(peer) => peer.handling_mode,
184 Input::Operation(_) => None,
185 }
186 }
187
188 pub fn continuation_kind(&self) -> ContinuationKind {
194 match self {
195 Input::Continuation(continuation) => continuation.continuation_kind,
196 _ => ContinuationKind::Ordinary,
197 }
198 }
199}
200
201fn reject_legacy_payload_blocks(event: &ExternalEventInput) -> Result<(), BlobStoreError> {
206 if event
207 .payload
208 .as_object()
209 .is_some_and(|obj| obj.contains_key("blocks"))
210 {
211 return Err(BlobStoreError::Internal(format!(
212 "external-event payload for event_type `{}` carries the retired payload-level \
213 `blocks` key; multimodal content must use the typed `ExternalEventInput.blocks` owner",
214 event.event_type
215 )));
216 }
217 Ok(())
218}
219
220pub async fn externalize_input_images(
221 blob_store: &dyn BlobStore,
222 input: &mut Input,
223) -> Result<(), BlobStoreError> {
224 match input {
225 Input::Prompt(prompt) => {
226 if let ContentInput::Blocks(blocks) = &mut prompt.content {
227 externalize_content_blocks(blob_store, blocks).await?;
228 }
229 }
230 Input::Peer(peer) => {
231 if let ContentInput::Blocks(blocks) = &mut peer.content {
232 externalize_content_blocks(blob_store, blocks).await?;
233 }
234 }
235 Input::FlowStep(flow_step) => {
236 if let ContentInput::Blocks(blocks) = &mut flow_step.content {
237 externalize_content_blocks(blob_store, blocks).await?;
238 }
239 }
240 Input::ExternalEvent(event) => {
241 reject_legacy_payload_blocks(event)?;
242 if let Some(blocks) = event.blocks.as_mut() {
243 externalize_content_blocks(blob_store, blocks).await?;
244 }
245 }
246 Input::Continuation(_) | Input::Operation(_) => {}
247 }
248 Ok(())
249}
250
251pub async fn hydrate_input_images(
252 blob_store: &dyn BlobStore,
253 input: &mut Input,
254 missing_behavior: MissingBlobBehavior,
255) -> Result<(), BlobStoreError> {
256 match input {
257 Input::Prompt(prompt) => {
258 if let ContentInput::Blocks(blocks) = &mut prompt.content {
259 hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
260 }
261 }
262 Input::Peer(peer) => {
263 if let ContentInput::Blocks(blocks) = &mut peer.content {
264 hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
265 }
266 }
267 Input::FlowStep(flow_step) => {
268 if let ContentInput::Blocks(blocks) = &mut flow_step.content {
269 hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
270 }
271 }
272 Input::ExternalEvent(event) => {
273 reject_legacy_payload_blocks(event)?;
274 if let Some(blocks) = event.blocks.as_mut() {
275 hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
276 }
277 }
278 Input::Continuation(_) | Input::Operation(_) => {}
279 }
280 Ok(())
281}
282
283#[derive(Debug, Clone, Serialize, Deserialize)]
285pub struct PromptInput {
286 pub header: InputHeader,
287 pub content: ContentInput,
292 #[serde(default, skip_serializing_if = "Vec::is_empty")]
297 pub typed_turn_appends: Vec<ConversationAppend>,
298 #[serde(default, skip_serializing_if = "Option::is_none")]
299 pub turn_metadata: Option<RuntimeTurnMetadata>,
300}
301
302impl PromptInput {
303 pub fn new(text: impl Into<String>, turn_metadata: Option<RuntimeTurnMetadata>) -> Self {
305 Self {
306 header: InputHeader {
307 id: meerkat_core::lifecycle::InputId::new(),
308 timestamp: chrono::Utc::now(),
309 source: InputOrigin::Operator,
310 durability: InputDurability::Durable,
311 visibility: InputVisibility::default(),
312 idempotency_key: None,
313 supersession_key: None,
314 correlation_id: None,
315 },
316 content: ContentInput::Text(text.into()),
317 typed_turn_appends: Vec::new(),
318 turn_metadata,
319 }
320 }
321
322 pub fn from_content_input(
324 input: ContentInput,
325 turn_metadata: Option<RuntimeTurnMetadata>,
326 ) -> Self {
327 Self {
328 header: InputHeader {
329 id: meerkat_core::lifecycle::InputId::new(),
330 timestamp: chrono::Utc::now(),
331 source: InputOrigin::Operator,
332 durability: InputDurability::Durable,
333 visibility: InputVisibility::default(),
334 idempotency_key: None,
335 supersession_key: None,
336 correlation_id: None,
337 },
338 content: input,
339 typed_turn_appends: Vec::new(),
340 turn_metadata,
341 }
342 }
343}
344
345#[derive(Debug, Clone, Serialize, Deserialize)]
347pub struct PeerInput {
348 pub header: InputHeader,
349 #[serde(skip_serializing_if = "Option::is_none")]
351 pub convention: Option<PeerConvention>,
352 pub content: ContentInput,
359 #[serde(default, skip_serializing_if = "Option::is_none")]
364 pub payload: Option<serde_json::Value>,
365 #[serde(default, skip_serializing_if = "Option::is_none")]
371 pub handling_mode: Option<HandlingMode>,
372}
373
374#[derive(Debug, Clone, Serialize, Deserialize)]
376#[serde(tag = "convention_type", rename_all = "snake_case")]
377#[non_exhaustive]
378pub enum PeerConvention {
379 Message,
381 Request { request_id: String, intent: String },
383 ResponseProgress {
385 request_id: String,
386 phase: ResponseProgressPhase,
387 },
388 ResponseTerminal {
390 request_id: String,
391 status: ResponseTerminalStatus,
392 },
393}
394
395pub type ResponseProgressPhase = PeerResponseProgressProjectionPhase;
398
399pub type ResponseTerminalStatus = PeerResponseTerminalProjectionStatus;
402
403pub fn response_terminal_status_from_wire(
404 status: meerkat_contracts::PeerResponseTerminalStatusWire,
405) -> ResponseTerminalStatus {
406 match status {
407 meerkat_contracts::PeerResponseTerminalStatusWire::Completed => {
408 PeerResponseTerminalProjectionStatus::Completed
409 }
410 meerkat_contracts::PeerResponseTerminalStatusWire::Failed => {
411 PeerResponseTerminalProjectionStatus::Failed
412 }
413 meerkat_contracts::PeerResponseTerminalStatusWire::Cancelled => {
414 PeerResponseTerminalProjectionStatus::Cancelled
415 }
416 }
417}
418
419pub fn peer_response_terminal_input(
420 peer_id: meerkat_core::comms::PeerId,
421 display_name: Option<meerkat_core::comms::PeerName>,
422 request_id: meerkat_core::PeerCorrelationId,
423 status: meerkat_contracts::PeerResponseTerminalStatusWire,
424 result: serde_json::Value,
425) -> Input {
426 let correlation_id = CorrelationId::from_uuid(request_id.as_uuid());
427 let request_id = request_id.to_string();
428 let peer_id = peer_id.to_string();
429 let display_identity = display_name.map_or_else(|| peer_id.clone(), |name| name.as_string());
430
431 Input::Peer(PeerInput {
432 header: InputHeader {
433 id: InputId::new(),
434 timestamp: Utc::now(),
435 source: InputOrigin::Peer {
436 peer_id,
437 display_identity: Some(display_identity),
438 runtime_id: None,
439 },
440 durability: InputDurability::Durable,
441 visibility: InputVisibility::default(),
442 idempotency_key: None,
443 supersession_key: None,
444 correlation_id: Some(correlation_id),
445 },
446 convention: Some(PeerConvention::ResponseTerminal {
447 request_id,
448 status: response_terminal_status_from_wire(status),
449 }),
450 content: ContentInput::Text(String::new()),
451 payload: Some(result),
452 handling_mode: None,
453 })
454}
455
456#[derive(Debug, Clone, Serialize, Deserialize)]
458pub struct FlowStepInput {
459 pub header: InputHeader,
460 pub step_id: String,
462 pub content: ContentInput,
467 #[serde(default, skip_serializing_if = "Option::is_none")]
468 pub turn_metadata: Option<RuntimeTurnMetadata>,
469}
470
471#[derive(Debug, Clone, Serialize, Deserialize)]
473pub struct ExternalEventInput {
474 pub header: InputHeader,
475 pub event_type: String,
477 pub payload: serde_json::Value,
481 #[serde(default, skip_serializing_if = "Option::is_none")]
484 pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
485 #[serde(default)]
487 pub handling_mode: HandlingMode,
488 #[serde(default, skip_serializing_if = "Option::is_none")]
490 pub render_metadata: Option<RenderMetadata>,
491}
492
493#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
503#[serde(rename_all = "snake_case")]
504pub enum ContinuationKind {
505 #[default]
507 Ordinary,
508 WorkgraphAttention,
510}
511
512#[derive(Debug, Clone, Serialize, Deserialize)]
516pub struct ContinuationInput {
517 pub header: InputHeader,
518 pub reason: String,
520 #[serde(default)]
524 pub continuation_kind: ContinuationKind,
525 #[serde(default)]
527 pub handling_mode: HandlingMode,
528 #[serde(default, skip_serializing_if = "Option::is_none")]
530 pub request_id: Option<String>,
531 #[serde(default, skip_serializing_if = "Option::is_none")]
533 pub flow_tool_overlay: Option<TurnToolOverlay>,
534 #[serde(default, skip_serializing_if = "Option::is_none")]
536 pub context_append: Option<ConversationContextAppend>,
537 #[serde(default, skip_serializing_if = "Option::is_none")]
539 pub turn_append: Option<ConversationAppend>,
540}
541
542impl ContinuationInput {
543 pub fn detached_background_op_completed() -> Self {
549 Self {
550 header: InputHeader {
551 id: meerkat_core::lifecycle::InputId::new(),
552 timestamp: chrono::Utc::now(),
553 source: InputOrigin::System,
554 durability: InputDurability::Derived,
555 visibility: InputVisibility {
556 transcript_eligible: false,
557 operator_eligible: false,
558 },
559 idempotency_key: None,
560 supersession_key: None,
561 correlation_id: None,
562 },
563 reason: "detached_background_op_completed".to_string(),
564 continuation_kind: ContinuationKind::Ordinary,
565 handling_mode: HandlingMode::Steer,
566 request_id: None,
567 flow_tool_overlay: None,
568 context_append: None,
569 turn_append: None,
570 }
571 }
572}
573
574#[derive(Debug, Clone, Serialize, Deserialize)]
577pub struct OperationInput {
578 pub header: InputHeader,
579 pub operation_id: OperationId,
581 pub event: OpEvent,
583}
584
585pub(crate) fn peer_projection_from_peer_input(
592 peer: &PeerInput,
593) -> Option<PeerConversationProjection> {
594 peer_projection_from_peer_input_with_id(peer, peer_canonical_id(peer)?.as_str())
595}
596
597fn peer_projection_from_peer_input_with_id(
598 peer: &PeerInput,
599 peer_id: &str,
600) -> Option<PeerConversationProjection> {
601 let peer_id = peer_id.to_string();
602
603 match &peer.convention {
604 Some(PeerConvention::Message) => Some(PeerConversationProjection::Message { peer_id }),
605 Some(PeerConvention::Request { request_id, intent }) => {
606 let peer_id = match meerkat_core::comms::PeerId::parse(peer_id.as_str()) {
607 Ok(peer_id) => peer_id,
608 Err(error) => {
609 tracing::warn!(
610 peer_id,
611 error = %error,
612 "dropping peer request projection with non-canonical peer_id"
613 );
614 return None;
615 }
616 };
617 Some(PeerConversationProjection::Request {
618 peer_id,
619 display_name: peer_display_label(peer),
620 request_id: request_id.clone(),
621 intent: intent.clone(),
622 payload: peer.payload.clone(),
623 })
624 }
625 Some(PeerConvention::ResponseProgress { request_id, phase }) => {
626 Some(PeerConversationProjection::ResponseProgress {
627 peer_id,
628 request_id: request_id.clone(),
629 phase: *phase,
630 payload: peer.payload.clone(),
631 })
632 }
633 Some(PeerConvention::ResponseTerminal { .. }) => None,
634 None => None,
635 }
636}
637
638pub(crate) fn peer_response_terminal_fact(
639 peer: &PeerInput,
640) -> Result<Option<PeerResponseTerminalFact>, PeerResponseTerminalFactError> {
641 let InputOrigin::Peer {
642 peer_id,
643 display_identity,
644 runtime_id,
645 } = &peer.header.source
646 else {
647 return Ok(None);
648 };
649 let Some(PeerConvention::ResponseTerminal { request_id, status }) = &peer.convention else {
650 return Ok(None);
651 };
652
653 let transport_identity = runtime_id
654 .as_ref()
655 .map(ToString::to_string)
656 .map(PeerResponseTerminalTransportIdentity::parse)
657 .transpose()?;
658 let source = PeerResponseTerminalSource::new(
659 transport_identity,
660 PeerResponseTerminalRouteIdentity::parse(peer_id.clone())?,
661 PeerResponseTerminalDisplayIdentity::parse(
662 display_identity
663 .as_ref()
664 .ok_or(PeerResponseTerminalFactError::MissingDisplayIdentity)?
665 .clone(),
666 )?,
667 );
668 Ok(Some(PeerResponseTerminalFact::new(
669 source,
670 PeerResponseTerminalCorrelationId::parse(request_id)?,
671 *status,
672 PeerResponseTerminalRenderPayload::new(peer.payload.clone()),
673 )))
674}
675
676pub(crate) fn validate_peer_response_terminal_fact(
677 input: &Input,
678) -> Result<(), PeerResponseTerminalFactError> {
679 let Input::Peer(peer) = input else {
680 return Ok(());
681 };
682 peer_response_terminal_fact(peer).map(|_| ())
683}
684
685#[cfg(test)]
688pub(crate) fn peer_projection(input: &Input) -> Option<PeerConversationProjection> {
689 let Input::Peer(peer) = input else {
690 return None;
691 };
692 peer_projection_from_peer_input(peer)
693}
694
695fn peer_canonical_id(peer: &PeerInput) -> Option<String> {
696 let InputOrigin::Peer { peer_id, .. } = &peer.header.source else {
697 return None;
698 };
699 Some(peer_id.clone())
700}
701
702fn peer_display_label(peer: &PeerInput) -> Option<String> {
703 let InputOrigin::Peer {
704 display_identity, ..
705 } = &peer.header.source
706 else {
707 return None;
708 };
709
710 display_identity
711 .as_ref()
712 .map(|label| label.trim())
713 .filter(|label| !label.is_empty())
714 .map(ToOwned::to_owned)
715}
716
717pub(crate) fn peer_prompt_text(peer: &PeerInput) -> String {
719 peer_projection_from_peer_input(peer)
720 .map(|projection| {
721 let prompt = projection.prompt_text();
722 if prompt.is_empty() {
723 peer.content.text_content()
724 } else {
725 prompt
726 }
727 })
728 .unwrap_or_else(|| peer.content.text_content())
729}
730
731pub(crate) fn input_prompt_text(input: &Input) -> String {
732 match input {
733 Input::Prompt(p) => p.content.text_content(),
734 Input::Peer(p) => peer_prompt_text(p),
735 Input::FlowStep(f) => f.content.text_content(),
736 Input::ExternalEvent(e) => external_event_projection_text(e),
737 Input::Continuation(continuation) => format!("[Continuation] {}", continuation.reason),
738 Input::Operation(operation) => {
739 format!(
740 "[Operation {}] {:?}",
741 operation.operation_id, operation.event
742 )
743 }
744 }
745}
746
747fn external_event_projection_text(event: &ExternalEventInput) -> String {
748 let source_name = match &event.header.source {
749 InputOrigin::External { source_name } if !source_name.trim().is_empty() => {
750 source_name.as_str()
751 }
752 _ => event.event_type.as_str(),
753 };
754 let body = event
755 .payload
756 .get("body")
757 .and_then(serde_json::Value::as_str)
758 .map(str::trim);
759
760 meerkat_core::interaction::format_external_event_projection(source_name, body)
761}
762
763fn peer_notice_renderable(peer: &PeerInput) -> Option<CoreRenderable> {
764 let (peer_id, display_name) = match &peer.header.source {
765 InputOrigin::Peer {
766 peer_id,
767 display_identity,
768 ..
769 } => (peer_id.clone(), display_identity.clone()),
770 _ => return None,
771 };
772 use meerkat_core::types::CommsNoticeKind;
773 let (kind, request_id, intent, status) = match &peer.convention {
774 Some(PeerConvention::Message) | None => (CommsNoticeKind::Message, None, None, None),
775 Some(PeerConvention::Request { request_id, intent }) => (
776 CommsNoticeKind::Request,
777 Some(request_id.clone()),
778 Some(intent.clone()),
779 None,
780 ),
781 Some(PeerConvention::ResponseProgress { request_id, phase }) => (
782 CommsNoticeKind::ResponseProgress,
783 Some(request_id.clone()),
784 None,
785 Some(format!("{phase:?}")),
786 ),
787 Some(PeerConvention::ResponseTerminal { request_id, status }) => (
788 CommsNoticeKind::ResponseTerminal,
789 Some(request_id.clone()),
790 None,
791 Some(format!("{status:?}")),
792 ),
793 };
794 let summary = match kind {
795 CommsNoticeKind::Request => intent.as_ref().map_or_else(
796 || "Peer request".to_string(),
797 |intent| format!("Peer request: {intent}"),
798 ),
799 CommsNoticeKind::ResponseProgress => "Peer response progress".to_string(),
800 CommsNoticeKind::ResponseTerminal => "Peer response terminal".to_string(),
801 CommsNoticeKind::Message | CommsNoticeKind::Other(_) => "Peer message".to_string(),
802 };
803 let content = match &peer.content {
804 ContentInput::Text(body) if body.is_empty() => Vec::new(),
805 ContentInput::Text(body) => {
806 vec![meerkat_core::types::ContentBlock::Text { text: body.clone() }]
807 }
808 ContentInput::Blocks(blocks) => blocks.clone(),
809 };
810 let notice_peer = meerkat_core::comms::PeerId::parse(&peer_id)
817 .ok()
818 .map(|id| SystemNoticePeer { id, display_name });
819 Some(CoreRenderable::SystemNotice {
820 kind: SystemNoticeKind::Comms,
821 body: Some(summary.clone()),
822 blocks: vec![SystemNoticeBlock::Comms {
823 kind,
824 direction: SystemNoticeDirection::Incoming,
825 peer: notice_peer,
826 request_id,
827 intent,
828 status,
829 summary: Some(summary),
830 payload: peer.payload.clone(),
831 content,
832 }],
833 })
834}
835
836fn external_event_notice_renderable(event: &ExternalEventInput) -> CoreRenderable {
837 let source = match &event.header.source {
838 InputOrigin::External { source_name } if !source_name.trim().is_empty() => {
839 source_name.clone()
840 }
841 _ => event.event_type.clone(),
842 };
843 let body = event
844 .payload
845 .get("body")
846 .and_then(serde_json::Value::as_str)
847 .map(str::trim)
848 .filter(|body| !body.is_empty())
849 .map(ToOwned::to_owned);
850 let summary = body.as_ref().map_or_else(
851 || format!("External event via {source}"),
852 std::clone::Clone::clone,
853 );
854 CoreRenderable::SystemNotice {
855 kind: SystemNoticeKind::ExternalEvent,
856 body: Some(summary.clone()),
857 blocks: vec![SystemNoticeBlock::ExternalEvent {
858 source,
859 event_type: event.event_type.clone(),
860 summary: Some(summary),
861 body,
862 payload: Some(event.payload.clone()),
863 content: event.blocks.clone().unwrap_or_default(),
864 }],
865 }
866}
867
868fn input_to_append(input: &Input) -> Option<ConversationAppend> {
869 let (role, content) = match input {
877 Input::Prompt(p)
878 if !p.typed_turn_appends.is_empty()
879 && match &p.content {
880 ContentInput::Text(text) => text.trim().is_empty(),
881 ContentInput::Blocks(blocks) => blocks.is_empty(),
882 } =>
883 {
884 return None;
885 }
886 Input::Prompt(p) => match &p.content {
887 ContentInput::Blocks(blocks) => (
888 ConversationAppendRole::User,
889 CoreRenderable::Blocks {
890 blocks: blocks.clone(),
891 },
892 ),
893 ContentInput::Text(_) => (
894 ConversationAppendRole::User,
895 CoreRenderable::Text {
896 text: input_prompt_text(input),
897 },
898 ),
899 },
900 Input::Peer(p) => peer_notice_renderable(p)
901 .map(|content| (ConversationAppendRole::SystemNotice, content))?,
902 Input::FlowStep(f) => (
903 ConversationAppendRole::SystemNotice,
904 CoreRenderable::SystemNotice {
905 kind: SystemNoticeKind::Generic,
906 body: Some(format!("Flow step {}", f.step_id)),
907 blocks: vec![SystemNoticeBlock::RuntimeNotice {
908 category: "flow_step".to_string(),
909 detail: Some(f.content.text_content()),
910 payload: None,
911 }],
912 },
913 ),
914 Input::ExternalEvent(e) => (
915 ConversationAppendRole::SystemNotice,
916 external_event_notice_renderable(e),
917 ),
918 Input::Continuation(continuation) => return continuation.turn_append.clone(),
919 Input::Operation(_) => return None,
920 };
921
922 Some(ConversationAppend { role, content })
923}
924
925fn input_to_context_append(input: &Input) -> Option<ConversationContextAppend> {
926 let (projection, content) = match input {
927 Input::Continuation(continuation) => {
928 return continuation.context_append.clone();
929 }
930 Input::Peer(peer) => {
931 let projection = peer_projection_from_peer_input(peer)?;
932 let content = peer_notice_renderable(peer)?;
933 (projection, content)
934 }
935 _ => return None,
936 };
937
938 Some(ConversationContextAppend {
939 key: projection.context_key()?,
940 content,
941 })
942}
943
944fn peer_response_terminal_context_append(
945 peer: &PeerInput,
946) -> Result<Option<ConversationContextAppend>, PeerResponseTerminalFactError> {
947 let Some(fact) = peer_response_terminal_fact(peer)? else {
948 return Ok(None);
949 };
950
951 Ok(Some(ConversationContextAppend {
952 key: fact.context_key(),
953 content: CoreRenderable::SystemNotice {
954 kind: SystemNoticeKind::Comms,
955 body: Some("Peer terminal response context".to_string()),
956 blocks: vec![SystemNoticeBlock::Comms {
957 kind: meerkat_core::types::CommsNoticeKind::ResponseTerminal,
958 direction: SystemNoticeDirection::Incoming,
959 peer: Some(SystemNoticePeer {
960 id: fact.source.route_identity.peer_id(),
961 display_name: Some(fact.source.display_identity.to_string()),
962 }),
963 request_id: Some(fact.correlation_id.to_string()),
964 intent: None,
965 status: Some(fact.status.label().to_string()),
966 summary: Some("Peer terminal response".to_string()),
967 payload: fact.render_payload.as_ref().cloned(),
968 content: Vec::new(),
969 }],
970 },
971 }))
972}
973
974pub(crate) fn runtime_input_projection(
975 input: &Input,
976) -> crate::ingress_types::RuntimeInputProjection {
977 crate::ingress_types::RuntimeInputProjection {
978 append: input_to_append(input),
979 additional_appends: match input {
980 Input::Prompt(prompt) => prompt.typed_turn_appends.clone(),
981 _ => Vec::new(),
982 },
983 context_append: input_to_context_append(input),
984 peer_response_terminal: None,
985 }
986}
987
988pub(crate) fn runtime_input_projection_for_machine_batch(
989 input: &Input,
990) -> crate::ingress_types::RuntimeInputProjection {
991 let mut projection = runtime_input_projection(input);
992 if let Input::Peer(peer) = input
993 && let Ok(Some(context_append)) = peer_response_terminal_context_append(peer)
994 {
995 projection.context_append = Some(context_append);
996 if let Ok(fact) = peer_response_terminal_fact(peer) {
1000 projection.peer_response_terminal = fact;
1001 }
1002 }
1003 projection
1004}
1005
1006pub(crate) fn context_append_to_pending_system_context_append(
1007 append: &ConversationContextAppend,
1008 peer_response_terminal: Option<&meerkat_core::PeerResponseTerminalFact>,
1009) -> meerkat_core::PendingSystemContextAppend {
1010 meerkat_core::PendingSystemContextAppend {
1011 content: append.content.clone(),
1012 source: Some(append.key.clone()),
1013 idempotency_key: Some(append.key.clone()),
1014 source_kind: meerkat_core::session::SystemContextSource::Normal,
1016 peer_response_terminal: peer_response_terminal.cloned(),
1021 accepted_at: meerkat_core::time_compat::SystemTime::now(),
1022 }
1023}
1024
1025pub(crate) fn projection_to_pending_system_context_appends(
1026 input_id: &InputId,
1027 projection: &crate::ingress_types::RuntimeInputProjection,
1028) -> Vec<meerkat_core::PendingSystemContextAppend> {
1029 if let Some(append) = projection.context_append.as_ref() {
1030 return std::iter::once(context_append_to_pending_system_context_append(
1031 append,
1032 projection.peer_response_terminal.as_ref(),
1033 ))
1034 .filter(|append| !append.content.render_text().trim().is_empty())
1035 .collect();
1036 }
1037
1038 projection
1039 .append
1040 .as_ref()
1041 .map(|append| {
1042 let key = format!("runtime:steer:{input_id}");
1048 meerkat_core::PendingSystemContextAppend {
1049 content: append.content.clone(),
1050 source: Some(key.clone()),
1051 idempotency_key: Some(key),
1052 source_kind: meerkat_core::session::SystemContextSource::RuntimeSteer,
1053 peer_response_terminal: None,
1055 accepted_at: meerkat_core::time_compat::SystemTime::now(),
1056 }
1057 })
1058 .into_iter()
1059 .filter(|append| !append.content.render_text().trim().is_empty())
1060 .collect()
1061}
1062
1063#[cfg(test)]
1064#[allow(clippy::unwrap_used, clippy::panic)]
1065mod tests {
1066 use super::*;
1067 use chrono::Utc;
1068
1069 fn make_header() -> InputHeader {
1070 InputHeader {
1071 id: InputId::new(),
1072 timestamp: Utc::now(),
1073 source: InputOrigin::Operator,
1074 durability: InputDurability::Durable,
1075 visibility: InputVisibility::default(),
1076 idempotency_key: None,
1077 supersession_key: None,
1078 correlation_id: None,
1079 }
1080 }
1081
1082 fn typed_runtime_notice_append(detail: &str) -> ConversationAppend {
1083 ConversationAppend {
1084 role: ConversationAppendRole::SystemNotice,
1085 content: CoreRenderable::SystemNotice {
1086 kind: meerkat_core::types::SystemNoticeKind::Generic,
1087 body: Some(detail.to_string()),
1088 blocks: vec![meerkat_core::types::SystemNoticeBlock::RuntimeNotice {
1089 category: "test".to_string(),
1090 detail: Some(detail.to_string()),
1091 payload: None,
1092 }],
1093 },
1094 }
1095 }
1096
1097 #[test]
1098 fn prompt_input_serde() {
1099 let input = Input::Prompt(PromptInput {
1100 header: make_header(),
1101 content: "hello".into(),
1102 typed_turn_appends: Vec::new(),
1103 turn_metadata: None,
1104 });
1105 let json = serde_json::to_value(&input).unwrap();
1106 assert_eq!(json["input_type"], "prompt");
1107 let parsed: Input = serde_json::from_value(json).unwrap();
1108 assert!(matches!(parsed, Input::Prompt(_)));
1109 }
1110
1111 #[test]
1112 fn prompt_input_typed_turn_appends_project_without_user_text() {
1113 let append = typed_runtime_notice_append("peer delivery");
1114 let input = Input::Prompt(PromptInput {
1115 header: make_header(),
1116 content: ContentInput::Text(String::new()),
1117 typed_turn_appends: vec![append.clone()],
1118 turn_metadata: None,
1119 });
1120
1121 let projection = runtime_input_projection(&input);
1122 assert!(
1123 projection.append.is_none(),
1124 "empty runtime-authored prompt carrier must not synthesize a user append"
1125 );
1126 assert_eq!(projection.additional_appends, vec![append]);
1127 }
1128
1129 #[test]
1130 fn prompt_input_typed_turn_appends_serde_roundtrip() {
1131 let append = typed_runtime_notice_append("typed appends persist");
1132 let input = Input::Prompt(PromptInput {
1133 header: make_header(),
1134 content: ContentInput::Text(String::new()),
1135 typed_turn_appends: vec![append.clone()],
1136 turn_metadata: None,
1137 });
1138
1139 let json = serde_json::to_value(&input).unwrap();
1140 let parsed: Input = serde_json::from_value(json).unwrap();
1141 let Input::Prompt(prompt) = parsed else {
1142 panic!("expected prompt input");
1143 };
1144 assert_eq!(prompt.content.text_content(), "");
1145 assert_eq!(prompt.typed_turn_appends, vec![append]);
1146 }
1147
1148 #[test]
1149 fn peer_input_message_serde() {
1150 let input = Input::Peer(PeerInput {
1151 header: make_header(),
1152 convention: Some(PeerConvention::Message),
1153 content: "hi there".into(),
1154 payload: None,
1155 handling_mode: None,
1156 });
1157 let json = serde_json::to_value(&input).unwrap();
1158 assert_eq!(json["input_type"], "peer");
1159 let parsed: Input = serde_json::from_value(json).unwrap();
1160 assert!(matches!(parsed, Input::Peer(_)));
1161 }
1162
1163 #[test]
1164 fn peer_message_blocks_preserve_typed_comms_content_without_prefix_injection() {
1165 let peer_id = "018f6f79-7a82-7c4e-a552-a3b86f963005";
1166 let mut header = make_header();
1167 header.source = InputOrigin::Peer {
1168 peer_id: peer_id.into(),
1169 display_identity: Some("display-agent".into()),
1170 runtime_id: None,
1171 };
1172 let input = Input::Peer(PeerInput {
1173 header,
1174 convention: Some(PeerConvention::Message),
1175 content: ContentInput::Blocks(vec![
1176 meerkat_core::types::ContentBlock::Text {
1177 text: "caption".into(),
1178 },
1179 meerkat_core::types::ContentBlock::Image {
1180 media_type: "image/png".into(),
1181 data: "abc".into(),
1182 },
1183 ]),
1184 payload: None,
1185 handling_mode: None,
1186 });
1187
1188 let Input::Peer(peer) = &input else {
1189 panic!("expected peer input");
1190 };
1191 assert_eq!(
1192 peer_projection_from_peer_input(peer)
1193 .and_then(|projection| projection.block_prefix_text())
1194 .as_deref(),
1195 Some(format!("Peer message from {peer_id}").as_str())
1196 );
1197
1198 let projection = runtime_input_projection(&input);
1199 let append = projection.append.expect("conversation append");
1200 let CoreRenderable::SystemNotice { blocks, .. } = append.content else {
1201 panic!("expected typed system notice");
1202 };
1203 let Some(meerkat_core::types::SystemNoticeBlock::Comms { content, peer, .. }) =
1204 blocks.first()
1205 else {
1206 panic!("expected comms block");
1207 };
1208 assert_eq!(
1209 peer.as_ref().and_then(|peer| peer.display_name.as_deref()),
1210 Some("display-agent")
1211 );
1212 assert_eq!(
1213 content.first(),
1214 Some(&meerkat_core::types::ContentBlock::Text {
1215 text: "caption".into()
1216 })
1217 );
1218 }
1219
1220 #[test]
1221 fn peer_response_terminal_context_is_deferred_to_machine_batch_projection() {
1222 let route_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f2";
1223 let request_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f1";
1224 let mut header = make_header();
1225 header.source = InputOrigin::Peer {
1226 peer_id: route_id.into(),
1227 display_identity: Some("display-agent".into()),
1228 runtime_id: None,
1229 };
1230 let input = Input::Peer(PeerInput {
1231 header,
1232 convention: Some(PeerConvention::ResponseTerminal {
1233 request_id: request_id.into(),
1234 status: ResponseTerminalStatus::Completed,
1235 }),
1236 content: "response body".into(),
1237 payload: Some(serde_json::json!({"answer":"ok"})),
1238 handling_mode: None,
1239 });
1240
1241 let Input::Peer(peer) = &input else {
1242 panic!("expected peer input");
1243 };
1244 let expected_canonical_key = format!("peer_response_terminal:{route_id}:{request_id}");
1245 assert!(
1246 peer_projection_from_peer_input(peer).is_none(),
1247 "terminal peer response projection must not be built before machine batch selection"
1248 );
1249
1250 let projection = runtime_input_projection(&input);
1251 assert!(
1252 projection.context_append.is_none(),
1253 "admission projection must not store terminal peer response context"
1254 );
1255 let projection = runtime_input_projection_for_machine_batch(&input);
1256 let context = projection.context_append.expect("context append");
1257 assert_eq!(context.key, expected_canonical_key);
1258 let CoreRenderable::SystemNotice { blocks, .. } = context.content else {
1259 panic!("expected typed context");
1260 };
1261 let Some(meerkat_core::types::SystemNoticeBlock::Comms { peer, .. }) = blocks.first()
1262 else {
1263 panic!("expected comms block");
1264 };
1265 assert_eq!(
1266 peer.as_ref().and_then(|peer| peer.display_name.as_deref()),
1267 Some("display-agent")
1268 );
1269 assert_eq!(
1270 peer.as_ref().map(|peer| peer.id),
1271 Some(meerkat_core::comms::PeerId::parse(route_id).expect("valid route id"))
1272 );
1273 }
1274
1275 #[test]
1276 fn steer_projection_uses_context_append_as_pending_system_context() {
1277 let input_id = InputId::new();
1278 let projection = crate::ingress_types::RuntimeInputProjection {
1279 append: Some(ConversationAppend {
1280 role: ConversationAppendRole::SystemNotice,
1281 content: CoreRenderable::Text {
1282 text: "ordinary append must lose to context append".into(),
1283 },
1284 }),
1285 additional_appends: Vec::new(),
1286 context_append: Some(ConversationContextAppend {
1287 key: "peer_response_terminal:peer:req".into(),
1288 content: CoreRenderable::Text {
1289 text: "terminal response is ready".into(),
1290 },
1291 }),
1292 peer_response_terminal: None,
1293 };
1294
1295 let appends = projection_to_pending_system_context_appends(&input_id, &projection);
1296
1297 assert_eq!(appends.len(), 1);
1298 assert_eq!(
1299 appends[0].content.render_text(),
1300 "terminal response is ready"
1301 );
1302 assert_eq!(
1303 appends[0].source.as_deref(),
1304 Some("peer_response_terminal:peer:req")
1305 );
1306 assert_eq!(
1307 appends[0].idempotency_key.as_deref(),
1308 Some("peer_response_terminal:peer:req")
1309 );
1310 }
1311
1312 #[test]
1313 fn continuation_projection_can_carry_runtime_context_append() {
1314 let input = Input::Continuation(ContinuationInput {
1315 header: make_header(),
1316 reason: "workgraph_attention".into(),
1317 continuation_kind: ContinuationKind::WorkgraphAttention,
1318 handling_mode: HandlingMode::Steer,
1319 request_id: Some("binding-1".into()),
1320 flow_tool_overlay: Some(TurnToolOverlay {
1321 allowed_tools: Some(vec!["workgraph_add_evidence".into()]),
1322 blocked_tools: None,
1323 dispatch_context: Default::default(),
1324 }),
1325 context_append: Some(ConversationContextAppend {
1326 key: "workgraph_attention:binding-1:2:5".into(),
1327 content: CoreRenderable::Text {
1328 text: "WorkGraph attention projection".into(),
1329 },
1330 }),
1331 turn_append: None,
1332 });
1333 let projection = runtime_input_projection_for_machine_batch(&input);
1334 let appends = projection_to_pending_system_context_appends(input.id(), &projection);
1335
1336 assert_eq!(appends.len(), 1);
1337 assert_eq!(
1338 appends[0].content.render_text(),
1339 "WorkGraph attention projection"
1340 );
1341 assert_eq!(
1342 appends[0].source.as_deref(),
1343 Some("workgraph_attention:binding-1:2:5")
1344 );
1345 let metadata = crate::runtime_loop::for_input(
1346 &input,
1347 crate::ingress_types::RuntimeInputSemantics {
1348 boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary::RunStart,
1349 execution_kind: meerkat_core::lifecycle::RuntimeExecutionKind::ContentTurn,
1350 execution_handling_mode: None,
1351 peer_response_terminal_apply_intent: None,
1352 live_interrupt_required: false,
1353 },
1354 );
1355 assert_eq!(
1356 metadata
1357 .flow_tool_overlay
1358 .and_then(|overlay| overlay.allowed_tools),
1359 Some(vec!["workgraph_add_evidence".into()])
1360 );
1361 }
1362
1363 #[test]
1364 fn steer_projection_falls_back_to_ordinary_peer_append() {
1365 let mut header = make_header();
1366 header.source = InputOrigin::Peer {
1367 peer_id: "peer-a".into(),
1368 display_identity: Some("Peer A".into()),
1369 runtime_id: None,
1370 };
1371 let input = Input::Peer(PeerInput {
1372 header,
1373 convention: Some(PeerConvention::Message),
1374 content: "please look at this while you work".into(),
1375 payload: None,
1376 handling_mode: Some(HandlingMode::Steer),
1377 });
1378 let input_id = input.id().clone();
1379 let projection = runtime_input_projection(&input);
1380
1381 let appends = projection_to_pending_system_context_appends(&input_id, &projection);
1382
1383 assert_eq!(appends.len(), 1);
1384 let rendered = appends[0].content.render_text();
1385 assert!(
1386 rendered.contains("please look at this while you work"),
1387 "peer message append should be renderable as live system context: {rendered:?}"
1388 );
1389 assert_eq!(
1390 appends[0].source.as_deref(),
1391 Some(format!("runtime:steer:{input_id}").as_str())
1392 );
1393 assert_eq!(
1394 appends[0].idempotency_key.as_deref(),
1395 Some(format!("runtime:steer:{input_id}").as_str())
1396 );
1397 }
1398
1399 #[test]
1400 fn steer_projection_filters_empty_context_and_empty_append() {
1401 let input_id = InputId::new();
1402 let context_projection = crate::ingress_types::RuntimeInputProjection {
1403 append: None,
1404 additional_appends: Vec::new(),
1405 context_append: Some(ConversationContextAppend {
1406 key: "empty-context".into(),
1407 content: CoreRenderable::Text { text: " ".into() },
1408 }),
1409 peer_response_terminal: None,
1410 };
1411 assert!(
1412 projection_to_pending_system_context_appends(&input_id, &context_projection).is_empty()
1413 );
1414
1415 let append_projection = crate::ingress_types::RuntimeInputProjection {
1416 append: Some(ConversationAppend {
1417 role: ConversationAppendRole::SystemNotice,
1418 content: CoreRenderable::Text { text: "\n".into() },
1419 }),
1420 additional_appends: Vec::new(),
1421 context_append: None,
1422 peer_response_terminal: None,
1423 };
1424 assert!(
1425 projection_to_pending_system_context_appends(&input_id, &append_projection).is_empty()
1426 );
1427 }
1428
1429 #[test]
1430 fn peer_response_terminal_with_blocks_projects_append_and_context() {
1431 let route_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f2";
1432 let request_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f1";
1433 let mut header = make_header();
1434 header.source = InputOrigin::Peer {
1435 peer_id: route_id.into(),
1436 display_identity: Some("display-agent".into()),
1437 runtime_id: None,
1438 };
1439 let input = Input::Peer(PeerInput {
1440 header,
1441 convention: Some(PeerConvention::ResponseTerminal {
1442 request_id: request_id.into(),
1443 status: ResponseTerminalStatus::Completed,
1444 }),
1445 content: ContentInput::Blocks(vec![meerkat_core::types::ContentBlock::Image {
1446 media_type: "image/jpeg".into(),
1447 data: "abc".into(),
1448 }]),
1449 payload: Some(serde_json::json!({"answer":"ok"})),
1450 handling_mode: None,
1451 });
1452
1453 let projection = runtime_input_projection_for_machine_batch(&input);
1454 let append = projection.append.expect("conversation append");
1455 let CoreRenderable::SystemNotice { blocks, .. } = append.content else {
1456 panic!("expected typed append");
1457 };
1458 let Some(meerkat_core::types::SystemNoticeBlock::Comms { content, peer, .. }) =
1459 blocks.first()
1460 else {
1461 panic!("expected comms block");
1462 };
1463 assert_eq!(
1464 peer.as_ref().and_then(|peer| peer.display_name.as_deref()),
1465 Some("display-agent")
1466 );
1467 assert!(matches!(
1468 content.first(),
1469 Some(meerkat_core::types::ContentBlock::Image { media_type, .. })
1470 if media_type == "image/jpeg"
1471 ));
1472 assert!(
1473 projection.context_append.is_some(),
1474 "terminal response must still apply runtime-owned context"
1475 );
1476 }
1477
1478 #[test]
1479 fn peer_input_request_serde() {
1480 let input = Input::Peer(PeerInput {
1481 header: make_header(),
1482 convention: Some(PeerConvention::Request {
1483 request_id: "req-1".into(),
1484 intent: "mob.peer_added".into(),
1485 }),
1486 content: "Agent joined".into(),
1487 payload: Some(serde_json::json!({"name": "agent-1"})),
1488 handling_mode: None,
1489 });
1490 let json = serde_json::to_value(&input).unwrap();
1491 let parsed: Input = serde_json::from_value(json).unwrap();
1492 if let Input::Peer(p) = parsed {
1493 assert!(matches!(p.convention, Some(PeerConvention::Request { .. })));
1494 } else {
1495 panic!("Expected PeerInput");
1496 }
1497 }
1498
1499 #[test]
1500 fn peer_input_response_terminal_serde() {
1501 let input = Input::Peer(PeerInput {
1502 header: make_header(),
1503 convention: Some(PeerConvention::ResponseTerminal {
1504 request_id: "req-1".into(),
1505 status: ResponseTerminalStatus::Completed,
1506 }),
1507 content: "Done".into(),
1508 payload: Some(serde_json::json!({"ok": true})),
1509 handling_mode: None,
1510 });
1511 let json = serde_json::to_value(&input).unwrap();
1512 let parsed: Input = serde_json::from_value(json).unwrap();
1513 assert!(matches!(parsed, Input::Peer(_)));
1514 }
1515
1516 #[test]
1517 fn peer_input_response_progress_serde() {
1518 let input = Input::Peer(PeerInput {
1519 header: make_header(),
1520 convention: Some(PeerConvention::ResponseProgress {
1521 request_id: "req-1".into(),
1522 phase: ResponseProgressPhase::InProgress,
1523 }),
1524 content: "Working...".into(),
1525 payload: Some(serde_json::json!({"progress": "working"})),
1526 handling_mode: None,
1527 });
1528 let json = serde_json::to_value(&input).unwrap();
1529 let parsed: Input = serde_json::from_value(json).unwrap();
1530 assert!(matches!(parsed, Input::Peer(_)));
1531 }
1532
1533 #[test]
1534 fn flow_step_input_serde() {
1535 let input = Input::FlowStep(FlowStepInput {
1536 header: make_header(),
1537 step_id: "step-1".into(),
1538 content: ContentInput::Blocks(vec![
1539 meerkat_core::types::ContentBlock::Text {
1540 text: "analyze the data".into(),
1541 },
1542 meerkat_core::types::ContentBlock::Image {
1543 media_type: "image/png".into(),
1544 data: meerkat_core::types::ImageData::Inline {
1545 data: "abc123".into(),
1546 },
1547 },
1548 ]),
1549 turn_metadata: None,
1550 });
1551 let json = serde_json::to_value(&input).unwrap();
1552 assert_eq!(json["input_type"], "flow_step");
1553 let parsed: Input = serde_json::from_value(json).unwrap();
1554 assert!(matches!(parsed, Input::FlowStep(_)));
1555 }
1556
1557 #[test]
1558 fn external_event_input_serde() {
1559 let input = Input::ExternalEvent(ExternalEventInput {
1560 header: make_header(),
1561 event_type: "webhook.received".into(),
1562 payload: serde_json::json!({"url": "https://example.com"}),
1563 blocks: Some(vec![
1564 meerkat_core::types::ContentBlock::Text {
1565 text: "look".into(),
1566 },
1567 meerkat_core::types::ContentBlock::Image {
1568 media_type: "image/png".into(),
1569 data: meerkat_core::types::ImageData::Inline {
1570 data: "abc123".into(),
1571 },
1572 },
1573 ]),
1574 handling_mode: HandlingMode::Queue,
1575 render_metadata: None,
1576 });
1577 let json = serde_json::to_value(&input).unwrap();
1578 assert_eq!(json["input_type"], "external_event");
1579 let parsed: Input = serde_json::from_value(json).unwrap();
1580 assert!(matches!(parsed, Input::ExternalEvent(_)));
1581 }
1582
1583 #[test]
1584 fn legacy_external_event_payload_blocks_are_rejected() {
1585 let event = ExternalEventInput {
1588 header: make_header(),
1589 event_type: "webhook.received".into(),
1590 payload: serde_json::json!({
1591 "body": "see image",
1592 "blocks": [
1593 { "type": "text", "text": "caption text" },
1594 { "type": "image", "media_type": "image/png", "source": "inline", "data": "abc123" }
1595 ]
1596 }),
1597 blocks: None,
1598 handling_mode: HandlingMode::Queue,
1599 render_metadata: None,
1600 };
1601
1602 let err = reject_legacy_payload_blocks(&event)
1603 .expect_err("payload-level blocks must fail closed");
1604 assert!(matches!(err, BlobStoreError::Internal(_)));
1605 assert!(event.payload.get("blocks").is_some());
1607 assert!(event.blocks.is_none());
1608 }
1609
1610 #[test]
1611 fn external_event_payload_without_blocks_key_passes_rejection_gate() {
1612 let event = ExternalEventInput {
1613 header: make_header(),
1614 event_type: "webhook.received".into(),
1615 payload: serde_json::json!({ "body": "plain payload" }),
1616 blocks: Some(vec![meerkat_core::types::ContentBlock::Text {
1617 text: "typed owner content".into(),
1618 }]),
1619 handling_mode: HandlingMode::Queue,
1620 render_metadata: None,
1621 };
1622
1623 reject_legacy_payload_blocks(&event)
1624 .expect("payload without a legacy blocks key must pass");
1625 }
1626
1627 #[test]
1628 fn continuation_input_serde() {
1629 let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
1630 let json = serde_json::to_value(&input).unwrap();
1631 assert_eq!(json["input_type"], "continuation");
1632 let parsed: Input = serde_json::from_value(json).unwrap();
1633 match parsed {
1634 Input::Continuation(continuation) => {
1635 assert_eq!(continuation.handling_mode, HandlingMode::Steer);
1636 assert_eq!(continuation.reason, "detached_background_op_completed");
1637 }
1638 other => panic!("Expected Continuation, got {other:?}"),
1639 }
1640 }
1641
1642 #[test]
1643 fn continuation_input_rejects_legacy_system_generated_tag() {
1644 let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
1647 let mut json = serde_json::to_value(&input).unwrap();
1648 json["input_type"] = serde_json::Value::String("system_generated".into());
1649 serde_json::from_value::<Input>(json)
1650 .expect_err("legacy system_generated input_type tag must be rejected");
1651 }
1652
1653 #[test]
1654 fn operation_input_serde() {
1655 let input = Input::Operation(OperationInput {
1656 header: InputHeader {
1657 durability: InputDurability::Derived,
1658 ..make_header()
1659 },
1660 operation_id: OperationId::new(),
1661 event: OpEvent::Cancelled {
1662 id: OperationId::new(),
1663 },
1664 });
1665 let json = serde_json::to_value(&input).unwrap();
1666 assert_eq!(json["input_type"], "operation");
1667 let parsed: Input = serde_json::from_value(json).unwrap();
1668 assert!(matches!(parsed, Input::Operation(_)));
1669 }
1670
1671 #[test]
1672 fn operation_input_rejects_legacy_projected_tag() {
1673 let input = Input::Operation(OperationInput {
1676 header: InputHeader {
1677 durability: InputDurability::Derived,
1678 ..make_header()
1679 },
1680 operation_id: OperationId::new(),
1681 event: OpEvent::Cancelled {
1682 id: OperationId::new(),
1683 },
1684 });
1685 let mut json = serde_json::to_value(&input).unwrap();
1686 json["input_type"] = serde_json::Value::String("projected".into());
1687 serde_json::from_value::<Input>(json)
1688 .expect_err("legacy projected input_type tag must be rejected");
1689 }
1690
1691 #[test]
1692 fn legacy_dual_carrier_input_shapes_are_rejected() {
1693 let header = serde_json::to_value(make_header()).unwrap();
1698
1699 let legacy_prompt = serde_json::json!({
1700 "input_type": "prompt",
1701 "header": header.clone(),
1702 "text": "hello",
1703 "blocks": null
1704 });
1705 serde_json::from_value::<Input>(legacy_prompt)
1706 .expect_err("legacy prompt text+blocks shape must be rejected");
1707
1708 let legacy_peer = serde_json::json!({
1709 "input_type": "peer",
1710 "header": header.clone(),
1711 "convention": { "convention_type": "message" },
1712 "body": "hi there"
1713 });
1714 serde_json::from_value::<Input>(legacy_peer)
1715 .expect_err("legacy peer body+blocks shape must be rejected");
1716
1717 let legacy_flow_step = serde_json::json!({
1718 "input_type": "flow_step",
1719 "header": header,
1720 "step_id": "step-1",
1721 "instructions": "analyze the data"
1722 });
1723 serde_json::from_value::<Input>(legacy_flow_step)
1724 .expect_err("legacy flow-step instructions+blocks shape must be rejected");
1725 }
1726
1727 #[test]
1728 fn input_kind_id() {
1729 let prompt = Input::Prompt(PromptInput {
1730 header: make_header(),
1731 content: "hi".into(),
1732 typed_turn_appends: Vec::new(),
1733 turn_metadata: None,
1734 });
1735 assert_eq!(prompt.kind(), InputKind::Prompt);
1736
1737 let peer_msg = Input::Peer(PeerInput {
1738 header: make_header(),
1739 convention: Some(PeerConvention::Message),
1740 content: "hi".into(),
1741 payload: None,
1742 handling_mode: None,
1743 });
1744 assert_eq!(peer_msg.kind(), InputKind::PeerMessage);
1745
1746 let peer_req = Input::Peer(PeerInput {
1747 header: make_header(),
1748 convention: Some(PeerConvention::Request {
1749 request_id: "r".into(),
1750 intent: "i".into(),
1751 }),
1752 content: "hi".into(),
1753 payload: Some(serde_json::json!({"subject": "x"})),
1754 handling_mode: None,
1755 });
1756 assert_eq!(peer_req.kind(), InputKind::PeerRequest);
1757
1758 let continuation = Input::Continuation(ContinuationInput {
1759 header: make_header(),
1760 reason: "continue".into(),
1761 continuation_kind: ContinuationKind::Ordinary,
1762 handling_mode: HandlingMode::Steer,
1763 request_id: None,
1764 flow_tool_overlay: None,
1765 context_append: None,
1766 turn_append: None,
1767 });
1768 assert_eq!(continuation.kind(), InputKind::Continuation);
1769
1770 let operation = Input::Operation(OperationInput {
1771 header: make_header(),
1772 operation_id: OperationId::new(),
1773 event: OpEvent::Cancelled {
1774 id: OperationId::new(),
1775 },
1776 });
1777 assert_eq!(operation.kind(), InputKind::Operation);
1778 }
1779
1780 #[test]
1781 fn input_source_variants() {
1782 let sources = vec![
1783 InputOrigin::Operator,
1784 InputOrigin::Peer {
1785 peer_id: "p1".into(),
1786 display_identity: None,
1787 runtime_id: None,
1788 },
1789 InputOrigin::Flow {
1790 flow_id: "f1".into(),
1791 step_index: 0,
1792 },
1793 InputOrigin::System,
1794 InputOrigin::External {
1795 source_name: "webhook".into(),
1796 },
1797 ];
1798 for source in sources {
1799 let json = serde_json::to_value(&source).unwrap();
1800 let parsed: InputOrigin = serde_json::from_value(json).unwrap();
1801 assert_eq!(source, parsed);
1802 }
1803 }
1804
1805 #[test]
1806 fn input_durability_serde() {
1807 for d in [
1808 InputDurability::Durable,
1809 InputDurability::Ephemeral,
1810 InputDurability::Derived,
1811 ] {
1812 let json = serde_json::to_value(d).unwrap();
1813 let parsed: InputDurability = serde_json::from_value(json).unwrap();
1814 assert_eq!(d, parsed);
1815 }
1816 }
1817
1818 #[test]
1819 fn peer_input_without_handling_mode_deserializes_as_none() {
1820 let json = serde_json::json!({
1822 "input_type": "peer",
1823 "header": serde_json::to_value(make_header()).unwrap(),
1824 "convention": { "convention_type": "message" },
1825 "content": "hello"
1826 });
1827 let parsed: Input = serde_json::from_value(json).unwrap();
1828 match parsed {
1829 Input::Peer(p) => assert!(p.handling_mode.is_none()),
1830 other => panic!("Expected Peer, got {other:?}"),
1831 }
1832 }
1833
1834 #[test]
1835 fn peer_input_with_queue_handling_mode_roundtrips() {
1836 let input = Input::Peer(PeerInput {
1837 header: make_header(),
1838 convention: Some(PeerConvention::Message),
1839 content: "hi".into(),
1840 payload: None,
1841 handling_mode: Some(HandlingMode::Queue),
1842 });
1843 let json = serde_json::to_value(&input).unwrap();
1844 assert_eq!(json["handling_mode"], "queue");
1845 let parsed: Input = serde_json::from_value(json).unwrap();
1846 match parsed {
1847 Input::Peer(p) => assert_eq!(p.handling_mode, Some(HandlingMode::Queue)),
1848 other => panic!("Expected Peer, got {other:?}"),
1849 }
1850 }
1851
1852 #[test]
1853 fn peer_response_terminal_input_owns_wire_status_mapping() {
1854 let peer_id = meerkat_core::comms::PeerId::from_uuid(
1855 uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000161").unwrap(),
1856 );
1857 let display_name = meerkat_core::comms::PeerName::new("analyst").unwrap();
1858 let request_id = meerkat_core::PeerCorrelationId::from_uuid(
1859 uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000162").unwrap(),
1860 );
1861 let input = peer_response_terminal_input(
1862 peer_id,
1863 Some(display_name),
1864 request_id,
1865 meerkat_contracts::PeerResponseTerminalStatusWire::Completed,
1866 serde_json::json!({"ok": true}),
1867 );
1868
1869 match input {
1870 Input::Peer(PeerInput {
1871 header:
1872 InputHeader {
1873 source:
1874 InputOrigin::Peer {
1875 peer_id,
1876 display_identity,
1877 runtime_id,
1878 },
1879 durability: InputDurability::Durable,
1880 correlation_id,
1881 ..
1882 },
1883 convention: Some(PeerConvention::ResponseTerminal { request_id, status }),
1884 payload: Some(payload),
1885 handling_mode: None,
1886 ..
1887 }) => {
1888 assert_eq!(peer_id, "00000000-0000-4000-8000-000000000161");
1889 assert_eq!(display_identity.as_deref(), Some("analyst"));
1890 assert_eq!(runtime_id, None);
1891 assert_eq!(request_id, "00000000-0000-4000-8000-000000000162");
1892 assert_eq!(
1893 correlation_id,
1894 Some(CorrelationId::from_uuid(
1895 uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000162").unwrap()
1896 ))
1897 );
1898 assert_eq!(status, ResponseTerminalStatus::Completed);
1899 assert_eq!(payload["ok"], true);
1900 }
1901 other => panic!("expected terminal peer input, got {other:?}"),
1902 }
1903 }
1904
1905 #[test]
1906 fn peer_response_terminal_validation_is_structural_only() {
1907 let peer_id = meerkat_core::comms::PeerId::from_uuid(
1908 uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000161").unwrap(),
1909 );
1910 let display_name = meerkat_core::comms::PeerName::new("analyst").unwrap();
1911 let request_id = meerkat_core::PeerCorrelationId::from_uuid(
1912 uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000162").unwrap(),
1913 );
1914 let input = peer_response_terminal_input(
1915 peer_id,
1916 Some(display_name),
1917 request_id,
1918 meerkat_contracts::PeerResponseTerminalStatusWire::Cancelled,
1919 serde_json::json!({"ok": false}),
1920 );
1921
1922 validate_peer_response_terminal_fact(&input)
1923 .expect("status support is generated admission authority, structural fact validation should pass");
1924 }
1925
1926 #[test]
1927 fn peer_input_with_steer_handling_mode_roundtrips() {
1928 let input = Input::Peer(PeerInput {
1929 header: make_header(),
1930 convention: Some(PeerConvention::Message),
1931 content: "hi".into(),
1932 payload: None,
1933 handling_mode: Some(HandlingMode::Steer),
1934 });
1935 let json = serde_json::to_value(&input).unwrap();
1936 assert_eq!(json["handling_mode"], "steer");
1937 let parsed: Input = serde_json::from_value(json).unwrap();
1938 match parsed {
1939 Input::Peer(p) => assert_eq!(p.handling_mode, Some(HandlingMode::Steer)),
1940 other => panic!("Expected Peer, got {other:?}"),
1941 }
1942 }
1943
1944 #[test]
1945 fn peer_input_handling_mode_not_serialized_when_none() {
1946 let input = Input::Peer(PeerInput {
1947 header: make_header(),
1948 convention: Some(PeerConvention::Message),
1949 content: "hi".into(),
1950 payload: None,
1951 handling_mode: None,
1952 });
1953 let json = serde_json::to_value(&input).unwrap();
1954 assert!(json.get("handling_mode").is_none());
1955 }
1956}