1use chrono::{DateTime, Utc};
8use meerkat_core::lifecycle::InputId;
9use meerkat_core::lifecycle::run_primitive::RuntimeTurnMetadata;
10use meerkat_core::ops::{OpEvent, OperationId};
11use meerkat_core::types::HandlingMode;
12use meerkat_core::{
13 BlobStore, BlobStoreError, MissingBlobBehavior, externalize_content_blocks,
14 hydrate_content_blocks,
15};
16use serde::{Deserialize, Serialize};
17
18use crate::identifiers::{
19 CorrelationId, IdempotencyKey, KindId, LogicalRuntimeId, SupersessionKey,
20};
21use meerkat_core::types::RenderMetadata;
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct InputHeader {
26 pub id: InputId,
28 pub timestamp: DateTime<Utc>,
30 pub source: InputOrigin,
32 pub durability: InputDurability,
34 pub visibility: InputVisibility,
36 #[serde(skip_serializing_if = "Option::is_none")]
38 pub idempotency_key: Option<IdempotencyKey>,
39 #[serde(skip_serializing_if = "Option::is_none")]
41 pub supersession_key: Option<SupersessionKey>,
42 #[serde(skip_serializing_if = "Option::is_none")]
44 pub correlation_id: Option<CorrelationId>,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(tag = "type", rename_all = "snake_case")]
50#[non_exhaustive]
51pub enum InputOrigin {
52 Operator,
54 Peer {
56 peer_id: String,
57 #[serde(skip_serializing_if = "Option::is_none")]
58 runtime_id: Option<LogicalRuntimeId>,
59 },
60 Flow { flow_id: String, step_index: usize },
62 System,
64 External { source_name: String },
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
70#[serde(rename_all = "snake_case")]
71#[non_exhaustive]
72pub enum InputDurability {
73 Durable,
75 Ephemeral,
77 Derived,
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
83pub struct InputVisibility {
84 pub transcript_eligible: bool,
86 pub operator_eligible: bool,
88}
89
90impl Default for InputVisibility {
91 fn default() -> Self {
92 Self {
93 transcript_eligible: true,
94 operator_eligible: true,
95 }
96 }
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101#[serde(tag = "input_type", rename_all = "snake_case")]
102#[non_exhaustive]
103pub enum Input {
104 Prompt(PromptInput),
106 Peer(PeerInput),
108 FlowStep(FlowStepInput),
110 ExternalEvent(ExternalEventInput),
112 #[serde(alias = "system_generated")]
114 Continuation(ContinuationInput),
115 #[serde(alias = "projected")]
117 Operation(OperationInput),
118}
119
120impl Input {
121 pub fn header(&self) -> &InputHeader {
123 match self {
124 Input::Prompt(i) => &i.header,
125 Input::Peer(i) => &i.header,
126 Input::FlowStep(i) => &i.header,
127 Input::ExternalEvent(i) => &i.header,
128 Input::Continuation(i) => &i.header,
129 Input::Operation(i) => &i.header,
130 }
131 }
132
133 pub fn id(&self) -> &InputId {
135 &self.header().id
136 }
137
138 pub fn kind_id(&self) -> KindId {
140 match self {
141 Input::Prompt(_) => KindId::new("prompt"),
142 Input::Peer(p) => match &p.convention {
143 Some(PeerConvention::Message) => KindId::new("peer_message"),
144 Some(PeerConvention::Request { .. }) => KindId::new("peer_request"),
145 Some(PeerConvention::ResponseProgress { .. }) => {
146 KindId::new("peer_response_progress")
147 }
148 Some(PeerConvention::ResponseTerminal { .. }) => {
149 KindId::new("peer_response_terminal")
150 }
151 None => KindId::new("peer_message"),
152 },
153 Input::FlowStep(_) => KindId::new("flow_step"),
154 Input::ExternalEvent(_) => KindId::new("external_event"),
155 Input::Continuation(_) => KindId::new("continuation"),
156 Input::Operation(_) => KindId::new("operation"),
157 }
158 }
159
160 pub fn handling_mode(&self) -> Option<HandlingMode> {
162 match self {
163 Input::Prompt(prompt) => prompt.turn_metadata.as_ref()?.handling_mode,
164 Input::FlowStep(flow_step) => flow_step.turn_metadata.as_ref()?.handling_mode,
165 Input::ExternalEvent(event) => Some(event.handling_mode),
166 Input::Continuation(continuation) => Some(continuation.handling_mode),
167 _ => None,
168 }
169 }
170}
171
172fn migrate_legacy_payload_blocks(event: &mut ExternalEventInput) -> Result<(), BlobStoreError> {
173 let Some(obj) = event.payload.as_object_mut() else {
174 return Ok(());
175 };
176 let Some(blocks_value) = obj.remove("blocks") else {
177 return Ok(());
178 };
179 if event.blocks.is_some() {
180 return Ok(());
181 }
182 let blocks = serde_json::from_value::<Vec<meerkat_core::types::ContentBlock>>(blocks_value)
183 .map_err(|err| {
184 BlobStoreError::Internal(format!("failed to decode payload blocks: {err}"))
185 })?;
186 event.blocks = Some(blocks);
187 Ok(())
188}
189
190pub async fn externalize_input_images(
191 blob_store: &dyn BlobStore,
192 input: &mut Input,
193) -> Result<(), BlobStoreError> {
194 match input {
195 Input::Prompt(prompt) => {
196 if let Some(blocks) = prompt.blocks.as_mut() {
197 externalize_content_blocks(blob_store, blocks).await?;
198 }
199 }
200 Input::Peer(peer) => {
201 if let Some(blocks) = peer.blocks.as_mut() {
202 externalize_content_blocks(blob_store, blocks).await?;
203 }
204 }
205 Input::FlowStep(flow_step) => {
206 if let Some(blocks) = flow_step.blocks.as_mut() {
207 externalize_content_blocks(blob_store, blocks).await?;
208 }
209 }
210 Input::ExternalEvent(event) => {
211 migrate_legacy_payload_blocks(event)?;
212 if let Some(blocks) = event.blocks.as_mut() {
213 externalize_content_blocks(blob_store, blocks).await?;
214 }
215 }
216 Input::Continuation(_) | Input::Operation(_) => {}
217 }
218 Ok(())
219}
220
221pub async fn hydrate_input_images(
222 blob_store: &dyn BlobStore,
223 input: &mut Input,
224 missing_behavior: MissingBlobBehavior,
225) -> Result<(), BlobStoreError> {
226 match input {
227 Input::Prompt(prompt) => {
228 if let Some(blocks) = prompt.blocks.as_mut() {
229 hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
230 }
231 }
232 Input::Peer(peer) => {
233 if let Some(blocks) = peer.blocks.as_mut() {
234 hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
235 }
236 }
237 Input::FlowStep(flow_step) => {
238 if let Some(blocks) = flow_step.blocks.as_mut() {
239 hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
240 }
241 }
242 Input::ExternalEvent(event) => {
243 migrate_legacy_payload_blocks(event)?;
244 if let Some(blocks) = event.blocks.as_mut() {
245 hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
246 }
247 }
248 Input::Continuation(_) | Input::Operation(_) => {}
249 }
250 Ok(())
251}
252
253#[derive(Debug, Clone, Serialize, Deserialize)]
255pub struct PromptInput {
256 pub header: InputHeader,
257 pub text: String,
259 #[serde(default, skip_serializing_if = "Option::is_none")]
262 pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
263 #[serde(default, skip_serializing_if = "Option::is_none")]
264 pub turn_metadata: Option<RuntimeTurnMetadata>,
265}
266
267impl PromptInput {
268 pub fn new(text: impl Into<String>, turn_metadata: Option<RuntimeTurnMetadata>) -> Self {
270 Self {
271 header: InputHeader {
272 id: meerkat_core::lifecycle::InputId::new(),
273 timestamp: chrono::Utc::now(),
274 source: InputOrigin::Operator,
275 durability: InputDurability::Durable,
276 visibility: InputVisibility::default(),
277 idempotency_key: None,
278 supersession_key: None,
279 correlation_id: None,
280 },
281 text: text.into(),
282 blocks: None,
283 turn_metadata,
284 }
285 }
286
287 pub fn from_content_input(
289 input: meerkat_core::types::ContentInput,
290 turn_metadata: Option<RuntimeTurnMetadata>,
291 ) -> Self {
292 let text = input.text_content();
293 let blocks = if input.has_images() {
294 Some(input.into_blocks())
295 } else {
296 None
297 };
298 Self {
299 header: InputHeader {
300 id: meerkat_core::lifecycle::InputId::new(),
301 timestamp: chrono::Utc::now(),
302 source: InputOrigin::Operator,
303 durability: InputDurability::Durable,
304 visibility: InputVisibility::default(),
305 idempotency_key: None,
306 supersession_key: None,
307 correlation_id: None,
308 },
309 text,
310 blocks,
311 turn_metadata,
312 }
313 }
314}
315
316#[derive(Debug, Clone, Serialize, Deserialize)]
318pub struct PeerInput {
319 pub header: InputHeader,
320 #[serde(skip_serializing_if = "Option::is_none")]
322 pub convention: Option<PeerConvention>,
323 pub body: String,
325 #[serde(default, skip_serializing_if = "Option::is_none")]
328 pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
329}
330
331#[derive(Debug, Clone, Serialize, Deserialize)]
333#[serde(tag = "convention_type", rename_all = "snake_case")]
334#[non_exhaustive]
335pub enum PeerConvention {
336 Message,
338 Request { request_id: String, intent: String },
340 ResponseProgress {
342 request_id: String,
343 phase: ResponseProgressPhase,
344 },
345 ResponseTerminal {
347 request_id: String,
348 status: ResponseTerminalStatus,
349 },
350}
351
352#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
354#[serde(rename_all = "snake_case")]
355#[non_exhaustive]
356pub enum ResponseProgressPhase {
357 Accepted,
359 InProgress,
361 PartialResult,
363}
364
365#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
367#[serde(rename_all = "snake_case")]
368#[non_exhaustive]
369pub enum ResponseTerminalStatus {
370 Completed,
372 Failed,
374 Cancelled,
376}
377
378#[derive(Debug, Clone, Serialize, Deserialize)]
380pub struct FlowStepInput {
381 pub header: InputHeader,
382 pub step_id: String,
384 pub instructions: String,
386 #[serde(default, skip_serializing_if = "Option::is_none")]
390 pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
391 #[serde(default, skip_serializing_if = "Option::is_none")]
392 pub turn_metadata: Option<RuntimeTurnMetadata>,
393}
394
395#[derive(Debug, Clone, Serialize, Deserialize)]
397pub struct ExternalEventInput {
398 pub header: InputHeader,
399 pub event_type: String,
401 pub payload: serde_json::Value,
405 #[serde(default, skip_serializing_if = "Option::is_none")]
408 pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
409 #[serde(default)]
411 pub handling_mode: HandlingMode,
412 #[serde(default, skip_serializing_if = "Option::is_none")]
414 pub render_metadata: Option<RenderMetadata>,
415}
416
417#[derive(Debug, Clone, Serialize, Deserialize)]
421pub struct ContinuationInput {
422 pub header: InputHeader,
423 pub reason: String,
425 #[serde(default)]
427 pub handling_mode: HandlingMode,
428 #[serde(default, skip_serializing_if = "Option::is_none")]
430 pub request_id: Option<String>,
431}
432
433impl ContinuationInput {
434 pub fn terminal_peer_response(reason: impl Into<String>) -> Self {
437 Self::terminal_peer_response_for_request(reason, None)
438 }
439
440 pub fn terminal_peer_response_for_request(
443 reason: impl Into<String>,
444 request_id: Option<String>,
445 ) -> Self {
446 Self {
447 header: InputHeader {
448 id: meerkat_core::lifecycle::InputId::new(),
449 timestamp: chrono::Utc::now(),
450 source: InputOrigin::System,
451 durability: InputDurability::Ephemeral,
452 visibility: InputVisibility {
453 transcript_eligible: false,
454 operator_eligible: false,
455 },
456 idempotency_key: None,
457 supersession_key: None,
458 correlation_id: None,
459 },
460 reason: reason.into(),
461 handling_mode: HandlingMode::Steer,
462 request_id,
463 }
464 }
465}
466
467#[derive(Debug, Clone, Serialize, Deserialize)]
470pub struct OperationInput {
471 pub header: InputHeader,
472 pub operation_id: OperationId,
474 pub event: OpEvent,
476}
477
478#[cfg(test)]
479#[allow(clippy::unwrap_used, clippy::panic)]
480mod tests {
481 use super::*;
482 use chrono::Utc;
483
484 fn make_header() -> InputHeader {
485 InputHeader {
486 id: InputId::new(),
487 timestamp: Utc::now(),
488 source: InputOrigin::Operator,
489 durability: InputDurability::Durable,
490 visibility: InputVisibility::default(),
491 idempotency_key: None,
492 supersession_key: None,
493 correlation_id: None,
494 }
495 }
496
497 #[test]
498 fn prompt_input_serde() {
499 let input = Input::Prompt(PromptInput {
500 header: make_header(),
501 text: "hello".into(),
502 blocks: None,
503 turn_metadata: None,
504 });
505 let json = serde_json::to_value(&input).unwrap();
506 assert_eq!(json["input_type"], "prompt");
507 let parsed: Input = serde_json::from_value(json).unwrap();
508 assert!(matches!(parsed, Input::Prompt(_)));
509 }
510
511 #[test]
512 fn peer_input_message_serde() {
513 let input = Input::Peer(PeerInput {
514 header: make_header(),
515 convention: Some(PeerConvention::Message),
516 body: "hi there".into(),
517 blocks: None,
518 });
519 let json = serde_json::to_value(&input).unwrap();
520 assert_eq!(json["input_type"], "peer");
521 let parsed: Input = serde_json::from_value(json).unwrap();
522 assert!(matches!(parsed, Input::Peer(_)));
523 }
524
525 #[test]
526 fn peer_input_request_serde() {
527 let input = Input::Peer(PeerInput {
528 header: make_header(),
529 convention: Some(PeerConvention::Request {
530 request_id: "req-1".into(),
531 intent: "mob.peer_added".into(),
532 }),
533 body: "Agent joined".into(),
534 blocks: None,
535 });
536 let json = serde_json::to_value(&input).unwrap();
537 let parsed: Input = serde_json::from_value(json).unwrap();
538 if let Input::Peer(p) = parsed {
539 assert!(matches!(p.convention, Some(PeerConvention::Request { .. })));
540 } else {
541 panic!("Expected PeerInput");
542 }
543 }
544
545 #[test]
546 fn peer_input_response_terminal_serde() {
547 let input = Input::Peer(PeerInput {
548 header: make_header(),
549 convention: Some(PeerConvention::ResponseTerminal {
550 request_id: "req-1".into(),
551 status: ResponseTerminalStatus::Completed,
552 }),
553 body: "Done".into(),
554 blocks: None,
555 });
556 let json = serde_json::to_value(&input).unwrap();
557 let parsed: Input = serde_json::from_value(json).unwrap();
558 assert!(matches!(parsed, Input::Peer(_)));
559 }
560
561 #[test]
562 fn peer_input_response_progress_serde() {
563 let input = Input::Peer(PeerInput {
564 header: make_header(),
565 convention: Some(PeerConvention::ResponseProgress {
566 request_id: "req-1".into(),
567 phase: ResponseProgressPhase::InProgress,
568 }),
569 body: "Working...".into(),
570 blocks: None,
571 });
572 let json = serde_json::to_value(&input).unwrap();
573 let parsed: Input = serde_json::from_value(json).unwrap();
574 assert!(matches!(parsed, Input::Peer(_)));
575 }
576
577 #[test]
578 fn flow_step_input_serde() {
579 let input = Input::FlowStep(FlowStepInput {
580 header: make_header(),
581 step_id: "step-1".into(),
582 instructions: "analyze the data".into(),
583 blocks: Some(vec![
584 meerkat_core::types::ContentBlock::Text {
585 text: "analyze the data".into(),
586 },
587 meerkat_core::types::ContentBlock::Image {
588 media_type: "image/png".into(),
589 data: meerkat_core::types::ImageData::Inline {
590 data: "abc123".into(),
591 },
592 },
593 ]),
594 turn_metadata: None,
595 });
596 let json = serde_json::to_value(&input).unwrap();
597 assert_eq!(json["input_type"], "flow_step");
598 let parsed: Input = serde_json::from_value(json).unwrap();
599 assert!(matches!(parsed, Input::FlowStep(_)));
600 }
601
602 #[test]
603 fn external_event_input_serde() {
604 let input = Input::ExternalEvent(ExternalEventInput {
605 header: make_header(),
606 event_type: "webhook.received".into(),
607 payload: serde_json::json!({"url": "https://example.com"}),
608 blocks: Some(vec![
609 meerkat_core::types::ContentBlock::Text {
610 text: "look".into(),
611 },
612 meerkat_core::types::ContentBlock::Image {
613 media_type: "image/png".into(),
614 data: meerkat_core::types::ImageData::Inline {
615 data: "abc123".into(),
616 },
617 },
618 ]),
619 handling_mode: HandlingMode::Queue,
620 render_metadata: None,
621 });
622 let json = serde_json::to_value(&input).unwrap();
623 assert_eq!(json["input_type"], "external_event");
624 let parsed: Input = serde_json::from_value(json).unwrap();
625 assert!(matches!(parsed, Input::ExternalEvent(_)));
626 }
627
628 #[test]
629 fn legacy_external_event_payload_blocks_migrate_to_canonical_blocks_owner() {
630 let mut input = Input::ExternalEvent(ExternalEventInput {
631 header: make_header(),
632 event_type: "webhook.received".into(),
633 payload: serde_json::json!({
634 "body": "see image",
635 "blocks": [
636 { "type": "text", "text": "caption text" },
637 { "type": "image", "media_type": "image/png", "source": "inline", "data": "abc123" }
638 ]
639 }),
640 blocks: None,
641 handling_mode: HandlingMode::Queue,
642 render_metadata: None,
643 });
644
645 match &mut input {
646 Input::ExternalEvent(event) => {
647 migrate_legacy_payload_blocks(event).unwrap();
648 assert!(event.payload.get("blocks").is_none());
649 assert_eq!(event.payload["body"], "see image");
650 assert_eq!(event.blocks.as_ref().map(Vec::len), Some(2));
651 }
652 other => panic!("Expected ExternalEvent, got {other:?}"),
653 }
654 }
655
656 #[test]
657 fn continuation_input_serde() {
658 let input = Input::Continuation(ContinuationInput::terminal_peer_response_for_request(
659 "terminal peer response",
660 Some("req-1".into()),
661 ));
662 let json = serde_json::to_value(&input).unwrap();
663 assert_eq!(json["input_type"], "continuation");
664 assert_eq!(json["request_id"], "req-1");
665 let parsed: Input = serde_json::from_value(json).unwrap();
666 match parsed {
667 Input::Continuation(continuation) => {
668 assert_eq!(continuation.request_id.as_deref(), Some("req-1"));
669 assert_eq!(continuation.handling_mode, HandlingMode::Steer);
670 }
671 other => panic!("Expected Continuation, got {other:?}"),
672 }
673 }
674
675 #[test]
676 fn continuation_input_accepts_legacy_system_generated_tag() {
677 let input = Input::Continuation(ContinuationInput::terminal_peer_response_for_request(
678 "legacy system generated",
679 Some("req-legacy".into()),
680 ));
681 let mut json = serde_json::to_value(&input).unwrap();
682 json["input_type"] = serde_json::Value::String("system_generated".into());
683 let parsed: Input = serde_json::from_value(json).unwrap();
684 match parsed {
685 Input::Continuation(continuation) => {
686 assert_eq!(continuation.request_id.as_deref(), Some("req-legacy"));
687 }
688 other => panic!("Expected Continuation, got {other:?}"),
689 }
690 }
691
692 #[test]
693 fn operation_input_serde() {
694 let input = Input::Operation(OperationInput {
695 header: InputHeader {
696 durability: InputDurability::Derived,
697 ..make_header()
698 },
699 operation_id: OperationId::new(),
700 event: OpEvent::Cancelled {
701 id: OperationId::new(),
702 },
703 });
704 let json = serde_json::to_value(&input).unwrap();
705 assert_eq!(json["input_type"], "operation");
706 let parsed: Input = serde_json::from_value(json).unwrap();
707 assert!(matches!(parsed, Input::Operation(_)));
708 }
709
710 #[test]
711 fn operation_input_accepts_legacy_projected_tag() {
712 let input = Input::Operation(OperationInput {
713 header: InputHeader {
714 durability: InputDurability::Derived,
715 ..make_header()
716 },
717 operation_id: OperationId::new(),
718 event: OpEvent::Cancelled {
719 id: OperationId::new(),
720 },
721 });
722 let mut json = serde_json::to_value(&input).unwrap();
723 json["input_type"] = serde_json::Value::String("projected".into());
724 let parsed: Input = serde_json::from_value(json).unwrap();
725 assert!(matches!(parsed, Input::Operation(_)));
726 }
727
728 #[test]
729 fn input_kind_id() {
730 let prompt = Input::Prompt(PromptInput {
731 header: make_header(),
732 text: "hi".into(),
733 blocks: None,
734 turn_metadata: None,
735 });
736 assert_eq!(prompt.kind_id().0, "prompt");
737
738 let peer_msg = Input::Peer(PeerInput {
739 header: make_header(),
740 convention: Some(PeerConvention::Message),
741 body: "hi".into(),
742 blocks: None,
743 });
744 assert_eq!(peer_msg.kind_id().0, "peer_message");
745
746 let peer_req = Input::Peer(PeerInput {
747 header: make_header(),
748 convention: Some(PeerConvention::Request {
749 request_id: "r".into(),
750 intent: "i".into(),
751 }),
752 body: "hi".into(),
753 blocks: None,
754 });
755 assert_eq!(peer_req.kind_id().0, "peer_request");
756
757 let continuation = Input::Continuation(ContinuationInput {
758 header: make_header(),
759 reason: "continue".into(),
760 handling_mode: HandlingMode::Steer,
761 request_id: None,
762 });
763 assert_eq!(continuation.kind_id().0, "continuation");
764
765 let operation = Input::Operation(OperationInput {
766 header: make_header(),
767 operation_id: OperationId::new(),
768 event: OpEvent::Cancelled {
769 id: OperationId::new(),
770 },
771 });
772 assert_eq!(operation.kind_id().0, "operation");
773 }
774
775 #[test]
776 fn input_source_variants() {
777 let sources = vec![
778 InputOrigin::Operator,
779 InputOrigin::Peer {
780 peer_id: "p1".into(),
781 runtime_id: None,
782 },
783 InputOrigin::Flow {
784 flow_id: "f1".into(),
785 step_index: 0,
786 },
787 InputOrigin::System,
788 InputOrigin::External {
789 source_name: "webhook".into(),
790 },
791 ];
792 for source in sources {
793 let json = serde_json::to_value(&source).unwrap();
794 let parsed: InputOrigin = serde_json::from_value(json).unwrap();
795 assert_eq!(source, parsed);
796 }
797 }
798
799 #[test]
800 fn input_durability_serde() {
801 for d in [
802 InputDurability::Durable,
803 InputDurability::Ephemeral,
804 InputDurability::Derived,
805 ] {
806 let json = serde_json::to_value(d).unwrap();
807 let parsed: InputDurability = serde_json::from_value(json).unwrap();
808 assert_eq!(d, parsed);
809 }
810 }
811}