1use chrono::{DateTime, Utc};
8use meerkat_core::lifecycle::InputId;
9use meerkat_core::lifecycle::run_primitive::{
10 ConversationAppend, ConversationAppendRole, ConversationContextAppend, CoreRenderable,
11 RuntimeTurnMetadata,
12};
13use meerkat_core::ops::{OpEvent, OperationId};
14use meerkat_core::types::HandlingMode;
15use meerkat_core::{
16 BlobStore, BlobStoreError, MissingBlobBehavior, PeerConversationProjection,
17 PeerResponseProgressProjectionPhase, PeerResponseTerminalCorrelationId,
18 PeerResponseTerminalDisplayIdentity, PeerResponseTerminalFact, PeerResponseTerminalFactError,
19 PeerResponseTerminalProjectionStatus, PeerResponseTerminalRenderPayload,
20 PeerResponseTerminalRouteIdentity, PeerResponseTerminalSource,
21 PeerResponseTerminalTransportIdentity, externalize_content_blocks, hydrate_content_blocks,
22};
23use serde::{Deserialize, Serialize};
24
25use crate::identifiers::{
26 CorrelationId, IdempotencyKey, InputKind, KindId, LogicalRuntimeId, SupersessionKey,
27};
28use meerkat_core::types::RenderMetadata;
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct InputHeader {
33 pub id: InputId,
35 pub timestamp: DateTime<Utc>,
37 pub source: InputOrigin,
39 pub durability: InputDurability,
41 pub visibility: InputVisibility,
43 #[serde(skip_serializing_if = "Option::is_none")]
45 pub idempotency_key: Option<IdempotencyKey>,
46 #[serde(skip_serializing_if = "Option::is_none")]
48 pub supersession_key: Option<SupersessionKey>,
49 #[serde(skip_serializing_if = "Option::is_none")]
51 pub correlation_id: Option<CorrelationId>,
52}
53
54#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
56#[serde(tag = "type", rename_all = "snake_case")]
57#[non_exhaustive]
58pub enum InputOrigin {
59 Operator,
61 Peer {
63 peer_id: String,
66 #[serde(default, skip_serializing_if = "Option::is_none")]
70 display_identity: Option<String>,
71 #[serde(skip_serializing_if = "Option::is_none")]
72 runtime_id: Option<LogicalRuntimeId>,
73 },
74 Flow { flow_id: String, step_index: usize },
76 System,
78 External { source_name: String },
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
84#[serde(rename_all = "snake_case")]
85#[non_exhaustive]
86pub enum InputDurability {
87 Durable,
89 Ephemeral,
91 Derived,
93}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
97pub struct InputVisibility {
98 pub transcript_eligible: bool,
100 pub operator_eligible: bool,
102}
103
104impl Default for InputVisibility {
105 fn default() -> Self {
106 Self {
107 transcript_eligible: true,
108 operator_eligible: true,
109 }
110 }
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115#[serde(tag = "input_type", rename_all = "snake_case")]
116#[non_exhaustive]
117pub enum Input {
118 Prompt(PromptInput),
120 Peer(PeerInput),
122 FlowStep(FlowStepInput),
124 ExternalEvent(ExternalEventInput),
126 #[serde(alias = "system_generated")]
128 Continuation(ContinuationInput),
129 #[serde(alias = "projected")]
131 Operation(OperationInput),
132}
133
134impl Input {
135 pub fn header(&self) -> &InputHeader {
137 match self {
138 Input::Prompt(i) => &i.header,
139 Input::Peer(i) => &i.header,
140 Input::FlowStep(i) => &i.header,
141 Input::ExternalEvent(i) => &i.header,
142 Input::Continuation(i) => &i.header,
143 Input::Operation(i) => &i.header,
144 }
145 }
146
147 pub fn id(&self) -> &InputId {
149 &self.header().id
150 }
151
152 pub fn kind(&self) -> InputKind {
154 match self {
155 Input::Prompt(_) => InputKind::Prompt,
156 Input::Peer(p) => match &p.convention {
157 Some(PeerConvention::Message) | None => InputKind::PeerMessage,
158 Some(PeerConvention::Request { .. }) => InputKind::PeerRequest,
159 Some(PeerConvention::ResponseProgress { .. }) => InputKind::PeerResponseProgress,
160 Some(PeerConvention::ResponseTerminal { .. }) => InputKind::PeerResponseTerminal,
161 },
162 Input::FlowStep(_) => InputKind::FlowStep,
163 Input::ExternalEvent(_) => InputKind::ExternalEvent,
164 Input::Continuation(_) => InputKind::Continuation,
165 Input::Operation(_) => InputKind::Operation,
166 }
167 }
168
169 pub fn kind_id(&self) -> KindId {
171 KindId::new(self.kind())
172 }
173
174 pub fn handling_mode(&self) -> Option<HandlingMode> {
176 match self {
177 Input::Prompt(prompt) => prompt.turn_metadata.as_ref()?.handling_mode,
178 Input::FlowStep(flow_step) => flow_step.turn_metadata.as_ref()?.handling_mode,
179 Input::ExternalEvent(event) => Some(event.handling_mode),
180 Input::Continuation(continuation) => Some(continuation.handling_mode),
181 Input::Peer(peer) => peer.handling_mode,
182 Input::Operation(_) => None,
183 }
184 }
185}
186
187fn migrate_legacy_payload_blocks(event: &mut ExternalEventInput) -> Result<(), BlobStoreError> {
188 let Some(obj) = event.payload.as_object_mut() else {
189 return Ok(());
190 };
191 let Some(blocks_value) = obj.remove("blocks") else {
192 return Ok(());
193 };
194 if event.blocks.is_some() {
195 return Ok(());
196 }
197 let blocks = serde_json::from_value::<Vec<meerkat_core::types::ContentBlock>>(blocks_value)
198 .map_err(|err| {
199 BlobStoreError::Internal(format!("failed to decode payload blocks: {err}"))
200 })?;
201 event.blocks = Some(blocks);
202 Ok(())
203}
204
205pub async fn externalize_input_images(
206 blob_store: &dyn BlobStore,
207 input: &mut Input,
208) -> Result<(), BlobStoreError> {
209 match input {
210 Input::Prompt(prompt) => {
211 if let Some(blocks) = prompt.blocks.as_mut() {
212 externalize_content_blocks(blob_store, blocks).await?;
213 }
214 }
215 Input::Peer(peer) => {
216 if let Some(blocks) = peer.blocks.as_mut() {
217 externalize_content_blocks(blob_store, blocks).await?;
218 }
219 }
220 Input::FlowStep(flow_step) => {
221 if let Some(blocks) = flow_step.blocks.as_mut() {
222 externalize_content_blocks(blob_store, blocks).await?;
223 }
224 }
225 Input::ExternalEvent(event) => {
226 migrate_legacy_payload_blocks(event)?;
227 if let Some(blocks) = event.blocks.as_mut() {
228 externalize_content_blocks(blob_store, blocks).await?;
229 }
230 }
231 Input::Continuation(_) | Input::Operation(_) => {}
232 }
233 Ok(())
234}
235
236pub async fn hydrate_input_images(
237 blob_store: &dyn BlobStore,
238 input: &mut Input,
239 missing_behavior: MissingBlobBehavior,
240) -> Result<(), BlobStoreError> {
241 match input {
242 Input::Prompt(prompt) => {
243 if let Some(blocks) = prompt.blocks.as_mut() {
244 hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
245 }
246 }
247 Input::Peer(peer) => {
248 if let Some(blocks) = peer.blocks.as_mut() {
249 hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
250 }
251 }
252 Input::FlowStep(flow_step) => {
253 if let Some(blocks) = flow_step.blocks.as_mut() {
254 hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
255 }
256 }
257 Input::ExternalEvent(event) => {
258 migrate_legacy_payload_blocks(event)?;
259 if let Some(blocks) = event.blocks.as_mut() {
260 hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
261 }
262 }
263 Input::Continuation(_) | Input::Operation(_) => {}
264 }
265 Ok(())
266}
267
268#[derive(Debug, Clone, Serialize, Deserialize)]
270pub struct PromptInput {
271 pub header: InputHeader,
272 pub text: String,
274 #[serde(default, skip_serializing_if = "Option::is_none")]
277 pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
278 #[serde(default, skip_serializing_if = "Option::is_none")]
279 pub turn_metadata: Option<RuntimeTurnMetadata>,
280}
281
282impl PromptInput {
283 pub fn new(text: impl Into<String>, turn_metadata: Option<RuntimeTurnMetadata>) -> Self {
285 Self {
286 header: InputHeader {
287 id: meerkat_core::lifecycle::InputId::new(),
288 timestamp: chrono::Utc::now(),
289 source: InputOrigin::Operator,
290 durability: InputDurability::Durable,
291 visibility: InputVisibility::default(),
292 idempotency_key: None,
293 supersession_key: None,
294 correlation_id: None,
295 },
296 text: text.into(),
297 blocks: None,
298 turn_metadata,
299 }
300 }
301
302 pub fn from_content_input(
304 input: meerkat_core::types::ContentInput,
305 turn_metadata: Option<RuntimeTurnMetadata>,
306 ) -> Self {
307 let text = input.text_content();
308 let blocks = if input.has_images() {
309 Some(input.into_blocks())
310 } else {
311 None
312 };
313 Self {
314 header: InputHeader {
315 id: meerkat_core::lifecycle::InputId::new(),
316 timestamp: chrono::Utc::now(),
317 source: InputOrigin::Operator,
318 durability: InputDurability::Durable,
319 visibility: InputVisibility::default(),
320 idempotency_key: None,
321 supersession_key: None,
322 correlation_id: None,
323 },
324 text,
325 blocks,
326 turn_metadata,
327 }
328 }
329}
330
331#[derive(Debug, Clone, Serialize, Deserialize)]
333pub struct PeerInput {
334 pub header: InputHeader,
335 #[serde(skip_serializing_if = "Option::is_none")]
337 pub convention: Option<PeerConvention>,
338 pub body: String,
344 #[serde(default, skip_serializing_if = "Option::is_none")]
349 pub payload: Option<serde_json::Value>,
350 #[serde(default, skip_serializing_if = "Option::is_none")]
353 pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
354 #[serde(default, skip_serializing_if = "Option::is_none")]
359 pub handling_mode: Option<HandlingMode>,
360}
361
362#[derive(Debug, Clone, Serialize, Deserialize)]
364#[serde(tag = "convention_type", rename_all = "snake_case")]
365#[non_exhaustive]
366pub enum PeerConvention {
367 Message,
369 Request { request_id: String, intent: String },
371 ResponseProgress {
373 request_id: String,
374 phase: ResponseProgressPhase,
375 },
376 ResponseTerminal {
378 request_id: String,
379 status: ResponseTerminalStatus,
380 },
381}
382
383pub type ResponseProgressPhase = PeerResponseProgressProjectionPhase;
386
387pub type ResponseTerminalStatus = PeerResponseTerminalProjectionStatus;
390
391pub fn response_terminal_status_from_wire(
392 status: meerkat_contracts::PeerResponseTerminalStatusWire,
393) -> ResponseTerminalStatus {
394 match status {
395 meerkat_contracts::PeerResponseTerminalStatusWire::Completed => {
396 PeerResponseTerminalProjectionStatus::Completed
397 }
398 meerkat_contracts::PeerResponseTerminalStatusWire::Failed => {
399 PeerResponseTerminalProjectionStatus::Failed
400 }
401 meerkat_contracts::PeerResponseTerminalStatusWire::Cancelled => {
402 PeerResponseTerminalProjectionStatus::Cancelled
403 }
404 }
405}
406
407pub fn peer_response_terminal_input(
408 peer_id: meerkat_core::comms::PeerId,
409 display_name: Option<meerkat_core::comms::PeerName>,
410 request_id: meerkat_core::PeerCorrelationId,
411 status: meerkat_contracts::PeerResponseTerminalStatusWire,
412 result: serde_json::Value,
413) -> Input {
414 let correlation_id = CorrelationId::from_uuid(request_id.as_uuid());
415 let request_id = request_id.to_string();
416 let peer_id = peer_id.to_string();
417 let display_identity = display_name.map_or_else(|| peer_id.clone(), |name| name.as_string());
418
419 Input::Peer(PeerInput {
420 header: InputHeader {
421 id: InputId::new(),
422 timestamp: Utc::now(),
423 source: InputOrigin::Peer {
424 peer_id,
425 display_identity: Some(display_identity),
426 runtime_id: None,
427 },
428 durability: InputDurability::Durable,
429 visibility: InputVisibility::default(),
430 idempotency_key: None,
431 supersession_key: None,
432 correlation_id: Some(correlation_id),
433 },
434 convention: Some(PeerConvention::ResponseTerminal {
435 request_id,
436 status: response_terminal_status_from_wire(status),
437 }),
438 body: String::new(),
439 payload: Some(result),
440 blocks: None,
441 handling_mode: None,
442 })
443}
444
445#[derive(Debug, Clone, Serialize, Deserialize)]
447pub struct FlowStepInput {
448 pub header: InputHeader,
449 pub step_id: String,
451 pub instructions: String,
453 #[serde(default, skip_serializing_if = "Option::is_none")]
457 pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
458 #[serde(default, skip_serializing_if = "Option::is_none")]
459 pub turn_metadata: Option<RuntimeTurnMetadata>,
460}
461
462#[derive(Debug, Clone, Serialize, Deserialize)]
464pub struct ExternalEventInput {
465 pub header: InputHeader,
466 pub event_type: String,
468 pub payload: serde_json::Value,
472 #[serde(default, skip_serializing_if = "Option::is_none")]
475 pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
476 #[serde(default)]
478 pub handling_mode: HandlingMode,
479 #[serde(default, skip_serializing_if = "Option::is_none")]
481 pub render_metadata: Option<RenderMetadata>,
482}
483
484#[derive(Debug, Clone, Serialize, Deserialize)]
488pub struct ContinuationInput {
489 pub header: InputHeader,
490 pub reason: String,
492 #[serde(default)]
494 pub handling_mode: HandlingMode,
495 #[serde(default, skip_serializing_if = "Option::is_none")]
497 pub request_id: Option<String>,
498}
499
500impl ContinuationInput {
501 pub fn detached_background_op_completed() -> Self {
507 Self {
508 header: InputHeader {
509 id: meerkat_core::lifecycle::InputId::new(),
510 timestamp: chrono::Utc::now(),
511 source: InputOrigin::System,
512 durability: InputDurability::Derived,
513 visibility: InputVisibility {
514 transcript_eligible: false,
515 operator_eligible: false,
516 },
517 idempotency_key: None,
518 supersession_key: None,
519 correlation_id: None,
520 },
521 reason: "detached_background_op_completed".to_string(),
522 handling_mode: HandlingMode::Steer,
523 request_id: None,
524 }
525 }
526}
527
528#[derive(Debug, Clone, Serialize, Deserialize)]
531pub struct OperationInput {
532 pub header: InputHeader,
533 pub operation_id: OperationId,
535 pub event: OpEvent,
537}
538
539pub(crate) fn peer_projection_from_peer_input(
546 peer: &PeerInput,
547) -> Option<PeerConversationProjection> {
548 peer_projection_from_peer_input_with_id(peer, peer_canonical_id(peer)?.as_str())
549}
550
551fn peer_projection_from_peer_input_with_id(
552 peer: &PeerInput,
553 peer_id: &str,
554) -> Option<PeerConversationProjection> {
555 let peer_id = peer_id.to_string();
556
557 match &peer.convention {
558 Some(PeerConvention::Message) => Some(PeerConversationProjection::Message { peer_id }),
559 Some(PeerConvention::Request { request_id, intent }) => {
560 let peer_id = match meerkat_core::comms::PeerId::parse(peer_id.as_str()) {
561 Ok(peer_id) => peer_id,
562 Err(error) => {
563 tracing::warn!(
564 peer_id,
565 error = %error,
566 "dropping peer request projection with non-canonical peer_id"
567 );
568 return None;
569 }
570 };
571 Some(PeerConversationProjection::Request {
572 peer_id,
573 display_name: peer_display_label(peer),
574 request_id: request_id.clone(),
575 intent: intent.clone(),
576 payload: peer.payload.clone(),
577 })
578 }
579 Some(PeerConvention::ResponseProgress { request_id, phase }) => {
580 Some(PeerConversationProjection::ResponseProgress {
581 peer_id,
582 request_id: request_id.clone(),
583 phase: *phase,
584 payload: peer.payload.clone(),
585 })
586 }
587 Some(PeerConvention::ResponseTerminal { .. }) => None,
588 None => None,
589 }
590}
591
592pub(crate) fn peer_response_terminal_fact(
593 peer: &PeerInput,
594) -> Result<Option<PeerResponseTerminalFact>, PeerResponseTerminalFactError> {
595 let InputOrigin::Peer {
596 peer_id,
597 display_identity,
598 runtime_id,
599 } = &peer.header.source
600 else {
601 return Ok(None);
602 };
603 let Some(PeerConvention::ResponseTerminal { request_id, status }) = &peer.convention else {
604 return Ok(None);
605 };
606
607 let transport_identity = runtime_id
608 .as_ref()
609 .map(ToString::to_string)
610 .map(PeerResponseTerminalTransportIdentity::parse)
611 .transpose()?;
612 let source = PeerResponseTerminalSource::new(
613 transport_identity,
614 PeerResponseTerminalRouteIdentity::parse(peer_id.clone())?,
615 PeerResponseTerminalDisplayIdentity::parse(
616 display_identity
617 .as_ref()
618 .ok_or(PeerResponseTerminalFactError::MissingDisplayIdentity)?
619 .clone(),
620 )?,
621 );
622 Ok(Some(PeerResponseTerminalFact::new(
623 source,
624 PeerResponseTerminalCorrelationId::parse(request_id)?,
625 *status,
626 PeerResponseTerminalRenderPayload::new(peer.payload.clone()),
627 )))
628}
629
630pub(crate) fn validate_peer_response_terminal_fact(
631 input: &Input,
632) -> Result<(), PeerResponseTerminalFactError> {
633 let Input::Peer(peer) = input else {
634 return Ok(());
635 };
636 peer_response_terminal_fact(peer).map(|_| ())
637}
638
639#[cfg(test)]
642pub(crate) fn peer_projection(input: &Input) -> Option<PeerConversationProjection> {
643 let Input::Peer(peer) = input else {
644 return None;
645 };
646 peer_projection_from_peer_input(peer)
647}
648
649fn peer_canonical_id(peer: &PeerInput) -> Option<String> {
650 let InputOrigin::Peer { peer_id, .. } = &peer.header.source else {
651 return None;
652 };
653 Some(peer_id.clone())
654}
655
656fn peer_display_label(peer: &PeerInput) -> Option<String> {
657 let InputOrigin::Peer {
658 display_identity, ..
659 } = &peer.header.source
660 else {
661 return None;
662 };
663
664 display_identity
665 .as_ref()
666 .map(|label| label.trim())
667 .filter(|label| !label.is_empty())
668 .map(ToOwned::to_owned)
669}
670
671pub(crate) fn peer_prompt_text(peer: &PeerInput) -> String {
673 peer_projection_from_peer_input(peer)
674 .map(|projection| {
675 let prompt = projection.prompt_text();
676 if prompt.is_empty() {
677 peer.body.clone()
678 } else {
679 prompt
680 }
681 })
682 .unwrap_or_else(|| peer.body.clone())
683}
684
685pub(crate) fn peer_block_prefix_text(peer: &PeerInput) -> Option<String> {
687 if matches!(peer.convention, Some(PeerConvention::Message))
688 && let Some(prefix) = rendered_message_prefix(&peer.body)
689 {
690 return Some(prefix);
691 }
692
693 peer_projection_from_peer_input(peer).and_then(|projection| projection.block_prefix_text())
694}
695
696fn rendered_message_prefix(body: &str) -> Option<String> {
697 let prefix = body.lines().next()?.trim();
698 if prefix.starts_with("[COMMS MESSAGE from ") && prefix.ends_with(']') {
699 Some(prefix.to_string())
700 } else {
701 None
702 }
703}
704
705fn rendered_message_body_text(body: &str, prefix: &str) -> Option<String> {
706 let text = body
707 .lines()
708 .skip_while(|line| line.trim() != prefix)
709 .skip(1)
710 .map(str::trim)
711 .filter(|line| !line.is_empty() && !is_media_projection_line(line))
712 .collect::<Vec<_>>()
713 .join("\n");
714 if text.is_empty() { None } else { Some(text) }
715}
716
717fn is_media_projection_line(line: &str) -> bool {
718 (line.starts_with("[image:") || line.starts_with("[video:")) && line.ends_with(']')
719}
720
721fn blocks_include_text_projection(
722 blocks: &[meerkat_core::types::ContentBlock],
723 expected: &str,
724) -> bool {
725 let expected = expected.trim();
726 if expected.is_empty() {
727 return true;
728 }
729 if blocks
730 .iter()
731 .any(|block| matches!(block, meerkat_core::types::ContentBlock::Text { text } if text.trim() == expected))
732 {
733 return true;
734 }
735 let joined = blocks
736 .iter()
737 .filter_map(|block| match block {
738 meerkat_core::types::ContentBlock::Text { text } => Some(text.trim()),
739 _ => None,
740 })
741 .filter(|text| !text.is_empty())
742 .collect::<Vec<_>>()
743 .join("\n");
744 joined == expected
745}
746
747pub(crate) fn input_prompt_text(input: &Input) -> String {
748 match input {
749 Input::Prompt(p) => p.text.clone(),
750 Input::Peer(p) => peer_prompt_text(p),
751 Input::FlowStep(f) => f.instructions.clone(),
752 Input::ExternalEvent(e) => external_event_projection_text(e),
753 Input::Continuation(continuation) => format!("[Continuation] {}", continuation.reason),
754 Input::Operation(operation) => {
755 format!(
756 "[Operation {}] {:?}",
757 operation.operation_id, operation.event
758 )
759 }
760 }
761}
762
763fn external_event_projection_text(event: &ExternalEventInput) -> String {
764 let source_name = match &event.header.source {
765 InputOrigin::External { source_name } if !source_name.trim().is_empty() => {
766 source_name.as_str()
767 }
768 _ => event.event_type.as_str(),
769 };
770 let body = event
771 .payload
772 .get("body")
773 .and_then(serde_json::Value::as_str)
774 .map(str::trim);
775
776 meerkat_core::interaction::format_external_event_projection(source_name, body)
777}
778
779fn input_to_append(input: &Input) -> Option<ConversationAppend> {
780 if matches!(
781 input,
782 Input::Peer(PeerInput {
783 convention: Some(PeerConvention::ResponseTerminal { .. }),
784 ..
785 })
786 ) {
787 return None;
788 }
789
790 let content = match input {
791 Input::Prompt(p) if p.blocks.is_some() => CoreRenderable::Blocks {
792 blocks: p.blocks.clone().unwrap_or_default(),
793 },
794 Input::Peer(p) if p.blocks.is_some() => {
795 let raw_blocks = p.blocks.clone().unwrap_or_default();
796 if let Some(prefix) = peer_block_prefix_text(p) {
797 let mut blocks = vec![meerkat_core::types::ContentBlock::Text {
798 text: prefix.clone(),
799 }];
800 if let Some(body_text) = rendered_message_body_text(&p.body, &prefix)
801 && !blocks_include_text_projection(&raw_blocks, &body_text)
802 {
803 blocks.push(meerkat_core::types::ContentBlock::Text { text: body_text });
804 }
805 blocks.extend(raw_blocks);
806 CoreRenderable::Blocks { blocks }
807 } else {
808 let body_already_in_blocks = raw_blocks.first().is_some_and(|b| {
809 matches!(b, meerkat_core::types::ContentBlock::Text { text } if text == &p.body)
810 });
811 if p.body.is_empty() || body_already_in_blocks {
812 CoreRenderable::Blocks { blocks: raw_blocks }
813 } else {
814 let mut blocks = vec![meerkat_core::types::ContentBlock::Text {
815 text: p.body.clone(),
816 }];
817 blocks.extend(raw_blocks);
818 CoreRenderable::Blocks { blocks }
819 }
820 }
821 }
822 Input::FlowStep(f) if f.blocks.is_some() => CoreRenderable::Blocks {
823 blocks: f.blocks.clone().unwrap_or_default(),
824 },
825 Input::ExternalEvent(e) if e.blocks.is_some() => CoreRenderable::Blocks {
826 blocks: e.blocks.clone().unwrap_or_default(),
827 },
828 Input::Prompt(_) | Input::Peer(_) | Input::FlowStep(_) | Input::ExternalEvent(_) => {
829 CoreRenderable::Text {
830 text: input_prompt_text(input),
831 }
832 }
833 Input::Continuation(_) | Input::Operation(_) => return None,
834 };
835
836 Some(ConversationAppend {
837 role: ConversationAppendRole::User,
838 content,
839 })
840}
841
842fn input_to_context_append(input: &Input) -> Option<ConversationContextAppend> {
843 let projection = match input {
844 Input::Peer(peer) => peer_projection_from_peer_input(peer)?,
845 _ => return None,
846 };
847
848 Some(ConversationContextAppend {
849 key: projection.context_key()?,
850 content: CoreRenderable::Text {
851 text: projection.prompt_text(),
852 },
853 })
854}
855
856fn peer_response_terminal_context_append(
857 peer: &PeerInput,
858) -> Result<Option<ConversationContextAppend>, PeerResponseTerminalFactError> {
859 let Some(fact) = peer_response_terminal_fact(peer)? else {
860 return Ok(None);
861 };
862
863 Ok(Some(ConversationContextAppend {
864 key: fact.context_key(),
865 content: CoreRenderable::Text {
866 text: fact.prompt_text(),
867 },
868 }))
869}
870
871pub(crate) fn runtime_input_projection(
872 input: &Input,
873) -> crate::ingress_types::RuntimeInputProjection {
874 crate::ingress_types::RuntimeInputProjection {
875 append: input_to_append(input),
876 context_append: input_to_context_append(input),
877 }
878}
879
880pub(crate) fn runtime_input_projection_for_machine_batch(
881 input: &Input,
882) -> crate::ingress_types::RuntimeInputProjection {
883 let mut projection = runtime_input_projection(input);
884 if let Input::Peer(peer) = input
885 && let Ok(Some(context_append)) = peer_response_terminal_context_append(peer)
886 {
887 projection.context_append = Some(context_append);
888 }
889 projection
890}
891
892#[cfg(test)]
893#[allow(clippy::unwrap_used, clippy::panic)]
894mod tests {
895 use super::*;
896 use chrono::Utc;
897
898 fn make_header() -> InputHeader {
899 InputHeader {
900 id: InputId::new(),
901 timestamp: Utc::now(),
902 source: InputOrigin::Operator,
903 durability: InputDurability::Durable,
904 visibility: InputVisibility::default(),
905 idempotency_key: None,
906 supersession_key: None,
907 correlation_id: None,
908 }
909 }
910
911 #[test]
912 fn prompt_input_serde() {
913 let input = Input::Prompt(PromptInput {
914 header: make_header(),
915 text: "hello".into(),
916 blocks: None,
917 turn_metadata: None,
918 });
919 let json = serde_json::to_value(&input).unwrap();
920 assert_eq!(json["input_type"], "prompt");
921 let parsed: Input = serde_json::from_value(json).unwrap();
922 assert!(matches!(parsed, Input::Prompt(_)));
923 }
924
925 #[test]
926 fn peer_input_message_serde() {
927 let input = Input::Peer(PeerInput {
928 header: make_header(),
929 convention: Some(PeerConvention::Message),
930 body: "hi there".into(),
931 payload: None,
932 blocks: None,
933 handling_mode: None,
934 });
935 let json = serde_json::to_value(&input).unwrap();
936 assert_eq!(json["input_type"], "peer");
937 let parsed: Input = serde_json::from_value(json).unwrap();
938 assert!(matches!(parsed, Input::Peer(_)));
939 }
940
941 #[test]
942 fn peer_message_blocks_prefix_uses_rendered_display_label_not_canonical_origin() {
943 let mut header = make_header();
944 header.source = InputOrigin::Peer {
945 peer_id: "canonical-peer-id".into(),
946 display_identity: Some("display-agent".into()),
947 runtime_id: None,
948 };
949 let input = Input::Peer(PeerInput {
950 header,
951 convention: Some(PeerConvention::Message),
952 body: "[COMMS MESSAGE from display-agent]\ncaption\n[image: image/png]".into(),
953 payload: None,
954 blocks: Some(vec![
955 meerkat_core::types::ContentBlock::Text {
956 text: "caption".into(),
957 },
958 meerkat_core::types::ContentBlock::Image {
959 media_type: "image/png".into(),
960 data: "abc".into(),
961 },
962 ]),
963 handling_mode: None,
964 });
965
966 let Input::Peer(peer) = &input else {
967 panic!("expected peer input");
968 };
969 assert_eq!(
970 peer_projection_from_peer_input(peer)
971 .and_then(|projection| projection.block_prefix_text())
972 .as_deref(),
973 Some("[COMMS MESSAGE from canonical-peer-id]")
974 );
975
976 let projection = runtime_input_projection(&input);
977 let append = projection.append.expect("conversation append");
978 let CoreRenderable::Blocks { blocks } = append.content else {
979 panic!("expected multimodal blocks");
980 };
981 assert_eq!(
982 blocks.first(),
983 Some(&meerkat_core::types::ContentBlock::Text {
984 text: "[COMMS MESSAGE from display-agent]".into()
985 })
986 );
987 assert!(!meerkat_core::types::text_content(&blocks).contains("canonical-peer-id"));
988 }
989
990 #[test]
991 fn peer_response_terminal_context_is_deferred_to_machine_batch_projection() {
992 let route_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f2";
993 let request_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f1";
994 let mut header = make_header();
995 header.source = InputOrigin::Peer {
996 peer_id: route_id.into(),
997 display_identity: Some("display-agent".into()),
998 runtime_id: None,
999 };
1000 let input = Input::Peer(PeerInput {
1001 header,
1002 convention: Some(PeerConvention::ResponseTerminal {
1003 request_id: request_id.into(),
1004 status: ResponseTerminalStatus::Completed,
1005 }),
1006 body: "legacy response body".into(),
1007 payload: Some(serde_json::json!({"answer":"ok"})),
1008 blocks: None,
1009 handling_mode: None,
1010 });
1011
1012 let Input::Peer(peer) = &input else {
1013 panic!("expected peer input");
1014 };
1015 let expected_canonical_key = format!("peer_response_terminal:{route_id}:{request_id}");
1016 assert!(
1017 peer_projection_from_peer_input(peer).is_none(),
1018 "terminal peer response projection must not be built before machine batch selection"
1019 );
1020
1021 let projection = runtime_input_projection(&input);
1022 assert!(
1023 projection.context_append.is_none(),
1024 "admission projection must not store terminal peer response context"
1025 );
1026 let projection = runtime_input_projection_for_machine_batch(&input);
1027 let context = projection.context_append.expect("context append");
1028 assert_eq!(context.key, expected_canonical_key);
1029 let CoreRenderable::Text { text } = context.content else {
1030 panic!("expected text context");
1031 };
1032 assert!(text.contains("from display-agent"));
1033 assert!(!text.contains(route_id));
1034 }
1035
1036 #[test]
1037 fn peer_input_request_serde() {
1038 let input = Input::Peer(PeerInput {
1039 header: make_header(),
1040 convention: Some(PeerConvention::Request {
1041 request_id: "req-1".into(),
1042 intent: "mob.peer_added".into(),
1043 }),
1044 body: "Agent joined".into(),
1045 payload: Some(serde_json::json!({"name": "agent-1"})),
1046 blocks: None,
1047 handling_mode: None,
1048 });
1049 let json = serde_json::to_value(&input).unwrap();
1050 let parsed: Input = serde_json::from_value(json).unwrap();
1051 if let Input::Peer(p) = parsed {
1052 assert!(matches!(p.convention, Some(PeerConvention::Request { .. })));
1053 } else {
1054 panic!("Expected PeerInput");
1055 }
1056 }
1057
1058 #[test]
1059 fn peer_input_response_terminal_serde() {
1060 let input = Input::Peer(PeerInput {
1061 header: make_header(),
1062 convention: Some(PeerConvention::ResponseTerminal {
1063 request_id: "req-1".into(),
1064 status: ResponseTerminalStatus::Completed,
1065 }),
1066 body: "Done".into(),
1067 payload: Some(serde_json::json!({"ok": true})),
1068 blocks: None,
1069 handling_mode: None,
1070 });
1071 let json = serde_json::to_value(&input).unwrap();
1072 let parsed: Input = serde_json::from_value(json).unwrap();
1073 assert!(matches!(parsed, Input::Peer(_)));
1074 }
1075
1076 #[test]
1077 fn peer_input_response_progress_serde() {
1078 let input = Input::Peer(PeerInput {
1079 header: make_header(),
1080 convention: Some(PeerConvention::ResponseProgress {
1081 request_id: "req-1".into(),
1082 phase: ResponseProgressPhase::InProgress,
1083 }),
1084 body: "Working...".into(),
1085 payload: Some(serde_json::json!({"progress": "working"})),
1086 blocks: None,
1087 handling_mode: None,
1088 });
1089 let json = serde_json::to_value(&input).unwrap();
1090 let parsed: Input = serde_json::from_value(json).unwrap();
1091 assert!(matches!(parsed, Input::Peer(_)));
1092 }
1093
1094 #[test]
1095 fn flow_step_input_serde() {
1096 let input = Input::FlowStep(FlowStepInput {
1097 header: make_header(),
1098 step_id: "step-1".into(),
1099 instructions: "analyze the data".into(),
1100 blocks: Some(vec![
1101 meerkat_core::types::ContentBlock::Text {
1102 text: "analyze the data".into(),
1103 },
1104 meerkat_core::types::ContentBlock::Image {
1105 media_type: "image/png".into(),
1106 data: meerkat_core::types::ImageData::Inline {
1107 data: "abc123".into(),
1108 },
1109 },
1110 ]),
1111 turn_metadata: None,
1112 });
1113 let json = serde_json::to_value(&input).unwrap();
1114 assert_eq!(json["input_type"], "flow_step");
1115 let parsed: Input = serde_json::from_value(json).unwrap();
1116 assert!(matches!(parsed, Input::FlowStep(_)));
1117 }
1118
1119 #[test]
1120 fn external_event_input_serde() {
1121 let input = Input::ExternalEvent(ExternalEventInput {
1122 header: make_header(),
1123 event_type: "webhook.received".into(),
1124 payload: serde_json::json!({"url": "https://example.com"}),
1125 blocks: Some(vec![
1126 meerkat_core::types::ContentBlock::Text {
1127 text: "look".into(),
1128 },
1129 meerkat_core::types::ContentBlock::Image {
1130 media_type: "image/png".into(),
1131 data: meerkat_core::types::ImageData::Inline {
1132 data: "abc123".into(),
1133 },
1134 },
1135 ]),
1136 handling_mode: HandlingMode::Queue,
1137 render_metadata: None,
1138 });
1139 let json = serde_json::to_value(&input).unwrap();
1140 assert_eq!(json["input_type"], "external_event");
1141 let parsed: Input = serde_json::from_value(json).unwrap();
1142 assert!(matches!(parsed, Input::ExternalEvent(_)));
1143 }
1144
1145 #[test]
1146 fn legacy_external_event_payload_blocks_migrate_to_canonical_blocks_owner() {
1147 let mut input = Input::ExternalEvent(ExternalEventInput {
1148 header: make_header(),
1149 event_type: "webhook.received".into(),
1150 payload: serde_json::json!({
1151 "body": "see image",
1152 "blocks": [
1153 { "type": "text", "text": "caption text" },
1154 { "type": "image", "media_type": "image/png", "source": "inline", "data": "abc123" }
1155 ]
1156 }),
1157 blocks: None,
1158 handling_mode: HandlingMode::Queue,
1159 render_metadata: None,
1160 });
1161
1162 match &mut input {
1163 Input::ExternalEvent(event) => {
1164 migrate_legacy_payload_blocks(event).unwrap();
1165 assert!(event.payload.get("blocks").is_none());
1166 assert_eq!(event.payload["body"], "see image");
1167 assert_eq!(event.blocks.as_ref().map(Vec::len), Some(2));
1168 }
1169 other => panic!("Expected ExternalEvent, got {other:?}"),
1170 }
1171 }
1172
1173 #[test]
1174 fn continuation_input_serde() {
1175 let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
1176 let json = serde_json::to_value(&input).unwrap();
1177 assert_eq!(json["input_type"], "continuation");
1178 let parsed: Input = serde_json::from_value(json).unwrap();
1179 match parsed {
1180 Input::Continuation(continuation) => {
1181 assert_eq!(continuation.handling_mode, HandlingMode::Steer);
1182 assert_eq!(continuation.reason, "detached_background_op_completed");
1183 }
1184 other => panic!("Expected Continuation, got {other:?}"),
1185 }
1186 }
1187
1188 #[test]
1189 fn continuation_input_accepts_legacy_system_generated_tag() {
1190 let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
1191 let mut json = serde_json::to_value(&input).unwrap();
1192 json["input_type"] = serde_json::Value::String("system_generated".into());
1193 let parsed: Input = serde_json::from_value(json).unwrap();
1194 match parsed {
1195 Input::Continuation(continuation) => {
1196 assert_eq!(continuation.reason, "detached_background_op_completed");
1197 }
1198 other => panic!("Expected Continuation, got {other:?}"),
1199 }
1200 }
1201
1202 #[test]
1203 fn operation_input_serde() {
1204 let input = Input::Operation(OperationInput {
1205 header: InputHeader {
1206 durability: InputDurability::Derived,
1207 ..make_header()
1208 },
1209 operation_id: OperationId::new(),
1210 event: OpEvent::Cancelled {
1211 id: OperationId::new(),
1212 },
1213 });
1214 let json = serde_json::to_value(&input).unwrap();
1215 assert_eq!(json["input_type"], "operation");
1216 let parsed: Input = serde_json::from_value(json).unwrap();
1217 assert!(matches!(parsed, Input::Operation(_)));
1218 }
1219
1220 #[test]
1221 fn operation_input_accepts_legacy_projected_tag() {
1222 let input = Input::Operation(OperationInput {
1223 header: InputHeader {
1224 durability: InputDurability::Derived,
1225 ..make_header()
1226 },
1227 operation_id: OperationId::new(),
1228 event: OpEvent::Cancelled {
1229 id: OperationId::new(),
1230 },
1231 });
1232 let mut json = serde_json::to_value(&input).unwrap();
1233 json["input_type"] = serde_json::Value::String("projected".into());
1234 let parsed: Input = serde_json::from_value(json).unwrap();
1235 assert!(matches!(parsed, Input::Operation(_)));
1236 }
1237
1238 #[test]
1239 fn input_kind_id() {
1240 let prompt = Input::Prompt(PromptInput {
1241 header: make_header(),
1242 text: "hi".into(),
1243 blocks: None,
1244 turn_metadata: None,
1245 });
1246 assert_eq!(prompt.kind(), InputKind::Prompt);
1247
1248 let peer_msg = Input::Peer(PeerInput {
1249 header: make_header(),
1250 convention: Some(PeerConvention::Message),
1251 body: "hi".into(),
1252 payload: None,
1253 blocks: None,
1254 handling_mode: None,
1255 });
1256 assert_eq!(peer_msg.kind(), InputKind::PeerMessage);
1257
1258 let peer_req = Input::Peer(PeerInput {
1259 header: make_header(),
1260 convention: Some(PeerConvention::Request {
1261 request_id: "r".into(),
1262 intent: "i".into(),
1263 }),
1264 body: "hi".into(),
1265 payload: Some(serde_json::json!({"subject": "x"})),
1266 blocks: None,
1267 handling_mode: None,
1268 });
1269 assert_eq!(peer_req.kind(), InputKind::PeerRequest);
1270
1271 let continuation = Input::Continuation(ContinuationInput {
1272 header: make_header(),
1273 reason: "continue".into(),
1274 handling_mode: HandlingMode::Steer,
1275 request_id: None,
1276 });
1277 assert_eq!(continuation.kind(), InputKind::Continuation);
1278
1279 let operation = Input::Operation(OperationInput {
1280 header: make_header(),
1281 operation_id: OperationId::new(),
1282 event: OpEvent::Cancelled {
1283 id: OperationId::new(),
1284 },
1285 });
1286 assert_eq!(operation.kind(), InputKind::Operation);
1287 }
1288
1289 #[test]
1290 fn input_source_variants() {
1291 let sources = vec![
1292 InputOrigin::Operator,
1293 InputOrigin::Peer {
1294 peer_id: "p1".into(),
1295 display_identity: None,
1296 runtime_id: None,
1297 },
1298 InputOrigin::Flow {
1299 flow_id: "f1".into(),
1300 step_index: 0,
1301 },
1302 InputOrigin::System,
1303 InputOrigin::External {
1304 source_name: "webhook".into(),
1305 },
1306 ];
1307 for source in sources {
1308 let json = serde_json::to_value(&source).unwrap();
1309 let parsed: InputOrigin = serde_json::from_value(json).unwrap();
1310 assert_eq!(source, parsed);
1311 }
1312 }
1313
1314 #[test]
1315 fn input_durability_serde() {
1316 for d in [
1317 InputDurability::Durable,
1318 InputDurability::Ephemeral,
1319 InputDurability::Derived,
1320 ] {
1321 let json = serde_json::to_value(d).unwrap();
1322 let parsed: InputDurability = serde_json::from_value(json).unwrap();
1323 assert_eq!(d, parsed);
1324 }
1325 }
1326
1327 #[test]
1328 fn peer_input_without_handling_mode_deserializes_as_none() {
1329 let json = serde_json::json!({
1331 "input_type": "peer",
1332 "header": serde_json::to_value(make_header()).unwrap(),
1333 "convention": { "convention_type": "message" },
1334 "body": "hello"
1335 });
1336 let parsed: Input = serde_json::from_value(json).unwrap();
1337 match parsed {
1338 Input::Peer(p) => assert!(p.handling_mode.is_none()),
1339 other => panic!("Expected Peer, got {other:?}"),
1340 }
1341 }
1342
1343 #[test]
1344 fn peer_input_with_queue_handling_mode_roundtrips() {
1345 let input = Input::Peer(PeerInput {
1346 header: make_header(),
1347 convention: Some(PeerConvention::Message),
1348 body: "hi".into(),
1349 payload: None,
1350 blocks: None,
1351 handling_mode: Some(HandlingMode::Queue),
1352 });
1353 let json = serde_json::to_value(&input).unwrap();
1354 assert_eq!(json["handling_mode"], "queue");
1355 let parsed: Input = serde_json::from_value(json).unwrap();
1356 match parsed {
1357 Input::Peer(p) => assert_eq!(p.handling_mode, Some(HandlingMode::Queue)),
1358 other => panic!("Expected Peer, got {other:?}"),
1359 }
1360 }
1361
1362 #[test]
1363 fn peer_response_terminal_input_owns_wire_status_mapping() {
1364 let peer_id = meerkat_core::comms::PeerId::from_uuid(
1365 uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000161").unwrap(),
1366 );
1367 let display_name = meerkat_core::comms::PeerName::new("analyst").unwrap();
1368 let request_id = meerkat_core::PeerCorrelationId::from_uuid(
1369 uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000162").unwrap(),
1370 );
1371 let input = peer_response_terminal_input(
1372 peer_id,
1373 Some(display_name),
1374 request_id,
1375 meerkat_contracts::PeerResponseTerminalStatusWire::Cancelled,
1376 serde_json::json!({"ok": false}),
1377 );
1378
1379 match input {
1380 Input::Peer(PeerInput {
1381 header:
1382 InputHeader {
1383 source:
1384 InputOrigin::Peer {
1385 peer_id,
1386 display_identity,
1387 runtime_id,
1388 },
1389 durability: InputDurability::Durable,
1390 correlation_id,
1391 ..
1392 },
1393 convention: Some(PeerConvention::ResponseTerminal { request_id, status }),
1394 payload: Some(payload),
1395 handling_mode: None,
1396 ..
1397 }) => {
1398 assert_eq!(peer_id, "00000000-0000-4000-8000-000000000161");
1399 assert_eq!(display_identity.as_deref(), Some("analyst"));
1400 assert_eq!(runtime_id, None);
1401 assert_eq!(request_id, "00000000-0000-4000-8000-000000000162");
1402 assert_eq!(
1403 correlation_id,
1404 Some(CorrelationId::from_uuid(
1405 uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000162").unwrap()
1406 ))
1407 );
1408 assert_eq!(status, ResponseTerminalStatus::Cancelled);
1409 assert_eq!(payload["ok"], false);
1410 }
1411 other => panic!("expected terminal peer input, got {other:?}"),
1412 }
1413 }
1414
1415 #[test]
1416 fn peer_input_with_steer_handling_mode_roundtrips() {
1417 let input = Input::Peer(PeerInput {
1418 header: make_header(),
1419 convention: Some(PeerConvention::Message),
1420 body: "hi".into(),
1421 payload: None,
1422 blocks: None,
1423 handling_mode: Some(HandlingMode::Steer),
1424 });
1425 let json = serde_json::to_value(&input).unwrap();
1426 assert_eq!(json["handling_mode"], "steer");
1427 let parsed: Input = serde_json::from_value(json).unwrap();
1428 match parsed {
1429 Input::Peer(p) => assert_eq!(p.handling_mode, Some(HandlingMode::Steer)),
1430 other => panic!("Expected Peer, got {other:?}"),
1431 }
1432 }
1433
1434 #[test]
1435 fn peer_input_handling_mode_not_serialized_when_none() {
1436 let input = Input::Peer(PeerInput {
1437 header: make_header(),
1438 convention: Some(PeerConvention::Message),
1439 body: "hi".into(),
1440 payload: None,
1441 blocks: None,
1442 handling_mode: None,
1443 });
1444 let json = serde_json::to_value(&input).unwrap();
1445 assert!(json.get("handling_mode").is_none());
1446 }
1447}