1use chrono::{DateTime, Utc};
8use meerkat_core::lifecycle::InputId;
9use meerkat_core::lifecycle::run_primitive::RuntimeTurnMetadata;
10use meerkat_core::ops::{OpEvent, OperationId};
11use meerkat_core::types::HandlingMode;
12use meerkat_core::{
13 BlobStore, BlobStoreError, MissingBlobBehavior, externalize_content_blocks,
14 hydrate_content_blocks,
15};
16use serde::{Deserialize, Serialize};
17
18use crate::identifiers::{
19 CorrelationId, IdempotencyKey, KindId, LogicalRuntimeId, SupersessionKey,
20};
21use meerkat_core::types::RenderMetadata;
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct InputHeader {
26 pub id: InputId,
28 pub timestamp: DateTime<Utc>,
30 pub source: InputOrigin,
32 pub durability: InputDurability,
34 pub visibility: InputVisibility,
36 #[serde(skip_serializing_if = "Option::is_none")]
38 pub idempotency_key: Option<IdempotencyKey>,
39 #[serde(skip_serializing_if = "Option::is_none")]
41 pub supersession_key: Option<SupersessionKey>,
42 #[serde(skip_serializing_if = "Option::is_none")]
44 pub correlation_id: Option<CorrelationId>,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(tag = "type", rename_all = "snake_case")]
50#[non_exhaustive]
51pub enum InputOrigin {
52 Operator,
54 Peer {
56 peer_id: String,
57 #[serde(skip_serializing_if = "Option::is_none")]
58 runtime_id: Option<LogicalRuntimeId>,
59 },
60 Flow { flow_id: String, step_index: usize },
62 System,
64 External { source_name: String },
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
70#[serde(rename_all = "snake_case")]
71#[non_exhaustive]
72pub enum InputDurability {
73 Durable,
75 Ephemeral,
77 Derived,
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
83pub struct InputVisibility {
84 pub transcript_eligible: bool,
86 pub operator_eligible: bool,
88}
89
90impl Default for InputVisibility {
91 fn default() -> Self {
92 Self {
93 transcript_eligible: true,
94 operator_eligible: true,
95 }
96 }
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101#[serde(tag = "input_type", rename_all = "snake_case")]
102#[non_exhaustive]
103pub enum Input {
104 Prompt(PromptInput),
106 Peer(PeerInput),
108 FlowStep(FlowStepInput),
110 ExternalEvent(ExternalEventInput),
112 #[serde(alias = "system_generated")]
114 Continuation(ContinuationInput),
115 #[serde(alias = "projected")]
117 Operation(OperationInput),
118}
119
120impl Input {
121 pub fn header(&self) -> &InputHeader {
123 match self {
124 Input::Prompt(i) => &i.header,
125 Input::Peer(i) => &i.header,
126 Input::FlowStep(i) => &i.header,
127 Input::ExternalEvent(i) => &i.header,
128 Input::Continuation(i) => &i.header,
129 Input::Operation(i) => &i.header,
130 }
131 }
132
133 pub fn id(&self) -> &InputId {
135 &self.header().id
136 }
137
138 pub fn kind_id(&self) -> KindId {
140 match self {
141 Input::Prompt(_) => KindId::new("prompt"),
142 Input::Peer(p) => match &p.convention {
143 Some(PeerConvention::Message) => KindId::new("peer_message"),
144 Some(PeerConvention::Request { .. }) => KindId::new("peer_request"),
145 Some(PeerConvention::ResponseProgress { .. }) => {
146 KindId::new("peer_response_progress")
147 }
148 Some(PeerConvention::ResponseTerminal { .. }) => {
149 KindId::new("peer_response_terminal")
150 }
151 None => KindId::new("peer_message"),
152 },
153 Input::FlowStep(_) => KindId::new("flow_step"),
154 Input::ExternalEvent(_) => KindId::new("external_event"),
155 Input::Continuation(_) => KindId::new("continuation"),
156 Input::Operation(_) => KindId::new("operation"),
157 }
158 }
159
160 pub fn handling_mode(&self) -> Option<HandlingMode> {
162 match self {
163 Input::Prompt(prompt) => prompt.turn_metadata.as_ref()?.handling_mode,
164 Input::FlowStep(flow_step) => flow_step.turn_metadata.as_ref()?.handling_mode,
165 Input::ExternalEvent(event) => Some(event.handling_mode),
166 Input::Continuation(continuation) => Some(continuation.handling_mode),
167 Input::Peer(peer) => peer.handling_mode,
168 Input::Operation(_) => None,
169 }
170 }
171}
172
173fn migrate_legacy_payload_blocks(event: &mut ExternalEventInput) -> Result<(), BlobStoreError> {
174 let Some(obj) = event.payload.as_object_mut() else {
175 return Ok(());
176 };
177 let Some(blocks_value) = obj.remove("blocks") else {
178 return Ok(());
179 };
180 if event.blocks.is_some() {
181 return Ok(());
182 }
183 let blocks = serde_json::from_value::<Vec<meerkat_core::types::ContentBlock>>(blocks_value)
184 .map_err(|err| {
185 BlobStoreError::Internal(format!("failed to decode payload blocks: {err}"))
186 })?;
187 event.blocks = Some(blocks);
188 Ok(())
189}
190
191pub async fn externalize_input_images(
192 blob_store: &dyn BlobStore,
193 input: &mut Input,
194) -> Result<(), BlobStoreError> {
195 match input {
196 Input::Prompt(prompt) => {
197 if let Some(blocks) = prompt.blocks.as_mut() {
198 externalize_content_blocks(blob_store, blocks).await?;
199 }
200 }
201 Input::Peer(peer) => {
202 if let Some(blocks) = peer.blocks.as_mut() {
203 externalize_content_blocks(blob_store, blocks).await?;
204 }
205 }
206 Input::FlowStep(flow_step) => {
207 if let Some(blocks) = flow_step.blocks.as_mut() {
208 externalize_content_blocks(blob_store, blocks).await?;
209 }
210 }
211 Input::ExternalEvent(event) => {
212 migrate_legacy_payload_blocks(event)?;
213 if let Some(blocks) = event.blocks.as_mut() {
214 externalize_content_blocks(blob_store, blocks).await?;
215 }
216 }
217 Input::Continuation(_) | Input::Operation(_) => {}
218 }
219 Ok(())
220}
221
222pub async fn hydrate_input_images(
223 blob_store: &dyn BlobStore,
224 input: &mut Input,
225 missing_behavior: MissingBlobBehavior,
226) -> Result<(), BlobStoreError> {
227 match input {
228 Input::Prompt(prompt) => {
229 if let Some(blocks) = prompt.blocks.as_mut() {
230 hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
231 }
232 }
233 Input::Peer(peer) => {
234 if let Some(blocks) = peer.blocks.as_mut() {
235 hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
236 }
237 }
238 Input::FlowStep(flow_step) => {
239 if let Some(blocks) = flow_step.blocks.as_mut() {
240 hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
241 }
242 }
243 Input::ExternalEvent(event) => {
244 migrate_legacy_payload_blocks(event)?;
245 if let Some(blocks) = event.blocks.as_mut() {
246 hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
247 }
248 }
249 Input::Continuation(_) | Input::Operation(_) => {}
250 }
251 Ok(())
252}
253
254#[derive(Debug, Clone, Serialize, Deserialize)]
256pub struct PromptInput {
257 pub header: InputHeader,
258 pub text: String,
260 #[serde(default, skip_serializing_if = "Option::is_none")]
263 pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
264 #[serde(default, skip_serializing_if = "Option::is_none")]
265 pub turn_metadata: Option<RuntimeTurnMetadata>,
266}
267
268impl PromptInput {
269 pub fn new(text: impl Into<String>, turn_metadata: Option<RuntimeTurnMetadata>) -> Self {
271 Self {
272 header: InputHeader {
273 id: meerkat_core::lifecycle::InputId::new(),
274 timestamp: chrono::Utc::now(),
275 source: InputOrigin::Operator,
276 durability: InputDurability::Durable,
277 visibility: InputVisibility::default(),
278 idempotency_key: None,
279 supersession_key: None,
280 correlation_id: None,
281 },
282 text: text.into(),
283 blocks: None,
284 turn_metadata,
285 }
286 }
287
288 pub fn from_content_input(
290 input: meerkat_core::types::ContentInput,
291 turn_metadata: Option<RuntimeTurnMetadata>,
292 ) -> Self {
293 let text = input.text_content();
294 let blocks = if input.has_images() {
295 Some(input.into_blocks())
296 } else {
297 None
298 };
299 Self {
300 header: InputHeader {
301 id: meerkat_core::lifecycle::InputId::new(),
302 timestamp: chrono::Utc::now(),
303 source: InputOrigin::Operator,
304 durability: InputDurability::Durable,
305 visibility: InputVisibility::default(),
306 idempotency_key: None,
307 supersession_key: None,
308 correlation_id: None,
309 },
310 text,
311 blocks,
312 turn_metadata,
313 }
314 }
315}
316
317#[derive(Debug, Clone, Serialize, Deserialize)]
319pub struct PeerInput {
320 pub header: InputHeader,
321 #[serde(skip_serializing_if = "Option::is_none")]
323 pub convention: Option<PeerConvention>,
324 pub body: String,
326 #[serde(default, skip_serializing_if = "Option::is_none")]
329 pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
330 #[serde(default, skip_serializing_if = "Option::is_none")]
335 pub handling_mode: Option<HandlingMode>,
336}
337
338#[derive(Debug, Clone, Serialize, Deserialize)]
340#[serde(tag = "convention_type", rename_all = "snake_case")]
341#[non_exhaustive]
342pub enum PeerConvention {
343 Message,
345 Request { request_id: String, intent: String },
347 ResponseProgress {
349 request_id: String,
350 phase: ResponseProgressPhase,
351 },
352 ResponseTerminal {
354 request_id: String,
355 status: ResponseTerminalStatus,
356 },
357}
358
359#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
361#[serde(rename_all = "snake_case")]
362#[non_exhaustive]
363pub enum ResponseProgressPhase {
364 Accepted,
366 InProgress,
368 PartialResult,
370}
371
372#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
374#[serde(rename_all = "snake_case")]
375#[non_exhaustive]
376pub enum ResponseTerminalStatus {
377 Completed,
379 Failed,
381 Cancelled,
383}
384
385#[derive(Debug, Clone, Serialize, Deserialize)]
387pub struct FlowStepInput {
388 pub header: InputHeader,
389 pub step_id: String,
391 pub instructions: String,
393 #[serde(default, skip_serializing_if = "Option::is_none")]
397 pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
398 #[serde(default, skip_serializing_if = "Option::is_none")]
399 pub turn_metadata: Option<RuntimeTurnMetadata>,
400}
401
402#[derive(Debug, Clone, Serialize, Deserialize)]
404pub struct ExternalEventInput {
405 pub header: InputHeader,
406 pub event_type: String,
408 pub payload: serde_json::Value,
412 #[serde(default, skip_serializing_if = "Option::is_none")]
415 pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
416 #[serde(default)]
418 pub handling_mode: HandlingMode,
419 #[serde(default, skip_serializing_if = "Option::is_none")]
421 pub render_metadata: Option<RenderMetadata>,
422}
423
424#[derive(Debug, Clone, Serialize, Deserialize)]
428pub struct ContinuationInput {
429 pub header: InputHeader,
430 pub reason: String,
432 #[serde(default)]
434 pub handling_mode: HandlingMode,
435 #[serde(default, skip_serializing_if = "Option::is_none")]
437 pub request_id: Option<String>,
438}
439
440impl ContinuationInput {
441 pub fn detached_background_op_completed() -> Self {
447 Self {
448 header: InputHeader {
449 id: meerkat_core::lifecycle::InputId::new(),
450 timestamp: chrono::Utc::now(),
451 source: InputOrigin::System,
452 durability: InputDurability::Derived,
453 visibility: InputVisibility {
454 transcript_eligible: false,
455 operator_eligible: false,
456 },
457 idempotency_key: None,
458 supersession_key: None,
459 correlation_id: None,
460 },
461 reason: "detached_background_op_completed".to_string(),
462 handling_mode: HandlingMode::Steer,
463 request_id: None,
464 }
465 }
466}
467
468#[derive(Debug, Clone, Serialize, Deserialize)]
471pub struct OperationInput {
472 pub header: InputHeader,
473 pub operation_id: OperationId,
475 pub event: OpEvent,
477}
478
479pub(crate) fn classify_execution_kind(
484 input: &Input,
485) -> meerkat_core::lifecycle::RuntimeExecutionKind {
486 match input {
487 Input::Continuation(_) => meerkat_core::lifecycle::RuntimeExecutionKind::ResumePending,
488 _ => meerkat_core::lifecycle::RuntimeExecutionKind::ContentTurn,
489 }
490}
491
492#[cfg(test)]
493#[allow(clippy::unwrap_used, clippy::panic)]
494mod tests {
495 use super::*;
496 use chrono::Utc;
497
498 fn make_header() -> InputHeader {
499 InputHeader {
500 id: InputId::new(),
501 timestamp: Utc::now(),
502 source: InputOrigin::Operator,
503 durability: InputDurability::Durable,
504 visibility: InputVisibility::default(),
505 idempotency_key: None,
506 supersession_key: None,
507 correlation_id: None,
508 }
509 }
510
511 #[test]
512 fn prompt_input_serde() {
513 let input = Input::Prompt(PromptInput {
514 header: make_header(),
515 text: "hello".into(),
516 blocks: None,
517 turn_metadata: None,
518 });
519 let json = serde_json::to_value(&input).unwrap();
520 assert_eq!(json["input_type"], "prompt");
521 let parsed: Input = serde_json::from_value(json).unwrap();
522 assert!(matches!(parsed, Input::Prompt(_)));
523 }
524
525 #[test]
526 fn peer_input_message_serde() {
527 let input = Input::Peer(PeerInput {
528 header: make_header(),
529 convention: Some(PeerConvention::Message),
530 body: "hi there".into(),
531 blocks: None,
532 handling_mode: None,
533 });
534 let json = serde_json::to_value(&input).unwrap();
535 assert_eq!(json["input_type"], "peer");
536 let parsed: Input = serde_json::from_value(json).unwrap();
537 assert!(matches!(parsed, Input::Peer(_)));
538 }
539
540 #[test]
541 fn peer_input_request_serde() {
542 let input = Input::Peer(PeerInput {
543 header: make_header(),
544 convention: Some(PeerConvention::Request {
545 request_id: "req-1".into(),
546 intent: "mob.peer_added".into(),
547 }),
548 body: "Agent joined".into(),
549 blocks: None,
550 handling_mode: None,
551 });
552 let json = serde_json::to_value(&input).unwrap();
553 let parsed: Input = serde_json::from_value(json).unwrap();
554 if let Input::Peer(p) = parsed {
555 assert!(matches!(p.convention, Some(PeerConvention::Request { .. })));
556 } else {
557 panic!("Expected PeerInput");
558 }
559 }
560
561 #[test]
562 fn peer_input_response_terminal_serde() {
563 let input = Input::Peer(PeerInput {
564 header: make_header(),
565 convention: Some(PeerConvention::ResponseTerminal {
566 request_id: "req-1".into(),
567 status: ResponseTerminalStatus::Completed,
568 }),
569 body: "Done".into(),
570 blocks: None,
571 handling_mode: None,
572 });
573 let json = serde_json::to_value(&input).unwrap();
574 let parsed: Input = serde_json::from_value(json).unwrap();
575 assert!(matches!(parsed, Input::Peer(_)));
576 }
577
578 #[test]
579 fn peer_input_response_progress_serde() {
580 let input = Input::Peer(PeerInput {
581 header: make_header(),
582 convention: Some(PeerConvention::ResponseProgress {
583 request_id: "req-1".into(),
584 phase: ResponseProgressPhase::InProgress,
585 }),
586 body: "Working...".into(),
587 blocks: None,
588 handling_mode: None,
589 });
590 let json = serde_json::to_value(&input).unwrap();
591 let parsed: Input = serde_json::from_value(json).unwrap();
592 assert!(matches!(parsed, Input::Peer(_)));
593 }
594
595 #[test]
596 fn flow_step_input_serde() {
597 let input = Input::FlowStep(FlowStepInput {
598 header: make_header(),
599 step_id: "step-1".into(),
600 instructions: "analyze the data".into(),
601 blocks: Some(vec![
602 meerkat_core::types::ContentBlock::Text {
603 text: "analyze the data".into(),
604 },
605 meerkat_core::types::ContentBlock::Image {
606 media_type: "image/png".into(),
607 data: meerkat_core::types::ImageData::Inline {
608 data: "abc123".into(),
609 },
610 },
611 ]),
612 turn_metadata: None,
613 });
614 let json = serde_json::to_value(&input).unwrap();
615 assert_eq!(json["input_type"], "flow_step");
616 let parsed: Input = serde_json::from_value(json).unwrap();
617 assert!(matches!(parsed, Input::FlowStep(_)));
618 }
619
620 #[test]
621 fn external_event_input_serde() {
622 let input = Input::ExternalEvent(ExternalEventInput {
623 header: make_header(),
624 event_type: "webhook.received".into(),
625 payload: serde_json::json!({"url": "https://example.com"}),
626 blocks: Some(vec![
627 meerkat_core::types::ContentBlock::Text {
628 text: "look".into(),
629 },
630 meerkat_core::types::ContentBlock::Image {
631 media_type: "image/png".into(),
632 data: meerkat_core::types::ImageData::Inline {
633 data: "abc123".into(),
634 },
635 },
636 ]),
637 handling_mode: HandlingMode::Queue,
638 render_metadata: None,
639 });
640 let json = serde_json::to_value(&input).unwrap();
641 assert_eq!(json["input_type"], "external_event");
642 let parsed: Input = serde_json::from_value(json).unwrap();
643 assert!(matches!(parsed, Input::ExternalEvent(_)));
644 }
645
646 #[test]
647 fn legacy_external_event_payload_blocks_migrate_to_canonical_blocks_owner() {
648 let mut input = Input::ExternalEvent(ExternalEventInput {
649 header: make_header(),
650 event_type: "webhook.received".into(),
651 payload: serde_json::json!({
652 "body": "see image",
653 "blocks": [
654 { "type": "text", "text": "caption text" },
655 { "type": "image", "media_type": "image/png", "source": "inline", "data": "abc123" }
656 ]
657 }),
658 blocks: None,
659 handling_mode: HandlingMode::Queue,
660 render_metadata: None,
661 });
662
663 match &mut input {
664 Input::ExternalEvent(event) => {
665 migrate_legacy_payload_blocks(event).unwrap();
666 assert!(event.payload.get("blocks").is_none());
667 assert_eq!(event.payload["body"], "see image");
668 assert_eq!(event.blocks.as_ref().map(Vec::len), Some(2));
669 }
670 other => panic!("Expected ExternalEvent, got {other:?}"),
671 }
672 }
673
674 #[test]
675 fn continuation_input_serde() {
676 let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
677 let json = serde_json::to_value(&input).unwrap();
678 assert_eq!(json["input_type"], "continuation");
679 let parsed: Input = serde_json::from_value(json).unwrap();
680 match parsed {
681 Input::Continuation(continuation) => {
682 assert_eq!(continuation.handling_mode, HandlingMode::Steer);
683 assert_eq!(continuation.reason, "detached_background_op_completed");
684 }
685 other => panic!("Expected Continuation, got {other:?}"),
686 }
687 }
688
689 #[test]
690 fn continuation_input_accepts_legacy_system_generated_tag() {
691 let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
692 let mut json = serde_json::to_value(&input).unwrap();
693 json["input_type"] = serde_json::Value::String("system_generated".into());
694 let parsed: Input = serde_json::from_value(json).unwrap();
695 match parsed {
696 Input::Continuation(continuation) => {
697 assert_eq!(continuation.reason, "detached_background_op_completed");
698 }
699 other => panic!("Expected Continuation, got {other:?}"),
700 }
701 }
702
703 #[test]
704 fn operation_input_serde() {
705 let input = Input::Operation(OperationInput {
706 header: InputHeader {
707 durability: InputDurability::Derived,
708 ..make_header()
709 },
710 operation_id: OperationId::new(),
711 event: OpEvent::Cancelled {
712 id: OperationId::new(),
713 },
714 });
715 let json = serde_json::to_value(&input).unwrap();
716 assert_eq!(json["input_type"], "operation");
717 let parsed: Input = serde_json::from_value(json).unwrap();
718 assert!(matches!(parsed, Input::Operation(_)));
719 }
720
721 #[test]
722 fn operation_input_accepts_legacy_projected_tag() {
723 let input = Input::Operation(OperationInput {
724 header: InputHeader {
725 durability: InputDurability::Derived,
726 ..make_header()
727 },
728 operation_id: OperationId::new(),
729 event: OpEvent::Cancelled {
730 id: OperationId::new(),
731 },
732 });
733 let mut json = serde_json::to_value(&input).unwrap();
734 json["input_type"] = serde_json::Value::String("projected".into());
735 let parsed: Input = serde_json::from_value(json).unwrap();
736 assert!(matches!(parsed, Input::Operation(_)));
737 }
738
739 #[test]
740 fn input_kind_id() {
741 let prompt = Input::Prompt(PromptInput {
742 header: make_header(),
743 text: "hi".into(),
744 blocks: None,
745 turn_metadata: None,
746 });
747 assert_eq!(prompt.kind_id().0, "prompt");
748
749 let peer_msg = Input::Peer(PeerInput {
750 header: make_header(),
751 convention: Some(PeerConvention::Message),
752 body: "hi".into(),
753 blocks: None,
754 handling_mode: None,
755 });
756 assert_eq!(peer_msg.kind_id().0, "peer_message");
757
758 let peer_req = Input::Peer(PeerInput {
759 header: make_header(),
760 convention: Some(PeerConvention::Request {
761 request_id: "r".into(),
762 intent: "i".into(),
763 }),
764 body: "hi".into(),
765 blocks: None,
766 handling_mode: None,
767 });
768 assert_eq!(peer_req.kind_id().0, "peer_request");
769
770 let continuation = Input::Continuation(ContinuationInput {
771 header: make_header(),
772 reason: "continue".into(),
773 handling_mode: HandlingMode::Steer,
774 request_id: None,
775 });
776 assert_eq!(continuation.kind_id().0, "continuation");
777
778 let operation = Input::Operation(OperationInput {
779 header: make_header(),
780 operation_id: OperationId::new(),
781 event: OpEvent::Cancelled {
782 id: OperationId::new(),
783 },
784 });
785 assert_eq!(operation.kind_id().0, "operation");
786 }
787
788 #[test]
789 fn input_source_variants() {
790 let sources = vec![
791 InputOrigin::Operator,
792 InputOrigin::Peer {
793 peer_id: "p1".into(),
794 runtime_id: None,
795 },
796 InputOrigin::Flow {
797 flow_id: "f1".into(),
798 step_index: 0,
799 },
800 InputOrigin::System,
801 InputOrigin::External {
802 source_name: "webhook".into(),
803 },
804 ];
805 for source in sources {
806 let json = serde_json::to_value(&source).unwrap();
807 let parsed: InputOrigin = serde_json::from_value(json).unwrap();
808 assert_eq!(source, parsed);
809 }
810 }
811
812 #[test]
813 fn input_durability_serde() {
814 for d in [
815 InputDurability::Durable,
816 InputDurability::Ephemeral,
817 InputDurability::Derived,
818 ] {
819 let json = serde_json::to_value(d).unwrap();
820 let parsed: InputDurability = serde_json::from_value(json).unwrap();
821 assert_eq!(d, parsed);
822 }
823 }
824
825 #[test]
826 fn peer_input_without_handling_mode_deserializes_as_none() {
827 let json = serde_json::json!({
829 "input_type": "peer",
830 "header": serde_json::to_value(make_header()).unwrap(),
831 "convention": { "convention_type": "message" },
832 "body": "hello"
833 });
834 let parsed: Input = serde_json::from_value(json).unwrap();
835 match parsed {
836 Input::Peer(p) => assert!(p.handling_mode.is_none()),
837 other => panic!("Expected Peer, got {other:?}"),
838 }
839 }
840
841 #[test]
842 fn peer_input_with_queue_handling_mode_roundtrips() {
843 let input = Input::Peer(PeerInput {
844 header: make_header(),
845 convention: Some(PeerConvention::Message),
846 body: "hi".into(),
847 blocks: None,
848 handling_mode: Some(HandlingMode::Queue),
849 });
850 let json = serde_json::to_value(&input).unwrap();
851 assert_eq!(json["handling_mode"], "queue");
852 let parsed: Input = serde_json::from_value(json).unwrap();
853 match parsed {
854 Input::Peer(p) => assert_eq!(p.handling_mode, Some(HandlingMode::Queue)),
855 other => panic!("Expected Peer, got {other:?}"),
856 }
857 }
858
859 #[test]
860 fn peer_input_with_steer_handling_mode_roundtrips() {
861 let input = Input::Peer(PeerInput {
862 header: make_header(),
863 convention: Some(PeerConvention::Message),
864 body: "hi".into(),
865 blocks: None,
866 handling_mode: Some(HandlingMode::Steer),
867 });
868 let json = serde_json::to_value(&input).unwrap();
869 assert_eq!(json["handling_mode"], "steer");
870 let parsed: Input = serde_json::from_value(json).unwrap();
871 match parsed {
872 Input::Peer(p) => assert_eq!(p.handling_mode, Some(HandlingMode::Steer)),
873 other => panic!("Expected Peer, got {other:?}"),
874 }
875 }
876
877 #[test]
878 fn peer_input_handling_mode_not_serialized_when_none() {
879 let input = Input::Peer(PeerInput {
880 header: make_header(),
881 convention: Some(PeerConvention::Message),
882 body: "hi".into(),
883 blocks: None,
884 handling_mode: None,
885 });
886 let json = serde_json::to_value(&input).unwrap();
887 assert!(json.get("handling_mode").is_none());
888 }
889
890 #[test]
893 fn classify_prompt_is_content_turn() {
894 let input = Input::Prompt(PromptInput {
895 header: make_header(),
896 text: "hello".into(),
897 blocks: None,
898 turn_metadata: None,
899 });
900 assert_eq!(
901 classify_execution_kind(&input),
902 meerkat_core::lifecycle::RuntimeExecutionKind::ContentTurn
903 );
904 }
905
906 #[test]
907 fn classify_peer_terminal_is_content_turn() {
908 let input = Input::Peer(PeerInput {
909 header: make_header(),
910 convention: Some(PeerConvention::ResponseTerminal {
911 request_id: "r".into(),
912 status: ResponseTerminalStatus::Completed,
913 }),
914 body: "done".into(),
915 blocks: None,
916 handling_mode: None,
917 });
918 assert_eq!(
919 classify_execution_kind(&input),
920 meerkat_core::lifecycle::RuntimeExecutionKind::ContentTurn
921 );
922 }
923
924 #[test]
925 fn classify_peer_terminal_with_steer_is_content_turn() {
926 let input = Input::Peer(PeerInput {
927 header: make_header(),
928 convention: Some(PeerConvention::ResponseTerminal {
929 request_id: "r".into(),
930 status: ResponseTerminalStatus::Completed,
931 }),
932 body: "done".into(),
933 blocks: None,
934 handling_mode: Some(HandlingMode::Steer),
935 });
936 assert_eq!(
937 classify_execution_kind(&input),
938 meerkat_core::lifecycle::RuntimeExecutionKind::ContentTurn
939 );
940 }
941
942 #[test]
943 fn classify_continuation_is_resume_pending() {
944 let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
945 assert_eq!(
946 classify_execution_kind(&input),
947 meerkat_core::lifecycle::RuntimeExecutionKind::ResumePending
948 );
949 }
950
951 #[test]
952 fn classify_peer_message_is_content_turn() {
953 let input = Input::Peer(PeerInput {
954 header: make_header(),
955 convention: Some(PeerConvention::Message),
956 body: "hi".into(),
957 blocks: None,
958 handling_mode: None,
959 });
960 assert_eq!(
961 classify_execution_kind(&input),
962 meerkat_core::lifecycle::RuntimeExecutionKind::ContentTurn
963 );
964 }
965
966 #[test]
967 fn classify_operation_is_content_turn() {
968 let input = Input::Operation(OperationInput {
969 header: make_header(),
970 operation_id: OperationId::new(),
971 event: OpEvent::Started {
972 id: OperationId::new(),
973 kind: meerkat_core::ops::WorkKind::ToolCall,
974 },
975 });
976 assert_eq!(
977 classify_execution_kind(&input),
978 meerkat_core::lifecycle::RuntimeExecutionKind::ContentTurn
979 );
980 }
981}