use chrono::{DateTime, Utc};
use meerkat_core::lifecycle::InputId;
use meerkat_core::lifecycle::run_primitive::RuntimeTurnMetadata;
use serde::{Deserialize, Serialize};
use crate::identifiers::{
CorrelationId, IdempotencyKey, KindId, LogicalRuntimeId, SupersessionKey,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InputHeader {
pub id: InputId,
pub timestamp: DateTime<Utc>,
pub source: InputOrigin,
pub durability: InputDurability,
pub visibility: InputVisibility,
#[serde(skip_serializing_if = "Option::is_none")]
pub idempotency_key: Option<IdempotencyKey>,
#[serde(skip_serializing_if = "Option::is_none")]
pub supersession_key: Option<SupersessionKey>,
#[serde(skip_serializing_if = "Option::is_none")]
pub correlation_id: Option<CorrelationId>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[non_exhaustive]
pub enum InputOrigin {
Operator,
Peer {
peer_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
runtime_id: Option<LogicalRuntimeId>,
},
Flow { flow_id: String, step_index: usize },
System,
External { source_name: String },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum InputDurability {
Durable,
Ephemeral,
Derived,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct InputVisibility {
pub transcript_eligible: bool,
pub operator_eligible: bool,
}
impl Default for InputVisibility {
fn default() -> Self {
Self {
transcript_eligible: true,
operator_eligible: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "input_type", rename_all = "snake_case")]
#[non_exhaustive]
pub enum Input {
Prompt(PromptInput),
Peer(PeerInput),
FlowStep(FlowStepInput),
ExternalEvent(ExternalEventInput),
SystemGenerated(SystemGeneratedInput),
Projected(ProjectedInput),
}
impl Input {
pub fn header(&self) -> &InputHeader {
match self {
Input::Prompt(i) => &i.header,
Input::Peer(i) => &i.header,
Input::FlowStep(i) => &i.header,
Input::ExternalEvent(i) => &i.header,
Input::SystemGenerated(i) => &i.header,
Input::Projected(i) => &i.header,
}
}
pub fn id(&self) -> &InputId {
&self.header().id
}
pub fn kind_id(&self) -> KindId {
match self {
Input::Prompt(_) => KindId::new("prompt"),
Input::Peer(p) => match &p.convention {
Some(PeerConvention::Message) => KindId::new("peer_message"),
Some(PeerConvention::Request { .. }) => KindId::new("peer_request"),
Some(PeerConvention::ResponseProgress { .. }) => {
KindId::new("peer_response_progress")
}
Some(PeerConvention::ResponseTerminal { .. }) => {
KindId::new("peer_response_terminal")
}
None => KindId::new("peer_message"),
},
Input::FlowStep(_) => KindId::new("flow_step"),
Input::ExternalEvent(_) => KindId::new("external_event"),
Input::SystemGenerated(_) => KindId::new("system_generated"),
Input::Projected(_) => KindId::new("projected"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PromptInput {
pub header: InputHeader,
pub text: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub turn_metadata: Option<RuntimeTurnMetadata>,
}
impl PromptInput {
pub fn new(text: impl Into<String>, turn_metadata: Option<RuntimeTurnMetadata>) -> Self {
Self {
header: InputHeader {
id: meerkat_core::lifecycle::InputId::new(),
timestamp: chrono::Utc::now(),
source: InputOrigin::Operator,
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
text: text.into(),
turn_metadata,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerInput {
pub header: InputHeader,
#[serde(skip_serializing_if = "Option::is_none")]
pub convention: Option<PeerConvention>,
pub body: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "convention_type", rename_all = "snake_case")]
#[non_exhaustive]
pub enum PeerConvention {
Message,
Request { request_id: String, intent: String },
ResponseProgress {
request_id: String,
phase: ResponseProgressPhase,
},
ResponseTerminal {
request_id: String,
status: ResponseTerminalStatus,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum ResponseProgressPhase {
Accepted,
InProgress,
PartialResult,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum ResponseTerminalStatus {
Completed,
Failed,
Cancelled,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlowStepInput {
pub header: InputHeader,
pub step_id: String,
pub instructions: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub turn_metadata: Option<RuntimeTurnMetadata>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExternalEventInput {
pub header: InputHeader,
pub event_type: String,
pub payload: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemGeneratedInput {
pub header: InputHeader,
pub generator: String,
pub content: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProjectedInput {
pub header: InputHeader,
pub rule_id: String,
pub source_event_id: String,
pub content: String,
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
use chrono::Utc;
fn make_header() -> InputHeader {
InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::Operator,
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
}
}
#[test]
fn prompt_input_serde() {
let input = Input::Prompt(PromptInput {
header: make_header(),
text: "hello".into(),
turn_metadata: None,
});
let json = serde_json::to_value(&input).unwrap();
assert_eq!(json["input_type"], "prompt");
let parsed: Input = serde_json::from_value(json).unwrap();
assert!(matches!(parsed, Input::Prompt(_)));
}
#[test]
fn peer_input_message_serde() {
let input = Input::Peer(PeerInput {
header: make_header(),
convention: Some(PeerConvention::Message),
body: "hi there".into(),
});
let json = serde_json::to_value(&input).unwrap();
assert_eq!(json["input_type"], "peer");
let parsed: Input = serde_json::from_value(json).unwrap();
assert!(matches!(parsed, Input::Peer(_)));
}
#[test]
fn peer_input_request_serde() {
let input = Input::Peer(PeerInput {
header: make_header(),
convention: Some(PeerConvention::Request {
request_id: "req-1".into(),
intent: "mob.peer_added".into(),
}),
body: "Agent joined".into(),
});
let json = serde_json::to_value(&input).unwrap();
let parsed: Input = serde_json::from_value(json).unwrap();
if let Input::Peer(p) = parsed {
assert!(matches!(p.convention, Some(PeerConvention::Request { .. })));
} else {
panic!("Expected PeerInput");
}
}
#[test]
fn peer_input_response_terminal_serde() {
let input = Input::Peer(PeerInput {
header: make_header(),
convention: Some(PeerConvention::ResponseTerminal {
request_id: "req-1".into(),
status: ResponseTerminalStatus::Completed,
}),
body: "Done".into(),
});
let json = serde_json::to_value(&input).unwrap();
let parsed: Input = serde_json::from_value(json).unwrap();
assert!(matches!(parsed, Input::Peer(_)));
}
#[test]
fn peer_input_response_progress_serde() {
let input = Input::Peer(PeerInput {
header: make_header(),
convention: Some(PeerConvention::ResponseProgress {
request_id: "req-1".into(),
phase: ResponseProgressPhase::InProgress,
}),
body: "Working...".into(),
});
let json = serde_json::to_value(&input).unwrap();
let parsed: Input = serde_json::from_value(json).unwrap();
assert!(matches!(parsed, Input::Peer(_)));
}
#[test]
fn flow_step_input_serde() {
let input = Input::FlowStep(FlowStepInput {
header: make_header(),
step_id: "step-1".into(),
instructions: "analyze the data".into(),
turn_metadata: None,
});
let json = serde_json::to_value(&input).unwrap();
assert_eq!(json["input_type"], "flow_step");
let parsed: Input = serde_json::from_value(json).unwrap();
assert!(matches!(parsed, Input::FlowStep(_)));
}
#[test]
fn external_event_input_serde() {
let input = Input::ExternalEvent(ExternalEventInput {
header: make_header(),
event_type: "webhook.received".into(),
payload: serde_json::json!({"url": "https://example.com"}),
});
let json = serde_json::to_value(&input).unwrap();
assert_eq!(json["input_type"], "external_event");
let parsed: Input = serde_json::from_value(json).unwrap();
assert!(matches!(parsed, Input::ExternalEvent(_)));
}
#[test]
fn system_generated_input_serde() {
let input = Input::SystemGenerated(SystemGeneratedInput {
header: make_header(),
generator: "compactor".into(),
content: "summary text".into(),
});
let json = serde_json::to_value(&input).unwrap();
assert_eq!(json["input_type"], "system_generated");
let parsed: Input = serde_json::from_value(json).unwrap();
assert!(matches!(parsed, Input::SystemGenerated(_)));
}
#[test]
fn projected_input_serde() {
let input = Input::Projected(ProjectedInput {
header: InputHeader {
durability: InputDurability::Derived,
..make_header()
},
rule_id: "rule-1".into(),
source_event_id: "evt-1".into(),
content: "projected content".into(),
});
let json = serde_json::to_value(&input).unwrap();
assert_eq!(json["input_type"], "projected");
let parsed: Input = serde_json::from_value(json).unwrap();
assert!(matches!(parsed, Input::Projected(_)));
}
#[test]
fn input_kind_id() {
let prompt = Input::Prompt(PromptInput {
header: make_header(),
text: "hi".into(),
turn_metadata: None,
});
assert_eq!(prompt.kind_id().0, "prompt");
let peer_msg = Input::Peer(PeerInput {
header: make_header(),
convention: Some(PeerConvention::Message),
body: "hi".into(),
});
assert_eq!(peer_msg.kind_id().0, "peer_message");
let peer_req = Input::Peer(PeerInput {
header: make_header(),
convention: Some(PeerConvention::Request {
request_id: "r".into(),
intent: "i".into(),
}),
body: "hi".into(),
});
assert_eq!(peer_req.kind_id().0, "peer_request");
}
#[test]
fn input_source_variants() {
let sources = vec![
InputOrigin::Operator,
InputOrigin::Peer {
peer_id: "p1".into(),
runtime_id: None,
},
InputOrigin::Flow {
flow_id: "f1".into(),
step_index: 0,
},
InputOrigin::System,
InputOrigin::External {
source_name: "webhook".into(),
},
];
for source in sources {
let json = serde_json::to_value(&source).unwrap();
let parsed: InputOrigin = serde_json::from_value(json).unwrap();
assert_eq!(source, parsed);
}
}
#[test]
fn input_durability_serde() {
for d in [
InputDurability::Durable,
InputDurability::Ephemeral,
InputDurability::Derived,
] {
let json = serde_json::to_value(d).unwrap();
let parsed: InputDurability = serde_json::from_value(json).unwrap();
assert_eq!(d, parsed);
}
}
}