Skip to main content

meerkat_runtime/
input.rs

1//! §8 Input types — the 6 input variants accepted by the runtime layer.
2//!
3//! Core never sees these. The runtime's policy table resolves each Input
4//! to a PolicyDecision, then the runtime translates accepted Inputs into
5//! RunPrimitive for core consumption.
6
7use chrono::{DateTime, Utc};
8use meerkat_core::lifecycle::InputId;
9use meerkat_core::lifecycle::run_primitive::RuntimeTurnMetadata;
10use meerkat_core::ops::{OpEvent, OperationId};
11use meerkat_core::types::HandlingMode;
12use meerkat_core::{
13    BlobStore, BlobStoreError, MissingBlobBehavior, externalize_content_blocks,
14    hydrate_content_blocks,
15};
16use serde::{Deserialize, Serialize};
17
18use crate::identifiers::{
19    CorrelationId, IdempotencyKey, KindId, LogicalRuntimeId, SupersessionKey,
20};
21use meerkat_core::types::RenderMetadata;
22
23/// Common header for all input variants.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct InputHeader {
26    /// Unique ID for this input.
27    pub id: InputId,
28    /// When the input was created.
29    pub timestamp: DateTime<Utc>,
30    /// Source of the input.
31    pub source: InputOrigin,
32    /// Durability requirement.
33    pub durability: InputDurability,
34    /// Visibility controls.
35    pub visibility: InputVisibility,
36    /// Optional idempotency key for dedup.
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub idempotency_key: Option<IdempotencyKey>,
39    /// Optional supersession key.
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub supersession_key: Option<SupersessionKey>,
42    /// Optional correlation ID.
43    #[serde(skip_serializing_if = "Option::is_none")]
44    pub correlation_id: Option<CorrelationId>,
45}
46
47/// Where the input originated.
48#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(tag = "type", rename_all = "snake_case")]
50#[non_exhaustive]
51pub enum InputOrigin {
52    /// Human operator / external API caller.
53    Operator,
54    /// Peer agent (comms).
55    Peer {
56        peer_id: String,
57        #[serde(skip_serializing_if = "Option::is_none")]
58        runtime_id: Option<LogicalRuntimeId>,
59    },
60    /// Flow engine (mob orchestration).
61    Flow { flow_id: String, step_index: usize },
62    /// System-generated (compaction, projection, etc.).
63    System,
64    /// External event source.
65    External { source_name: String },
66}
67
68/// Durability requirement for an input.
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
70#[serde(rename_all = "snake_case")]
71#[non_exhaustive]
72pub enum InputDurability {
73    /// Must be persisted before acknowledgment.
74    Durable,
75    /// In-memory only, may be lost on crash.
76    Ephemeral,
77    /// Derived from other inputs (can be reconstructed).
78    Derived,
79}
80
81/// Visibility controls for an input.
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
83pub struct InputVisibility {
84    /// Whether this input appears in the conversation transcript.
85    pub transcript_eligible: bool,
86    /// Whether this input is visible to operator surfaces.
87    pub operator_eligible: bool,
88}
89
90impl Default for InputVisibility {
91    fn default() -> Self {
92        Self {
93            transcript_eligible: true,
94            operator_eligible: true,
95        }
96    }
97}
98
99/// The 6 input variants accepted by the runtime layer.
100#[derive(Debug, Clone, Serialize, Deserialize)]
101#[serde(tag = "input_type", rename_all = "snake_case")]
102#[non_exhaustive]
103pub enum Input {
104    /// User/operator prompt.
105    Prompt(PromptInput),
106    /// Peer-originated input (comms).
107    Peer(PeerInput),
108    /// Flow step input (mob orchestration).
109    FlowStep(FlowStepInput),
110    /// External event input.
111    ExternalEvent(ExternalEventInput),
112    /// Explicit runtime continuation work.
113    #[serde(alias = "system_generated")]
114    Continuation(ContinuationInput),
115    /// Explicit non-content operation/lifecycle input.
116    #[serde(alias = "projected")]
117    Operation(OperationInput),
118}
119
120impl Input {
121    /// Get the input header.
122    pub fn header(&self) -> &InputHeader {
123        match self {
124            Input::Prompt(i) => &i.header,
125            Input::Peer(i) => &i.header,
126            Input::FlowStep(i) => &i.header,
127            Input::ExternalEvent(i) => &i.header,
128            Input::Continuation(i) => &i.header,
129            Input::Operation(i) => &i.header,
130        }
131    }
132
133    /// Get the input ID.
134    pub fn id(&self) -> &InputId {
135        &self.header().id
136    }
137
138    /// Get the kind ID for policy resolution.
139    pub fn kind_id(&self) -> KindId {
140        match self {
141            Input::Prompt(_) => KindId::new("prompt"),
142            Input::Peer(p) => match &p.convention {
143                Some(PeerConvention::Message) => KindId::new("peer_message"),
144                Some(PeerConvention::Request { .. }) => KindId::new("peer_request"),
145                Some(PeerConvention::ResponseProgress { .. }) => {
146                    KindId::new("peer_response_progress")
147                }
148                Some(PeerConvention::ResponseTerminal { .. }) => {
149                    KindId::new("peer_response_terminal")
150                }
151                None => KindId::new("peer_message"),
152            },
153            Input::FlowStep(_) => KindId::new("flow_step"),
154            Input::ExternalEvent(_) => KindId::new("external_event"),
155            Input::Continuation(_) => KindId::new("continuation"),
156            Input::Operation(_) => KindId::new("operation"),
157        }
158    }
159
160    /// Handling-mode hint for ordinary work admitted through the runtime.
161    pub fn handling_mode(&self) -> Option<HandlingMode> {
162        match self {
163            Input::Prompt(prompt) => prompt.turn_metadata.as_ref()?.handling_mode,
164            Input::FlowStep(flow_step) => flow_step.turn_metadata.as_ref()?.handling_mode,
165            Input::ExternalEvent(event) => Some(event.handling_mode),
166            Input::Continuation(continuation) => Some(continuation.handling_mode),
167            _ => None,
168        }
169    }
170}
171
172fn migrate_legacy_payload_blocks(event: &mut ExternalEventInput) -> Result<(), BlobStoreError> {
173    let Some(obj) = event.payload.as_object_mut() else {
174        return Ok(());
175    };
176    let Some(blocks_value) = obj.remove("blocks") else {
177        return Ok(());
178    };
179    if event.blocks.is_some() {
180        return Ok(());
181    }
182    let blocks = serde_json::from_value::<Vec<meerkat_core::types::ContentBlock>>(blocks_value)
183        .map_err(|err| {
184            BlobStoreError::Internal(format!("failed to decode payload blocks: {err}"))
185        })?;
186    event.blocks = Some(blocks);
187    Ok(())
188}
189
190pub async fn externalize_input_images(
191    blob_store: &dyn BlobStore,
192    input: &mut Input,
193) -> Result<(), BlobStoreError> {
194    match input {
195        Input::Prompt(prompt) => {
196            if let Some(blocks) = prompt.blocks.as_mut() {
197                externalize_content_blocks(blob_store, blocks).await?;
198            }
199        }
200        Input::Peer(peer) => {
201            if let Some(blocks) = peer.blocks.as_mut() {
202                externalize_content_blocks(blob_store, blocks).await?;
203            }
204        }
205        Input::FlowStep(flow_step) => {
206            if let Some(blocks) = flow_step.blocks.as_mut() {
207                externalize_content_blocks(blob_store, blocks).await?;
208            }
209        }
210        Input::ExternalEvent(event) => {
211            migrate_legacy_payload_blocks(event)?;
212            if let Some(blocks) = event.blocks.as_mut() {
213                externalize_content_blocks(blob_store, blocks).await?;
214            }
215        }
216        Input::Continuation(_) | Input::Operation(_) => {}
217    }
218    Ok(())
219}
220
221pub async fn hydrate_input_images(
222    blob_store: &dyn BlobStore,
223    input: &mut Input,
224    missing_behavior: MissingBlobBehavior,
225) -> Result<(), BlobStoreError> {
226    match input {
227        Input::Prompt(prompt) => {
228            if let Some(blocks) = prompt.blocks.as_mut() {
229                hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
230            }
231        }
232        Input::Peer(peer) => {
233            if let Some(blocks) = peer.blocks.as_mut() {
234                hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
235            }
236        }
237        Input::FlowStep(flow_step) => {
238            if let Some(blocks) = flow_step.blocks.as_mut() {
239                hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
240            }
241        }
242        Input::ExternalEvent(event) => {
243            migrate_legacy_payload_blocks(event)?;
244            if let Some(blocks) = event.blocks.as_mut() {
245                hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
246            }
247        }
248        Input::Continuation(_) | Input::Operation(_) => {}
249    }
250    Ok(())
251}
252
253/// User/operator prompt input.
254#[derive(Debug, Clone, Serialize, Deserialize)]
255pub struct PromptInput {
256    pub header: InputHeader,
257    /// The prompt text.
258    pub text: String,
259    /// Optional multimodal content blocks. When present, `text` serves as the
260    /// text projection (backwards compat), and `blocks` carries the full content.
261    #[serde(default, skip_serializing_if = "Option::is_none")]
262    pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
263    #[serde(default, skip_serializing_if = "Option::is_none")]
264    pub turn_metadata: Option<RuntimeTurnMetadata>,
265}
266
267impl PromptInput {
268    /// Create a new operator prompt with default header.
269    pub fn new(text: impl Into<String>, turn_metadata: Option<RuntimeTurnMetadata>) -> Self {
270        Self {
271            header: InputHeader {
272                id: meerkat_core::lifecycle::InputId::new(),
273                timestamp: chrono::Utc::now(),
274                source: InputOrigin::Operator,
275                durability: InputDurability::Durable,
276                visibility: InputVisibility::default(),
277                idempotency_key: None,
278                supersession_key: None,
279                correlation_id: None,
280            },
281            text: text.into(),
282            blocks: None,
283            turn_metadata,
284        }
285    }
286
287    /// Create a multimodal prompt from `ContentInput`.
288    pub fn from_content_input(
289        input: meerkat_core::types::ContentInput,
290        turn_metadata: Option<RuntimeTurnMetadata>,
291    ) -> Self {
292        let text = input.text_content();
293        let blocks = if input.has_images() {
294            Some(input.into_blocks())
295        } else {
296            None
297        };
298        Self {
299            header: InputHeader {
300                id: meerkat_core::lifecycle::InputId::new(),
301                timestamp: chrono::Utc::now(),
302                source: InputOrigin::Operator,
303                durability: InputDurability::Durable,
304                visibility: InputVisibility::default(),
305                idempotency_key: None,
306                supersession_key: None,
307                correlation_id: None,
308            },
309            text,
310            blocks,
311            turn_metadata,
312        }
313    }
314}
315
316/// Peer-originated input from comms.
317#[derive(Debug, Clone, Serialize, Deserialize)]
318pub struct PeerInput {
319    pub header: InputHeader,
320    /// The peer convention (message, request, response).
321    #[serde(skip_serializing_if = "Option::is_none")]
322    pub convention: Option<PeerConvention>,
323    /// LLM-facing rendered text projection for this peer input.
324    pub body: String,
325    /// Optional multimodal content blocks. When present, `body` serves as the
326    /// text projection (backwards compat), and `blocks` carries the full content.
327    #[serde(default, skip_serializing_if = "Option::is_none")]
328    pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
329}
330
331/// Peer communication conventions.
332#[derive(Debug, Clone, Serialize, Deserialize)]
333#[serde(tag = "convention_type", rename_all = "snake_case")]
334#[non_exhaustive]
335pub enum PeerConvention {
336    /// Simple peer-to-peer message.
337    Message,
338    /// Request expecting a response.
339    Request { request_id: String, intent: String },
340    /// Progress update for an ongoing response.
341    ResponseProgress {
342        request_id: String,
343        phase: ResponseProgressPhase,
344    },
345    /// Terminal response (completed or failed).
346    ResponseTerminal {
347        request_id: String,
348        status: ResponseTerminalStatus,
349    },
350}
351
352/// Phase of a response progress update.
353#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
354#[serde(rename_all = "snake_case")]
355#[non_exhaustive]
356pub enum ResponseProgressPhase {
357    /// Request was accepted.
358    Accepted,
359    /// Work is in progress.
360    InProgress,
361    /// Partial result available.
362    PartialResult,
363}
364
365/// Terminal status of a response.
366#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
367#[serde(rename_all = "snake_case")]
368#[non_exhaustive]
369pub enum ResponseTerminalStatus {
370    /// Request completed successfully.
371    Completed,
372    /// Request failed.
373    Failed,
374    /// Request was cancelled.
375    Cancelled,
376}
377
378/// Flow step input from mob orchestration.
379#[derive(Debug, Clone, Serialize, Deserialize)]
380pub struct FlowStepInput {
381    pub header: InputHeader,
382    /// Flow step identifier.
383    pub step_id: String,
384    /// Step instructions/prompt.
385    pub instructions: String,
386    /// Optional multimodal content blocks. When present, `instructions` serves
387    /// as the text projection (backwards compat), and `blocks` carries the
388    /// full content.
389    #[serde(default, skip_serializing_if = "Option::is_none")]
390    pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
391    #[serde(default, skip_serializing_if = "Option::is_none")]
392    pub turn_metadata: Option<RuntimeTurnMetadata>,
393}
394
395/// External event input.
396#[derive(Debug, Clone, Serialize, Deserialize)]
397pub struct ExternalEventInput {
398    pub header: InputHeader,
399    /// Event type/name.
400    pub event_type: String,
401    /// Event payload. Uses `Value` because the runtime layer may inspect/merge
402    /// payloads during coalescing and projection — not a pure pass-through.
403    /// Multimodal content does NOT live here canonically; use `blocks`.
404    pub payload: serde_json::Value,
405    /// Optional multimodal blocks carried by the external event. This is the
406    /// canonical owner for multimodal external-event content.
407    #[serde(default, skip_serializing_if = "Option::is_none")]
408    pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
409    /// Runtime-owned handling hint for this external event.
410    #[serde(default)]
411    pub handling_mode: HandlingMode,
412    /// Optional normalized render metadata carried with the event.
413    #[serde(default, skip_serializing_if = "Option::is_none")]
414    pub render_metadata: Option<RenderMetadata>,
415}
416
417/// Explicit continuation request that asks the runtime to keep draining
418/// ordinary work after a boundary-local event (for example, terminal peer
419/// responses injected into session state).
420#[derive(Debug, Clone, Serialize, Deserialize)]
421pub struct ContinuationInput {
422    pub header: InputHeader,
423    /// Stable reason for the continuation request.
424    pub reason: String,
425    /// Ordinary-work handling mode for the continuation.
426    #[serde(default)]
427    pub handling_mode: HandlingMode,
428    /// Optional request/correlation handle tied to the continuation.
429    #[serde(default, skip_serializing_if = "Option::is_none")]
430    pub request_id: Option<String>,
431}
432
433impl ContinuationInput {
434    /// Build the common runtime-owned continuation used after terminal peer
435    /// response injection.
436    pub fn terminal_peer_response(reason: impl Into<String>) -> Self {
437        Self::terminal_peer_response_for_request(reason, None)
438    }
439
440    /// Build the common runtime-owned continuation used after terminal peer
441    /// response injection, preserving the correlated request when known.
442    pub fn terminal_peer_response_for_request(
443        reason: impl Into<String>,
444        request_id: Option<String>,
445    ) -> Self {
446        Self {
447            header: InputHeader {
448                id: meerkat_core::lifecycle::InputId::new(),
449                timestamp: chrono::Utc::now(),
450                source: InputOrigin::System,
451                durability: InputDurability::Ephemeral,
452                visibility: InputVisibility {
453                    transcript_eligible: false,
454                    operator_eligible: false,
455                },
456                idempotency_key: None,
457                supersession_key: None,
458                correlation_id: None,
459            },
460            reason: reason.into(),
461            handling_mode: HandlingMode::Steer,
462            request_id,
463        }
464    }
465}
466
467/// Explicit operation/lifecycle input admitted through runtime instead of
468/// being smuggled through transcript projections or peer-only paths.
469#[derive(Debug, Clone, Serialize, Deserialize)]
470pub struct OperationInput {
471    pub header: InputHeader,
472    /// Stable operation identifier.
473    pub operation_id: OperationId,
474    /// Typed lifecycle event for the operation.
475    pub event: OpEvent,
476}
477
478#[cfg(test)]
479#[allow(clippy::unwrap_used, clippy::panic)]
480mod tests {
481    use super::*;
482    use chrono::Utc;
483
484    fn make_header() -> InputHeader {
485        InputHeader {
486            id: InputId::new(),
487            timestamp: Utc::now(),
488            source: InputOrigin::Operator,
489            durability: InputDurability::Durable,
490            visibility: InputVisibility::default(),
491            idempotency_key: None,
492            supersession_key: None,
493            correlation_id: None,
494        }
495    }
496
497    #[test]
498    fn prompt_input_serde() {
499        let input = Input::Prompt(PromptInput {
500            header: make_header(),
501            text: "hello".into(),
502            blocks: None,
503            turn_metadata: None,
504        });
505        let json = serde_json::to_value(&input).unwrap();
506        assert_eq!(json["input_type"], "prompt");
507        let parsed: Input = serde_json::from_value(json).unwrap();
508        assert!(matches!(parsed, Input::Prompt(_)));
509    }
510
511    #[test]
512    fn peer_input_message_serde() {
513        let input = Input::Peer(PeerInput {
514            header: make_header(),
515            convention: Some(PeerConvention::Message),
516            body: "hi there".into(),
517            blocks: None,
518        });
519        let json = serde_json::to_value(&input).unwrap();
520        assert_eq!(json["input_type"], "peer");
521        let parsed: Input = serde_json::from_value(json).unwrap();
522        assert!(matches!(parsed, Input::Peer(_)));
523    }
524
525    #[test]
526    fn peer_input_request_serde() {
527        let input = Input::Peer(PeerInput {
528            header: make_header(),
529            convention: Some(PeerConvention::Request {
530                request_id: "req-1".into(),
531                intent: "mob.peer_added".into(),
532            }),
533            body: "Agent joined".into(),
534            blocks: None,
535        });
536        let json = serde_json::to_value(&input).unwrap();
537        let parsed: Input = serde_json::from_value(json).unwrap();
538        if let Input::Peer(p) = parsed {
539            assert!(matches!(p.convention, Some(PeerConvention::Request { .. })));
540        } else {
541            panic!("Expected PeerInput");
542        }
543    }
544
545    #[test]
546    fn peer_input_response_terminal_serde() {
547        let input = Input::Peer(PeerInput {
548            header: make_header(),
549            convention: Some(PeerConvention::ResponseTerminal {
550                request_id: "req-1".into(),
551                status: ResponseTerminalStatus::Completed,
552            }),
553            body: "Done".into(),
554            blocks: None,
555        });
556        let json = serde_json::to_value(&input).unwrap();
557        let parsed: Input = serde_json::from_value(json).unwrap();
558        assert!(matches!(parsed, Input::Peer(_)));
559    }
560
561    #[test]
562    fn peer_input_response_progress_serde() {
563        let input = Input::Peer(PeerInput {
564            header: make_header(),
565            convention: Some(PeerConvention::ResponseProgress {
566                request_id: "req-1".into(),
567                phase: ResponseProgressPhase::InProgress,
568            }),
569            body: "Working...".into(),
570            blocks: None,
571        });
572        let json = serde_json::to_value(&input).unwrap();
573        let parsed: Input = serde_json::from_value(json).unwrap();
574        assert!(matches!(parsed, Input::Peer(_)));
575    }
576
577    #[test]
578    fn flow_step_input_serde() {
579        let input = Input::FlowStep(FlowStepInput {
580            header: make_header(),
581            step_id: "step-1".into(),
582            instructions: "analyze the data".into(),
583            blocks: Some(vec![
584                meerkat_core::types::ContentBlock::Text {
585                    text: "analyze the data".into(),
586                },
587                meerkat_core::types::ContentBlock::Image {
588                    media_type: "image/png".into(),
589                    data: meerkat_core::types::ImageData::Inline {
590                        data: "abc123".into(),
591                    },
592                },
593            ]),
594            turn_metadata: None,
595        });
596        let json = serde_json::to_value(&input).unwrap();
597        assert_eq!(json["input_type"], "flow_step");
598        let parsed: Input = serde_json::from_value(json).unwrap();
599        assert!(matches!(parsed, Input::FlowStep(_)));
600    }
601
602    #[test]
603    fn external_event_input_serde() {
604        let input = Input::ExternalEvent(ExternalEventInput {
605            header: make_header(),
606            event_type: "webhook.received".into(),
607            payload: serde_json::json!({"url": "https://example.com"}),
608            blocks: Some(vec![
609                meerkat_core::types::ContentBlock::Text {
610                    text: "look".into(),
611                },
612                meerkat_core::types::ContentBlock::Image {
613                    media_type: "image/png".into(),
614                    data: meerkat_core::types::ImageData::Inline {
615                        data: "abc123".into(),
616                    },
617                },
618            ]),
619            handling_mode: HandlingMode::Queue,
620            render_metadata: None,
621        });
622        let json = serde_json::to_value(&input).unwrap();
623        assert_eq!(json["input_type"], "external_event");
624        let parsed: Input = serde_json::from_value(json).unwrap();
625        assert!(matches!(parsed, Input::ExternalEvent(_)));
626    }
627
628    #[test]
629    fn legacy_external_event_payload_blocks_migrate_to_canonical_blocks_owner() {
630        let mut input = Input::ExternalEvent(ExternalEventInput {
631            header: make_header(),
632            event_type: "webhook.received".into(),
633            payload: serde_json::json!({
634                "body": "see image",
635                "blocks": [
636                    { "type": "text", "text": "caption text" },
637                    { "type": "image", "media_type": "image/png", "source": "inline", "data": "abc123" }
638                ]
639            }),
640            blocks: None,
641            handling_mode: HandlingMode::Queue,
642            render_metadata: None,
643        });
644
645        match &mut input {
646            Input::ExternalEvent(event) => {
647                migrate_legacy_payload_blocks(event).unwrap();
648                assert!(event.payload.get("blocks").is_none());
649                assert_eq!(event.payload["body"], "see image");
650                assert_eq!(event.blocks.as_ref().map(Vec::len), Some(2));
651            }
652            other => panic!("Expected ExternalEvent, got {other:?}"),
653        }
654    }
655
656    #[test]
657    fn continuation_input_serde() {
658        let input = Input::Continuation(ContinuationInput::terminal_peer_response_for_request(
659            "terminal peer response",
660            Some("req-1".into()),
661        ));
662        let json = serde_json::to_value(&input).unwrap();
663        assert_eq!(json["input_type"], "continuation");
664        assert_eq!(json["request_id"], "req-1");
665        let parsed: Input = serde_json::from_value(json).unwrap();
666        match parsed {
667            Input::Continuation(continuation) => {
668                assert_eq!(continuation.request_id.as_deref(), Some("req-1"));
669                assert_eq!(continuation.handling_mode, HandlingMode::Steer);
670            }
671            other => panic!("Expected Continuation, got {other:?}"),
672        }
673    }
674
675    #[test]
676    fn continuation_input_accepts_legacy_system_generated_tag() {
677        let input = Input::Continuation(ContinuationInput::terminal_peer_response_for_request(
678            "legacy system generated",
679            Some("req-legacy".into()),
680        ));
681        let mut json = serde_json::to_value(&input).unwrap();
682        json["input_type"] = serde_json::Value::String("system_generated".into());
683        let parsed: Input = serde_json::from_value(json).unwrap();
684        match parsed {
685            Input::Continuation(continuation) => {
686                assert_eq!(continuation.request_id.as_deref(), Some("req-legacy"));
687            }
688            other => panic!("Expected Continuation, got {other:?}"),
689        }
690    }
691
692    #[test]
693    fn operation_input_serde() {
694        let input = Input::Operation(OperationInput {
695            header: InputHeader {
696                durability: InputDurability::Derived,
697                ..make_header()
698            },
699            operation_id: OperationId::new(),
700            event: OpEvent::Cancelled {
701                id: OperationId::new(),
702            },
703        });
704        let json = serde_json::to_value(&input).unwrap();
705        assert_eq!(json["input_type"], "operation");
706        let parsed: Input = serde_json::from_value(json).unwrap();
707        assert!(matches!(parsed, Input::Operation(_)));
708    }
709
710    #[test]
711    fn operation_input_accepts_legacy_projected_tag() {
712        let input = Input::Operation(OperationInput {
713            header: InputHeader {
714                durability: InputDurability::Derived,
715                ..make_header()
716            },
717            operation_id: OperationId::new(),
718            event: OpEvent::Cancelled {
719                id: OperationId::new(),
720            },
721        });
722        let mut json = serde_json::to_value(&input).unwrap();
723        json["input_type"] = serde_json::Value::String("projected".into());
724        let parsed: Input = serde_json::from_value(json).unwrap();
725        assert!(matches!(parsed, Input::Operation(_)));
726    }
727
728    #[test]
729    fn input_kind_id() {
730        let prompt = Input::Prompt(PromptInput {
731            header: make_header(),
732            text: "hi".into(),
733            blocks: None,
734            turn_metadata: None,
735        });
736        assert_eq!(prompt.kind_id().0, "prompt");
737
738        let peer_msg = Input::Peer(PeerInput {
739            header: make_header(),
740            convention: Some(PeerConvention::Message),
741            body: "hi".into(),
742            blocks: None,
743        });
744        assert_eq!(peer_msg.kind_id().0, "peer_message");
745
746        let peer_req = Input::Peer(PeerInput {
747            header: make_header(),
748            convention: Some(PeerConvention::Request {
749                request_id: "r".into(),
750                intent: "i".into(),
751            }),
752            body: "hi".into(),
753            blocks: None,
754        });
755        assert_eq!(peer_req.kind_id().0, "peer_request");
756
757        let continuation = Input::Continuation(ContinuationInput {
758            header: make_header(),
759            reason: "continue".into(),
760            handling_mode: HandlingMode::Steer,
761            request_id: None,
762        });
763        assert_eq!(continuation.kind_id().0, "continuation");
764
765        let operation = Input::Operation(OperationInput {
766            header: make_header(),
767            operation_id: OperationId::new(),
768            event: OpEvent::Cancelled {
769                id: OperationId::new(),
770            },
771        });
772        assert_eq!(operation.kind_id().0, "operation");
773    }
774
775    #[test]
776    fn input_source_variants() {
777        let sources = vec![
778            InputOrigin::Operator,
779            InputOrigin::Peer {
780                peer_id: "p1".into(),
781                runtime_id: None,
782            },
783            InputOrigin::Flow {
784                flow_id: "f1".into(),
785                step_index: 0,
786            },
787            InputOrigin::System,
788            InputOrigin::External {
789                source_name: "webhook".into(),
790            },
791        ];
792        for source in sources {
793            let json = serde_json::to_value(&source).unwrap();
794            let parsed: InputOrigin = serde_json::from_value(json).unwrap();
795            assert_eq!(source, parsed);
796        }
797    }
798
799    #[test]
800    fn input_durability_serde() {
801        for d in [
802            InputDurability::Durable,
803            InputDurability::Ephemeral,
804            InputDurability::Derived,
805        ] {
806            let json = serde_json::to_value(d).unwrap();
807            let parsed: InputDurability = serde_json::from_value(json).unwrap();
808            assert_eq!(d, parsed);
809        }
810    }
811}