Skip to main content

meerkat_runtime/
input.rs

1//! §8 Input types — the 6 input variants accepted by the runtime layer.
2//!
3//! Core never sees these. The runtime's policy table resolves each Input
4//! to a PolicyDecision, then the runtime translates accepted Inputs into
5//! RunPrimitive for core consumption.
6
7use chrono::{DateTime, Utc};
8use meerkat_core::lifecycle::InputId;
9use meerkat_core::lifecycle::run_primitive::RuntimeTurnMetadata;
10use serde::{Deserialize, Serialize};
11
12use crate::identifiers::{
13    CorrelationId, IdempotencyKey, KindId, LogicalRuntimeId, SupersessionKey,
14};
15
16/// Common header for all input variants.
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct InputHeader {
19    /// Unique ID for this input.
20    pub id: InputId,
21    /// When the input was created.
22    pub timestamp: DateTime<Utc>,
23    /// Source of the input.
24    pub source: InputOrigin,
25    /// Durability requirement.
26    pub durability: InputDurability,
27    /// Visibility controls.
28    pub visibility: InputVisibility,
29    /// Optional idempotency key for dedup.
30    #[serde(skip_serializing_if = "Option::is_none")]
31    pub idempotency_key: Option<IdempotencyKey>,
32    /// Optional supersession key.
33    #[serde(skip_serializing_if = "Option::is_none")]
34    pub supersession_key: Option<SupersessionKey>,
35    /// Optional correlation ID.
36    #[serde(skip_serializing_if = "Option::is_none")]
37    pub correlation_id: Option<CorrelationId>,
38}
39
40/// Where the input originated.
41#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
42#[serde(tag = "type", rename_all = "snake_case")]
43#[non_exhaustive]
44pub enum InputOrigin {
45    /// Human operator / external API caller.
46    Operator,
47    /// Peer agent (comms).
48    Peer {
49        peer_id: String,
50        #[serde(skip_serializing_if = "Option::is_none")]
51        runtime_id: Option<LogicalRuntimeId>,
52    },
53    /// Flow engine (mob orchestration).
54    Flow { flow_id: String, step_index: usize },
55    /// System-generated (compaction, projection, etc.).
56    System,
57    /// External event source.
58    External { source_name: String },
59}
60
61/// Durability requirement for an input.
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
63#[serde(rename_all = "snake_case")]
64#[non_exhaustive]
65pub enum InputDurability {
66    /// Must be persisted before acknowledgment.
67    Durable,
68    /// In-memory only, may be lost on crash.
69    Ephemeral,
70    /// Derived from other inputs (can be reconstructed).
71    Derived,
72}
73
74/// Visibility controls for an input.
75#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
76pub struct InputVisibility {
77    /// Whether this input appears in the conversation transcript.
78    pub transcript_eligible: bool,
79    /// Whether this input is visible to operator surfaces.
80    pub operator_eligible: bool,
81}
82
83impl Default for InputVisibility {
84    fn default() -> Self {
85        Self {
86            transcript_eligible: true,
87            operator_eligible: true,
88        }
89    }
90}
91
92/// The 6 input variants accepted by the runtime layer.
93#[derive(Debug, Clone, Serialize, Deserialize)]
94#[serde(tag = "input_type", rename_all = "snake_case")]
95#[non_exhaustive]
96pub enum Input {
97    /// User/operator prompt.
98    Prompt(PromptInput),
99    /// Peer-originated input (comms).
100    Peer(PeerInput),
101    /// Flow step input (mob orchestration).
102    FlowStep(FlowStepInput),
103    /// External event input.
104    ExternalEvent(ExternalEventInput),
105    /// System-generated input (compaction, etc.).
106    SystemGenerated(SystemGeneratedInput),
107    /// Projection-derived input.
108    Projected(ProjectedInput),
109}
110
111impl Input {
112    /// Get the input header.
113    pub fn header(&self) -> &InputHeader {
114        match self {
115            Input::Prompt(i) => &i.header,
116            Input::Peer(i) => &i.header,
117            Input::FlowStep(i) => &i.header,
118            Input::ExternalEvent(i) => &i.header,
119            Input::SystemGenerated(i) => &i.header,
120            Input::Projected(i) => &i.header,
121        }
122    }
123
124    /// Get the input ID.
125    pub fn id(&self) -> &InputId {
126        &self.header().id
127    }
128
129    /// Get the kind ID for policy resolution.
130    pub fn kind_id(&self) -> KindId {
131        match self {
132            Input::Prompt(_) => KindId::new("prompt"),
133            Input::Peer(p) => match &p.convention {
134                Some(PeerConvention::Message) => KindId::new("peer_message"),
135                Some(PeerConvention::Request { .. }) => KindId::new("peer_request"),
136                Some(PeerConvention::ResponseProgress { .. }) => {
137                    KindId::new("peer_response_progress")
138                }
139                Some(PeerConvention::ResponseTerminal { .. }) => {
140                    KindId::new("peer_response_terminal")
141                }
142                None => KindId::new("peer_message"),
143            },
144            Input::FlowStep(_) => KindId::new("flow_step"),
145            Input::ExternalEvent(_) => KindId::new("external_event"),
146            Input::SystemGenerated(_) => KindId::new("system_generated"),
147            Input::Projected(_) => KindId::new("projected"),
148        }
149    }
150}
151
152/// User/operator prompt input.
153#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct PromptInput {
155    pub header: InputHeader,
156    /// The prompt text.
157    pub text: String,
158    /// Optional multimodal content blocks. When present, `text` serves as the
159    /// text projection (backwards compat), and `blocks` carries the full content.
160    #[serde(default, skip_serializing_if = "Option::is_none")]
161    pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
162    #[serde(default, skip_serializing_if = "Option::is_none")]
163    pub turn_metadata: Option<RuntimeTurnMetadata>,
164}
165
166impl PromptInput {
167    /// Create a new operator prompt with default header.
168    pub fn new(text: impl Into<String>, turn_metadata: Option<RuntimeTurnMetadata>) -> Self {
169        Self {
170            header: InputHeader {
171                id: meerkat_core::lifecycle::InputId::new(),
172                timestamp: chrono::Utc::now(),
173                source: InputOrigin::Operator,
174                durability: InputDurability::Durable,
175                visibility: InputVisibility::default(),
176                idempotency_key: None,
177                supersession_key: None,
178                correlation_id: None,
179            },
180            text: text.into(),
181            blocks: None,
182            turn_metadata,
183        }
184    }
185
186    /// Create a multimodal prompt from `ContentInput`.
187    pub fn from_content_input(
188        input: meerkat_core::types::ContentInput,
189        turn_metadata: Option<RuntimeTurnMetadata>,
190    ) -> Self {
191        let text = input.text_content();
192        let blocks = if input.has_images() {
193            Some(input.into_blocks())
194        } else {
195            None
196        };
197        Self {
198            header: InputHeader {
199                id: meerkat_core::lifecycle::InputId::new(),
200                timestamp: chrono::Utc::now(),
201                source: InputOrigin::Operator,
202                durability: InputDurability::Durable,
203                visibility: InputVisibility::default(),
204                idempotency_key: None,
205                supersession_key: None,
206                correlation_id: None,
207            },
208            text,
209            blocks,
210            turn_metadata,
211        }
212    }
213}
214
215/// Peer-originated input from comms.
216#[derive(Debug, Clone, Serialize, Deserialize)]
217pub struct PeerInput {
218    pub header: InputHeader,
219    /// The peer convention (message, request, response).
220    #[serde(skip_serializing_if = "Option::is_none")]
221    pub convention: Option<PeerConvention>,
222    /// Message body.
223    pub body: String,
224    /// Optional multimodal content blocks. When present, `body` serves as the
225    /// text projection (backwards compat), and `blocks` carries the full content.
226    #[serde(default, skip_serializing_if = "Option::is_none")]
227    pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
228}
229
230/// Peer communication conventions.
231#[derive(Debug, Clone, Serialize, Deserialize)]
232#[serde(tag = "convention_type", rename_all = "snake_case")]
233#[non_exhaustive]
234pub enum PeerConvention {
235    /// Simple peer-to-peer message.
236    Message,
237    /// Request expecting a response.
238    Request { request_id: String, intent: String },
239    /// Progress update for an ongoing response.
240    ResponseProgress {
241        request_id: String,
242        phase: ResponseProgressPhase,
243    },
244    /// Terminal response (completed or failed).
245    ResponseTerminal {
246        request_id: String,
247        status: ResponseTerminalStatus,
248    },
249}
250
251/// Phase of a response progress update.
252#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
253#[serde(rename_all = "snake_case")]
254#[non_exhaustive]
255pub enum ResponseProgressPhase {
256    /// Request was accepted.
257    Accepted,
258    /// Work is in progress.
259    InProgress,
260    /// Partial result available.
261    PartialResult,
262}
263
264/// Terminal status of a response.
265#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
266#[serde(rename_all = "snake_case")]
267#[non_exhaustive]
268pub enum ResponseTerminalStatus {
269    /// Request completed successfully.
270    Completed,
271    /// Request failed.
272    Failed,
273    /// Request was cancelled.
274    Cancelled,
275}
276
277/// Flow step input from mob orchestration.
278#[derive(Debug, Clone, Serialize, Deserialize)]
279pub struct FlowStepInput {
280    pub header: InputHeader,
281    /// Flow step identifier.
282    pub step_id: String,
283    /// Step instructions/prompt.
284    pub instructions: String,
285    #[serde(default, skip_serializing_if = "Option::is_none")]
286    pub turn_metadata: Option<RuntimeTurnMetadata>,
287}
288
289/// External event input.
290#[derive(Debug, Clone, Serialize, Deserialize)]
291pub struct ExternalEventInput {
292    pub header: InputHeader,
293    /// Event type/name.
294    pub event_type: String,
295    /// Event payload. Uses `Value` because the runtime layer may inspect/merge
296    /// payloads during coalescing and projection — not a pure pass-through.
297    pub payload: serde_json::Value,
298}
299
300/// System-generated input (e.g., compaction summary).
301#[derive(Debug, Clone, Serialize, Deserialize)]
302pub struct SystemGeneratedInput {
303    pub header: InputHeader,
304    /// What generated this input.
305    pub generator: String,
306    /// Content.
307    pub content: String,
308}
309
310/// Projection-derived input (generated from RuntimeEvent by projection rules).
311#[derive(Debug, Clone, Serialize, Deserialize)]
312pub struct ProjectedInput {
313    pub header: InputHeader,
314    /// The projection rule that created this.
315    pub rule_id: String,
316    /// Source event ID.
317    pub source_event_id: String,
318    /// Projected content.
319    pub content: String,
320}
321
322#[cfg(test)]
323#[allow(clippy::unwrap_used, clippy::panic)]
324mod tests {
325    use super::*;
326    use chrono::Utc;
327
328    fn make_header() -> InputHeader {
329        InputHeader {
330            id: InputId::new(),
331            timestamp: Utc::now(),
332            source: InputOrigin::Operator,
333            durability: InputDurability::Durable,
334            visibility: InputVisibility::default(),
335            idempotency_key: None,
336            supersession_key: None,
337            correlation_id: None,
338        }
339    }
340
341    #[test]
342    fn prompt_input_serde() {
343        let input = Input::Prompt(PromptInput {
344            header: make_header(),
345            text: "hello".into(),
346            blocks: None,
347            turn_metadata: None,
348        });
349        let json = serde_json::to_value(&input).unwrap();
350        assert_eq!(json["input_type"], "prompt");
351        let parsed: Input = serde_json::from_value(json).unwrap();
352        assert!(matches!(parsed, Input::Prompt(_)));
353    }
354
355    #[test]
356    fn peer_input_message_serde() {
357        let input = Input::Peer(PeerInput {
358            header: make_header(),
359            convention: Some(PeerConvention::Message),
360            body: "hi there".into(),
361            blocks: None,
362        });
363        let json = serde_json::to_value(&input).unwrap();
364        assert_eq!(json["input_type"], "peer");
365        let parsed: Input = serde_json::from_value(json).unwrap();
366        assert!(matches!(parsed, Input::Peer(_)));
367    }
368
369    #[test]
370    fn peer_input_request_serde() {
371        let input = Input::Peer(PeerInput {
372            header: make_header(),
373            convention: Some(PeerConvention::Request {
374                request_id: "req-1".into(),
375                intent: "mob.peer_added".into(),
376            }),
377            body: "Agent joined".into(),
378            blocks: None,
379        });
380        let json = serde_json::to_value(&input).unwrap();
381        let parsed: Input = serde_json::from_value(json).unwrap();
382        if let Input::Peer(p) = parsed {
383            assert!(matches!(p.convention, Some(PeerConvention::Request { .. })));
384        } else {
385            panic!("Expected PeerInput");
386        }
387    }
388
389    #[test]
390    fn peer_input_response_terminal_serde() {
391        let input = Input::Peer(PeerInput {
392            header: make_header(),
393            convention: Some(PeerConvention::ResponseTerminal {
394                request_id: "req-1".into(),
395                status: ResponseTerminalStatus::Completed,
396            }),
397            body: "Done".into(),
398            blocks: None,
399        });
400        let json = serde_json::to_value(&input).unwrap();
401        let parsed: Input = serde_json::from_value(json).unwrap();
402        assert!(matches!(parsed, Input::Peer(_)));
403    }
404
405    #[test]
406    fn peer_input_response_progress_serde() {
407        let input = Input::Peer(PeerInput {
408            header: make_header(),
409            convention: Some(PeerConvention::ResponseProgress {
410                request_id: "req-1".into(),
411                phase: ResponseProgressPhase::InProgress,
412            }),
413            body: "Working...".into(),
414            blocks: None,
415        });
416        let json = serde_json::to_value(&input).unwrap();
417        let parsed: Input = serde_json::from_value(json).unwrap();
418        assert!(matches!(parsed, Input::Peer(_)));
419    }
420
421    #[test]
422    fn flow_step_input_serde() {
423        let input = Input::FlowStep(FlowStepInput {
424            header: make_header(),
425            step_id: "step-1".into(),
426            instructions: "analyze the data".into(),
427            turn_metadata: None,
428        });
429        let json = serde_json::to_value(&input).unwrap();
430        assert_eq!(json["input_type"], "flow_step");
431        let parsed: Input = serde_json::from_value(json).unwrap();
432        assert!(matches!(parsed, Input::FlowStep(_)));
433    }
434
435    #[test]
436    fn external_event_input_serde() {
437        let input = Input::ExternalEvent(ExternalEventInput {
438            header: make_header(),
439            event_type: "webhook.received".into(),
440            payload: serde_json::json!({"url": "https://example.com"}),
441        });
442        let json = serde_json::to_value(&input).unwrap();
443        assert_eq!(json["input_type"], "external_event");
444        let parsed: Input = serde_json::from_value(json).unwrap();
445        assert!(matches!(parsed, Input::ExternalEvent(_)));
446    }
447
448    #[test]
449    fn system_generated_input_serde() {
450        let input = Input::SystemGenerated(SystemGeneratedInput {
451            header: make_header(),
452            generator: "compactor".into(),
453            content: "summary text".into(),
454        });
455        let json = serde_json::to_value(&input).unwrap();
456        assert_eq!(json["input_type"], "system_generated");
457        let parsed: Input = serde_json::from_value(json).unwrap();
458        assert!(matches!(parsed, Input::SystemGenerated(_)));
459    }
460
461    #[test]
462    fn projected_input_serde() {
463        let input = Input::Projected(ProjectedInput {
464            header: InputHeader {
465                durability: InputDurability::Derived,
466                ..make_header()
467            },
468            rule_id: "rule-1".into(),
469            source_event_id: "evt-1".into(),
470            content: "projected content".into(),
471        });
472        let json = serde_json::to_value(&input).unwrap();
473        assert_eq!(json["input_type"], "projected");
474        let parsed: Input = serde_json::from_value(json).unwrap();
475        assert!(matches!(parsed, Input::Projected(_)));
476    }
477
478    #[test]
479    fn input_kind_id() {
480        let prompt = Input::Prompt(PromptInput {
481            header: make_header(),
482            text: "hi".into(),
483            blocks: None,
484            turn_metadata: None,
485        });
486        assert_eq!(prompt.kind_id().0, "prompt");
487
488        let peer_msg = Input::Peer(PeerInput {
489            header: make_header(),
490            convention: Some(PeerConvention::Message),
491            body: "hi".into(),
492            blocks: None,
493        });
494        assert_eq!(peer_msg.kind_id().0, "peer_message");
495
496        let peer_req = Input::Peer(PeerInput {
497            header: make_header(),
498            convention: Some(PeerConvention::Request {
499                request_id: "r".into(),
500                intent: "i".into(),
501            }),
502            body: "hi".into(),
503            blocks: None,
504        });
505        assert_eq!(peer_req.kind_id().0, "peer_request");
506    }
507
508    #[test]
509    fn input_source_variants() {
510        let sources = vec![
511            InputOrigin::Operator,
512            InputOrigin::Peer {
513                peer_id: "p1".into(),
514                runtime_id: None,
515            },
516            InputOrigin::Flow {
517                flow_id: "f1".into(),
518                step_index: 0,
519            },
520            InputOrigin::System,
521            InputOrigin::External {
522                source_name: "webhook".into(),
523            },
524        ];
525        for source in sources {
526            let json = serde_json::to_value(&source).unwrap();
527            let parsed: InputOrigin = serde_json::from_value(json).unwrap();
528            assert_eq!(source, parsed);
529        }
530    }
531
532    #[test]
533    fn input_durability_serde() {
534        for d in [
535            InputDurability::Durable,
536            InputDurability::Ephemeral,
537            InputDurability::Derived,
538        ] {
539            let json = serde_json::to_value(d).unwrap();
540            let parsed: InputDurability = serde_json::from_value(json).unwrap();
541            assert_eq!(d, parsed);
542        }
543    }
544}