Skip to main content

meerkat_runtime/
input.rs

1//! §8 Input types — the 6 input variants accepted by the runtime layer.
2//!
3//! Core never sees these. The runtime's policy table resolves each Input
4//! to a PolicyDecision, then the runtime translates accepted Inputs into
5//! RunPrimitive for core consumption.
6
7use chrono::{DateTime, Utc};
8use meerkat_core::lifecycle::InputId;
9use meerkat_core::lifecycle::run_primitive::RuntimeTurnMetadata;
10use serde::{Deserialize, Serialize};
11
12use crate::identifiers::{
13    CorrelationId, IdempotencyKey, KindId, LogicalRuntimeId, SupersessionKey,
14};
15
16/// Common header for all input variants.
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct InputHeader {
19    /// Unique ID for this input.
20    pub id: InputId,
21    /// When the input was created.
22    pub timestamp: DateTime<Utc>,
23    /// Source of the input.
24    pub source: InputOrigin,
25    /// Durability requirement.
26    pub durability: InputDurability,
27    /// Visibility controls.
28    pub visibility: InputVisibility,
29    /// Optional idempotency key for dedup.
30    #[serde(skip_serializing_if = "Option::is_none")]
31    pub idempotency_key: Option<IdempotencyKey>,
32    /// Optional supersession key.
33    #[serde(skip_serializing_if = "Option::is_none")]
34    pub supersession_key: Option<SupersessionKey>,
35    /// Optional correlation ID.
36    #[serde(skip_serializing_if = "Option::is_none")]
37    pub correlation_id: Option<CorrelationId>,
38}
39
40/// Where the input originated.
41#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
42#[serde(tag = "type", rename_all = "snake_case")]
43#[non_exhaustive]
44pub enum InputOrigin {
45    /// Human operator / external API caller.
46    Operator,
47    /// Peer agent (comms).
48    Peer {
49        peer_id: String,
50        #[serde(skip_serializing_if = "Option::is_none")]
51        runtime_id: Option<LogicalRuntimeId>,
52    },
53    /// Flow engine (mob orchestration).
54    Flow { flow_id: String, step_index: usize },
55    /// System-generated (compaction, projection, etc.).
56    System,
57    /// External event source.
58    External { source_name: String },
59}
60
61/// Durability requirement for an input.
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
63#[serde(rename_all = "snake_case")]
64#[non_exhaustive]
65pub enum InputDurability {
66    /// Must be persisted before acknowledgment.
67    Durable,
68    /// In-memory only, may be lost on crash.
69    Ephemeral,
70    /// Derived from other inputs (can be reconstructed).
71    Derived,
72}
73
74/// Visibility controls for an input.
75#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
76pub struct InputVisibility {
77    /// Whether this input appears in the conversation transcript.
78    pub transcript_eligible: bool,
79    /// Whether this input is visible to operator surfaces.
80    pub operator_eligible: bool,
81}
82
83impl Default for InputVisibility {
84    fn default() -> Self {
85        Self {
86            transcript_eligible: true,
87            operator_eligible: true,
88        }
89    }
90}
91
92/// The 6 input variants accepted by the runtime layer.
93#[derive(Debug, Clone, Serialize, Deserialize)]
94#[serde(tag = "input_type", rename_all = "snake_case")]
95#[non_exhaustive]
96pub enum Input {
97    /// User/operator prompt.
98    Prompt(PromptInput),
99    /// Peer-originated input (comms).
100    Peer(PeerInput),
101    /// Flow step input (mob orchestration).
102    FlowStep(FlowStepInput),
103    /// External event input.
104    ExternalEvent(ExternalEventInput),
105    /// System-generated input (compaction, etc.).
106    SystemGenerated(SystemGeneratedInput),
107    /// Projection-derived input.
108    Projected(ProjectedInput),
109}
110
111impl Input {
112    /// Get the input header.
113    pub fn header(&self) -> &InputHeader {
114        match self {
115            Input::Prompt(i) => &i.header,
116            Input::Peer(i) => &i.header,
117            Input::FlowStep(i) => &i.header,
118            Input::ExternalEvent(i) => &i.header,
119            Input::SystemGenerated(i) => &i.header,
120            Input::Projected(i) => &i.header,
121        }
122    }
123
124    /// Get the input ID.
125    pub fn id(&self) -> &InputId {
126        &self.header().id
127    }
128
129    /// Get the kind ID for policy resolution.
130    pub fn kind_id(&self) -> KindId {
131        match self {
132            Input::Prompt(_) => KindId::new("prompt"),
133            Input::Peer(p) => match &p.convention {
134                Some(PeerConvention::Message) => KindId::new("peer_message"),
135                Some(PeerConvention::Request { .. }) => KindId::new("peer_request"),
136                Some(PeerConvention::ResponseProgress { .. }) => {
137                    KindId::new("peer_response_progress")
138                }
139                Some(PeerConvention::ResponseTerminal { .. }) => {
140                    KindId::new("peer_response_terminal")
141                }
142                None => KindId::new("peer_message"),
143            },
144            Input::FlowStep(_) => KindId::new("flow_step"),
145            Input::ExternalEvent(_) => KindId::new("external_event"),
146            Input::SystemGenerated(_) => KindId::new("system_generated"),
147            Input::Projected(_) => KindId::new("projected"),
148        }
149    }
150}
151
152/// User/operator prompt input.
153#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct PromptInput {
155    pub header: InputHeader,
156    /// The prompt text.
157    pub text: String,
158    #[serde(default, skip_serializing_if = "Option::is_none")]
159    pub turn_metadata: Option<RuntimeTurnMetadata>,
160}
161
162impl PromptInput {
163    /// Create a new operator prompt with default header.
164    pub fn new(text: impl Into<String>, turn_metadata: Option<RuntimeTurnMetadata>) -> Self {
165        Self {
166            header: InputHeader {
167                id: meerkat_core::lifecycle::InputId::new(),
168                timestamp: chrono::Utc::now(),
169                source: InputOrigin::Operator,
170                durability: InputDurability::Durable,
171                visibility: InputVisibility::default(),
172                idempotency_key: None,
173                supersession_key: None,
174                correlation_id: None,
175            },
176            text: text.into(),
177            turn_metadata,
178        }
179    }
180}
181
182/// Peer-originated input from comms.
183#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct PeerInput {
185    pub header: InputHeader,
186    /// The peer convention (message, request, response).
187    #[serde(skip_serializing_if = "Option::is_none")]
188    pub convention: Option<PeerConvention>,
189    /// Message body.
190    pub body: String,
191}
192
193/// Peer communication conventions.
194#[derive(Debug, Clone, Serialize, Deserialize)]
195#[serde(tag = "convention_type", rename_all = "snake_case")]
196#[non_exhaustive]
197pub enum PeerConvention {
198    /// Simple peer-to-peer message.
199    Message,
200    /// Request expecting a response.
201    Request { request_id: String, intent: String },
202    /// Progress update for an ongoing response.
203    ResponseProgress {
204        request_id: String,
205        phase: ResponseProgressPhase,
206    },
207    /// Terminal response (completed or failed).
208    ResponseTerminal {
209        request_id: String,
210        status: ResponseTerminalStatus,
211    },
212}
213
214/// Phase of a response progress update.
215#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
216#[serde(rename_all = "snake_case")]
217#[non_exhaustive]
218pub enum ResponseProgressPhase {
219    /// Request was accepted.
220    Accepted,
221    /// Work is in progress.
222    InProgress,
223    /// Partial result available.
224    PartialResult,
225}
226
227/// Terminal status of a response.
228#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
229#[serde(rename_all = "snake_case")]
230#[non_exhaustive]
231pub enum ResponseTerminalStatus {
232    /// Request completed successfully.
233    Completed,
234    /// Request failed.
235    Failed,
236    /// Request was cancelled.
237    Cancelled,
238}
239
240/// Flow step input from mob orchestration.
241#[derive(Debug, Clone, Serialize, Deserialize)]
242pub struct FlowStepInput {
243    pub header: InputHeader,
244    /// Flow step identifier.
245    pub step_id: String,
246    /// Step instructions/prompt.
247    pub instructions: String,
248    #[serde(default, skip_serializing_if = "Option::is_none")]
249    pub turn_metadata: Option<RuntimeTurnMetadata>,
250}
251
252/// External event input.
253#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct ExternalEventInput {
255    pub header: InputHeader,
256    /// Event type/name.
257    pub event_type: String,
258    /// Event payload. Uses `Value` because the runtime layer may inspect/merge
259    /// payloads during coalescing and projection — not a pure pass-through.
260    pub payload: serde_json::Value,
261}
262
263/// System-generated input (e.g., compaction summary).
264#[derive(Debug, Clone, Serialize, Deserialize)]
265pub struct SystemGeneratedInput {
266    pub header: InputHeader,
267    /// What generated this input.
268    pub generator: String,
269    /// Content.
270    pub content: String,
271}
272
273/// Projection-derived input (generated from RuntimeEvent by projection rules).
274#[derive(Debug, Clone, Serialize, Deserialize)]
275pub struct ProjectedInput {
276    pub header: InputHeader,
277    /// The projection rule that created this.
278    pub rule_id: String,
279    /// Source event ID.
280    pub source_event_id: String,
281    /// Projected content.
282    pub content: String,
283}
284
285#[cfg(test)]
286#[allow(clippy::unwrap_used, clippy::panic)]
287mod tests {
288    use super::*;
289    use chrono::Utc;
290
291    fn make_header() -> InputHeader {
292        InputHeader {
293            id: InputId::new(),
294            timestamp: Utc::now(),
295            source: InputOrigin::Operator,
296            durability: InputDurability::Durable,
297            visibility: InputVisibility::default(),
298            idempotency_key: None,
299            supersession_key: None,
300            correlation_id: None,
301        }
302    }
303
304    #[test]
305    fn prompt_input_serde() {
306        let input = Input::Prompt(PromptInput {
307            header: make_header(),
308            text: "hello".into(),
309            turn_metadata: None,
310        });
311        let json = serde_json::to_value(&input).unwrap();
312        assert_eq!(json["input_type"], "prompt");
313        let parsed: Input = serde_json::from_value(json).unwrap();
314        assert!(matches!(parsed, Input::Prompt(_)));
315    }
316
317    #[test]
318    fn peer_input_message_serde() {
319        let input = Input::Peer(PeerInput {
320            header: make_header(),
321            convention: Some(PeerConvention::Message),
322            body: "hi there".into(),
323        });
324        let json = serde_json::to_value(&input).unwrap();
325        assert_eq!(json["input_type"], "peer");
326        let parsed: Input = serde_json::from_value(json).unwrap();
327        assert!(matches!(parsed, Input::Peer(_)));
328    }
329
330    #[test]
331    fn peer_input_request_serde() {
332        let input = Input::Peer(PeerInput {
333            header: make_header(),
334            convention: Some(PeerConvention::Request {
335                request_id: "req-1".into(),
336                intent: "mob.peer_added".into(),
337            }),
338            body: "Agent joined".into(),
339        });
340        let json = serde_json::to_value(&input).unwrap();
341        let parsed: Input = serde_json::from_value(json).unwrap();
342        if let Input::Peer(p) = parsed {
343            assert!(matches!(p.convention, Some(PeerConvention::Request { .. })));
344        } else {
345            panic!("Expected PeerInput");
346        }
347    }
348
349    #[test]
350    fn peer_input_response_terminal_serde() {
351        let input = Input::Peer(PeerInput {
352            header: make_header(),
353            convention: Some(PeerConvention::ResponseTerminal {
354                request_id: "req-1".into(),
355                status: ResponseTerminalStatus::Completed,
356            }),
357            body: "Done".into(),
358        });
359        let json = serde_json::to_value(&input).unwrap();
360        let parsed: Input = serde_json::from_value(json).unwrap();
361        assert!(matches!(parsed, Input::Peer(_)));
362    }
363
364    #[test]
365    fn peer_input_response_progress_serde() {
366        let input = Input::Peer(PeerInput {
367            header: make_header(),
368            convention: Some(PeerConvention::ResponseProgress {
369                request_id: "req-1".into(),
370                phase: ResponseProgressPhase::InProgress,
371            }),
372            body: "Working...".into(),
373        });
374        let json = serde_json::to_value(&input).unwrap();
375        let parsed: Input = serde_json::from_value(json).unwrap();
376        assert!(matches!(parsed, Input::Peer(_)));
377    }
378
379    #[test]
380    fn flow_step_input_serde() {
381        let input = Input::FlowStep(FlowStepInput {
382            header: make_header(),
383            step_id: "step-1".into(),
384            instructions: "analyze the data".into(),
385            turn_metadata: None,
386        });
387        let json = serde_json::to_value(&input).unwrap();
388        assert_eq!(json["input_type"], "flow_step");
389        let parsed: Input = serde_json::from_value(json).unwrap();
390        assert!(matches!(parsed, Input::FlowStep(_)));
391    }
392
393    #[test]
394    fn external_event_input_serde() {
395        let input = Input::ExternalEvent(ExternalEventInput {
396            header: make_header(),
397            event_type: "webhook.received".into(),
398            payload: serde_json::json!({"url": "https://example.com"}),
399        });
400        let json = serde_json::to_value(&input).unwrap();
401        assert_eq!(json["input_type"], "external_event");
402        let parsed: Input = serde_json::from_value(json).unwrap();
403        assert!(matches!(parsed, Input::ExternalEvent(_)));
404    }
405
406    #[test]
407    fn system_generated_input_serde() {
408        let input = Input::SystemGenerated(SystemGeneratedInput {
409            header: make_header(),
410            generator: "compactor".into(),
411            content: "summary text".into(),
412        });
413        let json = serde_json::to_value(&input).unwrap();
414        assert_eq!(json["input_type"], "system_generated");
415        let parsed: Input = serde_json::from_value(json).unwrap();
416        assert!(matches!(parsed, Input::SystemGenerated(_)));
417    }
418
419    #[test]
420    fn projected_input_serde() {
421        let input = Input::Projected(ProjectedInput {
422            header: InputHeader {
423                durability: InputDurability::Derived,
424                ..make_header()
425            },
426            rule_id: "rule-1".into(),
427            source_event_id: "evt-1".into(),
428            content: "projected content".into(),
429        });
430        let json = serde_json::to_value(&input).unwrap();
431        assert_eq!(json["input_type"], "projected");
432        let parsed: Input = serde_json::from_value(json).unwrap();
433        assert!(matches!(parsed, Input::Projected(_)));
434    }
435
436    #[test]
437    fn input_kind_id() {
438        let prompt = Input::Prompt(PromptInput {
439            header: make_header(),
440            text: "hi".into(),
441            turn_metadata: None,
442        });
443        assert_eq!(prompt.kind_id().0, "prompt");
444
445        let peer_msg = Input::Peer(PeerInput {
446            header: make_header(),
447            convention: Some(PeerConvention::Message),
448            body: "hi".into(),
449        });
450        assert_eq!(peer_msg.kind_id().0, "peer_message");
451
452        let peer_req = Input::Peer(PeerInput {
453            header: make_header(),
454            convention: Some(PeerConvention::Request {
455                request_id: "r".into(),
456                intent: "i".into(),
457            }),
458            body: "hi".into(),
459        });
460        assert_eq!(peer_req.kind_id().0, "peer_request");
461    }
462
463    #[test]
464    fn input_source_variants() {
465        let sources = vec![
466            InputOrigin::Operator,
467            InputOrigin::Peer {
468                peer_id: "p1".into(),
469                runtime_id: None,
470            },
471            InputOrigin::Flow {
472                flow_id: "f1".into(),
473                step_index: 0,
474            },
475            InputOrigin::System,
476            InputOrigin::External {
477                source_name: "webhook".into(),
478            },
479        ];
480        for source in sources {
481            let json = serde_json::to_value(&source).unwrap();
482            let parsed: InputOrigin = serde_json::from_value(json).unwrap();
483            assert_eq!(source, parsed);
484        }
485    }
486
487    #[test]
488    fn input_durability_serde() {
489        for d in [
490            InputDurability::Durable,
491            InputDurability::Ephemeral,
492            InputDurability::Derived,
493        ] {
494            let json = serde_json::to_value(d).unwrap();
495            let parsed: InputDurability = serde_json::from_value(json).unwrap();
496            assert_eq!(d, parsed);
497        }
498    }
499}