1use chrono::{DateTime, Utc};
8use meerkat_core::lifecycle::InputId;
9use meerkat_core::lifecycle::run_primitive::RuntimeTurnMetadata;
10use serde::{Deserialize, Serialize};
11
12use crate::identifiers::{
13 CorrelationId, IdempotencyKey, KindId, LogicalRuntimeId, SupersessionKey,
14};
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct InputHeader {
19 pub id: InputId,
21 pub timestamp: DateTime<Utc>,
23 pub source: InputOrigin,
25 pub durability: InputDurability,
27 pub visibility: InputVisibility,
29 #[serde(skip_serializing_if = "Option::is_none")]
31 pub idempotency_key: Option<IdempotencyKey>,
32 #[serde(skip_serializing_if = "Option::is_none")]
34 pub supersession_key: Option<SupersessionKey>,
35 #[serde(skip_serializing_if = "Option::is_none")]
37 pub correlation_id: Option<CorrelationId>,
38}
39
40#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
42#[serde(tag = "type", rename_all = "snake_case")]
43#[non_exhaustive]
44pub enum InputOrigin {
45 Operator,
47 Peer {
49 peer_id: String,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 runtime_id: Option<LogicalRuntimeId>,
52 },
53 Flow { flow_id: String, step_index: usize },
55 System,
57 External { source_name: String },
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
63#[serde(rename_all = "snake_case")]
64#[non_exhaustive]
65pub enum InputDurability {
66 Durable,
68 Ephemeral,
70 Derived,
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
76pub struct InputVisibility {
77 pub transcript_eligible: bool,
79 pub operator_eligible: bool,
81}
82
83impl Default for InputVisibility {
84 fn default() -> Self {
85 Self {
86 transcript_eligible: true,
87 operator_eligible: true,
88 }
89 }
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
94#[serde(tag = "input_type", rename_all = "snake_case")]
95#[non_exhaustive]
96pub enum Input {
97 Prompt(PromptInput),
99 Peer(PeerInput),
101 FlowStep(FlowStepInput),
103 ExternalEvent(ExternalEventInput),
105 SystemGenerated(SystemGeneratedInput),
107 Projected(ProjectedInput),
109}
110
111impl Input {
112 pub fn header(&self) -> &InputHeader {
114 match self {
115 Input::Prompt(i) => &i.header,
116 Input::Peer(i) => &i.header,
117 Input::FlowStep(i) => &i.header,
118 Input::ExternalEvent(i) => &i.header,
119 Input::SystemGenerated(i) => &i.header,
120 Input::Projected(i) => &i.header,
121 }
122 }
123
124 pub fn id(&self) -> &InputId {
126 &self.header().id
127 }
128
129 pub fn kind_id(&self) -> KindId {
131 match self {
132 Input::Prompt(_) => KindId::new("prompt"),
133 Input::Peer(p) => match &p.convention {
134 Some(PeerConvention::Message) => KindId::new("peer_message"),
135 Some(PeerConvention::Request { .. }) => KindId::new("peer_request"),
136 Some(PeerConvention::ResponseProgress { .. }) => {
137 KindId::new("peer_response_progress")
138 }
139 Some(PeerConvention::ResponseTerminal { .. }) => {
140 KindId::new("peer_response_terminal")
141 }
142 None => KindId::new("peer_message"),
143 },
144 Input::FlowStep(_) => KindId::new("flow_step"),
145 Input::ExternalEvent(_) => KindId::new("external_event"),
146 Input::SystemGenerated(_) => KindId::new("system_generated"),
147 Input::Projected(_) => KindId::new("projected"),
148 }
149 }
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct PromptInput {
155 pub header: InputHeader,
156 pub text: String,
158 #[serde(default, skip_serializing_if = "Option::is_none")]
159 pub turn_metadata: Option<RuntimeTurnMetadata>,
160}
161
162impl PromptInput {
163 pub fn new(text: impl Into<String>, turn_metadata: Option<RuntimeTurnMetadata>) -> Self {
165 Self {
166 header: InputHeader {
167 id: meerkat_core::lifecycle::InputId::new(),
168 timestamp: chrono::Utc::now(),
169 source: InputOrigin::Operator,
170 durability: InputDurability::Durable,
171 visibility: InputVisibility::default(),
172 idempotency_key: None,
173 supersession_key: None,
174 correlation_id: None,
175 },
176 text: text.into(),
177 turn_metadata,
178 }
179 }
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct PeerInput {
185 pub header: InputHeader,
186 #[serde(skip_serializing_if = "Option::is_none")]
188 pub convention: Option<PeerConvention>,
189 pub body: String,
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize)]
195#[serde(tag = "convention_type", rename_all = "snake_case")]
196#[non_exhaustive]
197pub enum PeerConvention {
198 Message,
200 Request { request_id: String, intent: String },
202 ResponseProgress {
204 request_id: String,
205 phase: ResponseProgressPhase,
206 },
207 ResponseTerminal {
209 request_id: String,
210 status: ResponseTerminalStatus,
211 },
212}
213
214#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
216#[serde(rename_all = "snake_case")]
217#[non_exhaustive]
218pub enum ResponseProgressPhase {
219 Accepted,
221 InProgress,
223 PartialResult,
225}
226
227#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
229#[serde(rename_all = "snake_case")]
230#[non_exhaustive]
231pub enum ResponseTerminalStatus {
232 Completed,
234 Failed,
236 Cancelled,
238}
239
240#[derive(Debug, Clone, Serialize, Deserialize)]
242pub struct FlowStepInput {
243 pub header: InputHeader,
244 pub step_id: String,
246 pub instructions: String,
248 #[serde(default, skip_serializing_if = "Option::is_none")]
249 pub turn_metadata: Option<RuntimeTurnMetadata>,
250}
251
252#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct ExternalEventInput {
255 pub header: InputHeader,
256 pub event_type: String,
258 pub payload: serde_json::Value,
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize)]
265pub struct SystemGeneratedInput {
266 pub header: InputHeader,
267 pub generator: String,
269 pub content: String,
271}
272
273#[derive(Debug, Clone, Serialize, Deserialize)]
275pub struct ProjectedInput {
276 pub header: InputHeader,
277 pub rule_id: String,
279 pub source_event_id: String,
281 pub content: String,
283}
284
285#[cfg(test)]
286#[allow(clippy::unwrap_used, clippy::panic)]
287mod tests {
288 use super::*;
289 use chrono::Utc;
290
291 fn make_header() -> InputHeader {
292 InputHeader {
293 id: InputId::new(),
294 timestamp: Utc::now(),
295 source: InputOrigin::Operator,
296 durability: InputDurability::Durable,
297 visibility: InputVisibility::default(),
298 idempotency_key: None,
299 supersession_key: None,
300 correlation_id: None,
301 }
302 }
303
304 #[test]
305 fn prompt_input_serde() {
306 let input = Input::Prompt(PromptInput {
307 header: make_header(),
308 text: "hello".into(),
309 turn_metadata: None,
310 });
311 let json = serde_json::to_value(&input).unwrap();
312 assert_eq!(json["input_type"], "prompt");
313 let parsed: Input = serde_json::from_value(json).unwrap();
314 assert!(matches!(parsed, Input::Prompt(_)));
315 }
316
317 #[test]
318 fn peer_input_message_serde() {
319 let input = Input::Peer(PeerInput {
320 header: make_header(),
321 convention: Some(PeerConvention::Message),
322 body: "hi there".into(),
323 });
324 let json = serde_json::to_value(&input).unwrap();
325 assert_eq!(json["input_type"], "peer");
326 let parsed: Input = serde_json::from_value(json).unwrap();
327 assert!(matches!(parsed, Input::Peer(_)));
328 }
329
330 #[test]
331 fn peer_input_request_serde() {
332 let input = Input::Peer(PeerInput {
333 header: make_header(),
334 convention: Some(PeerConvention::Request {
335 request_id: "req-1".into(),
336 intent: "mob.peer_added".into(),
337 }),
338 body: "Agent joined".into(),
339 });
340 let json = serde_json::to_value(&input).unwrap();
341 let parsed: Input = serde_json::from_value(json).unwrap();
342 if let Input::Peer(p) = parsed {
343 assert!(matches!(p.convention, Some(PeerConvention::Request { .. })));
344 } else {
345 panic!("Expected PeerInput");
346 }
347 }
348
349 #[test]
350 fn peer_input_response_terminal_serde() {
351 let input = Input::Peer(PeerInput {
352 header: make_header(),
353 convention: Some(PeerConvention::ResponseTerminal {
354 request_id: "req-1".into(),
355 status: ResponseTerminalStatus::Completed,
356 }),
357 body: "Done".into(),
358 });
359 let json = serde_json::to_value(&input).unwrap();
360 let parsed: Input = serde_json::from_value(json).unwrap();
361 assert!(matches!(parsed, Input::Peer(_)));
362 }
363
364 #[test]
365 fn peer_input_response_progress_serde() {
366 let input = Input::Peer(PeerInput {
367 header: make_header(),
368 convention: Some(PeerConvention::ResponseProgress {
369 request_id: "req-1".into(),
370 phase: ResponseProgressPhase::InProgress,
371 }),
372 body: "Working...".into(),
373 });
374 let json = serde_json::to_value(&input).unwrap();
375 let parsed: Input = serde_json::from_value(json).unwrap();
376 assert!(matches!(parsed, Input::Peer(_)));
377 }
378
379 #[test]
380 fn flow_step_input_serde() {
381 let input = Input::FlowStep(FlowStepInput {
382 header: make_header(),
383 step_id: "step-1".into(),
384 instructions: "analyze the data".into(),
385 turn_metadata: None,
386 });
387 let json = serde_json::to_value(&input).unwrap();
388 assert_eq!(json["input_type"], "flow_step");
389 let parsed: Input = serde_json::from_value(json).unwrap();
390 assert!(matches!(parsed, Input::FlowStep(_)));
391 }
392
393 #[test]
394 fn external_event_input_serde() {
395 let input = Input::ExternalEvent(ExternalEventInput {
396 header: make_header(),
397 event_type: "webhook.received".into(),
398 payload: serde_json::json!({"url": "https://example.com"}),
399 });
400 let json = serde_json::to_value(&input).unwrap();
401 assert_eq!(json["input_type"], "external_event");
402 let parsed: Input = serde_json::from_value(json).unwrap();
403 assert!(matches!(parsed, Input::ExternalEvent(_)));
404 }
405
406 #[test]
407 fn system_generated_input_serde() {
408 let input = Input::SystemGenerated(SystemGeneratedInput {
409 header: make_header(),
410 generator: "compactor".into(),
411 content: "summary text".into(),
412 });
413 let json = serde_json::to_value(&input).unwrap();
414 assert_eq!(json["input_type"], "system_generated");
415 let parsed: Input = serde_json::from_value(json).unwrap();
416 assert!(matches!(parsed, Input::SystemGenerated(_)));
417 }
418
419 #[test]
420 fn projected_input_serde() {
421 let input = Input::Projected(ProjectedInput {
422 header: InputHeader {
423 durability: InputDurability::Derived,
424 ..make_header()
425 },
426 rule_id: "rule-1".into(),
427 source_event_id: "evt-1".into(),
428 content: "projected content".into(),
429 });
430 let json = serde_json::to_value(&input).unwrap();
431 assert_eq!(json["input_type"], "projected");
432 let parsed: Input = serde_json::from_value(json).unwrap();
433 assert!(matches!(parsed, Input::Projected(_)));
434 }
435
436 #[test]
437 fn input_kind_id() {
438 let prompt = Input::Prompt(PromptInput {
439 header: make_header(),
440 text: "hi".into(),
441 turn_metadata: None,
442 });
443 assert_eq!(prompt.kind_id().0, "prompt");
444
445 let peer_msg = Input::Peer(PeerInput {
446 header: make_header(),
447 convention: Some(PeerConvention::Message),
448 body: "hi".into(),
449 });
450 assert_eq!(peer_msg.kind_id().0, "peer_message");
451
452 let peer_req = Input::Peer(PeerInput {
453 header: make_header(),
454 convention: Some(PeerConvention::Request {
455 request_id: "r".into(),
456 intent: "i".into(),
457 }),
458 body: "hi".into(),
459 });
460 assert_eq!(peer_req.kind_id().0, "peer_request");
461 }
462
463 #[test]
464 fn input_source_variants() {
465 let sources = vec![
466 InputOrigin::Operator,
467 InputOrigin::Peer {
468 peer_id: "p1".into(),
469 runtime_id: None,
470 },
471 InputOrigin::Flow {
472 flow_id: "f1".into(),
473 step_index: 0,
474 },
475 InputOrigin::System,
476 InputOrigin::External {
477 source_name: "webhook".into(),
478 },
479 ];
480 for source in sources {
481 let json = serde_json::to_value(&source).unwrap();
482 let parsed: InputOrigin = serde_json::from_value(json).unwrap();
483 assert_eq!(source, parsed);
484 }
485 }
486
487 #[test]
488 fn input_durability_serde() {
489 for d in [
490 InputDurability::Durable,
491 InputDurability::Ephemeral,
492 InputDurability::Derived,
493 ] {
494 let json = serde_json::to_value(d).unwrap();
495 let parsed: InputDurability = serde_json::from_value(json).unwrap();
496 assert_eq!(d, parsed);
497 }
498 }
499}