1use chrono::{DateTime, Utc};
8use meerkat_core::lifecycle::InputId;
9use meerkat_core::lifecycle::run_primitive::RuntimeTurnMetadata;
10use serde::{Deserialize, Serialize};
11
12use crate::identifiers::{
13 CorrelationId, IdempotencyKey, KindId, LogicalRuntimeId, SupersessionKey,
14};
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct InputHeader {
19 pub id: InputId,
21 pub timestamp: DateTime<Utc>,
23 pub source: InputOrigin,
25 pub durability: InputDurability,
27 pub visibility: InputVisibility,
29 #[serde(skip_serializing_if = "Option::is_none")]
31 pub idempotency_key: Option<IdempotencyKey>,
32 #[serde(skip_serializing_if = "Option::is_none")]
34 pub supersession_key: Option<SupersessionKey>,
35 #[serde(skip_serializing_if = "Option::is_none")]
37 pub correlation_id: Option<CorrelationId>,
38}
39
40#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
42#[serde(tag = "type", rename_all = "snake_case")]
43#[non_exhaustive]
44pub enum InputOrigin {
45 Operator,
47 Peer {
49 peer_id: String,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 runtime_id: Option<LogicalRuntimeId>,
52 },
53 Flow { flow_id: String, step_index: usize },
55 System,
57 External { source_name: String },
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
63#[serde(rename_all = "snake_case")]
64#[non_exhaustive]
65pub enum InputDurability {
66 Durable,
68 Ephemeral,
70 Derived,
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
76pub struct InputVisibility {
77 pub transcript_eligible: bool,
79 pub operator_eligible: bool,
81}
82
83impl Default for InputVisibility {
84 fn default() -> Self {
85 Self {
86 transcript_eligible: true,
87 operator_eligible: true,
88 }
89 }
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
94#[serde(tag = "input_type", rename_all = "snake_case")]
95#[non_exhaustive]
96pub enum Input {
97 Prompt(PromptInput),
99 Peer(PeerInput),
101 FlowStep(FlowStepInput),
103 ExternalEvent(ExternalEventInput),
105 SystemGenerated(SystemGeneratedInput),
107 Projected(ProjectedInput),
109}
110
111impl Input {
112 pub fn header(&self) -> &InputHeader {
114 match self {
115 Input::Prompt(i) => &i.header,
116 Input::Peer(i) => &i.header,
117 Input::FlowStep(i) => &i.header,
118 Input::ExternalEvent(i) => &i.header,
119 Input::SystemGenerated(i) => &i.header,
120 Input::Projected(i) => &i.header,
121 }
122 }
123
124 pub fn id(&self) -> &InputId {
126 &self.header().id
127 }
128
129 pub fn kind_id(&self) -> KindId {
131 match self {
132 Input::Prompt(_) => KindId::new("prompt"),
133 Input::Peer(p) => match &p.convention {
134 Some(PeerConvention::Message) => KindId::new("peer_message"),
135 Some(PeerConvention::Request { .. }) => KindId::new("peer_request"),
136 Some(PeerConvention::ResponseProgress { .. }) => {
137 KindId::new("peer_response_progress")
138 }
139 Some(PeerConvention::ResponseTerminal { .. }) => {
140 KindId::new("peer_response_terminal")
141 }
142 None => KindId::new("peer_message"),
143 },
144 Input::FlowStep(_) => KindId::new("flow_step"),
145 Input::ExternalEvent(_) => KindId::new("external_event"),
146 Input::SystemGenerated(_) => KindId::new("system_generated"),
147 Input::Projected(_) => KindId::new("projected"),
148 }
149 }
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct PromptInput {
155 pub header: InputHeader,
156 pub text: String,
158 #[serde(default, skip_serializing_if = "Option::is_none")]
161 pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
162 #[serde(default, skip_serializing_if = "Option::is_none")]
163 pub turn_metadata: Option<RuntimeTurnMetadata>,
164}
165
166impl PromptInput {
167 pub fn new(text: impl Into<String>, turn_metadata: Option<RuntimeTurnMetadata>) -> Self {
169 Self {
170 header: InputHeader {
171 id: meerkat_core::lifecycle::InputId::new(),
172 timestamp: chrono::Utc::now(),
173 source: InputOrigin::Operator,
174 durability: InputDurability::Durable,
175 visibility: InputVisibility::default(),
176 idempotency_key: None,
177 supersession_key: None,
178 correlation_id: None,
179 },
180 text: text.into(),
181 blocks: None,
182 turn_metadata,
183 }
184 }
185
186 pub fn from_content_input(
188 input: meerkat_core::types::ContentInput,
189 turn_metadata: Option<RuntimeTurnMetadata>,
190 ) -> Self {
191 let text = input.text_content();
192 let blocks = if input.has_images() {
193 Some(input.into_blocks())
194 } else {
195 None
196 };
197 Self {
198 header: InputHeader {
199 id: meerkat_core::lifecycle::InputId::new(),
200 timestamp: chrono::Utc::now(),
201 source: InputOrigin::Operator,
202 durability: InputDurability::Durable,
203 visibility: InputVisibility::default(),
204 idempotency_key: None,
205 supersession_key: None,
206 correlation_id: None,
207 },
208 text,
209 blocks,
210 turn_metadata,
211 }
212 }
213}
214
215#[derive(Debug, Clone, Serialize, Deserialize)]
217pub struct PeerInput {
218 pub header: InputHeader,
219 #[serde(skip_serializing_if = "Option::is_none")]
221 pub convention: Option<PeerConvention>,
222 pub body: String,
224 #[serde(default, skip_serializing_if = "Option::is_none")]
227 pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
228}
229
230#[derive(Debug, Clone, Serialize, Deserialize)]
232#[serde(tag = "convention_type", rename_all = "snake_case")]
233#[non_exhaustive]
234pub enum PeerConvention {
235 Message,
237 Request { request_id: String, intent: String },
239 ResponseProgress {
241 request_id: String,
242 phase: ResponseProgressPhase,
243 },
244 ResponseTerminal {
246 request_id: String,
247 status: ResponseTerminalStatus,
248 },
249}
250
251#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
253#[serde(rename_all = "snake_case")]
254#[non_exhaustive]
255pub enum ResponseProgressPhase {
256 Accepted,
258 InProgress,
260 PartialResult,
262}
263
264#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
266#[serde(rename_all = "snake_case")]
267#[non_exhaustive]
268pub enum ResponseTerminalStatus {
269 Completed,
271 Failed,
273 Cancelled,
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize)]
279pub struct FlowStepInput {
280 pub header: InputHeader,
281 pub step_id: String,
283 pub instructions: String,
285 #[serde(default, skip_serializing_if = "Option::is_none")]
286 pub turn_metadata: Option<RuntimeTurnMetadata>,
287}
288
289#[derive(Debug, Clone, Serialize, Deserialize)]
291pub struct ExternalEventInput {
292 pub header: InputHeader,
293 pub event_type: String,
295 pub payload: serde_json::Value,
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize)]
302pub struct SystemGeneratedInput {
303 pub header: InputHeader,
304 pub generator: String,
306 pub content: String,
308}
309
310#[derive(Debug, Clone, Serialize, Deserialize)]
312pub struct ProjectedInput {
313 pub header: InputHeader,
314 pub rule_id: String,
316 pub source_event_id: String,
318 pub content: String,
320}
321
322#[cfg(test)]
323#[allow(clippy::unwrap_used, clippy::panic)]
324mod tests {
325 use super::*;
326 use chrono::Utc;
327
328 fn make_header() -> InputHeader {
329 InputHeader {
330 id: InputId::new(),
331 timestamp: Utc::now(),
332 source: InputOrigin::Operator,
333 durability: InputDurability::Durable,
334 visibility: InputVisibility::default(),
335 idempotency_key: None,
336 supersession_key: None,
337 correlation_id: None,
338 }
339 }
340
341 #[test]
342 fn prompt_input_serde() {
343 let input = Input::Prompt(PromptInput {
344 header: make_header(),
345 text: "hello".into(),
346 blocks: None,
347 turn_metadata: None,
348 });
349 let json = serde_json::to_value(&input).unwrap();
350 assert_eq!(json["input_type"], "prompt");
351 let parsed: Input = serde_json::from_value(json).unwrap();
352 assert!(matches!(parsed, Input::Prompt(_)));
353 }
354
355 #[test]
356 fn peer_input_message_serde() {
357 let input = Input::Peer(PeerInput {
358 header: make_header(),
359 convention: Some(PeerConvention::Message),
360 body: "hi there".into(),
361 blocks: None,
362 });
363 let json = serde_json::to_value(&input).unwrap();
364 assert_eq!(json["input_type"], "peer");
365 let parsed: Input = serde_json::from_value(json).unwrap();
366 assert!(matches!(parsed, Input::Peer(_)));
367 }
368
369 #[test]
370 fn peer_input_request_serde() {
371 let input = Input::Peer(PeerInput {
372 header: make_header(),
373 convention: Some(PeerConvention::Request {
374 request_id: "req-1".into(),
375 intent: "mob.peer_added".into(),
376 }),
377 body: "Agent joined".into(),
378 blocks: None,
379 });
380 let json = serde_json::to_value(&input).unwrap();
381 let parsed: Input = serde_json::from_value(json).unwrap();
382 if let Input::Peer(p) = parsed {
383 assert!(matches!(p.convention, Some(PeerConvention::Request { .. })));
384 } else {
385 panic!("Expected PeerInput");
386 }
387 }
388
389 #[test]
390 fn peer_input_response_terminal_serde() {
391 let input = Input::Peer(PeerInput {
392 header: make_header(),
393 convention: Some(PeerConvention::ResponseTerminal {
394 request_id: "req-1".into(),
395 status: ResponseTerminalStatus::Completed,
396 }),
397 body: "Done".into(),
398 blocks: None,
399 });
400 let json = serde_json::to_value(&input).unwrap();
401 let parsed: Input = serde_json::from_value(json).unwrap();
402 assert!(matches!(parsed, Input::Peer(_)));
403 }
404
405 #[test]
406 fn peer_input_response_progress_serde() {
407 let input = Input::Peer(PeerInput {
408 header: make_header(),
409 convention: Some(PeerConvention::ResponseProgress {
410 request_id: "req-1".into(),
411 phase: ResponseProgressPhase::InProgress,
412 }),
413 body: "Working...".into(),
414 blocks: None,
415 });
416 let json = serde_json::to_value(&input).unwrap();
417 let parsed: Input = serde_json::from_value(json).unwrap();
418 assert!(matches!(parsed, Input::Peer(_)));
419 }
420
421 #[test]
422 fn flow_step_input_serde() {
423 let input = Input::FlowStep(FlowStepInput {
424 header: make_header(),
425 step_id: "step-1".into(),
426 instructions: "analyze the data".into(),
427 turn_metadata: None,
428 });
429 let json = serde_json::to_value(&input).unwrap();
430 assert_eq!(json["input_type"], "flow_step");
431 let parsed: Input = serde_json::from_value(json).unwrap();
432 assert!(matches!(parsed, Input::FlowStep(_)));
433 }
434
435 #[test]
436 fn external_event_input_serde() {
437 let input = Input::ExternalEvent(ExternalEventInput {
438 header: make_header(),
439 event_type: "webhook.received".into(),
440 payload: serde_json::json!({"url": "https://example.com"}),
441 });
442 let json = serde_json::to_value(&input).unwrap();
443 assert_eq!(json["input_type"], "external_event");
444 let parsed: Input = serde_json::from_value(json).unwrap();
445 assert!(matches!(parsed, Input::ExternalEvent(_)));
446 }
447
448 #[test]
449 fn system_generated_input_serde() {
450 let input = Input::SystemGenerated(SystemGeneratedInput {
451 header: make_header(),
452 generator: "compactor".into(),
453 content: "summary text".into(),
454 });
455 let json = serde_json::to_value(&input).unwrap();
456 assert_eq!(json["input_type"], "system_generated");
457 let parsed: Input = serde_json::from_value(json).unwrap();
458 assert!(matches!(parsed, Input::SystemGenerated(_)));
459 }
460
461 #[test]
462 fn projected_input_serde() {
463 let input = Input::Projected(ProjectedInput {
464 header: InputHeader {
465 durability: InputDurability::Derived,
466 ..make_header()
467 },
468 rule_id: "rule-1".into(),
469 source_event_id: "evt-1".into(),
470 content: "projected content".into(),
471 });
472 let json = serde_json::to_value(&input).unwrap();
473 assert_eq!(json["input_type"], "projected");
474 let parsed: Input = serde_json::from_value(json).unwrap();
475 assert!(matches!(parsed, Input::Projected(_)));
476 }
477
478 #[test]
479 fn input_kind_id() {
480 let prompt = Input::Prompt(PromptInput {
481 header: make_header(),
482 text: "hi".into(),
483 blocks: None,
484 turn_metadata: None,
485 });
486 assert_eq!(prompt.kind_id().0, "prompt");
487
488 let peer_msg = Input::Peer(PeerInput {
489 header: make_header(),
490 convention: Some(PeerConvention::Message),
491 body: "hi".into(),
492 blocks: None,
493 });
494 assert_eq!(peer_msg.kind_id().0, "peer_message");
495
496 let peer_req = Input::Peer(PeerInput {
497 header: make_header(),
498 convention: Some(PeerConvention::Request {
499 request_id: "r".into(),
500 intent: "i".into(),
501 }),
502 body: "hi".into(),
503 blocks: None,
504 });
505 assert_eq!(peer_req.kind_id().0, "peer_request");
506 }
507
508 #[test]
509 fn input_source_variants() {
510 let sources = vec![
511 InputOrigin::Operator,
512 InputOrigin::Peer {
513 peer_id: "p1".into(),
514 runtime_id: None,
515 },
516 InputOrigin::Flow {
517 flow_id: "f1".into(),
518 step_index: 0,
519 },
520 InputOrigin::System,
521 InputOrigin::External {
522 source_name: "webhook".into(),
523 },
524 ];
525 for source in sources {
526 let json = serde_json::to_value(&source).unwrap();
527 let parsed: InputOrigin = serde_json::from_value(json).unwrap();
528 assert_eq!(source, parsed);
529 }
530 }
531
532 #[test]
533 fn input_durability_serde() {
534 for d in [
535 InputDurability::Durable,
536 InputDurability::Ephemeral,
537 InputDurability::Derived,
538 ] {
539 let json = serde_json::to_value(d).unwrap();
540 let parsed: InputDurability = serde_json::from_value(json).unwrap();
541 assert_eq!(d, parsed);
542 }
543 }
544}