Skip to main content

meerkat_runtime/
input.rs

1//! §8 Input types — the 6 input variants accepted by the runtime layer.
2//!
3//! Core never sees these. The runtime's policy table resolves each Input
4//! to a PolicyDecision, then the runtime translates accepted Inputs into
5//! RunPrimitive for core consumption.
6
7use chrono::{DateTime, Utc};
8use meerkat_core::lifecycle::InputId;
9use meerkat_core::lifecycle::run_primitive::RuntimeTurnMetadata;
10use meerkat_core::ops::{OpEvent, OperationId};
11use meerkat_core::types::HandlingMode;
12use meerkat_core::{
13    BlobStore, BlobStoreError, MissingBlobBehavior, externalize_content_blocks,
14    hydrate_content_blocks,
15};
16use serde::{Deserialize, Serialize};
17
18use crate::identifiers::{
19    CorrelationId, IdempotencyKey, KindId, LogicalRuntimeId, SupersessionKey,
20};
21use meerkat_core::types::RenderMetadata;
22
23/// Common header for all input variants.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct InputHeader {
26    /// Unique ID for this input.
27    pub id: InputId,
28    /// When the input was created.
29    pub timestamp: DateTime<Utc>,
30    /// Source of the input.
31    pub source: InputOrigin,
32    /// Durability requirement.
33    pub durability: InputDurability,
34    /// Visibility controls.
35    pub visibility: InputVisibility,
36    /// Optional idempotency key for dedup.
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub idempotency_key: Option<IdempotencyKey>,
39    /// Optional supersession key.
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub supersession_key: Option<SupersessionKey>,
42    /// Optional correlation ID.
43    #[serde(skip_serializing_if = "Option::is_none")]
44    pub correlation_id: Option<CorrelationId>,
45}
46
47/// Where the input originated.
48#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(tag = "type", rename_all = "snake_case")]
50#[non_exhaustive]
51pub enum InputOrigin {
52    /// Human operator / external API caller.
53    Operator,
54    /// Peer agent (comms).
55    Peer {
56        peer_id: String,
57        #[serde(skip_serializing_if = "Option::is_none")]
58        runtime_id: Option<LogicalRuntimeId>,
59    },
60    /// Flow engine (mob orchestration).
61    Flow { flow_id: String, step_index: usize },
62    /// System-generated (compaction, projection, etc.).
63    System,
64    /// External event source.
65    External { source_name: String },
66}
67
68/// Durability requirement for an input.
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
70#[serde(rename_all = "snake_case")]
71#[non_exhaustive]
72pub enum InputDurability {
73    /// Must be persisted before acknowledgment.
74    Durable,
75    /// In-memory only, may be lost on crash.
76    Ephemeral,
77    /// Derived from other inputs (can be reconstructed).
78    Derived,
79}
80
81/// Visibility controls for an input.
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
83pub struct InputVisibility {
84    /// Whether this input appears in the conversation transcript.
85    pub transcript_eligible: bool,
86    /// Whether this input is visible to operator surfaces.
87    pub operator_eligible: bool,
88}
89
90impl Default for InputVisibility {
91    fn default() -> Self {
92        Self {
93            transcript_eligible: true,
94            operator_eligible: true,
95        }
96    }
97}
98
99/// The 6 input variants accepted by the runtime layer.
100#[derive(Debug, Clone, Serialize, Deserialize)]
101#[serde(tag = "input_type", rename_all = "snake_case")]
102#[non_exhaustive]
103pub enum Input {
104    /// User/operator prompt.
105    Prompt(PromptInput),
106    /// Peer-originated input (comms).
107    Peer(PeerInput),
108    /// Flow step input (mob orchestration).
109    FlowStep(FlowStepInput),
110    /// External event input.
111    ExternalEvent(ExternalEventInput),
112    /// Explicit runtime continuation work.
113    #[serde(alias = "system_generated")]
114    Continuation(ContinuationInput),
115    /// Explicit non-content operation/lifecycle input.
116    #[serde(alias = "projected")]
117    Operation(OperationInput),
118}
119
120impl Input {
121    /// Get the input header.
122    pub fn header(&self) -> &InputHeader {
123        match self {
124            Input::Prompt(i) => &i.header,
125            Input::Peer(i) => &i.header,
126            Input::FlowStep(i) => &i.header,
127            Input::ExternalEvent(i) => &i.header,
128            Input::Continuation(i) => &i.header,
129            Input::Operation(i) => &i.header,
130        }
131    }
132
133    /// Get the input ID.
134    pub fn id(&self) -> &InputId {
135        &self.header().id
136    }
137
138    /// Get the kind ID for policy resolution.
139    pub fn kind_id(&self) -> KindId {
140        match self {
141            Input::Prompt(_) => KindId::new("prompt"),
142            Input::Peer(p) => match &p.convention {
143                Some(PeerConvention::Message) => KindId::new("peer_message"),
144                Some(PeerConvention::Request { .. }) => KindId::new("peer_request"),
145                Some(PeerConvention::ResponseProgress { .. }) => {
146                    KindId::new("peer_response_progress")
147                }
148                Some(PeerConvention::ResponseTerminal { .. }) => {
149                    KindId::new("peer_response_terminal")
150                }
151                None => KindId::new("peer_message"),
152            },
153            Input::FlowStep(_) => KindId::new("flow_step"),
154            Input::ExternalEvent(_) => KindId::new("external_event"),
155            Input::Continuation(_) => KindId::new("continuation"),
156            Input::Operation(_) => KindId::new("operation"),
157        }
158    }
159
160    /// Handling-mode hint for ordinary work admitted through the runtime.
161    pub fn handling_mode(&self) -> Option<HandlingMode> {
162        match self {
163            Input::Prompt(prompt) => prompt.turn_metadata.as_ref()?.handling_mode,
164            Input::FlowStep(flow_step) => flow_step.turn_metadata.as_ref()?.handling_mode,
165            Input::ExternalEvent(event) => Some(event.handling_mode),
166            Input::Continuation(continuation) => Some(continuation.handling_mode),
167            Input::Peer(peer) => peer.handling_mode,
168            Input::Operation(_) => None,
169        }
170    }
171}
172
173fn migrate_legacy_payload_blocks(event: &mut ExternalEventInput) -> Result<(), BlobStoreError> {
174    let Some(obj) = event.payload.as_object_mut() else {
175        return Ok(());
176    };
177    let Some(blocks_value) = obj.remove("blocks") else {
178        return Ok(());
179    };
180    if event.blocks.is_some() {
181        return Ok(());
182    }
183    let blocks = serde_json::from_value::<Vec<meerkat_core::types::ContentBlock>>(blocks_value)
184        .map_err(|err| {
185            BlobStoreError::Internal(format!("failed to decode payload blocks: {err}"))
186        })?;
187    event.blocks = Some(blocks);
188    Ok(())
189}
190
191pub async fn externalize_input_images(
192    blob_store: &dyn BlobStore,
193    input: &mut Input,
194) -> Result<(), BlobStoreError> {
195    match input {
196        Input::Prompt(prompt) => {
197            if let Some(blocks) = prompt.blocks.as_mut() {
198                externalize_content_blocks(blob_store, blocks).await?;
199            }
200        }
201        Input::Peer(peer) => {
202            if let Some(blocks) = peer.blocks.as_mut() {
203                externalize_content_blocks(blob_store, blocks).await?;
204            }
205        }
206        Input::FlowStep(flow_step) => {
207            if let Some(blocks) = flow_step.blocks.as_mut() {
208                externalize_content_blocks(blob_store, blocks).await?;
209            }
210        }
211        Input::ExternalEvent(event) => {
212            migrate_legacy_payload_blocks(event)?;
213            if let Some(blocks) = event.blocks.as_mut() {
214                externalize_content_blocks(blob_store, blocks).await?;
215            }
216        }
217        Input::Continuation(_) | Input::Operation(_) => {}
218    }
219    Ok(())
220}
221
222pub async fn hydrate_input_images(
223    blob_store: &dyn BlobStore,
224    input: &mut Input,
225    missing_behavior: MissingBlobBehavior,
226) -> Result<(), BlobStoreError> {
227    match input {
228        Input::Prompt(prompt) => {
229            if let Some(blocks) = prompt.blocks.as_mut() {
230                hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
231            }
232        }
233        Input::Peer(peer) => {
234            if let Some(blocks) = peer.blocks.as_mut() {
235                hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
236            }
237        }
238        Input::FlowStep(flow_step) => {
239            if let Some(blocks) = flow_step.blocks.as_mut() {
240                hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
241            }
242        }
243        Input::ExternalEvent(event) => {
244            migrate_legacy_payload_blocks(event)?;
245            if let Some(blocks) = event.blocks.as_mut() {
246                hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
247            }
248        }
249        Input::Continuation(_) | Input::Operation(_) => {}
250    }
251    Ok(())
252}
253
254/// User/operator prompt input.
255#[derive(Debug, Clone, Serialize, Deserialize)]
256pub struct PromptInput {
257    pub header: InputHeader,
258    /// The prompt text.
259    pub text: String,
260    /// Optional multimodal content blocks. When present, `text` serves as the
261    /// text projection (backwards compat), and `blocks` carries the full content.
262    #[serde(default, skip_serializing_if = "Option::is_none")]
263    pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
264    #[serde(default, skip_serializing_if = "Option::is_none")]
265    pub turn_metadata: Option<RuntimeTurnMetadata>,
266}
267
268impl PromptInput {
269    /// Create a new operator prompt with default header.
270    pub fn new(text: impl Into<String>, turn_metadata: Option<RuntimeTurnMetadata>) -> Self {
271        Self {
272            header: InputHeader {
273                id: meerkat_core::lifecycle::InputId::new(),
274                timestamp: chrono::Utc::now(),
275                source: InputOrigin::Operator,
276                durability: InputDurability::Durable,
277                visibility: InputVisibility::default(),
278                idempotency_key: None,
279                supersession_key: None,
280                correlation_id: None,
281            },
282            text: text.into(),
283            blocks: None,
284            turn_metadata,
285        }
286    }
287
288    /// Create a multimodal prompt from `ContentInput`.
289    pub fn from_content_input(
290        input: meerkat_core::types::ContentInput,
291        turn_metadata: Option<RuntimeTurnMetadata>,
292    ) -> Self {
293        let text = input.text_content();
294        let blocks = if input.has_images() {
295            Some(input.into_blocks())
296        } else {
297            None
298        };
299        Self {
300            header: InputHeader {
301                id: meerkat_core::lifecycle::InputId::new(),
302                timestamp: chrono::Utc::now(),
303                source: InputOrigin::Operator,
304                durability: InputDurability::Durable,
305                visibility: InputVisibility::default(),
306                idempotency_key: None,
307                supersession_key: None,
308                correlation_id: None,
309            },
310            text,
311            blocks,
312            turn_metadata,
313        }
314    }
315}
316
317/// Peer-originated input from comms.
318#[derive(Debug, Clone, Serialize, Deserialize)]
319pub struct PeerInput {
320    pub header: InputHeader,
321    /// The peer convention (message, request, response).
322    #[serde(skip_serializing_if = "Option::is_none")]
323    pub convention: Option<PeerConvention>,
324    /// LLM-facing rendered text projection for this peer input.
325    pub body: String,
326    /// Optional multimodal content blocks. When present, `body` serves as the
327    /// text projection (backwards compat), and `blocks` carries the full content.
328    #[serde(default, skip_serializing_if = "Option::is_none")]
329    pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
330    /// Optional handling-mode override for actionable peer inputs.
331    /// When present on Message/Request/no-convention, overrides kind-based
332    /// policy defaults. Forbidden on ResponseProgress and ResponseTerminal
333    /// (enforced by [`validate_peer_handling_mode`]).
334    #[serde(default, skip_serializing_if = "Option::is_none")]
335    pub handling_mode: Option<HandlingMode>,
336}
337
338/// Peer communication conventions.
339#[derive(Debug, Clone, Serialize, Deserialize)]
340#[serde(tag = "convention_type", rename_all = "snake_case")]
341#[non_exhaustive]
342pub enum PeerConvention {
343    /// Simple peer-to-peer message.
344    Message,
345    /// Request expecting a response.
346    Request { request_id: String, intent: String },
347    /// Progress update for an ongoing response.
348    ResponseProgress {
349        request_id: String,
350        phase: ResponseProgressPhase,
351    },
352    /// Terminal response (completed or failed).
353    ResponseTerminal {
354        request_id: String,
355        status: ResponseTerminalStatus,
356    },
357}
358
359/// Phase of a response progress update.
360#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
361#[serde(rename_all = "snake_case")]
362#[non_exhaustive]
363pub enum ResponseProgressPhase {
364    /// Request was accepted.
365    Accepted,
366    /// Work is in progress.
367    InProgress,
368    /// Partial result available.
369    PartialResult,
370}
371
372/// Terminal status of a response.
373#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
374#[serde(rename_all = "snake_case")]
375#[non_exhaustive]
376pub enum ResponseTerminalStatus {
377    /// Request completed successfully.
378    Completed,
379    /// Request failed.
380    Failed,
381    /// Request was cancelled.
382    Cancelled,
383}
384
385/// Flow step input from mob orchestration.
386#[derive(Debug, Clone, Serialize, Deserialize)]
387pub struct FlowStepInput {
388    pub header: InputHeader,
389    /// Flow step identifier.
390    pub step_id: String,
391    /// Step instructions/prompt.
392    pub instructions: String,
393    /// Optional multimodal content blocks. When present, `instructions` serves
394    /// as the text projection (backwards compat), and `blocks` carries the
395    /// full content.
396    #[serde(default, skip_serializing_if = "Option::is_none")]
397    pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
398    #[serde(default, skip_serializing_if = "Option::is_none")]
399    pub turn_metadata: Option<RuntimeTurnMetadata>,
400}
401
402/// External event input.
403#[derive(Debug, Clone, Serialize, Deserialize)]
404pub struct ExternalEventInput {
405    pub header: InputHeader,
406    /// Event type/name.
407    pub event_type: String,
408    /// Event payload. Uses `Value` because the runtime layer may inspect/merge
409    /// payloads during coalescing and projection — not a pure pass-through.
410    /// Multimodal content does NOT live here canonically; use `blocks`.
411    pub payload: serde_json::Value,
412    /// Optional multimodal blocks carried by the external event. This is the
413    /// canonical owner for multimodal external-event content.
414    #[serde(default, skip_serializing_if = "Option::is_none")]
415    pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
416    /// Runtime-owned handling hint for this external event.
417    #[serde(default)]
418    pub handling_mode: HandlingMode,
419    /// Optional normalized render metadata carried with the event.
420    #[serde(default, skip_serializing_if = "Option::is_none")]
421    pub render_metadata: Option<RenderMetadata>,
422}
423
424/// Explicit continuation request that asks the runtime to keep draining
425/// ordinary work after a boundary-local event (for example, terminal peer
426/// responses injected into session state).
427#[derive(Debug, Clone, Serialize, Deserialize)]
428pub struct ContinuationInput {
429    pub header: InputHeader,
430    /// Stable reason for the continuation request.
431    pub reason: String,
432    /// Ordinary-work handling mode for the continuation.
433    #[serde(default)]
434    pub handling_mode: HandlingMode,
435    /// Optional request/correlation handle tied to the continuation.
436    #[serde(default, skip_serializing_if = "Option::is_none")]
437    pub request_id: Option<String>,
438}
439
440impl ContinuationInput {
441    /// Build a continuation for waking an idle session after a detached
442    /// background operation reaches terminal state.
443    ///
444    /// Properties: `Derived` durability, invisible to transcript and operator,
445    /// `System` origin, `Steer` handling mode.
446    pub fn detached_background_op_completed() -> Self {
447        Self {
448            header: InputHeader {
449                id: meerkat_core::lifecycle::InputId::new(),
450                timestamp: chrono::Utc::now(),
451                source: InputOrigin::System,
452                durability: InputDurability::Derived,
453                visibility: InputVisibility {
454                    transcript_eligible: false,
455                    operator_eligible: false,
456                },
457                idempotency_key: None,
458                supersession_key: None,
459                correlation_id: None,
460            },
461            reason: "detached_background_op_completed".to_string(),
462            handling_mode: HandlingMode::Steer,
463            request_id: None,
464        }
465    }
466}
467
468/// Explicit operation/lifecycle input admitted through runtime instead of
469/// being smuggled through transcript projections or peer-only paths.
470#[derive(Debug, Clone, Serialize, Deserialize)]
471pub struct OperationInput {
472    pub header: InputHeader,
473    /// Stable operation identifier.
474    pub operation_id: OperationId,
475    /// Typed lifecycle event for the operation.
476    pub event: OpEvent,
477}
478
479/// Classify an input's execution intent for the runtime loop.
480///
481/// `Continuation` inputs map to `ResumePending`; everything else is `ContentTurn`.
482/// Terminal peer responses are always `ContentTurn` regardless of handling_mode.
483pub(crate) fn classify_execution_kind(
484    input: &Input,
485) -> meerkat_core::lifecycle::RuntimeExecutionKind {
486    match input {
487        Input::Continuation(_) => meerkat_core::lifecycle::RuntimeExecutionKind::ResumePending,
488        _ => meerkat_core::lifecycle::RuntimeExecutionKind::ContentTurn,
489    }
490}
491
492#[cfg(test)]
493#[allow(clippy::unwrap_used, clippy::panic)]
494mod tests {
495    use super::*;
496    use chrono::Utc;
497
498    fn make_header() -> InputHeader {
499        InputHeader {
500            id: InputId::new(),
501            timestamp: Utc::now(),
502            source: InputOrigin::Operator,
503            durability: InputDurability::Durable,
504            visibility: InputVisibility::default(),
505            idempotency_key: None,
506            supersession_key: None,
507            correlation_id: None,
508        }
509    }
510
511    #[test]
512    fn prompt_input_serde() {
513        let input = Input::Prompt(PromptInput {
514            header: make_header(),
515            text: "hello".into(),
516            blocks: None,
517            turn_metadata: None,
518        });
519        let json = serde_json::to_value(&input).unwrap();
520        assert_eq!(json["input_type"], "prompt");
521        let parsed: Input = serde_json::from_value(json).unwrap();
522        assert!(matches!(parsed, Input::Prompt(_)));
523    }
524
525    #[test]
526    fn peer_input_message_serde() {
527        let input = Input::Peer(PeerInput {
528            header: make_header(),
529            convention: Some(PeerConvention::Message),
530            body: "hi there".into(),
531            blocks: None,
532            handling_mode: None,
533        });
534        let json = serde_json::to_value(&input).unwrap();
535        assert_eq!(json["input_type"], "peer");
536        let parsed: Input = serde_json::from_value(json).unwrap();
537        assert!(matches!(parsed, Input::Peer(_)));
538    }
539
540    #[test]
541    fn peer_input_request_serde() {
542        let input = Input::Peer(PeerInput {
543            header: make_header(),
544            convention: Some(PeerConvention::Request {
545                request_id: "req-1".into(),
546                intent: "mob.peer_added".into(),
547            }),
548            body: "Agent joined".into(),
549            blocks: None,
550            handling_mode: None,
551        });
552        let json = serde_json::to_value(&input).unwrap();
553        let parsed: Input = serde_json::from_value(json).unwrap();
554        if let Input::Peer(p) = parsed {
555            assert!(matches!(p.convention, Some(PeerConvention::Request { .. })));
556        } else {
557            panic!("Expected PeerInput");
558        }
559    }
560
561    #[test]
562    fn peer_input_response_terminal_serde() {
563        let input = Input::Peer(PeerInput {
564            header: make_header(),
565            convention: Some(PeerConvention::ResponseTerminal {
566                request_id: "req-1".into(),
567                status: ResponseTerminalStatus::Completed,
568            }),
569            body: "Done".into(),
570            blocks: None,
571            handling_mode: None,
572        });
573        let json = serde_json::to_value(&input).unwrap();
574        let parsed: Input = serde_json::from_value(json).unwrap();
575        assert!(matches!(parsed, Input::Peer(_)));
576    }
577
578    #[test]
579    fn peer_input_response_progress_serde() {
580        let input = Input::Peer(PeerInput {
581            header: make_header(),
582            convention: Some(PeerConvention::ResponseProgress {
583                request_id: "req-1".into(),
584                phase: ResponseProgressPhase::InProgress,
585            }),
586            body: "Working...".into(),
587            blocks: None,
588            handling_mode: None,
589        });
590        let json = serde_json::to_value(&input).unwrap();
591        let parsed: Input = serde_json::from_value(json).unwrap();
592        assert!(matches!(parsed, Input::Peer(_)));
593    }
594
595    #[test]
596    fn flow_step_input_serde() {
597        let input = Input::FlowStep(FlowStepInput {
598            header: make_header(),
599            step_id: "step-1".into(),
600            instructions: "analyze the data".into(),
601            blocks: Some(vec![
602                meerkat_core::types::ContentBlock::Text {
603                    text: "analyze the data".into(),
604                },
605                meerkat_core::types::ContentBlock::Image {
606                    media_type: "image/png".into(),
607                    data: meerkat_core::types::ImageData::Inline {
608                        data: "abc123".into(),
609                    },
610                },
611            ]),
612            turn_metadata: None,
613        });
614        let json = serde_json::to_value(&input).unwrap();
615        assert_eq!(json["input_type"], "flow_step");
616        let parsed: Input = serde_json::from_value(json).unwrap();
617        assert!(matches!(parsed, Input::FlowStep(_)));
618    }
619
620    #[test]
621    fn external_event_input_serde() {
622        let input = Input::ExternalEvent(ExternalEventInput {
623            header: make_header(),
624            event_type: "webhook.received".into(),
625            payload: serde_json::json!({"url": "https://example.com"}),
626            blocks: Some(vec![
627                meerkat_core::types::ContentBlock::Text {
628                    text: "look".into(),
629                },
630                meerkat_core::types::ContentBlock::Image {
631                    media_type: "image/png".into(),
632                    data: meerkat_core::types::ImageData::Inline {
633                        data: "abc123".into(),
634                    },
635                },
636            ]),
637            handling_mode: HandlingMode::Queue,
638            render_metadata: None,
639        });
640        let json = serde_json::to_value(&input).unwrap();
641        assert_eq!(json["input_type"], "external_event");
642        let parsed: Input = serde_json::from_value(json).unwrap();
643        assert!(matches!(parsed, Input::ExternalEvent(_)));
644    }
645
646    #[test]
647    fn legacy_external_event_payload_blocks_migrate_to_canonical_blocks_owner() {
648        let mut input = Input::ExternalEvent(ExternalEventInput {
649            header: make_header(),
650            event_type: "webhook.received".into(),
651            payload: serde_json::json!({
652                "body": "see image",
653                "blocks": [
654                    { "type": "text", "text": "caption text" },
655                    { "type": "image", "media_type": "image/png", "source": "inline", "data": "abc123" }
656                ]
657            }),
658            blocks: None,
659            handling_mode: HandlingMode::Queue,
660            render_metadata: None,
661        });
662
663        match &mut input {
664            Input::ExternalEvent(event) => {
665                migrate_legacy_payload_blocks(event).unwrap();
666                assert!(event.payload.get("blocks").is_none());
667                assert_eq!(event.payload["body"], "see image");
668                assert_eq!(event.blocks.as_ref().map(Vec::len), Some(2));
669            }
670            other => panic!("Expected ExternalEvent, got {other:?}"),
671        }
672    }
673
674    #[test]
675    fn continuation_input_serde() {
676        let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
677        let json = serde_json::to_value(&input).unwrap();
678        assert_eq!(json["input_type"], "continuation");
679        let parsed: Input = serde_json::from_value(json).unwrap();
680        match parsed {
681            Input::Continuation(continuation) => {
682                assert_eq!(continuation.handling_mode, HandlingMode::Steer);
683                assert_eq!(continuation.reason, "detached_background_op_completed");
684            }
685            other => panic!("Expected Continuation, got {other:?}"),
686        }
687    }
688
689    #[test]
690    fn continuation_input_accepts_legacy_system_generated_tag() {
691        let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
692        let mut json = serde_json::to_value(&input).unwrap();
693        json["input_type"] = serde_json::Value::String("system_generated".into());
694        let parsed: Input = serde_json::from_value(json).unwrap();
695        match parsed {
696            Input::Continuation(continuation) => {
697                assert_eq!(continuation.reason, "detached_background_op_completed");
698            }
699            other => panic!("Expected Continuation, got {other:?}"),
700        }
701    }
702
703    #[test]
704    fn operation_input_serde() {
705        let input = Input::Operation(OperationInput {
706            header: InputHeader {
707                durability: InputDurability::Derived,
708                ..make_header()
709            },
710            operation_id: OperationId::new(),
711            event: OpEvent::Cancelled {
712                id: OperationId::new(),
713            },
714        });
715        let json = serde_json::to_value(&input).unwrap();
716        assert_eq!(json["input_type"], "operation");
717        let parsed: Input = serde_json::from_value(json).unwrap();
718        assert!(matches!(parsed, Input::Operation(_)));
719    }
720
721    #[test]
722    fn operation_input_accepts_legacy_projected_tag() {
723        let input = Input::Operation(OperationInput {
724            header: InputHeader {
725                durability: InputDurability::Derived,
726                ..make_header()
727            },
728            operation_id: OperationId::new(),
729            event: OpEvent::Cancelled {
730                id: OperationId::new(),
731            },
732        });
733        let mut json = serde_json::to_value(&input).unwrap();
734        json["input_type"] = serde_json::Value::String("projected".into());
735        let parsed: Input = serde_json::from_value(json).unwrap();
736        assert!(matches!(parsed, Input::Operation(_)));
737    }
738
739    #[test]
740    fn input_kind_id() {
741        let prompt = Input::Prompt(PromptInput {
742            header: make_header(),
743            text: "hi".into(),
744            blocks: None,
745            turn_metadata: None,
746        });
747        assert_eq!(prompt.kind_id().0, "prompt");
748
749        let peer_msg = Input::Peer(PeerInput {
750            header: make_header(),
751            convention: Some(PeerConvention::Message),
752            body: "hi".into(),
753            blocks: None,
754            handling_mode: None,
755        });
756        assert_eq!(peer_msg.kind_id().0, "peer_message");
757
758        let peer_req = Input::Peer(PeerInput {
759            header: make_header(),
760            convention: Some(PeerConvention::Request {
761                request_id: "r".into(),
762                intent: "i".into(),
763            }),
764            body: "hi".into(),
765            blocks: None,
766            handling_mode: None,
767        });
768        assert_eq!(peer_req.kind_id().0, "peer_request");
769
770        let continuation = Input::Continuation(ContinuationInput {
771            header: make_header(),
772            reason: "continue".into(),
773            handling_mode: HandlingMode::Steer,
774            request_id: None,
775        });
776        assert_eq!(continuation.kind_id().0, "continuation");
777
778        let operation = Input::Operation(OperationInput {
779            header: make_header(),
780            operation_id: OperationId::new(),
781            event: OpEvent::Cancelled {
782                id: OperationId::new(),
783            },
784        });
785        assert_eq!(operation.kind_id().0, "operation");
786    }
787
788    #[test]
789    fn input_source_variants() {
790        let sources = vec![
791            InputOrigin::Operator,
792            InputOrigin::Peer {
793                peer_id: "p1".into(),
794                runtime_id: None,
795            },
796            InputOrigin::Flow {
797                flow_id: "f1".into(),
798                step_index: 0,
799            },
800            InputOrigin::System,
801            InputOrigin::External {
802                source_name: "webhook".into(),
803            },
804        ];
805        for source in sources {
806            let json = serde_json::to_value(&source).unwrap();
807            let parsed: InputOrigin = serde_json::from_value(json).unwrap();
808            assert_eq!(source, parsed);
809        }
810    }
811
812    #[test]
813    fn input_durability_serde() {
814        for d in [
815            InputDurability::Durable,
816            InputDurability::Ephemeral,
817            InputDurability::Derived,
818        ] {
819            let json = serde_json::to_value(d).unwrap();
820            let parsed: InputDurability = serde_json::from_value(json).unwrap();
821            assert_eq!(d, parsed);
822        }
823    }
824
825    #[test]
826    fn peer_input_without_handling_mode_deserializes_as_none() {
827        // Simulate old serialized PeerInput without the handling_mode field.
828        let json = serde_json::json!({
829            "input_type": "peer",
830            "header": serde_json::to_value(make_header()).unwrap(),
831            "convention": { "convention_type": "message" },
832            "body": "hello"
833        });
834        let parsed: Input = serde_json::from_value(json).unwrap();
835        match parsed {
836            Input::Peer(p) => assert!(p.handling_mode.is_none()),
837            other => panic!("Expected Peer, got {other:?}"),
838        }
839    }
840
841    #[test]
842    fn peer_input_with_queue_handling_mode_roundtrips() {
843        let input = Input::Peer(PeerInput {
844            header: make_header(),
845            convention: Some(PeerConvention::Message),
846            body: "hi".into(),
847            blocks: None,
848            handling_mode: Some(HandlingMode::Queue),
849        });
850        let json = serde_json::to_value(&input).unwrap();
851        assert_eq!(json["handling_mode"], "queue");
852        let parsed: Input = serde_json::from_value(json).unwrap();
853        match parsed {
854            Input::Peer(p) => assert_eq!(p.handling_mode, Some(HandlingMode::Queue)),
855            other => panic!("Expected Peer, got {other:?}"),
856        }
857    }
858
859    #[test]
860    fn peer_input_with_steer_handling_mode_roundtrips() {
861        let input = Input::Peer(PeerInput {
862            header: make_header(),
863            convention: Some(PeerConvention::Message),
864            body: "hi".into(),
865            blocks: None,
866            handling_mode: Some(HandlingMode::Steer),
867        });
868        let json = serde_json::to_value(&input).unwrap();
869        assert_eq!(json["handling_mode"], "steer");
870        let parsed: Input = serde_json::from_value(json).unwrap();
871        match parsed {
872            Input::Peer(p) => assert_eq!(p.handling_mode, Some(HandlingMode::Steer)),
873            other => panic!("Expected Peer, got {other:?}"),
874        }
875    }
876
877    #[test]
878    fn peer_input_handling_mode_not_serialized_when_none() {
879        let input = Input::Peer(PeerInput {
880            header: make_header(),
881            convention: Some(PeerConvention::Message),
882            body: "hi".into(),
883            blocks: None,
884            handling_mode: None,
885        });
886        let json = serde_json::to_value(&input).unwrap();
887        assert!(json.get("handling_mode").is_none());
888    }
889
890    // --- classify_execution_kind tests ---
891
892    #[test]
893    fn classify_prompt_is_content_turn() {
894        let input = Input::Prompt(PromptInput {
895            header: make_header(),
896            text: "hello".into(),
897            blocks: None,
898            turn_metadata: None,
899        });
900        assert_eq!(
901            classify_execution_kind(&input),
902            meerkat_core::lifecycle::RuntimeExecutionKind::ContentTurn
903        );
904    }
905
906    #[test]
907    fn classify_peer_terminal_is_content_turn() {
908        let input = Input::Peer(PeerInput {
909            header: make_header(),
910            convention: Some(PeerConvention::ResponseTerminal {
911                request_id: "r".into(),
912                status: ResponseTerminalStatus::Completed,
913            }),
914            body: "done".into(),
915            blocks: None,
916            handling_mode: None,
917        });
918        assert_eq!(
919            classify_execution_kind(&input),
920            meerkat_core::lifecycle::RuntimeExecutionKind::ContentTurn
921        );
922    }
923
924    #[test]
925    fn classify_peer_terminal_with_steer_is_content_turn() {
926        let input = Input::Peer(PeerInput {
927            header: make_header(),
928            convention: Some(PeerConvention::ResponseTerminal {
929                request_id: "r".into(),
930                status: ResponseTerminalStatus::Completed,
931            }),
932            body: "done".into(),
933            blocks: None,
934            handling_mode: Some(HandlingMode::Steer),
935        });
936        assert_eq!(
937            classify_execution_kind(&input),
938            meerkat_core::lifecycle::RuntimeExecutionKind::ContentTurn
939        );
940    }
941
942    #[test]
943    fn classify_continuation_is_resume_pending() {
944        let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
945        assert_eq!(
946            classify_execution_kind(&input),
947            meerkat_core::lifecycle::RuntimeExecutionKind::ResumePending
948        );
949    }
950
951    #[test]
952    fn classify_peer_message_is_content_turn() {
953        let input = Input::Peer(PeerInput {
954            header: make_header(),
955            convention: Some(PeerConvention::Message),
956            body: "hi".into(),
957            blocks: None,
958            handling_mode: None,
959        });
960        assert_eq!(
961            classify_execution_kind(&input),
962            meerkat_core::lifecycle::RuntimeExecutionKind::ContentTurn
963        );
964    }
965
966    #[test]
967    fn classify_operation_is_content_turn() {
968        let input = Input::Operation(OperationInput {
969            header: make_header(),
970            operation_id: OperationId::new(),
971            event: OpEvent::Started {
972                id: OperationId::new(),
973                kind: meerkat_core::ops::WorkKind::ToolCall,
974            },
975        });
976        assert_eq!(
977            classify_execution_kind(&input),
978            meerkat_core::lifecycle::RuntimeExecutionKind::ContentTurn
979        );
980    }
981}