use chrono::{DateTime, Utc};
use meerkat_core::lifecycle::InputId;
use meerkat_core::lifecycle::run_primitive::{
ConversationAppend, ConversationAppendRole, ConversationContextAppend, CoreRenderable,
RuntimeTurnMetadata,
};
use meerkat_core::ops::{OpEvent, OperationId};
use meerkat_core::types::{
HandlingMode, SystemNoticeBlock, SystemNoticeDirection, SystemNoticeKind, SystemNoticePeer,
};
use meerkat_core::{
BlobStore, BlobStoreError, MissingBlobBehavior, PeerConversationProjection,
PeerResponseProgressProjectionPhase, PeerResponseTerminalCorrelationId,
PeerResponseTerminalDisplayIdentity, PeerResponseTerminalFact, PeerResponseTerminalFactError,
PeerResponseTerminalProjectionStatus, PeerResponseTerminalRenderPayload,
PeerResponseTerminalRouteIdentity, PeerResponseTerminalSource,
PeerResponseTerminalTransportIdentity, externalize_content_blocks, hydrate_content_blocks,
};
use serde::{Deserialize, Serialize};
use crate::identifiers::{
CorrelationId, IdempotencyKey, InputKind, KindId, LogicalRuntimeId, SupersessionKey,
};
use meerkat_core::types::RenderMetadata;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InputHeader {
pub id: InputId,
pub timestamp: DateTime<Utc>,
pub source: InputOrigin,
pub durability: InputDurability,
pub visibility: InputVisibility,
#[serde(skip_serializing_if = "Option::is_none")]
pub idempotency_key: Option<IdempotencyKey>,
#[serde(skip_serializing_if = "Option::is_none")]
pub supersession_key: Option<SupersessionKey>,
#[serde(skip_serializing_if = "Option::is_none")]
pub correlation_id: Option<CorrelationId>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[non_exhaustive]
pub enum InputOrigin {
Operator,
Peer {
peer_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
display_identity: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
runtime_id: Option<LogicalRuntimeId>,
},
Flow { flow_id: String, step_index: usize },
System,
External { source_name: String },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum InputDurability {
Durable,
Ephemeral,
Derived,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct InputVisibility {
pub transcript_eligible: bool,
pub operator_eligible: bool,
}
impl Default for InputVisibility {
fn default() -> Self {
Self {
transcript_eligible: true,
operator_eligible: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "input_type", rename_all = "snake_case")]
#[non_exhaustive]
pub enum Input {
Prompt(PromptInput),
Peer(PeerInput),
FlowStep(FlowStepInput),
ExternalEvent(ExternalEventInput),
#[serde(alias = "system_generated")]
Continuation(ContinuationInput),
#[serde(alias = "projected")]
Operation(OperationInput),
}
impl Input {
pub fn header(&self) -> &InputHeader {
match self {
Input::Prompt(i) => &i.header,
Input::Peer(i) => &i.header,
Input::FlowStep(i) => &i.header,
Input::ExternalEvent(i) => &i.header,
Input::Continuation(i) => &i.header,
Input::Operation(i) => &i.header,
}
}
pub fn id(&self) -> &InputId {
&self.header().id
}
pub fn kind(&self) -> InputKind {
match self {
Input::Prompt(_) => InputKind::Prompt,
Input::Peer(p) => match &p.convention {
Some(PeerConvention::Message) | None => InputKind::PeerMessage,
Some(PeerConvention::Request { .. }) => InputKind::PeerRequest,
Some(PeerConvention::ResponseProgress { .. }) => InputKind::PeerResponseProgress,
Some(PeerConvention::ResponseTerminal { .. }) => InputKind::PeerResponseTerminal,
},
Input::FlowStep(_) => InputKind::FlowStep,
Input::ExternalEvent(_) => InputKind::ExternalEvent,
Input::Continuation(_) => InputKind::Continuation,
Input::Operation(_) => InputKind::Operation,
}
}
pub fn kind_id(&self) -> KindId {
KindId::new(self.kind())
}
pub fn handling_mode(&self) -> Option<HandlingMode> {
match self {
Input::Prompt(prompt) => prompt.turn_metadata.as_ref()?.handling_mode,
Input::FlowStep(flow_step) => flow_step.turn_metadata.as_ref()?.handling_mode,
Input::ExternalEvent(event) => Some(event.handling_mode),
Input::Continuation(continuation) => Some(continuation.handling_mode),
Input::Peer(peer) => peer.handling_mode,
Input::Operation(_) => None,
}
}
}
fn migrate_legacy_payload_blocks(event: &mut ExternalEventInput) -> Result<(), BlobStoreError> {
let Some(obj) = event.payload.as_object_mut() else {
return Ok(());
};
let Some(blocks_value) = obj.remove("blocks") else {
return Ok(());
};
if event.blocks.is_some() {
return Ok(());
}
let blocks = serde_json::from_value::<Vec<meerkat_core::types::ContentBlock>>(blocks_value)
.map_err(|err| {
BlobStoreError::Internal(format!("failed to decode payload blocks: {err}"))
})?;
event.blocks = Some(blocks);
Ok(())
}
pub async fn externalize_input_images(
blob_store: &dyn BlobStore,
input: &mut Input,
) -> Result<(), BlobStoreError> {
match input {
Input::Prompt(prompt) => {
if let Some(blocks) = prompt.blocks.as_mut() {
externalize_content_blocks(blob_store, blocks).await?;
}
}
Input::Peer(peer) => {
if let Some(blocks) = peer.blocks.as_mut() {
externalize_content_blocks(blob_store, blocks).await?;
}
}
Input::FlowStep(flow_step) => {
if let Some(blocks) = flow_step.blocks.as_mut() {
externalize_content_blocks(blob_store, blocks).await?;
}
}
Input::ExternalEvent(event) => {
migrate_legacy_payload_blocks(event)?;
if let Some(blocks) = event.blocks.as_mut() {
externalize_content_blocks(blob_store, blocks).await?;
}
}
Input::Continuation(_) | Input::Operation(_) => {}
}
Ok(())
}
pub async fn hydrate_input_images(
blob_store: &dyn BlobStore,
input: &mut Input,
missing_behavior: MissingBlobBehavior,
) -> Result<(), BlobStoreError> {
match input {
Input::Prompt(prompt) => {
if let Some(blocks) = prompt.blocks.as_mut() {
hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
}
}
Input::Peer(peer) => {
if let Some(blocks) = peer.blocks.as_mut() {
hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
}
}
Input::FlowStep(flow_step) => {
if let Some(blocks) = flow_step.blocks.as_mut() {
hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
}
}
Input::ExternalEvent(event) => {
migrate_legacy_payload_blocks(event)?;
if let Some(blocks) = event.blocks.as_mut() {
hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
}
}
Input::Continuation(_) | Input::Operation(_) => {}
}
Ok(())
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PromptInput {
pub header: InputHeader,
pub text: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub typed_turn_appends: Vec<ConversationAppend>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub turn_metadata: Option<RuntimeTurnMetadata>,
}
impl PromptInput {
pub fn new(text: impl Into<String>, turn_metadata: Option<RuntimeTurnMetadata>) -> Self {
Self {
header: InputHeader {
id: meerkat_core::lifecycle::InputId::new(),
timestamp: chrono::Utc::now(),
source: InputOrigin::Operator,
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
text: text.into(),
blocks: None,
typed_turn_appends: Vec::new(),
turn_metadata,
}
}
pub fn from_content_input(
input: meerkat_core::types::ContentInput,
turn_metadata: Option<RuntimeTurnMetadata>,
) -> Self {
let text = input.text_content();
let blocks = if input.has_images() {
Some(input.into_blocks())
} else {
None
};
Self {
header: InputHeader {
id: meerkat_core::lifecycle::InputId::new(),
timestamp: chrono::Utc::now(),
source: InputOrigin::Operator,
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
text,
blocks,
typed_turn_appends: Vec::new(),
turn_metadata,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerInput {
pub header: InputHeader,
#[serde(skip_serializing_if = "Option::is_none")]
pub convention: Option<PeerConvention>,
pub body: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub payload: Option<serde_json::Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub handling_mode: Option<HandlingMode>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "convention_type", rename_all = "snake_case")]
#[non_exhaustive]
pub enum PeerConvention {
Message,
Request { request_id: String, intent: String },
ResponseProgress {
request_id: String,
phase: ResponseProgressPhase,
},
ResponseTerminal {
request_id: String,
status: ResponseTerminalStatus,
},
}
pub type ResponseProgressPhase = PeerResponseProgressProjectionPhase;
pub type ResponseTerminalStatus = PeerResponseTerminalProjectionStatus;
pub fn response_terminal_status_from_wire(
status: meerkat_contracts::PeerResponseTerminalStatusWire,
) -> ResponseTerminalStatus {
match status {
meerkat_contracts::PeerResponseTerminalStatusWire::Completed => {
PeerResponseTerminalProjectionStatus::Completed
}
meerkat_contracts::PeerResponseTerminalStatusWire::Failed => {
PeerResponseTerminalProjectionStatus::Failed
}
meerkat_contracts::PeerResponseTerminalStatusWire::Cancelled => {
PeerResponseTerminalProjectionStatus::Cancelled
}
}
}
pub fn peer_response_terminal_input(
peer_id: meerkat_core::comms::PeerId,
display_name: Option<meerkat_core::comms::PeerName>,
request_id: meerkat_core::PeerCorrelationId,
status: meerkat_contracts::PeerResponseTerminalStatusWire,
result: serde_json::Value,
) -> Input {
let correlation_id = CorrelationId::from_uuid(request_id.as_uuid());
let request_id = request_id.to_string();
let peer_id = peer_id.to_string();
let display_identity = display_name.map_or_else(|| peer_id.clone(), |name| name.as_string());
Input::Peer(PeerInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::Peer {
peer_id,
display_identity: Some(display_identity),
runtime_id: None,
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: Some(correlation_id),
},
convention: Some(PeerConvention::ResponseTerminal {
request_id,
status: response_terminal_status_from_wire(status),
}),
body: String::new(),
payload: Some(result),
blocks: None,
handling_mode: None,
})
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlowStepInput {
pub header: InputHeader,
pub step_id: String,
pub instructions: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub turn_metadata: Option<RuntimeTurnMetadata>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExternalEventInput {
pub header: InputHeader,
pub event_type: String,
pub payload: serde_json::Value,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
#[serde(default)]
pub handling_mode: HandlingMode,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub render_metadata: Option<RenderMetadata>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContinuationInput {
pub header: InputHeader,
pub reason: String,
#[serde(default)]
pub handling_mode: HandlingMode,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub request_id: Option<String>,
}
impl ContinuationInput {
pub fn detached_background_op_completed() -> Self {
Self {
header: InputHeader {
id: meerkat_core::lifecycle::InputId::new(),
timestamp: chrono::Utc::now(),
source: InputOrigin::System,
durability: InputDurability::Derived,
visibility: InputVisibility {
transcript_eligible: false,
operator_eligible: false,
},
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
reason: "detached_background_op_completed".to_string(),
handling_mode: HandlingMode::Steer,
request_id: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OperationInput {
pub header: InputHeader,
pub operation_id: OperationId,
pub event: OpEvent,
}
pub(crate) fn peer_projection_from_peer_input(
peer: &PeerInput,
) -> Option<PeerConversationProjection> {
peer_projection_from_peer_input_with_id(peer, peer_canonical_id(peer)?.as_str())
}
fn peer_projection_from_peer_input_with_id(
peer: &PeerInput,
peer_id: &str,
) -> Option<PeerConversationProjection> {
let peer_id = peer_id.to_string();
match &peer.convention {
Some(PeerConvention::Message) => Some(PeerConversationProjection::Message { peer_id }),
Some(PeerConvention::Request { request_id, intent }) => {
let peer_id = match meerkat_core::comms::PeerId::parse(peer_id.as_str()) {
Ok(peer_id) => peer_id,
Err(error) => {
tracing::warn!(
peer_id,
error = %error,
"dropping peer request projection with non-canonical peer_id"
);
return None;
}
};
Some(PeerConversationProjection::Request {
peer_id,
display_name: peer_display_label(peer),
request_id: request_id.clone(),
intent: intent.clone(),
payload: peer.payload.clone(),
})
}
Some(PeerConvention::ResponseProgress { request_id, phase }) => {
Some(PeerConversationProjection::ResponseProgress {
peer_id,
request_id: request_id.clone(),
phase: *phase,
payload: peer.payload.clone(),
})
}
Some(PeerConvention::ResponseTerminal { .. }) => None,
None => None,
}
}
pub(crate) fn peer_response_terminal_fact(
peer: &PeerInput,
) -> Result<Option<PeerResponseTerminalFact>, PeerResponseTerminalFactError> {
let InputOrigin::Peer {
peer_id,
display_identity,
runtime_id,
} = &peer.header.source
else {
return Ok(None);
};
let Some(PeerConvention::ResponseTerminal { request_id, status }) = &peer.convention else {
return Ok(None);
};
let transport_identity = runtime_id
.as_ref()
.map(ToString::to_string)
.map(PeerResponseTerminalTransportIdentity::parse)
.transpose()?;
let source = PeerResponseTerminalSource::new(
transport_identity,
PeerResponseTerminalRouteIdentity::parse(peer_id.clone())?,
PeerResponseTerminalDisplayIdentity::parse(
display_identity
.as_ref()
.ok_or(PeerResponseTerminalFactError::MissingDisplayIdentity)?
.clone(),
)?,
);
Ok(Some(PeerResponseTerminalFact::new(
source,
PeerResponseTerminalCorrelationId::parse(request_id)?,
*status,
PeerResponseTerminalRenderPayload::new(peer.payload.clone()),
)))
}
pub(crate) fn validate_peer_response_terminal_fact(
input: &Input,
) -> Result<(), PeerResponseTerminalFactError> {
let Input::Peer(peer) = input else {
return Ok(());
};
peer_response_terminal_fact(peer).map(|_| ())
}
#[cfg(test)]
pub(crate) fn peer_projection(input: &Input) -> Option<PeerConversationProjection> {
let Input::Peer(peer) = input else {
return None;
};
peer_projection_from_peer_input(peer)
}
fn peer_canonical_id(peer: &PeerInput) -> Option<String> {
let InputOrigin::Peer { peer_id, .. } = &peer.header.source else {
return None;
};
Some(peer_id.clone())
}
fn peer_display_label(peer: &PeerInput) -> Option<String> {
let InputOrigin::Peer {
display_identity, ..
} = &peer.header.source
else {
return None;
};
display_identity
.as_ref()
.map(|label| label.trim())
.filter(|label| !label.is_empty())
.map(ToOwned::to_owned)
}
pub(crate) fn peer_prompt_text(peer: &PeerInput) -> String {
peer_projection_from_peer_input(peer)
.map(|projection| {
let prompt = projection.prompt_text();
if prompt.is_empty() {
peer.body.clone()
} else {
prompt
}
})
.unwrap_or_else(|| peer.body.clone())
}
pub(crate) fn input_prompt_text(input: &Input) -> String {
match input {
Input::Prompt(p) => p.text.clone(),
Input::Peer(p) => peer_prompt_text(p),
Input::FlowStep(f) => f.instructions.clone(),
Input::ExternalEvent(e) => external_event_projection_text(e),
Input::Continuation(continuation) => format!("[Continuation] {}", continuation.reason),
Input::Operation(operation) => {
format!(
"[Operation {}] {:?}",
operation.operation_id, operation.event
)
}
}
}
fn external_event_projection_text(event: &ExternalEventInput) -> String {
let source_name = match &event.header.source {
InputOrigin::External { source_name } if !source_name.trim().is_empty() => {
source_name.as_str()
}
_ => event.event_type.as_str(),
};
let body = event
.payload
.get("body")
.and_then(serde_json::Value::as_str)
.map(str::trim);
meerkat_core::interaction::format_external_event_projection(source_name, body)
}
fn peer_notice_renderable(peer: &PeerInput) -> Option<CoreRenderable> {
let (peer_id, display_name) = match &peer.header.source {
InputOrigin::Peer {
peer_id,
display_identity,
..
} => (peer_id.clone(), display_identity.clone()),
_ => return None,
};
let (kind, request_id, intent, status) = match &peer.convention {
Some(PeerConvention::Message) | None => ("message", None, None, None),
Some(PeerConvention::Request { request_id, intent }) => (
"request",
Some(request_id.clone()),
Some(intent.clone()),
None,
),
Some(PeerConvention::ResponseProgress { request_id, phase }) => (
"response_progress",
Some(request_id.clone()),
None,
Some(format!("{phase:?}")),
),
Some(PeerConvention::ResponseTerminal { request_id, status }) => (
"response_terminal",
Some(request_id.clone()),
None,
Some(format!("{status:?}")),
),
};
let summary = match kind {
"request" => intent.as_ref().map_or_else(
|| "Peer request".to_string(),
|intent| format!("Peer request: {intent}"),
),
"response_progress" => "Peer response progress".to_string(),
"response_terminal" => "Peer response terminal".to_string(),
_ => "Peer message".to_string(),
};
let content = if let Some(blocks) = peer.blocks.clone() {
let body_already_in_blocks = blocks.iter().any(|block| {
matches!(block, meerkat_core::types::ContentBlock::Text { text } if text.trim() == peer.body.trim())
});
if peer.body.trim().is_empty() || body_already_in_blocks {
blocks
} else {
let mut content = vec![meerkat_core::types::ContentBlock::Text {
text: peer.body.clone(),
}];
content.extend(blocks);
content
}
} else if peer.body.is_empty() {
Vec::new()
} else {
vec![meerkat_core::types::ContentBlock::Text {
text: peer.body.clone(),
}]
};
Some(CoreRenderable::SystemNotice {
kind: SystemNoticeKind::Comms,
body: Some(summary.clone()),
blocks: vec![SystemNoticeBlock::Comms {
kind: kind.to_string(),
direction: SystemNoticeDirection::Incoming,
peer: Some(SystemNoticePeer {
id: peer_id,
display_name,
}),
request_id,
intent,
status,
summary: Some(summary),
payload: peer.payload.clone(),
content,
}],
})
}
fn external_event_notice_renderable(event: &ExternalEventInput) -> CoreRenderable {
let source = match &event.header.source {
InputOrigin::External { source_name } if !source_name.trim().is_empty() => {
source_name.clone()
}
_ => event.event_type.clone(),
};
let body = event
.payload
.get("body")
.and_then(serde_json::Value::as_str)
.map(str::trim)
.filter(|body| !body.is_empty())
.map(ToOwned::to_owned);
let summary = body.as_ref().map_or_else(
|| format!("External event via {source}"),
std::clone::Clone::clone,
);
CoreRenderable::SystemNotice {
kind: SystemNoticeKind::ExternalEvent,
body: Some(summary.clone()),
blocks: vec![SystemNoticeBlock::ExternalEvent {
source,
event_type: event.event_type.clone(),
summary: Some(summary),
body,
payload: Some(event.payload.clone()),
content: event.blocks.clone().unwrap_or_default(),
}],
}
}
fn input_to_append(input: &Input) -> Option<ConversationAppend> {
if matches!(
input,
Input::Peer(PeerInput {
convention: Some(PeerConvention::ResponseTerminal { .. }),
blocks: None,
..
})
) {
return None;
}
let (role, content) = match input {
Input::Prompt(p)
if !p.typed_turn_appends.is_empty()
&& p.text.trim().is_empty()
&& p.blocks.as_ref().is_none_or(Vec::is_empty) =>
{
return None;
}
Input::Prompt(p) if p.blocks.is_some() => (
ConversationAppendRole::User,
CoreRenderable::Blocks {
blocks: p.blocks.clone().unwrap_or_default(),
},
),
Input::Prompt(_) => (
ConversationAppendRole::User,
CoreRenderable::Text {
text: input_prompt_text(input),
},
),
Input::Peer(p) => peer_notice_renderable(p)
.map(|content| (ConversationAppendRole::SystemNotice, content))?,
Input::FlowStep(f) => (
ConversationAppendRole::SystemNotice,
CoreRenderable::SystemNotice {
kind: SystemNoticeKind::Generic,
body: Some(format!("Flow step {}", f.step_id)),
blocks: vec![SystemNoticeBlock::RuntimeNotice {
category: "flow_step".to_string(),
detail: Some(f.instructions.clone()),
payload: None,
}],
},
),
Input::ExternalEvent(e) => (
ConversationAppendRole::SystemNotice,
external_event_notice_renderable(e),
),
Input::Continuation(_) | Input::Operation(_) => return None,
};
Some(ConversationAppend { role, content })
}
fn input_to_context_append(input: &Input) -> Option<ConversationContextAppend> {
let (projection, content) = match input {
Input::Peer(peer) => {
let projection = peer_projection_from_peer_input(peer)?;
let content = peer_notice_renderable(peer)?;
(projection, content)
}
_ => return None,
};
Some(ConversationContextAppend {
key: projection.context_key()?,
content,
})
}
fn peer_response_terminal_context_append(
peer: &PeerInput,
) -> Result<Option<ConversationContextAppend>, PeerResponseTerminalFactError> {
let Some(fact) = peer_response_terminal_fact(peer)? else {
return Ok(None);
};
Ok(Some(ConversationContextAppend {
key: fact.context_key(),
content: CoreRenderable::SystemNotice {
kind: SystemNoticeKind::Comms,
body: Some("Peer terminal response context".to_string()),
blocks: vec![SystemNoticeBlock::Comms {
kind: "response_terminal".to_string(),
direction: SystemNoticeDirection::Incoming,
peer: Some(SystemNoticePeer {
id: fact.source.route_identity.to_string(),
display_name: Some(fact.source.display_identity.to_string()),
}),
request_id: Some(fact.correlation_id.to_string()),
intent: None,
status: Some(
match fact.status {
PeerResponseTerminalProjectionStatus::Completed => "completed",
PeerResponseTerminalProjectionStatus::Failed => "failed",
PeerResponseTerminalProjectionStatus::Cancelled => "cancelled",
}
.to_string(),
),
summary: Some("Peer terminal response".to_string()),
payload: fact.render_payload.as_ref().cloned(),
content: Vec::new(),
}],
},
}))
}
pub(crate) fn runtime_input_projection(
input: &Input,
) -> crate::ingress_types::RuntimeInputProjection {
crate::ingress_types::RuntimeInputProjection {
append: input_to_append(input),
additional_appends: match input {
Input::Prompt(prompt) => prompt.typed_turn_appends.clone(),
_ => Vec::new(),
},
context_append: input_to_context_append(input),
}
}
pub(crate) fn runtime_input_projection_for_machine_batch(
input: &Input,
) -> crate::ingress_types::RuntimeInputProjection {
let mut projection = runtime_input_projection(input);
if let Input::Peer(peer) = input
&& let Ok(Some(context_append)) = peer_response_terminal_context_append(peer)
{
projection.context_append = Some(context_append);
}
projection
}
pub(crate) fn context_append_to_pending_system_context_append(
append: &ConversationContextAppend,
) -> meerkat_core::PendingSystemContextAppend {
let text = render_core_context_for_pending_system_context(&append.content);
meerkat_core::PendingSystemContextAppend {
text,
source: Some(append.key.clone()),
idempotency_key: Some(append.key.clone()),
accepted_at: meerkat_core::time_compat::SystemTime::now(),
}
}
pub(crate) fn projection_to_pending_system_context_appends(
input_id: &InputId,
projection: &crate::ingress_types::RuntimeInputProjection,
) -> Vec<meerkat_core::PendingSystemContextAppend> {
if let Some(append) = projection.context_append.as_ref() {
return std::iter::once(context_append_to_pending_system_context_append(append))
.filter(|append| !append.text.trim().is_empty())
.collect();
}
projection
.append
.as_ref()
.map(|append| {
let key = format!("runtime:steer:{input_id}");
meerkat_core::PendingSystemContextAppend {
text: render_core_context_for_pending_system_context(&append.content),
source: Some(key.clone()),
idempotency_key: Some(key),
accepted_at: meerkat_core::time_compat::SystemTime::now(),
}
})
.into_iter()
.filter(|append| !append.text.trim().is_empty())
.collect()
}
fn render_core_context_for_pending_system_context(content: &CoreRenderable) -> String {
match content {
CoreRenderable::Text { text } => text.clone(),
CoreRenderable::Blocks { blocks } => meerkat_core::types::text_content(blocks),
CoreRenderable::Json { value } => {
serde_json::to_string_pretty(value).unwrap_or_else(|_| value.to_string())
}
CoreRenderable::Reference { uri, label } => match label {
Some(label) => format!("{label}: {uri}"),
None => uri.clone(),
},
CoreRenderable::SystemNotice { kind, body, blocks } => {
meerkat_core::types::SystemNoticeMessage::with_blocks(
*kind,
body.clone(),
blocks.clone(),
)
.model_projection_text()
}
_ => String::new(),
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
use chrono::Utc;
fn make_header() -> InputHeader {
InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::Operator,
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
}
}
fn typed_runtime_notice_append(detail: &str) -> ConversationAppend {
ConversationAppend {
role: ConversationAppendRole::SystemNotice,
content: CoreRenderable::SystemNotice {
kind: meerkat_core::types::SystemNoticeKind::Generic,
body: Some(detail.to_string()),
blocks: vec![meerkat_core::types::SystemNoticeBlock::RuntimeNotice {
category: "test".to_string(),
detail: Some(detail.to_string()),
payload: None,
}],
},
}
}
#[test]
fn prompt_input_serde() {
let input = Input::Prompt(PromptInput {
header: make_header(),
text: "hello".into(),
blocks: None,
typed_turn_appends: Vec::new(),
turn_metadata: None,
});
let json = serde_json::to_value(&input).unwrap();
assert_eq!(json["input_type"], "prompt");
let parsed: Input = serde_json::from_value(json).unwrap();
assert!(matches!(parsed, Input::Prompt(_)));
}
#[test]
fn prompt_input_typed_turn_appends_project_without_user_text() {
let append = typed_runtime_notice_append("peer delivery");
let input = Input::Prompt(PromptInput {
header: make_header(),
text: String::new(),
blocks: None,
typed_turn_appends: vec![append.clone()],
turn_metadata: None,
});
let projection = runtime_input_projection(&input);
assert!(
projection.append.is_none(),
"empty runtime-authored prompt carrier must not synthesize a user append"
);
assert_eq!(projection.additional_appends, vec![append]);
}
#[test]
fn prompt_input_typed_turn_appends_serde_roundtrip() {
let append = typed_runtime_notice_append("typed appends persist");
let input = Input::Prompt(PromptInput {
header: make_header(),
text: String::new(),
blocks: None,
typed_turn_appends: vec![append.clone()],
turn_metadata: None,
});
let json = serde_json::to_value(&input).unwrap();
let parsed: Input = serde_json::from_value(json).unwrap();
let Input::Prompt(prompt) = parsed else {
panic!("expected prompt input");
};
assert_eq!(prompt.text, "");
assert_eq!(prompt.typed_turn_appends, vec![append]);
}
#[test]
fn peer_input_message_serde() {
let input = Input::Peer(PeerInput {
header: make_header(),
convention: Some(PeerConvention::Message),
body: "hi there".into(),
payload: None,
blocks: None,
handling_mode: None,
});
let json = serde_json::to_value(&input).unwrap();
assert_eq!(json["input_type"], "peer");
let parsed: Input = serde_json::from_value(json).unwrap();
assert!(matches!(parsed, Input::Peer(_)));
}
#[test]
fn peer_message_blocks_preserve_typed_comms_content_without_prefix_injection() {
let mut header = make_header();
header.source = InputOrigin::Peer {
peer_id: "canonical-peer-id".into(),
display_identity: Some("display-agent".into()),
runtime_id: None,
};
let input = Input::Peer(PeerInput {
header,
convention: Some(PeerConvention::Message),
body: "caption".into(),
payload: None,
blocks: Some(vec![
meerkat_core::types::ContentBlock::Text {
text: "caption".into(),
},
meerkat_core::types::ContentBlock::Image {
media_type: "image/png".into(),
data: "abc".into(),
},
]),
handling_mode: None,
});
let Input::Peer(peer) = &input else {
panic!("expected peer input");
};
assert_eq!(
peer_projection_from_peer_input(peer)
.and_then(|projection| projection.block_prefix_text())
.as_deref(),
Some("Peer message from canonical-peer-id")
);
let projection = runtime_input_projection(&input);
let append = projection.append.expect("conversation append");
let CoreRenderable::SystemNotice { blocks, .. } = append.content else {
panic!("expected typed system notice");
};
let Some(meerkat_core::types::SystemNoticeBlock::Comms { content, peer, .. }) =
blocks.first()
else {
panic!("expected comms block");
};
assert_eq!(
peer.as_ref().and_then(|peer| peer.display_name.as_deref()),
Some("display-agent")
);
assert_eq!(
content.first(),
Some(&meerkat_core::types::ContentBlock::Text {
text: "caption".into()
})
);
}
#[test]
fn peer_response_terminal_context_is_deferred_to_machine_batch_projection() {
let route_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f2";
let request_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f1";
let mut header = make_header();
header.source = InputOrigin::Peer {
peer_id: route_id.into(),
display_identity: Some("display-agent".into()),
runtime_id: None,
};
let input = Input::Peer(PeerInput {
header,
convention: Some(PeerConvention::ResponseTerminal {
request_id: request_id.into(),
status: ResponseTerminalStatus::Completed,
}),
body: "legacy response body".into(),
payload: Some(serde_json::json!({"answer":"ok"})),
blocks: None,
handling_mode: None,
});
let Input::Peer(peer) = &input else {
panic!("expected peer input");
};
let expected_canonical_key = format!("peer_response_terminal:{route_id}:{request_id}");
assert!(
peer_projection_from_peer_input(peer).is_none(),
"terminal peer response projection must not be built before machine batch selection"
);
let projection = runtime_input_projection(&input);
assert!(
projection.context_append.is_none(),
"admission projection must not store terminal peer response context"
);
let projection = runtime_input_projection_for_machine_batch(&input);
let context = projection.context_append.expect("context append");
assert_eq!(context.key, expected_canonical_key);
let CoreRenderable::SystemNotice { blocks, .. } = context.content else {
panic!("expected typed context");
};
let Some(meerkat_core::types::SystemNoticeBlock::Comms { peer, .. }) = blocks.first()
else {
panic!("expected comms block");
};
assert_eq!(
peer.as_ref().and_then(|peer| peer.display_name.as_deref()),
Some("display-agent")
);
assert_eq!(peer.as_ref().map(|peer| peer.id.as_str()), Some(route_id));
}
#[test]
fn steer_projection_uses_context_append_as_pending_system_context() {
let input_id = InputId::new();
let projection = crate::ingress_types::RuntimeInputProjection {
append: Some(ConversationAppend {
role: ConversationAppendRole::SystemNotice,
content: CoreRenderable::Text {
text: "ordinary append must lose to context append".into(),
},
}),
additional_appends: Vec::new(),
context_append: Some(ConversationContextAppend {
key: "peer_response_terminal:peer:req".into(),
content: CoreRenderable::Text {
text: "terminal response is ready".into(),
},
}),
};
let appends = projection_to_pending_system_context_appends(&input_id, &projection);
assert_eq!(appends.len(), 1);
assert_eq!(appends[0].text, "terminal response is ready");
assert_eq!(
appends[0].source.as_deref(),
Some("peer_response_terminal:peer:req")
);
assert_eq!(
appends[0].idempotency_key.as_deref(),
Some("peer_response_terminal:peer:req")
);
}
#[test]
fn steer_projection_falls_back_to_ordinary_peer_append() {
let mut header = make_header();
header.source = InputOrigin::Peer {
peer_id: "peer-a".into(),
display_identity: Some("Peer A".into()),
runtime_id: None,
};
let input = Input::Peer(PeerInput {
header,
convention: Some(PeerConvention::Message),
body: "please look at this while you work".into(),
payload: None,
blocks: None,
handling_mode: Some(HandlingMode::Steer),
});
let input_id = input.id().clone();
let projection = runtime_input_projection(&input);
let appends = projection_to_pending_system_context_appends(&input_id, &projection);
assert_eq!(appends.len(), 1);
assert!(
appends[0]
.text
.contains("please look at this while you work"),
"peer message append should be renderable as live system context: {:?}",
appends[0].text
);
assert_eq!(
appends[0].source.as_deref(),
Some(format!("runtime:steer:{input_id}").as_str())
);
assert_eq!(
appends[0].idempotency_key.as_deref(),
Some(format!("runtime:steer:{input_id}").as_str())
);
}
#[test]
fn steer_projection_filters_empty_context_and_empty_append() {
let input_id = InputId::new();
let context_projection = crate::ingress_types::RuntimeInputProjection {
append: None,
additional_appends: Vec::new(),
context_append: Some(ConversationContextAppend {
key: "empty-context".into(),
content: CoreRenderable::Text { text: " ".into() },
}),
};
assert!(
projection_to_pending_system_context_appends(&input_id, &context_projection).is_empty()
);
let append_projection = crate::ingress_types::RuntimeInputProjection {
append: Some(ConversationAppend {
role: ConversationAppendRole::SystemNotice,
content: CoreRenderable::Text { text: "\n".into() },
}),
additional_appends: Vec::new(),
context_append: None,
};
assert!(
projection_to_pending_system_context_appends(&input_id, &append_projection).is_empty()
);
}
#[test]
fn peer_response_terminal_with_blocks_projects_append_and_context() {
let route_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f2";
let request_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f1";
let mut header = make_header();
header.source = InputOrigin::Peer {
peer_id: route_id.into(),
display_identity: Some("display-agent".into()),
runtime_id: None,
};
let input = Input::Peer(PeerInput {
header,
convention: Some(PeerConvention::ResponseTerminal {
request_id: request_id.into(),
status: ResponseTerminalStatus::Completed,
}),
body: String::new(),
payload: Some(serde_json::json!({"answer":"ok"})),
blocks: Some(vec![meerkat_core::types::ContentBlock::Image {
media_type: "image/jpeg".into(),
data: "abc".into(),
}]),
handling_mode: None,
});
let projection = runtime_input_projection_for_machine_batch(&input);
let append = projection.append.expect("conversation append");
let CoreRenderable::SystemNotice { blocks, .. } = append.content else {
panic!("expected typed append");
};
let Some(meerkat_core::types::SystemNoticeBlock::Comms { content, peer, .. }) =
blocks.first()
else {
panic!("expected comms block");
};
assert_eq!(
peer.as_ref().and_then(|peer| peer.display_name.as_deref()),
Some("display-agent")
);
assert!(matches!(
content.first(),
Some(meerkat_core::types::ContentBlock::Image { media_type, .. })
if media_type == "image/jpeg"
));
assert!(
projection.context_append.is_some(),
"terminal response must still apply runtime-owned context"
);
}
#[test]
fn peer_input_request_serde() {
let input = Input::Peer(PeerInput {
header: make_header(),
convention: Some(PeerConvention::Request {
request_id: "req-1".into(),
intent: "mob.peer_added".into(),
}),
body: "Agent joined".into(),
payload: Some(serde_json::json!({"name": "agent-1"})),
blocks: None,
handling_mode: None,
});
let json = serde_json::to_value(&input).unwrap();
let parsed: Input = serde_json::from_value(json).unwrap();
if let Input::Peer(p) = parsed {
assert!(matches!(p.convention, Some(PeerConvention::Request { .. })));
} else {
panic!("Expected PeerInput");
}
}
#[test]
fn peer_input_response_terminal_serde() {
let input = Input::Peer(PeerInput {
header: make_header(),
convention: Some(PeerConvention::ResponseTerminal {
request_id: "req-1".into(),
status: ResponseTerminalStatus::Completed,
}),
body: "Done".into(),
payload: Some(serde_json::json!({"ok": true})),
blocks: None,
handling_mode: None,
});
let json = serde_json::to_value(&input).unwrap();
let parsed: Input = serde_json::from_value(json).unwrap();
assert!(matches!(parsed, Input::Peer(_)));
}
#[test]
fn peer_input_response_progress_serde() {
let input = Input::Peer(PeerInput {
header: make_header(),
convention: Some(PeerConvention::ResponseProgress {
request_id: "req-1".into(),
phase: ResponseProgressPhase::InProgress,
}),
body: "Working...".into(),
payload: Some(serde_json::json!({"progress": "working"})),
blocks: None,
handling_mode: None,
});
let json = serde_json::to_value(&input).unwrap();
let parsed: Input = serde_json::from_value(json).unwrap();
assert!(matches!(parsed, Input::Peer(_)));
}
#[test]
fn flow_step_input_serde() {
let input = Input::FlowStep(FlowStepInput {
header: make_header(),
step_id: "step-1".into(),
instructions: "analyze the data".into(),
blocks: Some(vec![
meerkat_core::types::ContentBlock::Text {
text: "analyze the data".into(),
},
meerkat_core::types::ContentBlock::Image {
media_type: "image/png".into(),
data: meerkat_core::types::ImageData::Inline {
data: "abc123".into(),
},
},
]),
turn_metadata: None,
});
let json = serde_json::to_value(&input).unwrap();
assert_eq!(json["input_type"], "flow_step");
let parsed: Input = serde_json::from_value(json).unwrap();
assert!(matches!(parsed, Input::FlowStep(_)));
}
#[test]
fn external_event_input_serde() {
let input = Input::ExternalEvent(ExternalEventInput {
header: make_header(),
event_type: "webhook.received".into(),
payload: serde_json::json!({"url": "https://example.com"}),
blocks: Some(vec![
meerkat_core::types::ContentBlock::Text {
text: "look".into(),
},
meerkat_core::types::ContentBlock::Image {
media_type: "image/png".into(),
data: meerkat_core::types::ImageData::Inline {
data: "abc123".into(),
},
},
]),
handling_mode: HandlingMode::Queue,
render_metadata: None,
});
let json = serde_json::to_value(&input).unwrap();
assert_eq!(json["input_type"], "external_event");
let parsed: Input = serde_json::from_value(json).unwrap();
assert!(matches!(parsed, Input::ExternalEvent(_)));
}
#[test]
fn legacy_external_event_payload_blocks_migrate_to_canonical_blocks_owner() {
let mut input = Input::ExternalEvent(ExternalEventInput {
header: make_header(),
event_type: "webhook.received".into(),
payload: serde_json::json!({
"body": "see image",
"blocks": [
{ "type": "text", "text": "caption text" },
{ "type": "image", "media_type": "image/png", "source": "inline", "data": "abc123" }
]
}),
blocks: None,
handling_mode: HandlingMode::Queue,
render_metadata: None,
});
match &mut input {
Input::ExternalEvent(event) => {
migrate_legacy_payload_blocks(event).unwrap();
assert!(event.payload.get("blocks").is_none());
assert_eq!(event.payload["body"], "see image");
assert_eq!(event.blocks.as_ref().map(Vec::len), Some(2));
}
other => panic!("Expected ExternalEvent, got {other:?}"),
}
}
#[test]
fn continuation_input_serde() {
let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
let json = serde_json::to_value(&input).unwrap();
assert_eq!(json["input_type"], "continuation");
let parsed: Input = serde_json::from_value(json).unwrap();
match parsed {
Input::Continuation(continuation) => {
assert_eq!(continuation.handling_mode, HandlingMode::Steer);
assert_eq!(continuation.reason, "detached_background_op_completed");
}
other => panic!("Expected Continuation, got {other:?}"),
}
}
#[test]
fn continuation_input_accepts_legacy_system_generated_tag() {
let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
let mut json = serde_json::to_value(&input).unwrap();
json["input_type"] = serde_json::Value::String("system_generated".into());
let parsed: Input = serde_json::from_value(json).unwrap();
match parsed {
Input::Continuation(continuation) => {
assert_eq!(continuation.reason, "detached_background_op_completed");
}
other => panic!("Expected Continuation, got {other:?}"),
}
}
#[test]
fn operation_input_serde() {
let input = Input::Operation(OperationInput {
header: InputHeader {
durability: InputDurability::Derived,
..make_header()
},
operation_id: OperationId::new(),
event: OpEvent::Cancelled {
id: OperationId::new(),
},
});
let json = serde_json::to_value(&input).unwrap();
assert_eq!(json["input_type"], "operation");
let parsed: Input = serde_json::from_value(json).unwrap();
assert!(matches!(parsed, Input::Operation(_)));
}
#[test]
fn operation_input_accepts_legacy_projected_tag() {
let input = Input::Operation(OperationInput {
header: InputHeader {
durability: InputDurability::Derived,
..make_header()
},
operation_id: OperationId::new(),
event: OpEvent::Cancelled {
id: OperationId::new(),
},
});
let mut json = serde_json::to_value(&input).unwrap();
json["input_type"] = serde_json::Value::String("projected".into());
let parsed: Input = serde_json::from_value(json).unwrap();
assert!(matches!(parsed, Input::Operation(_)));
}
#[test]
fn input_kind_id() {
let prompt = Input::Prompt(PromptInput {
header: make_header(),
text: "hi".into(),
blocks: None,
typed_turn_appends: Vec::new(),
turn_metadata: None,
});
assert_eq!(prompt.kind(), InputKind::Prompt);
let peer_msg = Input::Peer(PeerInput {
header: make_header(),
convention: Some(PeerConvention::Message),
body: "hi".into(),
payload: None,
blocks: None,
handling_mode: None,
});
assert_eq!(peer_msg.kind(), InputKind::PeerMessage);
let peer_req = Input::Peer(PeerInput {
header: make_header(),
convention: Some(PeerConvention::Request {
request_id: "r".into(),
intent: "i".into(),
}),
body: "hi".into(),
payload: Some(serde_json::json!({"subject": "x"})),
blocks: None,
handling_mode: None,
});
assert_eq!(peer_req.kind(), InputKind::PeerRequest);
let continuation = Input::Continuation(ContinuationInput {
header: make_header(),
reason: "continue".into(),
handling_mode: HandlingMode::Steer,
request_id: None,
});
assert_eq!(continuation.kind(), InputKind::Continuation);
let operation = Input::Operation(OperationInput {
header: make_header(),
operation_id: OperationId::new(),
event: OpEvent::Cancelled {
id: OperationId::new(),
},
});
assert_eq!(operation.kind(), InputKind::Operation);
}
#[test]
fn input_source_variants() {
let sources = vec![
InputOrigin::Operator,
InputOrigin::Peer {
peer_id: "p1".into(),
display_identity: None,
runtime_id: None,
},
InputOrigin::Flow {
flow_id: "f1".into(),
step_index: 0,
},
InputOrigin::System,
InputOrigin::External {
source_name: "webhook".into(),
},
];
for source in sources {
let json = serde_json::to_value(&source).unwrap();
let parsed: InputOrigin = serde_json::from_value(json).unwrap();
assert_eq!(source, parsed);
}
}
#[test]
fn input_durability_serde() {
for d in [
InputDurability::Durable,
InputDurability::Ephemeral,
InputDurability::Derived,
] {
let json = serde_json::to_value(d).unwrap();
let parsed: InputDurability = serde_json::from_value(json).unwrap();
assert_eq!(d, parsed);
}
}
#[test]
fn peer_input_without_handling_mode_deserializes_as_none() {
let json = serde_json::json!({
"input_type": "peer",
"header": serde_json::to_value(make_header()).unwrap(),
"convention": { "convention_type": "message" },
"body": "hello"
});
let parsed: Input = serde_json::from_value(json).unwrap();
match parsed {
Input::Peer(p) => assert!(p.handling_mode.is_none()),
other => panic!("Expected Peer, got {other:?}"),
}
}
#[test]
fn peer_input_with_queue_handling_mode_roundtrips() {
let input = Input::Peer(PeerInput {
header: make_header(),
convention: Some(PeerConvention::Message),
body: "hi".into(),
payload: None,
blocks: None,
handling_mode: Some(HandlingMode::Queue),
});
let json = serde_json::to_value(&input).unwrap();
assert_eq!(json["handling_mode"], "queue");
let parsed: Input = serde_json::from_value(json).unwrap();
match parsed {
Input::Peer(p) => assert_eq!(p.handling_mode, Some(HandlingMode::Queue)),
other => panic!("Expected Peer, got {other:?}"),
}
}
#[test]
fn peer_response_terminal_input_owns_wire_status_mapping() {
let peer_id = meerkat_core::comms::PeerId::from_uuid(
uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000161").unwrap(),
);
let display_name = meerkat_core::comms::PeerName::new("analyst").unwrap();
let request_id = meerkat_core::PeerCorrelationId::from_uuid(
uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000162").unwrap(),
);
let input = peer_response_terminal_input(
peer_id,
Some(display_name),
request_id,
meerkat_contracts::PeerResponseTerminalStatusWire::Cancelled,
serde_json::json!({"ok": false}),
);
match input {
Input::Peer(PeerInput {
header:
InputHeader {
source:
InputOrigin::Peer {
peer_id,
display_identity,
runtime_id,
},
durability: InputDurability::Durable,
correlation_id,
..
},
convention: Some(PeerConvention::ResponseTerminal { request_id, status }),
payload: Some(payload),
handling_mode: None,
..
}) => {
assert_eq!(peer_id, "00000000-0000-4000-8000-000000000161");
assert_eq!(display_identity.as_deref(), Some("analyst"));
assert_eq!(runtime_id, None);
assert_eq!(request_id, "00000000-0000-4000-8000-000000000162");
assert_eq!(
correlation_id,
Some(CorrelationId::from_uuid(
uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000162").unwrap()
))
);
assert_eq!(status, ResponseTerminalStatus::Cancelled);
assert_eq!(payload["ok"], false);
}
other => panic!("expected terminal peer input, got {other:?}"),
}
}
#[test]
fn peer_input_with_steer_handling_mode_roundtrips() {
let input = Input::Peer(PeerInput {
header: make_header(),
convention: Some(PeerConvention::Message),
body: "hi".into(),
payload: None,
blocks: None,
handling_mode: Some(HandlingMode::Steer),
});
let json = serde_json::to_value(&input).unwrap();
assert_eq!(json["handling_mode"], "steer");
let parsed: Input = serde_json::from_value(json).unwrap();
match parsed {
Input::Peer(p) => assert_eq!(p.handling_mode, Some(HandlingMode::Steer)),
other => panic!("Expected Peer, got {other:?}"),
}
}
#[test]
fn peer_input_handling_mode_not_serialized_when_none() {
let input = Input::Peer(PeerInput {
header: make_header(),
convention: Some(PeerConvention::Message),
body: "hi".into(),
payload: None,
blocks: None,
handling_mode: None,
});
let json = serde_json::to_value(&input).unwrap();
assert!(json.get("handling_mode").is_none());
}
}