Skip to main content

meerkat_runtime/
input.rs

1//! §8 Input types — the 6 input variants accepted by the runtime layer.
2//!
3//! Core never sees these. The runtime's policy table resolves each Input
4//! to a PolicyDecision, then the runtime translates accepted Inputs into
5//! RunPrimitive for core consumption.
6
7use chrono::{DateTime, Utc};
8use meerkat_core::lifecycle::InputId;
9use meerkat_core::lifecycle::run_primitive::{
10    ConversationAppend, ConversationAppendRole, ConversationContextAppend, CoreRenderable,
11    RuntimeTurnMetadata,
12};
13use meerkat_core::ops::{OpEvent, OperationId};
14use meerkat_core::types::HandlingMode;
15use meerkat_core::{
16    BlobStore, BlobStoreError, MissingBlobBehavior, PeerConversationProjection,
17    PeerResponseProgressProjectionPhase, PeerResponseTerminalCorrelationId,
18    PeerResponseTerminalDisplayIdentity, PeerResponseTerminalFact, PeerResponseTerminalFactError,
19    PeerResponseTerminalProjectionStatus, PeerResponseTerminalRenderPayload,
20    PeerResponseTerminalRouteIdentity, PeerResponseTerminalSource,
21    PeerResponseTerminalTransportIdentity, externalize_content_blocks, hydrate_content_blocks,
22};
23use serde::{Deserialize, Serialize};
24
25use crate::identifiers::{
26    CorrelationId, IdempotencyKey, InputKind, KindId, LogicalRuntimeId, SupersessionKey,
27};
28use meerkat_core::types::RenderMetadata;
29
30/// Common header for all input variants.
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct InputHeader {
33    /// Unique ID for this input.
34    pub id: InputId,
35    /// When the input was created.
36    pub timestamp: DateTime<Utc>,
37    /// Source of the input.
38    pub source: InputOrigin,
39    /// Durability requirement.
40    pub durability: InputDurability,
41    /// Visibility controls.
42    pub visibility: InputVisibility,
43    /// Optional idempotency key for dedup.
44    #[serde(skip_serializing_if = "Option::is_none")]
45    pub idempotency_key: Option<IdempotencyKey>,
46    /// Optional supersession key.
47    #[serde(skip_serializing_if = "Option::is_none")]
48    pub supersession_key: Option<SupersessionKey>,
49    /// Optional correlation ID.
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub correlation_id: Option<CorrelationId>,
52}
53
54/// Where the input originated.
55#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
56#[serde(tag = "type", rename_all = "snake_case")]
57#[non_exhaustive]
58pub enum InputOrigin {
59    /// Human operator / external API caller.
60    Operator,
61    /// Peer agent (comms).
62    Peer {
63        /// Canonical comms peer id used by machine/runtime policy and schema
64        /// projection.
65        peer_id: String,
66        /// Optional display/source label admitted at peer ingress. This is
67        /// presentation metadata only; routing and trust must use typed ingress
68        /// facts before the runtime input seam or the canonical `peer_id`.
69        #[serde(default, skip_serializing_if = "Option::is_none")]
70        display_identity: Option<String>,
71        #[serde(skip_serializing_if = "Option::is_none")]
72        runtime_id: Option<LogicalRuntimeId>,
73    },
74    /// Flow engine (mob orchestration).
75    Flow { flow_id: String, step_index: usize },
76    /// System-generated (compaction, projection, etc.).
77    System,
78    /// External event source.
79    External { source_name: String },
80}
81
82/// Durability requirement for an input.
83#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
84#[serde(rename_all = "snake_case")]
85#[non_exhaustive]
86pub enum InputDurability {
87    /// Must be persisted before acknowledgment.
88    Durable,
89    /// In-memory only, may be lost on crash.
90    Ephemeral,
91    /// Derived from other inputs (can be reconstructed).
92    Derived,
93}
94
95/// Visibility controls for an input.
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
97pub struct InputVisibility {
98    /// Whether this input appears in the conversation transcript.
99    pub transcript_eligible: bool,
100    /// Whether this input is visible to operator surfaces.
101    pub operator_eligible: bool,
102}
103
104impl Default for InputVisibility {
105    fn default() -> Self {
106        Self {
107            transcript_eligible: true,
108            operator_eligible: true,
109        }
110    }
111}
112
113/// The 6 input variants accepted by the runtime layer.
114#[derive(Debug, Clone, Serialize, Deserialize)]
115#[serde(tag = "input_type", rename_all = "snake_case")]
116#[non_exhaustive]
117pub enum Input {
118    /// User/operator prompt.
119    Prompt(PromptInput),
120    /// Peer-originated input (comms).
121    Peer(PeerInput),
122    /// Flow step input (mob orchestration).
123    FlowStep(FlowStepInput),
124    /// External event input.
125    ExternalEvent(ExternalEventInput),
126    /// Explicit runtime continuation work.
127    #[serde(alias = "system_generated")]
128    Continuation(ContinuationInput),
129    /// Explicit non-content operation/lifecycle input.
130    #[serde(alias = "projected")]
131    Operation(OperationInput),
132}
133
134impl Input {
135    /// Get the input header.
136    pub fn header(&self) -> &InputHeader {
137        match self {
138            Input::Prompt(i) => &i.header,
139            Input::Peer(i) => &i.header,
140            Input::FlowStep(i) => &i.header,
141            Input::ExternalEvent(i) => &i.header,
142            Input::Continuation(i) => &i.header,
143            Input::Operation(i) => &i.header,
144        }
145    }
146
147    /// Get the input ID.
148    pub fn id(&self) -> &InputId {
149        &self.header().id
150    }
151
152    /// Typed kind for policy dispatch.
153    pub fn kind(&self) -> InputKind {
154        match self {
155            Input::Prompt(_) => InputKind::Prompt,
156            Input::Peer(p) => match &p.convention {
157                Some(PeerConvention::Message) | None => InputKind::PeerMessage,
158                Some(PeerConvention::Request { .. }) => InputKind::PeerRequest,
159                Some(PeerConvention::ResponseProgress { .. }) => InputKind::PeerResponseProgress,
160                Some(PeerConvention::ResponseTerminal { .. }) => InputKind::PeerResponseTerminal,
161            },
162            Input::FlowStep(_) => InputKind::FlowStep,
163            Input::ExternalEvent(_) => InputKind::ExternalEvent,
164            Input::Continuation(_) => InputKind::Continuation,
165            Input::Operation(_) => InputKind::Operation,
166        }
167    }
168
169    /// Wrapped kind identifier (for newtype-discipline call sites).
170    pub fn kind_id(&self) -> KindId {
171        KindId::new(self.kind())
172    }
173
174    /// Handling-mode hint for ordinary work admitted through the runtime.
175    pub fn handling_mode(&self) -> Option<HandlingMode> {
176        match self {
177            Input::Prompt(prompt) => prompt.turn_metadata.as_ref()?.handling_mode,
178            Input::FlowStep(flow_step) => flow_step.turn_metadata.as_ref()?.handling_mode,
179            Input::ExternalEvent(event) => Some(event.handling_mode),
180            Input::Continuation(continuation) => Some(continuation.handling_mode),
181            Input::Peer(peer) => peer.handling_mode,
182            Input::Operation(_) => None,
183        }
184    }
185}
186
187fn migrate_legacy_payload_blocks(event: &mut ExternalEventInput) -> Result<(), BlobStoreError> {
188    let Some(obj) = event.payload.as_object_mut() else {
189        return Ok(());
190    };
191    let Some(blocks_value) = obj.remove("blocks") else {
192        return Ok(());
193    };
194    if event.blocks.is_some() {
195        return Ok(());
196    }
197    let blocks = serde_json::from_value::<Vec<meerkat_core::types::ContentBlock>>(blocks_value)
198        .map_err(|err| {
199            BlobStoreError::Internal(format!("failed to decode payload blocks: {err}"))
200        })?;
201    event.blocks = Some(blocks);
202    Ok(())
203}
204
205pub async fn externalize_input_images(
206    blob_store: &dyn BlobStore,
207    input: &mut Input,
208) -> Result<(), BlobStoreError> {
209    match input {
210        Input::Prompt(prompt) => {
211            if let Some(blocks) = prompt.blocks.as_mut() {
212                externalize_content_blocks(blob_store, blocks).await?;
213            }
214        }
215        Input::Peer(peer) => {
216            if let Some(blocks) = peer.blocks.as_mut() {
217                externalize_content_blocks(blob_store, blocks).await?;
218            }
219        }
220        Input::FlowStep(flow_step) => {
221            if let Some(blocks) = flow_step.blocks.as_mut() {
222                externalize_content_blocks(blob_store, blocks).await?;
223            }
224        }
225        Input::ExternalEvent(event) => {
226            migrate_legacy_payload_blocks(event)?;
227            if let Some(blocks) = event.blocks.as_mut() {
228                externalize_content_blocks(blob_store, blocks).await?;
229            }
230        }
231        Input::Continuation(_) | Input::Operation(_) => {}
232    }
233    Ok(())
234}
235
236pub async fn hydrate_input_images(
237    blob_store: &dyn BlobStore,
238    input: &mut Input,
239    missing_behavior: MissingBlobBehavior,
240) -> Result<(), BlobStoreError> {
241    match input {
242        Input::Prompt(prompt) => {
243            if let Some(blocks) = prompt.blocks.as_mut() {
244                hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
245            }
246        }
247        Input::Peer(peer) => {
248            if let Some(blocks) = peer.blocks.as_mut() {
249                hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
250            }
251        }
252        Input::FlowStep(flow_step) => {
253            if let Some(blocks) = flow_step.blocks.as_mut() {
254                hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
255            }
256        }
257        Input::ExternalEvent(event) => {
258            migrate_legacy_payload_blocks(event)?;
259            if let Some(blocks) = event.blocks.as_mut() {
260                hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
261            }
262        }
263        Input::Continuation(_) | Input::Operation(_) => {}
264    }
265    Ok(())
266}
267
268/// User/operator prompt input.
269#[derive(Debug, Clone, Serialize, Deserialize)]
270pub struct PromptInput {
271    pub header: InputHeader,
272    /// The prompt text.
273    pub text: String,
274    /// Optional multimodal content blocks. When present, `text` serves as the
275    /// text projection (backwards compat), and `blocks` carries the full content.
276    #[serde(default, skip_serializing_if = "Option::is_none")]
277    pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
278    #[serde(default, skip_serializing_if = "Option::is_none")]
279    pub turn_metadata: Option<RuntimeTurnMetadata>,
280}
281
282impl PromptInput {
283    /// Create a new operator prompt with default header.
284    pub fn new(text: impl Into<String>, turn_metadata: Option<RuntimeTurnMetadata>) -> Self {
285        Self {
286            header: InputHeader {
287                id: meerkat_core::lifecycle::InputId::new(),
288                timestamp: chrono::Utc::now(),
289                source: InputOrigin::Operator,
290                durability: InputDurability::Durable,
291                visibility: InputVisibility::default(),
292                idempotency_key: None,
293                supersession_key: None,
294                correlation_id: None,
295            },
296            text: text.into(),
297            blocks: None,
298            turn_metadata,
299        }
300    }
301
302    /// Create a multimodal prompt from `ContentInput`.
303    pub fn from_content_input(
304        input: meerkat_core::types::ContentInput,
305        turn_metadata: Option<RuntimeTurnMetadata>,
306    ) -> Self {
307        let text = input.text_content();
308        let blocks = if input.has_images() {
309            Some(input.into_blocks())
310        } else {
311            None
312        };
313        Self {
314            header: InputHeader {
315                id: meerkat_core::lifecycle::InputId::new(),
316                timestamp: chrono::Utc::now(),
317                source: InputOrigin::Operator,
318                durability: InputDurability::Durable,
319                visibility: InputVisibility::default(),
320                idempotency_key: None,
321                supersession_key: None,
322                correlation_id: None,
323            },
324            text,
325            blocks,
326            turn_metadata,
327        }
328    }
329}
330
331/// Peer-originated input from comms.
332#[derive(Debug, Clone, Serialize, Deserialize)]
333pub struct PeerInput {
334    pub header: InputHeader,
335    /// The peer convention (message, request, response).
336    #[serde(skip_serializing_if = "Option::is_none")]
337    pub convention: Option<PeerConvention>,
338    /// Legacy textual body for this peer input.
339    ///
340    /// Message-style peer traffic uses this directly. Request/response prompt
341    /// projection is runtime-owned and must be reconstructed from
342    /// `convention + payload + source` rather than helper-rendered prose.
343    pub body: String,
344    /// Structured peer payload, when one exists.
345    ///
346    /// For `Request`, this is the request params. For `Response*`, this is the
347    /// response result payload. Message traffic leaves this unset.
348    #[serde(default, skip_serializing_if = "Option::is_none")]
349    pub payload: Option<serde_json::Value>,
350    /// Optional multimodal content blocks. When present, `body` serves as the
351    /// text projection (backwards compat), and `blocks` carries the full content.
352    #[serde(default, skip_serializing_if = "Option::is_none")]
353    pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
354    /// Optional handling-mode override for actionable peer inputs.
355    /// When present on Message/Request/no-convention, overrides kind-based
356    /// policy defaults. Forbidden on ResponseProgress and ResponseTerminal
357    /// (enforced by [`validate_peer_handling_mode`]).
358    #[serde(default, skip_serializing_if = "Option::is_none")]
359    pub handling_mode: Option<HandlingMode>,
360}
361
362/// Peer communication conventions.
363#[derive(Debug, Clone, Serialize, Deserialize)]
364#[serde(tag = "convention_type", rename_all = "snake_case")]
365#[non_exhaustive]
366pub enum PeerConvention {
367    /// Simple peer-to-peer message.
368    Message,
369    /// Request expecting a response.
370    Request { request_id: String, intent: String },
371    /// Progress update for an ongoing response.
372    ResponseProgress {
373        request_id: String,
374        phase: ResponseProgressPhase,
375    },
376    /// Terminal response (completed or failed).
377    ResponseTerminal {
378        request_id: String,
379        status: ResponseTerminalStatus,
380    },
381}
382
383/// Phase of a response progress update. This is the core projection enum, not
384/// a runtime-local duplicate.
385pub type ResponseProgressPhase = PeerResponseProgressProjectionPhase;
386
387/// Terminal status of a response. This is the core projection enum, not a
388/// runtime-local duplicate.
389pub type ResponseTerminalStatus = PeerResponseTerminalProjectionStatus;
390
391pub fn response_terminal_status_from_wire(
392    status: meerkat_contracts::PeerResponseTerminalStatusWire,
393) -> ResponseTerminalStatus {
394    match status {
395        meerkat_contracts::PeerResponseTerminalStatusWire::Completed => {
396            PeerResponseTerminalProjectionStatus::Completed
397        }
398        meerkat_contracts::PeerResponseTerminalStatusWire::Failed => {
399            PeerResponseTerminalProjectionStatus::Failed
400        }
401        meerkat_contracts::PeerResponseTerminalStatusWire::Cancelled => {
402            PeerResponseTerminalProjectionStatus::Cancelled
403        }
404    }
405}
406
407pub fn peer_response_terminal_input(
408    peer_id: meerkat_core::comms::PeerId,
409    display_name: Option<meerkat_core::comms::PeerName>,
410    request_id: meerkat_core::PeerCorrelationId,
411    status: meerkat_contracts::PeerResponseTerminalStatusWire,
412    result: serde_json::Value,
413) -> Input {
414    let correlation_id = CorrelationId::from_uuid(request_id.as_uuid());
415    let request_id = request_id.to_string();
416    let peer_id = peer_id.to_string();
417    let display_identity = display_name.map_or_else(|| peer_id.clone(), |name| name.as_string());
418
419    Input::Peer(PeerInput {
420        header: InputHeader {
421            id: InputId::new(),
422            timestamp: Utc::now(),
423            source: InputOrigin::Peer {
424                peer_id,
425                display_identity: Some(display_identity),
426                runtime_id: None,
427            },
428            durability: InputDurability::Durable,
429            visibility: InputVisibility::default(),
430            idempotency_key: None,
431            supersession_key: None,
432            correlation_id: Some(correlation_id),
433        },
434        convention: Some(PeerConvention::ResponseTerminal {
435            request_id,
436            status: response_terminal_status_from_wire(status),
437        }),
438        body: String::new(),
439        payload: Some(result),
440        blocks: None,
441        handling_mode: None,
442    })
443}
444
445/// Flow step input from mob orchestration.
446#[derive(Debug, Clone, Serialize, Deserialize)]
447pub struct FlowStepInput {
448    pub header: InputHeader,
449    /// Flow step identifier.
450    pub step_id: String,
451    /// Step instructions/prompt.
452    pub instructions: String,
453    /// Optional multimodal content blocks. When present, `instructions` serves
454    /// as the text projection (backwards compat), and `blocks` carries the
455    /// full content.
456    #[serde(default, skip_serializing_if = "Option::is_none")]
457    pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
458    #[serde(default, skip_serializing_if = "Option::is_none")]
459    pub turn_metadata: Option<RuntimeTurnMetadata>,
460}
461
462/// External event input.
463#[derive(Debug, Clone, Serialize, Deserialize)]
464pub struct ExternalEventInput {
465    pub header: InputHeader,
466    /// Event type/name.
467    pub event_type: String,
468    /// Event payload. Uses `Value` because the runtime layer may inspect/merge
469    /// payloads during coalescing and projection — not a pure pass-through.
470    /// Multimodal content does NOT live here canonically; use `blocks`.
471    pub payload: serde_json::Value,
472    /// Optional multimodal blocks carried by the external event. This is the
473    /// canonical owner for multimodal external-event content.
474    #[serde(default, skip_serializing_if = "Option::is_none")]
475    pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
476    /// Runtime-owned handling hint for this external event.
477    #[serde(default)]
478    pub handling_mode: HandlingMode,
479    /// Optional normalized render metadata carried with the event.
480    #[serde(default, skip_serializing_if = "Option::is_none")]
481    pub render_metadata: Option<RenderMetadata>,
482}
483
484/// Explicit continuation request that asks the runtime to keep draining
485/// ordinary work after a boundary-local event (for example, terminal peer
486/// responses injected into session state).
487#[derive(Debug, Clone, Serialize, Deserialize)]
488pub struct ContinuationInput {
489    pub header: InputHeader,
490    /// Stable reason for the continuation request.
491    pub reason: String,
492    /// Ordinary-work handling mode for the continuation.
493    #[serde(default)]
494    pub handling_mode: HandlingMode,
495    /// Optional request/correlation handle tied to the continuation.
496    #[serde(default, skip_serializing_if = "Option::is_none")]
497    pub request_id: Option<String>,
498}
499
500impl ContinuationInput {
501    /// Build a continuation for waking an idle session after a detached
502    /// background operation reaches terminal state.
503    ///
504    /// Properties: `Derived` durability, invisible to transcript and operator,
505    /// `System` origin, `Steer` handling mode.
506    pub fn detached_background_op_completed() -> Self {
507        Self {
508            header: InputHeader {
509                id: meerkat_core::lifecycle::InputId::new(),
510                timestamp: chrono::Utc::now(),
511                source: InputOrigin::System,
512                durability: InputDurability::Derived,
513                visibility: InputVisibility {
514                    transcript_eligible: false,
515                    operator_eligible: false,
516                },
517                idempotency_key: None,
518                supersession_key: None,
519                correlation_id: None,
520            },
521            reason: "detached_background_op_completed".to_string(),
522            handling_mode: HandlingMode::Steer,
523            request_id: None,
524        }
525    }
526}
527
528/// Explicit operation/lifecycle input admitted through runtime instead of
529/// being smuggled through transcript projections or peer-only paths.
530#[derive(Debug, Clone, Serialize, Deserialize)]
531pub struct OperationInput {
532    pub header: InputHeader,
533    /// Stable operation identifier.
534    pub operation_id: OperationId,
535    /// Typed lifecycle event for the operation.
536    pub event: OpEvent,
537}
538
539/// Build the core-owned peer conversation projection for a runtime peer input.
540///
541/// Peer-response terminal context projection is deliberately excluded here:
542/// admission must not store it as pre-machine truth. Runtime-loop batch
543/// construction uses [`runtime_input_projection_for_machine_batch`] after the
544/// machine-selected input is dequeued.
545pub(crate) fn peer_projection_from_peer_input(
546    peer: &PeerInput,
547) -> Option<PeerConversationProjection> {
548    peer_projection_from_peer_input_with_id(peer, peer_canonical_id(peer)?.as_str())
549}
550
551fn peer_projection_from_peer_input_with_id(
552    peer: &PeerInput,
553    peer_id: &str,
554) -> Option<PeerConversationProjection> {
555    let peer_id = peer_id.to_string();
556
557    match &peer.convention {
558        Some(PeerConvention::Message) => Some(PeerConversationProjection::Message { peer_id }),
559        Some(PeerConvention::Request { request_id, intent }) => {
560            let peer_id = match meerkat_core::comms::PeerId::parse(peer_id.as_str()) {
561                Ok(peer_id) => peer_id,
562                Err(error) => {
563                    tracing::warn!(
564                        peer_id,
565                        error = %error,
566                        "dropping peer request projection with non-canonical peer_id"
567                    );
568                    return None;
569                }
570            };
571            Some(PeerConversationProjection::Request {
572                peer_id,
573                display_name: peer_display_label(peer),
574                request_id: request_id.clone(),
575                intent: intent.clone(),
576                payload: peer.payload.clone(),
577            })
578        }
579        Some(PeerConvention::ResponseProgress { request_id, phase }) => {
580            Some(PeerConversationProjection::ResponseProgress {
581                peer_id,
582                request_id: request_id.clone(),
583                phase: *phase,
584                payload: peer.payload.clone(),
585            })
586        }
587        Some(PeerConvention::ResponseTerminal { .. }) => None,
588        None => None,
589    }
590}
591
592pub(crate) fn peer_response_terminal_fact(
593    peer: &PeerInput,
594) -> Result<Option<PeerResponseTerminalFact>, PeerResponseTerminalFactError> {
595    let InputOrigin::Peer {
596        peer_id,
597        display_identity,
598        runtime_id,
599    } = &peer.header.source
600    else {
601        return Ok(None);
602    };
603    let Some(PeerConvention::ResponseTerminal { request_id, status }) = &peer.convention else {
604        return Ok(None);
605    };
606
607    let transport_identity = runtime_id
608        .as_ref()
609        .map(ToString::to_string)
610        .map(PeerResponseTerminalTransportIdentity::parse)
611        .transpose()?;
612    let source = PeerResponseTerminalSource::new(
613        transport_identity,
614        PeerResponseTerminalRouteIdentity::parse(peer_id.clone())?,
615        PeerResponseTerminalDisplayIdentity::parse(
616            display_identity
617                .as_ref()
618                .ok_or(PeerResponseTerminalFactError::MissingDisplayIdentity)?
619                .clone(),
620        )?,
621    );
622    Ok(Some(PeerResponseTerminalFact::new(
623        source,
624        PeerResponseTerminalCorrelationId::parse(request_id)?,
625        *status,
626        PeerResponseTerminalRenderPayload::new(peer.payload.clone()),
627    )))
628}
629
630pub(crate) fn validate_peer_response_terminal_fact(
631    input: &Input,
632) -> Result<(), PeerResponseTerminalFactError> {
633    let Input::Peer(peer) = input else {
634        return Ok(());
635    };
636    peer_response_terminal_fact(peer).map(|_| ())
637}
638
639/// Lift an [`Input`] to its core peer projection when it is a peer input with a
640/// peer-origin header.
641#[cfg(test)]
642pub(crate) fn peer_projection(input: &Input) -> Option<PeerConversationProjection> {
643    let Input::Peer(peer) = input else {
644        return None;
645    };
646    peer_projection_from_peer_input(peer)
647}
648
649fn peer_canonical_id(peer: &PeerInput) -> Option<String> {
650    let InputOrigin::Peer { peer_id, .. } = &peer.header.source else {
651        return None;
652    };
653    Some(peer_id.clone())
654}
655
656fn peer_display_label(peer: &PeerInput) -> Option<String> {
657    let InputOrigin::Peer {
658        display_identity, ..
659    } = &peer.header.source
660    else {
661        return None;
662    };
663
664    display_identity
665        .as_ref()
666        .map(|label| label.trim())
667        .filter(|label| !label.is_empty())
668        .map(ToOwned::to_owned)
669}
670
671/// Rendered prompt-text projection for a peer input.
672pub(crate) fn peer_prompt_text(peer: &PeerInput) -> String {
673    peer_projection_from_peer_input(peer)
674        .map(|projection| {
675            let prompt = projection.prompt_text();
676            if prompt.is_empty() {
677                peer.body.clone()
678            } else {
679                prompt
680            }
681        })
682        .unwrap_or_else(|| peer.body.clone())
683}
684
685/// Optional block prefix for peer message inputs.
686pub(crate) fn peer_block_prefix_text(peer: &PeerInput) -> Option<String> {
687    if matches!(peer.convention, Some(PeerConvention::Message))
688        && let Some(prefix) = rendered_message_prefix(&peer.body)
689    {
690        return Some(prefix);
691    }
692
693    peer_projection_from_peer_input(peer).and_then(|projection| projection.block_prefix_text())
694}
695
696fn rendered_message_prefix(body: &str) -> Option<String> {
697    let prefix = body.lines().next()?.trim();
698    if prefix.starts_with("[COMMS MESSAGE from ") && prefix.ends_with(']') {
699        Some(prefix.to_string())
700    } else {
701        None
702    }
703}
704
705fn rendered_message_body_text(body: &str, prefix: &str) -> Option<String> {
706    let text = body
707        .lines()
708        .skip_while(|line| line.trim() != prefix)
709        .skip(1)
710        .map(str::trim)
711        .filter(|line| !line.is_empty() && !is_media_projection_line(line))
712        .collect::<Vec<_>>()
713        .join("\n");
714    if text.is_empty() { None } else { Some(text) }
715}
716
717fn is_media_projection_line(line: &str) -> bool {
718    (line.starts_with("[image:") || line.starts_with("[video:")) && line.ends_with(']')
719}
720
721fn blocks_include_text_projection(
722    blocks: &[meerkat_core::types::ContentBlock],
723    expected: &str,
724) -> bool {
725    let expected = expected.trim();
726    if expected.is_empty() {
727        return true;
728    }
729    if blocks
730        .iter()
731        .any(|block| matches!(block, meerkat_core::types::ContentBlock::Text { text } if text.trim() == expected))
732    {
733        return true;
734    }
735    let joined = blocks
736        .iter()
737        .filter_map(|block| match block {
738            meerkat_core::types::ContentBlock::Text { text } => Some(text.trim()),
739            _ => None,
740        })
741        .filter(|text| !text.is_empty())
742        .collect::<Vec<_>>()
743        .join("\n");
744    joined == expected
745}
746
747pub(crate) fn input_prompt_text(input: &Input) -> String {
748    match input {
749        Input::Prompt(p) => p.text.clone(),
750        Input::Peer(p) => peer_prompt_text(p),
751        Input::FlowStep(f) => f.instructions.clone(),
752        Input::ExternalEvent(e) => external_event_projection_text(e),
753        Input::Continuation(continuation) => format!("[Continuation] {}", continuation.reason),
754        Input::Operation(operation) => {
755            format!(
756                "[Operation {}] {:?}",
757                operation.operation_id, operation.event
758            )
759        }
760    }
761}
762
763fn external_event_projection_text(event: &ExternalEventInput) -> String {
764    let source_name = match &event.header.source {
765        InputOrigin::External { source_name } if !source_name.trim().is_empty() => {
766            source_name.as_str()
767        }
768        _ => event.event_type.as_str(),
769    };
770    let body = event
771        .payload
772        .get("body")
773        .and_then(serde_json::Value::as_str)
774        .map(str::trim);
775
776    meerkat_core::interaction::format_external_event_projection(source_name, body)
777}
778
779fn input_to_append(input: &Input) -> Option<ConversationAppend> {
780    if matches!(
781        input,
782        Input::Peer(PeerInput {
783            convention: Some(PeerConvention::ResponseTerminal { .. }),
784            ..
785        })
786    ) {
787        return None;
788    }
789
790    let content = match input {
791        Input::Prompt(p) if p.blocks.is_some() => CoreRenderable::Blocks {
792            blocks: p.blocks.clone().unwrap_or_default(),
793        },
794        Input::Peer(p) if p.blocks.is_some() => {
795            let raw_blocks = p.blocks.clone().unwrap_or_default();
796            if let Some(prefix) = peer_block_prefix_text(p) {
797                let mut blocks = vec![meerkat_core::types::ContentBlock::Text {
798                    text: prefix.clone(),
799                }];
800                if let Some(body_text) = rendered_message_body_text(&p.body, &prefix)
801                    && !blocks_include_text_projection(&raw_blocks, &body_text)
802                {
803                    blocks.push(meerkat_core::types::ContentBlock::Text { text: body_text });
804                }
805                blocks.extend(raw_blocks);
806                CoreRenderable::Blocks { blocks }
807            } else {
808                let body_already_in_blocks = raw_blocks.first().is_some_and(|b| {
809                    matches!(b, meerkat_core::types::ContentBlock::Text { text } if text == &p.body)
810                });
811                if p.body.is_empty() || body_already_in_blocks {
812                    CoreRenderable::Blocks { blocks: raw_blocks }
813                } else {
814                    let mut blocks = vec![meerkat_core::types::ContentBlock::Text {
815                        text: p.body.clone(),
816                    }];
817                    blocks.extend(raw_blocks);
818                    CoreRenderable::Blocks { blocks }
819                }
820            }
821        }
822        Input::FlowStep(f) if f.blocks.is_some() => CoreRenderable::Blocks {
823            blocks: f.blocks.clone().unwrap_or_default(),
824        },
825        Input::ExternalEvent(e) if e.blocks.is_some() => CoreRenderable::Blocks {
826            blocks: e.blocks.clone().unwrap_or_default(),
827        },
828        Input::Prompt(_) | Input::Peer(_) | Input::FlowStep(_) | Input::ExternalEvent(_) => {
829            CoreRenderable::Text {
830                text: input_prompt_text(input),
831            }
832        }
833        Input::Continuation(_) | Input::Operation(_) => return None,
834    };
835
836    Some(ConversationAppend {
837        role: ConversationAppendRole::User,
838        content,
839    })
840}
841
842fn input_to_context_append(input: &Input) -> Option<ConversationContextAppend> {
843    let projection = match input {
844        Input::Peer(peer) => peer_projection_from_peer_input(peer)?,
845        _ => return None,
846    };
847
848    Some(ConversationContextAppend {
849        key: projection.context_key()?,
850        content: CoreRenderable::Text {
851            text: projection.prompt_text(),
852        },
853    })
854}
855
856fn peer_response_terminal_context_append(
857    peer: &PeerInput,
858) -> Result<Option<ConversationContextAppend>, PeerResponseTerminalFactError> {
859    let Some(fact) = peer_response_terminal_fact(peer)? else {
860        return Ok(None);
861    };
862
863    Ok(Some(ConversationContextAppend {
864        key: fact.context_key(),
865        content: CoreRenderable::Text {
866            text: fact.prompt_text(),
867        },
868    }))
869}
870
871pub(crate) fn runtime_input_projection(
872    input: &Input,
873) -> crate::ingress_types::RuntimeInputProjection {
874    crate::ingress_types::RuntimeInputProjection {
875        append: input_to_append(input),
876        context_append: input_to_context_append(input),
877    }
878}
879
880pub(crate) fn runtime_input_projection_for_machine_batch(
881    input: &Input,
882) -> crate::ingress_types::RuntimeInputProjection {
883    let mut projection = runtime_input_projection(input);
884    if let Input::Peer(peer) = input
885        && let Ok(Some(context_append)) = peer_response_terminal_context_append(peer)
886    {
887        projection.context_append = Some(context_append);
888    }
889    projection
890}
891
892#[cfg(test)]
893#[allow(clippy::unwrap_used, clippy::panic)]
894mod tests {
895    use super::*;
896    use chrono::Utc;
897
898    fn make_header() -> InputHeader {
899        InputHeader {
900            id: InputId::new(),
901            timestamp: Utc::now(),
902            source: InputOrigin::Operator,
903            durability: InputDurability::Durable,
904            visibility: InputVisibility::default(),
905            idempotency_key: None,
906            supersession_key: None,
907            correlation_id: None,
908        }
909    }
910
911    #[test]
912    fn prompt_input_serde() {
913        let input = Input::Prompt(PromptInput {
914            header: make_header(),
915            text: "hello".into(),
916            blocks: None,
917            turn_metadata: None,
918        });
919        let json = serde_json::to_value(&input).unwrap();
920        assert_eq!(json["input_type"], "prompt");
921        let parsed: Input = serde_json::from_value(json).unwrap();
922        assert!(matches!(parsed, Input::Prompt(_)));
923    }
924
925    #[test]
926    fn peer_input_message_serde() {
927        let input = Input::Peer(PeerInput {
928            header: make_header(),
929            convention: Some(PeerConvention::Message),
930            body: "hi there".into(),
931            payload: None,
932            blocks: None,
933            handling_mode: None,
934        });
935        let json = serde_json::to_value(&input).unwrap();
936        assert_eq!(json["input_type"], "peer");
937        let parsed: Input = serde_json::from_value(json).unwrap();
938        assert!(matches!(parsed, Input::Peer(_)));
939    }
940
941    #[test]
942    fn peer_message_blocks_prefix_uses_rendered_display_label_not_canonical_origin() {
943        let mut header = make_header();
944        header.source = InputOrigin::Peer {
945            peer_id: "canonical-peer-id".into(),
946            display_identity: Some("display-agent".into()),
947            runtime_id: None,
948        };
949        let input = Input::Peer(PeerInput {
950            header,
951            convention: Some(PeerConvention::Message),
952            body: "[COMMS MESSAGE from display-agent]\ncaption\n[image: image/png]".into(),
953            payload: None,
954            blocks: Some(vec![
955                meerkat_core::types::ContentBlock::Text {
956                    text: "caption".into(),
957                },
958                meerkat_core::types::ContentBlock::Image {
959                    media_type: "image/png".into(),
960                    data: "abc".into(),
961                },
962            ]),
963            handling_mode: None,
964        });
965
966        let Input::Peer(peer) = &input else {
967            panic!("expected peer input");
968        };
969        assert_eq!(
970            peer_projection_from_peer_input(peer)
971                .and_then(|projection| projection.block_prefix_text())
972                .as_deref(),
973            Some("[COMMS MESSAGE from canonical-peer-id]")
974        );
975
976        let projection = runtime_input_projection(&input);
977        let append = projection.append.expect("conversation append");
978        let CoreRenderable::Blocks { blocks } = append.content else {
979            panic!("expected multimodal blocks");
980        };
981        assert_eq!(
982            blocks.first(),
983            Some(&meerkat_core::types::ContentBlock::Text {
984                text: "[COMMS MESSAGE from display-agent]".into()
985            })
986        );
987        assert!(!meerkat_core::types::text_content(&blocks).contains("canonical-peer-id"));
988    }
989
990    #[test]
991    fn peer_response_terminal_context_is_deferred_to_machine_batch_projection() {
992        let route_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f2";
993        let request_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f1";
994        let mut header = make_header();
995        header.source = InputOrigin::Peer {
996            peer_id: route_id.into(),
997            display_identity: Some("display-agent".into()),
998            runtime_id: None,
999        };
1000        let input = Input::Peer(PeerInput {
1001            header,
1002            convention: Some(PeerConvention::ResponseTerminal {
1003                request_id: request_id.into(),
1004                status: ResponseTerminalStatus::Completed,
1005            }),
1006            body: "legacy response body".into(),
1007            payload: Some(serde_json::json!({"answer":"ok"})),
1008            blocks: None,
1009            handling_mode: None,
1010        });
1011
1012        let Input::Peer(peer) = &input else {
1013            panic!("expected peer input");
1014        };
1015        let expected_canonical_key = format!("peer_response_terminal:{route_id}:{request_id}");
1016        assert!(
1017            peer_projection_from_peer_input(peer).is_none(),
1018            "terminal peer response projection must not be built before machine batch selection"
1019        );
1020
1021        let projection = runtime_input_projection(&input);
1022        assert!(
1023            projection.context_append.is_none(),
1024            "admission projection must not store terminal peer response context"
1025        );
1026        let projection = runtime_input_projection_for_machine_batch(&input);
1027        let context = projection.context_append.expect("context append");
1028        assert_eq!(context.key, expected_canonical_key);
1029        let CoreRenderable::Text { text } = context.content else {
1030            panic!("expected text context");
1031        };
1032        assert!(text.contains("from display-agent"));
1033        assert!(!text.contains(route_id));
1034    }
1035
1036    #[test]
1037    fn peer_input_request_serde() {
1038        let input = Input::Peer(PeerInput {
1039            header: make_header(),
1040            convention: Some(PeerConvention::Request {
1041                request_id: "req-1".into(),
1042                intent: "mob.peer_added".into(),
1043            }),
1044            body: "Agent joined".into(),
1045            payload: Some(serde_json::json!({"name": "agent-1"})),
1046            blocks: None,
1047            handling_mode: None,
1048        });
1049        let json = serde_json::to_value(&input).unwrap();
1050        let parsed: Input = serde_json::from_value(json).unwrap();
1051        if let Input::Peer(p) = parsed {
1052            assert!(matches!(p.convention, Some(PeerConvention::Request { .. })));
1053        } else {
1054            panic!("Expected PeerInput");
1055        }
1056    }
1057
1058    #[test]
1059    fn peer_input_response_terminal_serde() {
1060        let input = Input::Peer(PeerInput {
1061            header: make_header(),
1062            convention: Some(PeerConvention::ResponseTerminal {
1063                request_id: "req-1".into(),
1064                status: ResponseTerminalStatus::Completed,
1065            }),
1066            body: "Done".into(),
1067            payload: Some(serde_json::json!({"ok": true})),
1068            blocks: None,
1069            handling_mode: None,
1070        });
1071        let json = serde_json::to_value(&input).unwrap();
1072        let parsed: Input = serde_json::from_value(json).unwrap();
1073        assert!(matches!(parsed, Input::Peer(_)));
1074    }
1075
1076    #[test]
1077    fn peer_input_response_progress_serde() {
1078        let input = Input::Peer(PeerInput {
1079            header: make_header(),
1080            convention: Some(PeerConvention::ResponseProgress {
1081                request_id: "req-1".into(),
1082                phase: ResponseProgressPhase::InProgress,
1083            }),
1084            body: "Working...".into(),
1085            payload: Some(serde_json::json!({"progress": "working"})),
1086            blocks: None,
1087            handling_mode: None,
1088        });
1089        let json = serde_json::to_value(&input).unwrap();
1090        let parsed: Input = serde_json::from_value(json).unwrap();
1091        assert!(matches!(parsed, Input::Peer(_)));
1092    }
1093
1094    #[test]
1095    fn flow_step_input_serde() {
1096        let input = Input::FlowStep(FlowStepInput {
1097            header: make_header(),
1098            step_id: "step-1".into(),
1099            instructions: "analyze the data".into(),
1100            blocks: Some(vec![
1101                meerkat_core::types::ContentBlock::Text {
1102                    text: "analyze the data".into(),
1103                },
1104                meerkat_core::types::ContentBlock::Image {
1105                    media_type: "image/png".into(),
1106                    data: meerkat_core::types::ImageData::Inline {
1107                        data: "abc123".into(),
1108                    },
1109                },
1110            ]),
1111            turn_metadata: None,
1112        });
1113        let json = serde_json::to_value(&input).unwrap();
1114        assert_eq!(json["input_type"], "flow_step");
1115        let parsed: Input = serde_json::from_value(json).unwrap();
1116        assert!(matches!(parsed, Input::FlowStep(_)));
1117    }
1118
1119    #[test]
1120    fn external_event_input_serde() {
1121        let input = Input::ExternalEvent(ExternalEventInput {
1122            header: make_header(),
1123            event_type: "webhook.received".into(),
1124            payload: serde_json::json!({"url": "https://example.com"}),
1125            blocks: Some(vec![
1126                meerkat_core::types::ContentBlock::Text {
1127                    text: "look".into(),
1128                },
1129                meerkat_core::types::ContentBlock::Image {
1130                    media_type: "image/png".into(),
1131                    data: meerkat_core::types::ImageData::Inline {
1132                        data: "abc123".into(),
1133                    },
1134                },
1135            ]),
1136            handling_mode: HandlingMode::Queue,
1137            render_metadata: None,
1138        });
1139        let json = serde_json::to_value(&input).unwrap();
1140        assert_eq!(json["input_type"], "external_event");
1141        let parsed: Input = serde_json::from_value(json).unwrap();
1142        assert!(matches!(parsed, Input::ExternalEvent(_)));
1143    }
1144
1145    #[test]
1146    fn legacy_external_event_payload_blocks_migrate_to_canonical_blocks_owner() {
1147        let mut input = Input::ExternalEvent(ExternalEventInput {
1148            header: make_header(),
1149            event_type: "webhook.received".into(),
1150            payload: serde_json::json!({
1151                "body": "see image",
1152                "blocks": [
1153                    { "type": "text", "text": "caption text" },
1154                    { "type": "image", "media_type": "image/png", "source": "inline", "data": "abc123" }
1155                ]
1156            }),
1157            blocks: None,
1158            handling_mode: HandlingMode::Queue,
1159            render_metadata: None,
1160        });
1161
1162        match &mut input {
1163            Input::ExternalEvent(event) => {
1164                migrate_legacy_payload_blocks(event).unwrap();
1165                assert!(event.payload.get("blocks").is_none());
1166                assert_eq!(event.payload["body"], "see image");
1167                assert_eq!(event.blocks.as_ref().map(Vec::len), Some(2));
1168            }
1169            other => panic!("Expected ExternalEvent, got {other:?}"),
1170        }
1171    }
1172
1173    #[test]
1174    fn continuation_input_serde() {
1175        let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
1176        let json = serde_json::to_value(&input).unwrap();
1177        assert_eq!(json["input_type"], "continuation");
1178        let parsed: Input = serde_json::from_value(json).unwrap();
1179        match parsed {
1180            Input::Continuation(continuation) => {
1181                assert_eq!(continuation.handling_mode, HandlingMode::Steer);
1182                assert_eq!(continuation.reason, "detached_background_op_completed");
1183            }
1184            other => panic!("Expected Continuation, got {other:?}"),
1185        }
1186    }
1187
1188    #[test]
1189    fn continuation_input_accepts_legacy_system_generated_tag() {
1190        let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
1191        let mut json = serde_json::to_value(&input).unwrap();
1192        json["input_type"] = serde_json::Value::String("system_generated".into());
1193        let parsed: Input = serde_json::from_value(json).unwrap();
1194        match parsed {
1195            Input::Continuation(continuation) => {
1196                assert_eq!(continuation.reason, "detached_background_op_completed");
1197            }
1198            other => panic!("Expected Continuation, got {other:?}"),
1199        }
1200    }
1201
1202    #[test]
1203    fn operation_input_serde() {
1204        let input = Input::Operation(OperationInput {
1205            header: InputHeader {
1206                durability: InputDurability::Derived,
1207                ..make_header()
1208            },
1209            operation_id: OperationId::new(),
1210            event: OpEvent::Cancelled {
1211                id: OperationId::new(),
1212            },
1213        });
1214        let json = serde_json::to_value(&input).unwrap();
1215        assert_eq!(json["input_type"], "operation");
1216        let parsed: Input = serde_json::from_value(json).unwrap();
1217        assert!(matches!(parsed, Input::Operation(_)));
1218    }
1219
1220    #[test]
1221    fn operation_input_accepts_legacy_projected_tag() {
1222        let input = Input::Operation(OperationInput {
1223            header: InputHeader {
1224                durability: InputDurability::Derived,
1225                ..make_header()
1226            },
1227            operation_id: OperationId::new(),
1228            event: OpEvent::Cancelled {
1229                id: OperationId::new(),
1230            },
1231        });
1232        let mut json = serde_json::to_value(&input).unwrap();
1233        json["input_type"] = serde_json::Value::String("projected".into());
1234        let parsed: Input = serde_json::from_value(json).unwrap();
1235        assert!(matches!(parsed, Input::Operation(_)));
1236    }
1237
1238    #[test]
1239    fn input_kind_id() {
1240        let prompt = Input::Prompt(PromptInput {
1241            header: make_header(),
1242            text: "hi".into(),
1243            blocks: None,
1244            turn_metadata: None,
1245        });
1246        assert_eq!(prompt.kind(), InputKind::Prompt);
1247
1248        let peer_msg = Input::Peer(PeerInput {
1249            header: make_header(),
1250            convention: Some(PeerConvention::Message),
1251            body: "hi".into(),
1252            payload: None,
1253            blocks: None,
1254            handling_mode: None,
1255        });
1256        assert_eq!(peer_msg.kind(), InputKind::PeerMessage);
1257
1258        let peer_req = Input::Peer(PeerInput {
1259            header: make_header(),
1260            convention: Some(PeerConvention::Request {
1261                request_id: "r".into(),
1262                intent: "i".into(),
1263            }),
1264            body: "hi".into(),
1265            payload: Some(serde_json::json!({"subject": "x"})),
1266            blocks: None,
1267            handling_mode: None,
1268        });
1269        assert_eq!(peer_req.kind(), InputKind::PeerRequest);
1270
1271        let continuation = Input::Continuation(ContinuationInput {
1272            header: make_header(),
1273            reason: "continue".into(),
1274            handling_mode: HandlingMode::Steer,
1275            request_id: None,
1276        });
1277        assert_eq!(continuation.kind(), InputKind::Continuation);
1278
1279        let operation = Input::Operation(OperationInput {
1280            header: make_header(),
1281            operation_id: OperationId::new(),
1282            event: OpEvent::Cancelled {
1283                id: OperationId::new(),
1284            },
1285        });
1286        assert_eq!(operation.kind(), InputKind::Operation);
1287    }
1288
1289    #[test]
1290    fn input_source_variants() {
1291        let sources = vec![
1292            InputOrigin::Operator,
1293            InputOrigin::Peer {
1294                peer_id: "p1".into(),
1295                display_identity: None,
1296                runtime_id: None,
1297            },
1298            InputOrigin::Flow {
1299                flow_id: "f1".into(),
1300                step_index: 0,
1301            },
1302            InputOrigin::System,
1303            InputOrigin::External {
1304                source_name: "webhook".into(),
1305            },
1306        ];
1307        for source in sources {
1308            let json = serde_json::to_value(&source).unwrap();
1309            let parsed: InputOrigin = serde_json::from_value(json).unwrap();
1310            assert_eq!(source, parsed);
1311        }
1312    }
1313
1314    #[test]
1315    fn input_durability_serde() {
1316        for d in [
1317            InputDurability::Durable,
1318            InputDurability::Ephemeral,
1319            InputDurability::Derived,
1320        ] {
1321            let json = serde_json::to_value(d).unwrap();
1322            let parsed: InputDurability = serde_json::from_value(json).unwrap();
1323            assert_eq!(d, parsed);
1324        }
1325    }
1326
1327    #[test]
1328    fn peer_input_without_handling_mode_deserializes_as_none() {
1329        // Simulate old serialized PeerInput without the handling_mode field.
1330        let json = serde_json::json!({
1331            "input_type": "peer",
1332            "header": serde_json::to_value(make_header()).unwrap(),
1333            "convention": { "convention_type": "message" },
1334            "body": "hello"
1335        });
1336        let parsed: Input = serde_json::from_value(json).unwrap();
1337        match parsed {
1338            Input::Peer(p) => assert!(p.handling_mode.is_none()),
1339            other => panic!("Expected Peer, got {other:?}"),
1340        }
1341    }
1342
1343    #[test]
1344    fn peer_input_with_queue_handling_mode_roundtrips() {
1345        let input = Input::Peer(PeerInput {
1346            header: make_header(),
1347            convention: Some(PeerConvention::Message),
1348            body: "hi".into(),
1349            payload: None,
1350            blocks: None,
1351            handling_mode: Some(HandlingMode::Queue),
1352        });
1353        let json = serde_json::to_value(&input).unwrap();
1354        assert_eq!(json["handling_mode"], "queue");
1355        let parsed: Input = serde_json::from_value(json).unwrap();
1356        match parsed {
1357            Input::Peer(p) => assert_eq!(p.handling_mode, Some(HandlingMode::Queue)),
1358            other => panic!("Expected Peer, got {other:?}"),
1359        }
1360    }
1361
1362    #[test]
1363    fn peer_response_terminal_input_owns_wire_status_mapping() {
1364        let peer_id = meerkat_core::comms::PeerId::from_uuid(
1365            uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000161").unwrap(),
1366        );
1367        let display_name = meerkat_core::comms::PeerName::new("analyst").unwrap();
1368        let request_id = meerkat_core::PeerCorrelationId::from_uuid(
1369            uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000162").unwrap(),
1370        );
1371        let input = peer_response_terminal_input(
1372            peer_id,
1373            Some(display_name),
1374            request_id,
1375            meerkat_contracts::PeerResponseTerminalStatusWire::Cancelled,
1376            serde_json::json!({"ok": false}),
1377        );
1378
1379        match input {
1380            Input::Peer(PeerInput {
1381                header:
1382                    InputHeader {
1383                        source:
1384                            InputOrigin::Peer {
1385                                peer_id,
1386                                display_identity,
1387                                runtime_id,
1388                            },
1389                        durability: InputDurability::Durable,
1390                        correlation_id,
1391                        ..
1392                    },
1393                convention: Some(PeerConvention::ResponseTerminal { request_id, status }),
1394                payload: Some(payload),
1395                handling_mode: None,
1396                ..
1397            }) => {
1398                assert_eq!(peer_id, "00000000-0000-4000-8000-000000000161");
1399                assert_eq!(display_identity.as_deref(), Some("analyst"));
1400                assert_eq!(runtime_id, None);
1401                assert_eq!(request_id, "00000000-0000-4000-8000-000000000162");
1402                assert_eq!(
1403                    correlation_id,
1404                    Some(CorrelationId::from_uuid(
1405                        uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000162").unwrap()
1406                    ))
1407                );
1408                assert_eq!(status, ResponseTerminalStatus::Cancelled);
1409                assert_eq!(payload["ok"], false);
1410            }
1411            other => panic!("expected terminal peer input, got {other:?}"),
1412        }
1413    }
1414
1415    #[test]
1416    fn peer_input_with_steer_handling_mode_roundtrips() {
1417        let input = Input::Peer(PeerInput {
1418            header: make_header(),
1419            convention: Some(PeerConvention::Message),
1420            body: "hi".into(),
1421            payload: None,
1422            blocks: None,
1423            handling_mode: Some(HandlingMode::Steer),
1424        });
1425        let json = serde_json::to_value(&input).unwrap();
1426        assert_eq!(json["handling_mode"], "steer");
1427        let parsed: Input = serde_json::from_value(json).unwrap();
1428        match parsed {
1429            Input::Peer(p) => assert_eq!(p.handling_mode, Some(HandlingMode::Steer)),
1430            other => panic!("Expected Peer, got {other:?}"),
1431        }
1432    }
1433
1434    #[test]
1435    fn peer_input_handling_mode_not_serialized_when_none() {
1436        let input = Input::Peer(PeerInput {
1437            header: make_header(),
1438            convention: Some(PeerConvention::Message),
1439            body: "hi".into(),
1440            payload: None,
1441            blocks: None,
1442            handling_mode: None,
1443        });
1444        let json = serde_json::to_value(&input).unwrap();
1445        assert!(json.get("handling_mode").is_none());
1446    }
1447}