Skip to main content

meerkat_runtime/
input.rs

1//! §8 Input types — the 6 input variants accepted by the runtime layer.
2//!
3//! Core never sees these. The runtime's policy table resolves each Input
4//! to a PolicyDecision, then the runtime translates accepted Inputs into
5//! RunPrimitive for core consumption.
6
7use chrono::{DateTime, Utc};
8use meerkat_core::lifecycle::InputId;
9use meerkat_core::lifecycle::run_primitive::{
10    ConversationAppend, ConversationAppendRole, ConversationContextAppend, CoreRenderable,
11    RuntimeTurnMetadata,
12};
13use meerkat_core::ops::{OpEvent, OperationId};
14use meerkat_core::types::{
15    HandlingMode, SystemNoticeBlock, SystemNoticeDirection, SystemNoticeKind, SystemNoticePeer,
16};
17use meerkat_core::{
18    BlobStore, BlobStoreError, MissingBlobBehavior, PeerConversationProjection,
19    PeerResponseProgressProjectionPhase, PeerResponseTerminalCorrelationId,
20    PeerResponseTerminalDisplayIdentity, PeerResponseTerminalFact, PeerResponseTerminalFactError,
21    PeerResponseTerminalProjectionStatus, PeerResponseTerminalRenderPayload,
22    PeerResponseTerminalRouteIdentity, PeerResponseTerminalSource,
23    PeerResponseTerminalTransportIdentity, externalize_content_blocks, hydrate_content_blocks,
24};
25use serde::{Deserialize, Serialize};
26
27use crate::identifiers::{
28    CorrelationId, IdempotencyKey, InputKind, KindId, LogicalRuntimeId, SupersessionKey,
29};
30use meerkat_core::types::RenderMetadata;
31
32/// Common header for all input variants.
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct InputHeader {
35    /// Unique ID for this input.
36    pub id: InputId,
37    /// When the input was created.
38    pub timestamp: DateTime<Utc>,
39    /// Source of the input.
40    pub source: InputOrigin,
41    /// Durability requirement.
42    pub durability: InputDurability,
43    /// Visibility controls.
44    pub visibility: InputVisibility,
45    /// Optional idempotency key for dedup.
46    #[serde(skip_serializing_if = "Option::is_none")]
47    pub idempotency_key: Option<IdempotencyKey>,
48    /// Optional supersession key.
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub supersession_key: Option<SupersessionKey>,
51    /// Optional correlation ID.
52    #[serde(skip_serializing_if = "Option::is_none")]
53    pub correlation_id: Option<CorrelationId>,
54}
55
56/// Where the input originated.
57#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
58#[serde(tag = "type", rename_all = "snake_case")]
59#[non_exhaustive]
60pub enum InputOrigin {
61    /// Human operator / external API caller.
62    Operator,
63    /// Peer agent (comms).
64    Peer {
65        /// Canonical comms peer id used by machine/runtime policy and schema
66        /// projection.
67        peer_id: String,
68        /// Optional display/source label admitted at peer ingress. This is
69        /// presentation metadata only; routing and trust must use typed ingress
70        /// facts before the runtime input seam or the canonical `peer_id`.
71        #[serde(default, skip_serializing_if = "Option::is_none")]
72        display_identity: Option<String>,
73        #[serde(skip_serializing_if = "Option::is_none")]
74        runtime_id: Option<LogicalRuntimeId>,
75    },
76    /// Flow engine (mob orchestration).
77    Flow { flow_id: String, step_index: usize },
78    /// System-generated (compaction, projection, etc.).
79    System,
80    /// External event source.
81    External { source_name: String },
82}
83
84/// Durability requirement for an input.
85#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
86#[serde(rename_all = "snake_case")]
87#[non_exhaustive]
88pub enum InputDurability {
89    /// Must be persisted before acknowledgment.
90    Durable,
91    /// In-memory only, may be lost on crash.
92    Ephemeral,
93    /// Derived from other inputs (can be reconstructed).
94    Derived,
95}
96
97/// Visibility controls for an input.
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
99pub struct InputVisibility {
100    /// Whether this input appears in the conversation transcript.
101    pub transcript_eligible: bool,
102    /// Whether this input is visible to operator surfaces.
103    pub operator_eligible: bool,
104}
105
106impl Default for InputVisibility {
107    fn default() -> Self {
108        Self {
109            transcript_eligible: true,
110            operator_eligible: true,
111        }
112    }
113}
114
115/// The 6 input variants accepted by the runtime layer.
116#[derive(Debug, Clone, Serialize, Deserialize)]
117#[serde(tag = "input_type", rename_all = "snake_case")]
118#[non_exhaustive]
119pub enum Input {
120    /// User/operator prompt.
121    Prompt(PromptInput),
122    /// Peer-originated input (comms).
123    Peer(PeerInput),
124    /// Flow step input (mob orchestration).
125    FlowStep(FlowStepInput),
126    /// External event input.
127    ExternalEvent(ExternalEventInput),
128    /// Explicit runtime continuation work.
129    #[serde(alias = "system_generated")]
130    Continuation(ContinuationInput),
131    /// Explicit non-content operation/lifecycle input.
132    #[serde(alias = "projected")]
133    Operation(OperationInput),
134}
135
136impl Input {
137    /// Get the input header.
138    pub fn header(&self) -> &InputHeader {
139        match self {
140            Input::Prompt(i) => &i.header,
141            Input::Peer(i) => &i.header,
142            Input::FlowStep(i) => &i.header,
143            Input::ExternalEvent(i) => &i.header,
144            Input::Continuation(i) => &i.header,
145            Input::Operation(i) => &i.header,
146        }
147    }
148
149    /// Get the input ID.
150    pub fn id(&self) -> &InputId {
151        &self.header().id
152    }
153
154    /// Typed kind for policy dispatch.
155    pub fn kind(&self) -> InputKind {
156        match self {
157            Input::Prompt(_) => InputKind::Prompt,
158            Input::Peer(p) => match &p.convention {
159                Some(PeerConvention::Message) | None => InputKind::PeerMessage,
160                Some(PeerConvention::Request { .. }) => InputKind::PeerRequest,
161                Some(PeerConvention::ResponseProgress { .. }) => InputKind::PeerResponseProgress,
162                Some(PeerConvention::ResponseTerminal { .. }) => InputKind::PeerResponseTerminal,
163            },
164            Input::FlowStep(_) => InputKind::FlowStep,
165            Input::ExternalEvent(_) => InputKind::ExternalEvent,
166            Input::Continuation(_) => InputKind::Continuation,
167            Input::Operation(_) => InputKind::Operation,
168        }
169    }
170
171    /// Wrapped kind identifier (for newtype-discipline call sites).
172    pub fn kind_id(&self) -> KindId {
173        KindId::new(self.kind())
174    }
175
176    /// Handling-mode hint for ordinary work admitted through the runtime.
177    pub fn handling_mode(&self) -> Option<HandlingMode> {
178        match self {
179            Input::Prompt(prompt) => prompt.turn_metadata.as_ref()?.handling_mode,
180            Input::FlowStep(flow_step) => flow_step.turn_metadata.as_ref()?.handling_mode,
181            Input::ExternalEvent(event) => Some(event.handling_mode),
182            Input::Continuation(continuation) => Some(continuation.handling_mode),
183            Input::Peer(peer) => peer.handling_mode,
184            Input::Operation(_) => None,
185        }
186    }
187}
188
189fn migrate_legacy_payload_blocks(event: &mut ExternalEventInput) -> Result<(), BlobStoreError> {
190    let Some(obj) = event.payload.as_object_mut() else {
191        return Ok(());
192    };
193    let Some(blocks_value) = obj.remove("blocks") else {
194        return Ok(());
195    };
196    if event.blocks.is_some() {
197        return Ok(());
198    }
199    let blocks = serde_json::from_value::<Vec<meerkat_core::types::ContentBlock>>(blocks_value)
200        .map_err(|err| {
201            BlobStoreError::Internal(format!("failed to decode payload blocks: {err}"))
202        })?;
203    event.blocks = Some(blocks);
204    Ok(())
205}
206
207pub async fn externalize_input_images(
208    blob_store: &dyn BlobStore,
209    input: &mut Input,
210) -> Result<(), BlobStoreError> {
211    match input {
212        Input::Prompt(prompt) => {
213            if let Some(blocks) = prompt.blocks.as_mut() {
214                externalize_content_blocks(blob_store, blocks).await?;
215            }
216        }
217        Input::Peer(peer) => {
218            if let Some(blocks) = peer.blocks.as_mut() {
219                externalize_content_blocks(blob_store, blocks).await?;
220            }
221        }
222        Input::FlowStep(flow_step) => {
223            if let Some(blocks) = flow_step.blocks.as_mut() {
224                externalize_content_blocks(blob_store, blocks).await?;
225            }
226        }
227        Input::ExternalEvent(event) => {
228            migrate_legacy_payload_blocks(event)?;
229            if let Some(blocks) = event.blocks.as_mut() {
230                externalize_content_blocks(blob_store, blocks).await?;
231            }
232        }
233        Input::Continuation(_) | Input::Operation(_) => {}
234    }
235    Ok(())
236}
237
238pub async fn hydrate_input_images(
239    blob_store: &dyn BlobStore,
240    input: &mut Input,
241    missing_behavior: MissingBlobBehavior,
242) -> Result<(), BlobStoreError> {
243    match input {
244        Input::Prompt(prompt) => {
245            if let Some(blocks) = prompt.blocks.as_mut() {
246                hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
247            }
248        }
249        Input::Peer(peer) => {
250            if let Some(blocks) = peer.blocks.as_mut() {
251                hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
252            }
253        }
254        Input::FlowStep(flow_step) => {
255            if let Some(blocks) = flow_step.blocks.as_mut() {
256                hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
257            }
258        }
259        Input::ExternalEvent(event) => {
260            migrate_legacy_payload_blocks(event)?;
261            if let Some(blocks) = event.blocks.as_mut() {
262                hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
263            }
264        }
265        Input::Continuation(_) | Input::Operation(_) => {}
266    }
267    Ok(())
268}
269
270/// User/operator prompt input.
271#[derive(Debug, Clone, Serialize, Deserialize)]
272pub struct PromptInput {
273    pub header: InputHeader,
274    /// The prompt text.
275    pub text: String,
276    /// Optional multimodal content blocks. When present, `text` serves as the
277    /// text projection (backwards compat), and `blocks` carries the full content.
278    #[serde(default, skip_serializing_if = "Option::is_none")]
279    pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
280    /// Runtime-authored typed transcript appends that travel with this turn.
281    ///
282    /// These are not operator-authored prompt content. The runtime projects
283    /// them into model-facing text only when building the provider request.
284    #[serde(default, skip_serializing_if = "Vec::is_empty")]
285    pub typed_turn_appends: Vec<ConversationAppend>,
286    #[serde(default, skip_serializing_if = "Option::is_none")]
287    pub turn_metadata: Option<RuntimeTurnMetadata>,
288}
289
290impl PromptInput {
291    /// Create a new operator prompt with default header.
292    pub fn new(text: impl Into<String>, turn_metadata: Option<RuntimeTurnMetadata>) -> Self {
293        Self {
294            header: InputHeader {
295                id: meerkat_core::lifecycle::InputId::new(),
296                timestamp: chrono::Utc::now(),
297                source: InputOrigin::Operator,
298                durability: InputDurability::Durable,
299                visibility: InputVisibility::default(),
300                idempotency_key: None,
301                supersession_key: None,
302                correlation_id: None,
303            },
304            text: text.into(),
305            blocks: None,
306            typed_turn_appends: Vec::new(),
307            turn_metadata,
308        }
309    }
310
311    /// Create a multimodal prompt from `ContentInput`.
312    pub fn from_content_input(
313        input: meerkat_core::types::ContentInput,
314        turn_metadata: Option<RuntimeTurnMetadata>,
315    ) -> Self {
316        let text = input.text_content();
317        let blocks = if input.has_images() {
318            Some(input.into_blocks())
319        } else {
320            None
321        };
322        Self {
323            header: InputHeader {
324                id: meerkat_core::lifecycle::InputId::new(),
325                timestamp: chrono::Utc::now(),
326                source: InputOrigin::Operator,
327                durability: InputDurability::Durable,
328                visibility: InputVisibility::default(),
329                idempotency_key: None,
330                supersession_key: None,
331                correlation_id: None,
332            },
333            text,
334            blocks,
335            typed_turn_appends: Vec::new(),
336            turn_metadata,
337        }
338    }
339}
340
341/// Peer-originated input from comms.
342#[derive(Debug, Clone, Serialize, Deserialize)]
343pub struct PeerInput {
344    pub header: InputHeader,
345    /// The peer convention (message, request, response).
346    #[serde(skip_serializing_if = "Option::is_none")]
347    pub convention: Option<PeerConvention>,
348    /// Legacy textual body for this peer input.
349    ///
350    /// Message-style peer traffic uses this directly. Request/response prompt
351    /// projection is runtime-owned and must be reconstructed from
352    /// `convention + payload + source` rather than helper-rendered prose.
353    pub body: String,
354    /// Structured peer payload, when one exists.
355    ///
356    /// For `Request`, this is the request params. For `Response*`, this is the
357    /// response result payload. Message traffic leaves this unset.
358    #[serde(default, skip_serializing_if = "Option::is_none")]
359    pub payload: Option<serde_json::Value>,
360    /// Optional multimodal content blocks. When present, `body` serves as the
361    /// text projection (backwards compat), and `blocks` carries the full content.
362    #[serde(default, skip_serializing_if = "Option::is_none")]
363    pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
364    /// Optional handling-mode override for actionable peer inputs.
365    /// When present on Message/Request/no-convention, overrides kind-based
366    /// policy defaults. Forbidden on ResponseProgress; ResponseTerminal may
367    /// carry a typed override for requester reaction urgency (enforced by
368    /// [`validate_peer_handling_mode`]).
369    #[serde(default, skip_serializing_if = "Option::is_none")]
370    pub handling_mode: Option<HandlingMode>,
371}
372
373/// Peer communication conventions.
374#[derive(Debug, Clone, Serialize, Deserialize)]
375#[serde(tag = "convention_type", rename_all = "snake_case")]
376#[non_exhaustive]
377pub enum PeerConvention {
378    /// Simple peer-to-peer message.
379    Message,
380    /// Request expecting a response.
381    Request { request_id: String, intent: String },
382    /// Progress update for an ongoing response.
383    ResponseProgress {
384        request_id: String,
385        phase: ResponseProgressPhase,
386    },
387    /// Terminal response (completed or failed).
388    ResponseTerminal {
389        request_id: String,
390        status: ResponseTerminalStatus,
391    },
392}
393
394/// Phase of a response progress update. This is the core projection enum, not
395/// a runtime-local duplicate.
396pub type ResponseProgressPhase = PeerResponseProgressProjectionPhase;
397
398/// Terminal status of a response. This is the core projection enum, not a
399/// runtime-local duplicate.
400pub type ResponseTerminalStatus = PeerResponseTerminalProjectionStatus;
401
402pub fn response_terminal_status_from_wire(
403    status: meerkat_contracts::PeerResponseTerminalStatusWire,
404) -> ResponseTerminalStatus {
405    match status {
406        meerkat_contracts::PeerResponseTerminalStatusWire::Completed => {
407            PeerResponseTerminalProjectionStatus::Completed
408        }
409        meerkat_contracts::PeerResponseTerminalStatusWire::Failed => {
410            PeerResponseTerminalProjectionStatus::Failed
411        }
412        meerkat_contracts::PeerResponseTerminalStatusWire::Cancelled => {
413            PeerResponseTerminalProjectionStatus::Cancelled
414        }
415    }
416}
417
418pub fn peer_response_terminal_input(
419    peer_id: meerkat_core::comms::PeerId,
420    display_name: Option<meerkat_core::comms::PeerName>,
421    request_id: meerkat_core::PeerCorrelationId,
422    status: meerkat_contracts::PeerResponseTerminalStatusWire,
423    result: serde_json::Value,
424) -> Input {
425    let correlation_id = CorrelationId::from_uuid(request_id.as_uuid());
426    let request_id = request_id.to_string();
427    let peer_id = peer_id.to_string();
428    let display_identity = display_name.map_or_else(|| peer_id.clone(), |name| name.as_string());
429
430    Input::Peer(PeerInput {
431        header: InputHeader {
432            id: InputId::new(),
433            timestamp: Utc::now(),
434            source: InputOrigin::Peer {
435                peer_id,
436                display_identity: Some(display_identity),
437                runtime_id: None,
438            },
439            durability: InputDurability::Durable,
440            visibility: InputVisibility::default(),
441            idempotency_key: None,
442            supersession_key: None,
443            correlation_id: Some(correlation_id),
444        },
445        convention: Some(PeerConvention::ResponseTerminal {
446            request_id,
447            status: response_terminal_status_from_wire(status),
448        }),
449        body: String::new(),
450        payload: Some(result),
451        blocks: None,
452        handling_mode: None,
453    })
454}
455
456/// Flow step input from mob orchestration.
457#[derive(Debug, Clone, Serialize, Deserialize)]
458pub struct FlowStepInput {
459    pub header: InputHeader,
460    /// Flow step identifier.
461    pub step_id: String,
462    /// Step instructions/prompt.
463    pub instructions: String,
464    /// Optional multimodal content blocks. When present, `instructions` serves
465    /// as the text projection (backwards compat), and `blocks` carries the
466    /// full content.
467    #[serde(default, skip_serializing_if = "Option::is_none")]
468    pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
469    #[serde(default, skip_serializing_if = "Option::is_none")]
470    pub turn_metadata: Option<RuntimeTurnMetadata>,
471}
472
473/// External event input.
474#[derive(Debug, Clone, Serialize, Deserialize)]
475pub struct ExternalEventInput {
476    pub header: InputHeader,
477    /// Event type/name.
478    pub event_type: String,
479    /// Event payload. Uses `Value` because the runtime layer may inspect/merge
480    /// payloads during coalescing and projection — not a pure pass-through.
481    /// Multimodal content does NOT live here canonically; use `blocks`.
482    pub payload: serde_json::Value,
483    /// Optional multimodal blocks carried by the external event. This is the
484    /// canonical owner for multimodal external-event content.
485    #[serde(default, skip_serializing_if = "Option::is_none")]
486    pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
487    /// Runtime-owned handling hint for this external event.
488    #[serde(default)]
489    pub handling_mode: HandlingMode,
490    /// Optional normalized render metadata carried with the event.
491    #[serde(default, skip_serializing_if = "Option::is_none")]
492    pub render_metadata: Option<RenderMetadata>,
493}
494
495/// Explicit continuation request that asks the runtime to keep draining
496/// ordinary work after a boundary-local event (for example, terminal peer
497/// responses injected into session state).
498#[derive(Debug, Clone, Serialize, Deserialize)]
499pub struct ContinuationInput {
500    pub header: InputHeader,
501    /// Stable reason for the continuation request.
502    pub reason: String,
503    /// Ordinary-work handling mode for the continuation.
504    #[serde(default)]
505    pub handling_mode: HandlingMode,
506    /// Optional request/correlation handle tied to the continuation.
507    #[serde(default, skip_serializing_if = "Option::is_none")]
508    pub request_id: Option<String>,
509}
510
511impl ContinuationInput {
512    /// Build a continuation for waking an idle session after a detached
513    /// background operation reaches terminal state.
514    ///
515    /// Properties: `Derived` durability, invisible to transcript and operator,
516    /// `System` origin, `Steer` handling mode.
517    pub fn detached_background_op_completed() -> Self {
518        Self {
519            header: InputHeader {
520                id: meerkat_core::lifecycle::InputId::new(),
521                timestamp: chrono::Utc::now(),
522                source: InputOrigin::System,
523                durability: InputDurability::Derived,
524                visibility: InputVisibility {
525                    transcript_eligible: false,
526                    operator_eligible: false,
527                },
528                idempotency_key: None,
529                supersession_key: None,
530                correlation_id: None,
531            },
532            reason: "detached_background_op_completed".to_string(),
533            handling_mode: HandlingMode::Steer,
534            request_id: None,
535        }
536    }
537}
538
539/// Explicit operation/lifecycle input admitted through runtime instead of
540/// being smuggled through transcript projections or peer-only paths.
541#[derive(Debug, Clone, Serialize, Deserialize)]
542pub struct OperationInput {
543    pub header: InputHeader,
544    /// Stable operation identifier.
545    pub operation_id: OperationId,
546    /// Typed lifecycle event for the operation.
547    pub event: OpEvent,
548}
549
550/// Build the core-owned peer conversation projection for a runtime peer input.
551///
552/// Peer-response terminal context projection is deliberately excluded here:
553/// admission must not store it as pre-machine truth. Runtime-loop batch
554/// construction uses [`runtime_input_projection_for_machine_batch`] after the
555/// machine-selected input is dequeued.
556pub(crate) fn peer_projection_from_peer_input(
557    peer: &PeerInput,
558) -> Option<PeerConversationProjection> {
559    peer_projection_from_peer_input_with_id(peer, peer_canonical_id(peer)?.as_str())
560}
561
562fn peer_projection_from_peer_input_with_id(
563    peer: &PeerInput,
564    peer_id: &str,
565) -> Option<PeerConversationProjection> {
566    let peer_id = peer_id.to_string();
567
568    match &peer.convention {
569        Some(PeerConvention::Message) => Some(PeerConversationProjection::Message { peer_id }),
570        Some(PeerConvention::Request { request_id, intent }) => {
571            let peer_id = match meerkat_core::comms::PeerId::parse(peer_id.as_str()) {
572                Ok(peer_id) => peer_id,
573                Err(error) => {
574                    tracing::warn!(
575                        peer_id,
576                        error = %error,
577                        "dropping peer request projection with non-canonical peer_id"
578                    );
579                    return None;
580                }
581            };
582            Some(PeerConversationProjection::Request {
583                peer_id,
584                display_name: peer_display_label(peer),
585                request_id: request_id.clone(),
586                intent: intent.clone(),
587                payload: peer.payload.clone(),
588            })
589        }
590        Some(PeerConvention::ResponseProgress { request_id, phase }) => {
591            Some(PeerConversationProjection::ResponseProgress {
592                peer_id,
593                request_id: request_id.clone(),
594                phase: *phase,
595                payload: peer.payload.clone(),
596            })
597        }
598        Some(PeerConvention::ResponseTerminal { .. }) => None,
599        None => None,
600    }
601}
602
603pub(crate) fn peer_response_terminal_fact(
604    peer: &PeerInput,
605) -> Result<Option<PeerResponseTerminalFact>, PeerResponseTerminalFactError> {
606    let InputOrigin::Peer {
607        peer_id,
608        display_identity,
609        runtime_id,
610    } = &peer.header.source
611    else {
612        return Ok(None);
613    };
614    let Some(PeerConvention::ResponseTerminal { request_id, status }) = &peer.convention else {
615        return Ok(None);
616    };
617
618    let transport_identity = runtime_id
619        .as_ref()
620        .map(ToString::to_string)
621        .map(PeerResponseTerminalTransportIdentity::parse)
622        .transpose()?;
623    let source = PeerResponseTerminalSource::new(
624        transport_identity,
625        PeerResponseTerminalRouteIdentity::parse(peer_id.clone())?,
626        PeerResponseTerminalDisplayIdentity::parse(
627            display_identity
628                .as_ref()
629                .ok_or(PeerResponseTerminalFactError::MissingDisplayIdentity)?
630                .clone(),
631        )?,
632    );
633    Ok(Some(PeerResponseTerminalFact::new(
634        source,
635        PeerResponseTerminalCorrelationId::parse(request_id)?,
636        *status,
637        PeerResponseTerminalRenderPayload::new(peer.payload.clone()),
638    )))
639}
640
641pub(crate) fn validate_peer_response_terminal_fact(
642    input: &Input,
643) -> Result<(), PeerResponseTerminalFactError> {
644    let Input::Peer(peer) = input else {
645        return Ok(());
646    };
647    peer_response_terminal_fact(peer).map(|_| ())
648}
649
650/// Lift an [`Input`] to its core peer projection when it is a peer input with a
651/// peer-origin header.
652#[cfg(test)]
653pub(crate) fn peer_projection(input: &Input) -> Option<PeerConversationProjection> {
654    let Input::Peer(peer) = input else {
655        return None;
656    };
657    peer_projection_from_peer_input(peer)
658}
659
660fn peer_canonical_id(peer: &PeerInput) -> Option<String> {
661    let InputOrigin::Peer { peer_id, .. } = &peer.header.source else {
662        return None;
663    };
664    Some(peer_id.clone())
665}
666
667fn peer_display_label(peer: &PeerInput) -> Option<String> {
668    let InputOrigin::Peer {
669        display_identity, ..
670    } = &peer.header.source
671    else {
672        return None;
673    };
674
675    display_identity
676        .as_ref()
677        .map(|label| label.trim())
678        .filter(|label| !label.is_empty())
679        .map(ToOwned::to_owned)
680}
681
682/// Rendered prompt-text projection for a peer input.
683pub(crate) fn peer_prompt_text(peer: &PeerInput) -> String {
684    peer_projection_from_peer_input(peer)
685        .map(|projection| {
686            let prompt = projection.prompt_text();
687            if prompt.is_empty() {
688                peer.body.clone()
689            } else {
690                prompt
691            }
692        })
693        .unwrap_or_else(|| peer.body.clone())
694}
695
696pub(crate) fn input_prompt_text(input: &Input) -> String {
697    match input {
698        Input::Prompt(p) => p.text.clone(),
699        Input::Peer(p) => peer_prompt_text(p),
700        Input::FlowStep(f) => f.instructions.clone(),
701        Input::ExternalEvent(e) => external_event_projection_text(e),
702        Input::Continuation(continuation) => format!("[Continuation] {}", continuation.reason),
703        Input::Operation(operation) => {
704            format!(
705                "[Operation {}] {:?}",
706                operation.operation_id, operation.event
707            )
708        }
709    }
710}
711
712fn external_event_projection_text(event: &ExternalEventInput) -> String {
713    let source_name = match &event.header.source {
714        InputOrigin::External { source_name } if !source_name.trim().is_empty() => {
715            source_name.as_str()
716        }
717        _ => event.event_type.as_str(),
718    };
719    let body = event
720        .payload
721        .get("body")
722        .and_then(serde_json::Value::as_str)
723        .map(str::trim);
724
725    meerkat_core::interaction::format_external_event_projection(source_name, body)
726}
727
728fn peer_notice_renderable(peer: &PeerInput) -> Option<CoreRenderable> {
729    let (peer_id, display_name) = match &peer.header.source {
730        InputOrigin::Peer {
731            peer_id,
732            display_identity,
733            ..
734        } => (peer_id.clone(), display_identity.clone()),
735        _ => return None,
736    };
737    let (kind, request_id, intent, status) = match &peer.convention {
738        Some(PeerConvention::Message) | None => ("message", None, None, None),
739        Some(PeerConvention::Request { request_id, intent }) => (
740            "request",
741            Some(request_id.clone()),
742            Some(intent.clone()),
743            None,
744        ),
745        Some(PeerConvention::ResponseProgress { request_id, phase }) => (
746            "response_progress",
747            Some(request_id.clone()),
748            None,
749            Some(format!("{phase:?}")),
750        ),
751        Some(PeerConvention::ResponseTerminal { request_id, status }) => (
752            "response_terminal",
753            Some(request_id.clone()),
754            None,
755            Some(format!("{status:?}")),
756        ),
757    };
758    let summary = match kind {
759        "request" => intent.as_ref().map_or_else(
760            || "Peer request".to_string(),
761            |intent| format!("Peer request: {intent}"),
762        ),
763        "response_progress" => "Peer response progress".to_string(),
764        "response_terminal" => "Peer response terminal".to_string(),
765        _ => "Peer message".to_string(),
766    };
767    let content = if let Some(blocks) = peer.blocks.clone() {
768        let body_already_in_blocks = blocks.iter().any(|block| {
769            matches!(block, meerkat_core::types::ContentBlock::Text { text } if text.trim() == peer.body.trim())
770        });
771        if peer.body.trim().is_empty() || body_already_in_blocks {
772            blocks
773        } else {
774            let mut content = vec![meerkat_core::types::ContentBlock::Text {
775                text: peer.body.clone(),
776            }];
777            content.extend(blocks);
778            content
779        }
780    } else if peer.body.is_empty() {
781        Vec::new()
782    } else {
783        vec![meerkat_core::types::ContentBlock::Text {
784            text: peer.body.clone(),
785        }]
786    };
787    Some(CoreRenderable::SystemNotice {
788        kind: SystemNoticeKind::Comms,
789        body: Some(summary.clone()),
790        blocks: vec![SystemNoticeBlock::Comms {
791            kind: kind.to_string(),
792            direction: SystemNoticeDirection::Incoming,
793            peer: Some(SystemNoticePeer {
794                id: peer_id,
795                display_name,
796            }),
797            request_id,
798            intent,
799            status,
800            summary: Some(summary),
801            payload: peer.payload.clone(),
802            content,
803        }],
804    })
805}
806
807fn external_event_notice_renderable(event: &ExternalEventInput) -> CoreRenderable {
808    let source = match &event.header.source {
809        InputOrigin::External { source_name } if !source_name.trim().is_empty() => {
810            source_name.clone()
811        }
812        _ => event.event_type.clone(),
813    };
814    let body = event
815        .payload
816        .get("body")
817        .and_then(serde_json::Value::as_str)
818        .map(str::trim)
819        .filter(|body| !body.is_empty())
820        .map(ToOwned::to_owned);
821    let summary = body.as_ref().map_or_else(
822        || format!("External event via {source}"),
823        std::clone::Clone::clone,
824    );
825    CoreRenderable::SystemNotice {
826        kind: SystemNoticeKind::ExternalEvent,
827        body: Some(summary.clone()),
828        blocks: vec![SystemNoticeBlock::ExternalEvent {
829            source,
830            event_type: event.event_type.clone(),
831            summary: Some(summary),
832            body,
833            payload: Some(event.payload.clone()),
834            content: event.blocks.clone().unwrap_or_default(),
835        }],
836    }
837}
838
839fn input_to_append(input: &Input) -> Option<ConversationAppend> {
840    if matches!(
841        input,
842        Input::Peer(PeerInput {
843            convention: Some(PeerConvention::ResponseTerminal { .. }),
844            blocks: None,
845            ..
846        })
847    ) {
848        return None;
849    }
850
851    let (role, content) = match input {
852        Input::Prompt(p)
853            if !p.typed_turn_appends.is_empty()
854                && p.text.trim().is_empty()
855                && p.blocks.as_ref().is_none_or(Vec::is_empty) =>
856        {
857            return None;
858        }
859        Input::Prompt(p) if p.blocks.is_some() => (
860            ConversationAppendRole::User,
861            CoreRenderable::Blocks {
862                blocks: p.blocks.clone().unwrap_or_default(),
863            },
864        ),
865        Input::Prompt(_) => (
866            ConversationAppendRole::User,
867            CoreRenderable::Text {
868                text: input_prompt_text(input),
869            },
870        ),
871        Input::Peer(p) => peer_notice_renderable(p)
872            .map(|content| (ConversationAppendRole::SystemNotice, content))?,
873        Input::FlowStep(f) => (
874            ConversationAppendRole::SystemNotice,
875            CoreRenderable::SystemNotice {
876                kind: SystemNoticeKind::Generic,
877                body: Some(format!("Flow step {}", f.step_id)),
878                blocks: vec![SystemNoticeBlock::RuntimeNotice {
879                    category: "flow_step".to_string(),
880                    detail: Some(f.instructions.clone()),
881                    payload: None,
882                }],
883            },
884        ),
885        Input::ExternalEvent(e) => (
886            ConversationAppendRole::SystemNotice,
887            external_event_notice_renderable(e),
888        ),
889        Input::Continuation(_) | Input::Operation(_) => return None,
890    };
891
892    Some(ConversationAppend { role, content })
893}
894
895fn input_to_context_append(input: &Input) -> Option<ConversationContextAppend> {
896    let (projection, content) = match input {
897        Input::Peer(peer) => {
898            let projection = peer_projection_from_peer_input(peer)?;
899            let content = peer_notice_renderable(peer)?;
900            (projection, content)
901        }
902        _ => return None,
903    };
904
905    Some(ConversationContextAppend {
906        key: projection.context_key()?,
907        content,
908    })
909}
910
911fn peer_response_terminal_context_append(
912    peer: &PeerInput,
913) -> Result<Option<ConversationContextAppend>, PeerResponseTerminalFactError> {
914    let Some(fact) = peer_response_terminal_fact(peer)? else {
915        return Ok(None);
916    };
917
918    Ok(Some(ConversationContextAppend {
919        key: fact.context_key(),
920        content: CoreRenderable::SystemNotice {
921            kind: SystemNoticeKind::Comms,
922            body: Some("Peer terminal response context".to_string()),
923            blocks: vec![SystemNoticeBlock::Comms {
924                kind: "response_terminal".to_string(),
925                direction: SystemNoticeDirection::Incoming,
926                peer: Some(SystemNoticePeer {
927                    id: fact.source.route_identity.to_string(),
928                    display_name: Some(fact.source.display_identity.to_string()),
929                }),
930                request_id: Some(fact.correlation_id.to_string()),
931                intent: None,
932                status: Some(
933                    match fact.status {
934                        PeerResponseTerminalProjectionStatus::Completed => "completed",
935                        PeerResponseTerminalProjectionStatus::Failed => "failed",
936                        PeerResponseTerminalProjectionStatus::Cancelled => "cancelled",
937                    }
938                    .to_string(),
939                ),
940                summary: Some("Peer terminal response".to_string()),
941                payload: fact.render_payload.as_ref().cloned(),
942                content: Vec::new(),
943            }],
944        },
945    }))
946}
947
948pub(crate) fn runtime_input_projection(
949    input: &Input,
950) -> crate::ingress_types::RuntimeInputProjection {
951    crate::ingress_types::RuntimeInputProjection {
952        append: input_to_append(input),
953        additional_appends: match input {
954            Input::Prompt(prompt) => prompt.typed_turn_appends.clone(),
955            _ => Vec::new(),
956        },
957        context_append: input_to_context_append(input),
958    }
959}
960
961pub(crate) fn runtime_input_projection_for_machine_batch(
962    input: &Input,
963) -> crate::ingress_types::RuntimeInputProjection {
964    let mut projection = runtime_input_projection(input);
965    if let Input::Peer(peer) = input
966        && let Ok(Some(context_append)) = peer_response_terminal_context_append(peer)
967    {
968        projection.context_append = Some(context_append);
969    }
970    projection
971}
972
973#[cfg(test)]
974#[allow(clippy::unwrap_used, clippy::panic)]
975mod tests {
976    use super::*;
977    use chrono::Utc;
978
979    fn make_header() -> InputHeader {
980        InputHeader {
981            id: InputId::new(),
982            timestamp: Utc::now(),
983            source: InputOrigin::Operator,
984            durability: InputDurability::Durable,
985            visibility: InputVisibility::default(),
986            idempotency_key: None,
987            supersession_key: None,
988            correlation_id: None,
989        }
990    }
991
992    fn typed_runtime_notice_append(detail: &str) -> ConversationAppend {
993        ConversationAppend {
994            role: ConversationAppendRole::SystemNotice,
995            content: CoreRenderable::SystemNotice {
996                kind: meerkat_core::types::SystemNoticeKind::Generic,
997                body: Some(detail.to_string()),
998                blocks: vec![meerkat_core::types::SystemNoticeBlock::RuntimeNotice {
999                    category: "test".to_string(),
1000                    detail: Some(detail.to_string()),
1001                    payload: None,
1002                }],
1003            },
1004        }
1005    }
1006
1007    #[test]
1008    fn prompt_input_serde() {
1009        let input = Input::Prompt(PromptInput {
1010            header: make_header(),
1011            text: "hello".into(),
1012            blocks: None,
1013            typed_turn_appends: Vec::new(),
1014            turn_metadata: None,
1015        });
1016        let json = serde_json::to_value(&input).unwrap();
1017        assert_eq!(json["input_type"], "prompt");
1018        let parsed: Input = serde_json::from_value(json).unwrap();
1019        assert!(matches!(parsed, Input::Prompt(_)));
1020    }
1021
1022    #[test]
1023    fn prompt_input_typed_turn_appends_project_without_user_text() {
1024        let append = typed_runtime_notice_append("peer delivery");
1025        let input = Input::Prompt(PromptInput {
1026            header: make_header(),
1027            text: String::new(),
1028            blocks: None,
1029            typed_turn_appends: vec![append.clone()],
1030            turn_metadata: None,
1031        });
1032
1033        let projection = runtime_input_projection(&input);
1034        assert!(
1035            projection.append.is_none(),
1036            "empty runtime-authored prompt carrier must not synthesize a user append"
1037        );
1038        assert_eq!(projection.additional_appends, vec![append]);
1039    }
1040
1041    #[test]
1042    fn prompt_input_typed_turn_appends_serde_roundtrip() {
1043        let append = typed_runtime_notice_append("typed appends persist");
1044        let input = Input::Prompt(PromptInput {
1045            header: make_header(),
1046            text: String::new(),
1047            blocks: None,
1048            typed_turn_appends: vec![append.clone()],
1049            turn_metadata: None,
1050        });
1051
1052        let json = serde_json::to_value(&input).unwrap();
1053        let parsed: Input = serde_json::from_value(json).unwrap();
1054        let Input::Prompt(prompt) = parsed else {
1055            panic!("expected prompt input");
1056        };
1057        assert_eq!(prompt.text, "");
1058        assert_eq!(prompt.typed_turn_appends, vec![append]);
1059    }
1060
1061    #[test]
1062    fn peer_input_message_serde() {
1063        let input = Input::Peer(PeerInput {
1064            header: make_header(),
1065            convention: Some(PeerConvention::Message),
1066            body: "hi there".into(),
1067            payload: None,
1068            blocks: None,
1069            handling_mode: None,
1070        });
1071        let json = serde_json::to_value(&input).unwrap();
1072        assert_eq!(json["input_type"], "peer");
1073        let parsed: Input = serde_json::from_value(json).unwrap();
1074        assert!(matches!(parsed, Input::Peer(_)));
1075    }
1076
1077    #[test]
1078    fn peer_message_blocks_preserve_typed_comms_content_without_prefix_injection() {
1079        let mut header = make_header();
1080        header.source = InputOrigin::Peer {
1081            peer_id: "canonical-peer-id".into(),
1082            display_identity: Some("display-agent".into()),
1083            runtime_id: None,
1084        };
1085        let input = Input::Peer(PeerInput {
1086            header,
1087            convention: Some(PeerConvention::Message),
1088            body: "caption".into(),
1089            payload: None,
1090            blocks: Some(vec![
1091                meerkat_core::types::ContentBlock::Text {
1092                    text: "caption".into(),
1093                },
1094                meerkat_core::types::ContentBlock::Image {
1095                    media_type: "image/png".into(),
1096                    data: "abc".into(),
1097                },
1098            ]),
1099            handling_mode: None,
1100        });
1101
1102        let Input::Peer(peer) = &input else {
1103            panic!("expected peer input");
1104        };
1105        assert_eq!(
1106            peer_projection_from_peer_input(peer)
1107                .and_then(|projection| projection.block_prefix_text())
1108                .as_deref(),
1109            Some("Peer message from canonical-peer-id")
1110        );
1111
1112        let projection = runtime_input_projection(&input);
1113        let append = projection.append.expect("conversation append");
1114        let CoreRenderable::SystemNotice { blocks, .. } = append.content else {
1115            panic!("expected typed system notice");
1116        };
1117        let Some(meerkat_core::types::SystemNoticeBlock::Comms { content, peer, .. }) =
1118            blocks.first()
1119        else {
1120            panic!("expected comms block");
1121        };
1122        assert_eq!(
1123            peer.as_ref().and_then(|peer| peer.display_name.as_deref()),
1124            Some("display-agent")
1125        );
1126        assert_eq!(
1127            content.first(),
1128            Some(&meerkat_core::types::ContentBlock::Text {
1129                text: "caption".into()
1130            })
1131        );
1132    }
1133
1134    #[test]
1135    fn peer_response_terminal_context_is_deferred_to_machine_batch_projection() {
1136        let route_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f2";
1137        let request_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f1";
1138        let mut header = make_header();
1139        header.source = InputOrigin::Peer {
1140            peer_id: route_id.into(),
1141            display_identity: Some("display-agent".into()),
1142            runtime_id: None,
1143        };
1144        let input = Input::Peer(PeerInput {
1145            header,
1146            convention: Some(PeerConvention::ResponseTerminal {
1147                request_id: request_id.into(),
1148                status: ResponseTerminalStatus::Completed,
1149            }),
1150            body: "legacy response body".into(),
1151            payload: Some(serde_json::json!({"answer":"ok"})),
1152            blocks: None,
1153            handling_mode: None,
1154        });
1155
1156        let Input::Peer(peer) = &input else {
1157            panic!("expected peer input");
1158        };
1159        let expected_canonical_key = format!("peer_response_terminal:{route_id}:{request_id}");
1160        assert!(
1161            peer_projection_from_peer_input(peer).is_none(),
1162            "terminal peer response projection must not be built before machine batch selection"
1163        );
1164
1165        let projection = runtime_input_projection(&input);
1166        assert!(
1167            projection.context_append.is_none(),
1168            "admission projection must not store terminal peer response context"
1169        );
1170        let projection = runtime_input_projection_for_machine_batch(&input);
1171        let context = projection.context_append.expect("context append");
1172        assert_eq!(context.key, expected_canonical_key);
1173        let CoreRenderable::SystemNotice { blocks, .. } = context.content else {
1174            panic!("expected typed context");
1175        };
1176        let Some(meerkat_core::types::SystemNoticeBlock::Comms { peer, .. }) = blocks.first()
1177        else {
1178            panic!("expected comms block");
1179        };
1180        assert_eq!(
1181            peer.as_ref().and_then(|peer| peer.display_name.as_deref()),
1182            Some("display-agent")
1183        );
1184        assert_eq!(peer.as_ref().map(|peer| peer.id.as_str()), Some(route_id));
1185    }
1186
1187    #[test]
1188    fn peer_response_terminal_with_blocks_projects_append_and_context() {
1189        let route_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f2";
1190        let request_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f1";
1191        let mut header = make_header();
1192        header.source = InputOrigin::Peer {
1193            peer_id: route_id.into(),
1194            display_identity: Some("display-agent".into()),
1195            runtime_id: None,
1196        };
1197        let input = Input::Peer(PeerInput {
1198            header,
1199            convention: Some(PeerConvention::ResponseTerminal {
1200                request_id: request_id.into(),
1201                status: ResponseTerminalStatus::Completed,
1202            }),
1203            body: String::new(),
1204            payload: Some(serde_json::json!({"answer":"ok"})),
1205            blocks: Some(vec![meerkat_core::types::ContentBlock::Image {
1206                media_type: "image/jpeg".into(),
1207                data: "abc".into(),
1208            }]),
1209            handling_mode: None,
1210        });
1211
1212        let projection = runtime_input_projection_for_machine_batch(&input);
1213        let append = projection.append.expect("conversation append");
1214        let CoreRenderable::SystemNotice { blocks, .. } = append.content else {
1215            panic!("expected typed append");
1216        };
1217        let Some(meerkat_core::types::SystemNoticeBlock::Comms { content, peer, .. }) =
1218            blocks.first()
1219        else {
1220            panic!("expected comms block");
1221        };
1222        assert_eq!(
1223            peer.as_ref().and_then(|peer| peer.display_name.as_deref()),
1224            Some("display-agent")
1225        );
1226        assert!(matches!(
1227            content.first(),
1228            Some(meerkat_core::types::ContentBlock::Image { media_type, .. })
1229                if media_type == "image/jpeg"
1230        ));
1231        assert!(
1232            projection.context_append.is_some(),
1233            "terminal response must still apply runtime-owned context"
1234        );
1235    }
1236
1237    #[test]
1238    fn peer_input_request_serde() {
1239        let input = Input::Peer(PeerInput {
1240            header: make_header(),
1241            convention: Some(PeerConvention::Request {
1242                request_id: "req-1".into(),
1243                intent: "mob.peer_added".into(),
1244            }),
1245            body: "Agent joined".into(),
1246            payload: Some(serde_json::json!({"name": "agent-1"})),
1247            blocks: None,
1248            handling_mode: None,
1249        });
1250        let json = serde_json::to_value(&input).unwrap();
1251        let parsed: Input = serde_json::from_value(json).unwrap();
1252        if let Input::Peer(p) = parsed {
1253            assert!(matches!(p.convention, Some(PeerConvention::Request { .. })));
1254        } else {
1255            panic!("Expected PeerInput");
1256        }
1257    }
1258
1259    #[test]
1260    fn peer_input_response_terminal_serde() {
1261        let input = Input::Peer(PeerInput {
1262            header: make_header(),
1263            convention: Some(PeerConvention::ResponseTerminal {
1264                request_id: "req-1".into(),
1265                status: ResponseTerminalStatus::Completed,
1266            }),
1267            body: "Done".into(),
1268            payload: Some(serde_json::json!({"ok": true})),
1269            blocks: None,
1270            handling_mode: None,
1271        });
1272        let json = serde_json::to_value(&input).unwrap();
1273        let parsed: Input = serde_json::from_value(json).unwrap();
1274        assert!(matches!(parsed, Input::Peer(_)));
1275    }
1276
1277    #[test]
1278    fn peer_input_response_progress_serde() {
1279        let input = Input::Peer(PeerInput {
1280            header: make_header(),
1281            convention: Some(PeerConvention::ResponseProgress {
1282                request_id: "req-1".into(),
1283                phase: ResponseProgressPhase::InProgress,
1284            }),
1285            body: "Working...".into(),
1286            payload: Some(serde_json::json!({"progress": "working"})),
1287            blocks: None,
1288            handling_mode: None,
1289        });
1290        let json = serde_json::to_value(&input).unwrap();
1291        let parsed: Input = serde_json::from_value(json).unwrap();
1292        assert!(matches!(parsed, Input::Peer(_)));
1293    }
1294
1295    #[test]
1296    fn flow_step_input_serde() {
1297        let input = Input::FlowStep(FlowStepInput {
1298            header: make_header(),
1299            step_id: "step-1".into(),
1300            instructions: "analyze the data".into(),
1301            blocks: Some(vec![
1302                meerkat_core::types::ContentBlock::Text {
1303                    text: "analyze the data".into(),
1304                },
1305                meerkat_core::types::ContentBlock::Image {
1306                    media_type: "image/png".into(),
1307                    data: meerkat_core::types::ImageData::Inline {
1308                        data: "abc123".into(),
1309                    },
1310                },
1311            ]),
1312            turn_metadata: None,
1313        });
1314        let json = serde_json::to_value(&input).unwrap();
1315        assert_eq!(json["input_type"], "flow_step");
1316        let parsed: Input = serde_json::from_value(json).unwrap();
1317        assert!(matches!(parsed, Input::FlowStep(_)));
1318    }
1319
1320    #[test]
1321    fn external_event_input_serde() {
1322        let input = Input::ExternalEvent(ExternalEventInput {
1323            header: make_header(),
1324            event_type: "webhook.received".into(),
1325            payload: serde_json::json!({"url": "https://example.com"}),
1326            blocks: Some(vec![
1327                meerkat_core::types::ContentBlock::Text {
1328                    text: "look".into(),
1329                },
1330                meerkat_core::types::ContentBlock::Image {
1331                    media_type: "image/png".into(),
1332                    data: meerkat_core::types::ImageData::Inline {
1333                        data: "abc123".into(),
1334                    },
1335                },
1336            ]),
1337            handling_mode: HandlingMode::Queue,
1338            render_metadata: None,
1339        });
1340        let json = serde_json::to_value(&input).unwrap();
1341        assert_eq!(json["input_type"], "external_event");
1342        let parsed: Input = serde_json::from_value(json).unwrap();
1343        assert!(matches!(parsed, Input::ExternalEvent(_)));
1344    }
1345
1346    #[test]
1347    fn legacy_external_event_payload_blocks_migrate_to_canonical_blocks_owner() {
1348        let mut input = Input::ExternalEvent(ExternalEventInput {
1349            header: make_header(),
1350            event_type: "webhook.received".into(),
1351            payload: serde_json::json!({
1352                "body": "see image",
1353                "blocks": [
1354                    { "type": "text", "text": "caption text" },
1355                    { "type": "image", "media_type": "image/png", "source": "inline", "data": "abc123" }
1356                ]
1357            }),
1358            blocks: None,
1359            handling_mode: HandlingMode::Queue,
1360            render_metadata: None,
1361        });
1362
1363        match &mut input {
1364            Input::ExternalEvent(event) => {
1365                migrate_legacy_payload_blocks(event).unwrap();
1366                assert!(event.payload.get("blocks").is_none());
1367                assert_eq!(event.payload["body"], "see image");
1368                assert_eq!(event.blocks.as_ref().map(Vec::len), Some(2));
1369            }
1370            other => panic!("Expected ExternalEvent, got {other:?}"),
1371        }
1372    }
1373
1374    #[test]
1375    fn continuation_input_serde() {
1376        let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
1377        let json = serde_json::to_value(&input).unwrap();
1378        assert_eq!(json["input_type"], "continuation");
1379        let parsed: Input = serde_json::from_value(json).unwrap();
1380        match parsed {
1381            Input::Continuation(continuation) => {
1382                assert_eq!(continuation.handling_mode, HandlingMode::Steer);
1383                assert_eq!(continuation.reason, "detached_background_op_completed");
1384            }
1385            other => panic!("Expected Continuation, got {other:?}"),
1386        }
1387    }
1388
1389    #[test]
1390    fn continuation_input_accepts_legacy_system_generated_tag() {
1391        let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
1392        let mut json = serde_json::to_value(&input).unwrap();
1393        json["input_type"] = serde_json::Value::String("system_generated".into());
1394        let parsed: Input = serde_json::from_value(json).unwrap();
1395        match parsed {
1396            Input::Continuation(continuation) => {
1397                assert_eq!(continuation.reason, "detached_background_op_completed");
1398            }
1399            other => panic!("Expected Continuation, got {other:?}"),
1400        }
1401    }
1402
1403    #[test]
1404    fn operation_input_serde() {
1405        let input = Input::Operation(OperationInput {
1406            header: InputHeader {
1407                durability: InputDurability::Derived,
1408                ..make_header()
1409            },
1410            operation_id: OperationId::new(),
1411            event: OpEvent::Cancelled {
1412                id: OperationId::new(),
1413            },
1414        });
1415        let json = serde_json::to_value(&input).unwrap();
1416        assert_eq!(json["input_type"], "operation");
1417        let parsed: Input = serde_json::from_value(json).unwrap();
1418        assert!(matches!(parsed, Input::Operation(_)));
1419    }
1420
1421    #[test]
1422    fn operation_input_accepts_legacy_projected_tag() {
1423        let input = Input::Operation(OperationInput {
1424            header: InputHeader {
1425                durability: InputDurability::Derived,
1426                ..make_header()
1427            },
1428            operation_id: OperationId::new(),
1429            event: OpEvent::Cancelled {
1430                id: OperationId::new(),
1431            },
1432        });
1433        let mut json = serde_json::to_value(&input).unwrap();
1434        json["input_type"] = serde_json::Value::String("projected".into());
1435        let parsed: Input = serde_json::from_value(json).unwrap();
1436        assert!(matches!(parsed, Input::Operation(_)));
1437    }
1438
1439    #[test]
1440    fn input_kind_id() {
1441        let prompt = Input::Prompt(PromptInput {
1442            header: make_header(),
1443            text: "hi".into(),
1444            blocks: None,
1445            typed_turn_appends: Vec::new(),
1446            turn_metadata: None,
1447        });
1448        assert_eq!(prompt.kind(), InputKind::Prompt);
1449
1450        let peer_msg = Input::Peer(PeerInput {
1451            header: make_header(),
1452            convention: Some(PeerConvention::Message),
1453            body: "hi".into(),
1454            payload: None,
1455            blocks: None,
1456            handling_mode: None,
1457        });
1458        assert_eq!(peer_msg.kind(), InputKind::PeerMessage);
1459
1460        let peer_req = Input::Peer(PeerInput {
1461            header: make_header(),
1462            convention: Some(PeerConvention::Request {
1463                request_id: "r".into(),
1464                intent: "i".into(),
1465            }),
1466            body: "hi".into(),
1467            payload: Some(serde_json::json!({"subject": "x"})),
1468            blocks: None,
1469            handling_mode: None,
1470        });
1471        assert_eq!(peer_req.kind(), InputKind::PeerRequest);
1472
1473        let continuation = Input::Continuation(ContinuationInput {
1474            header: make_header(),
1475            reason: "continue".into(),
1476            handling_mode: HandlingMode::Steer,
1477            request_id: None,
1478        });
1479        assert_eq!(continuation.kind(), InputKind::Continuation);
1480
1481        let operation = Input::Operation(OperationInput {
1482            header: make_header(),
1483            operation_id: OperationId::new(),
1484            event: OpEvent::Cancelled {
1485                id: OperationId::new(),
1486            },
1487        });
1488        assert_eq!(operation.kind(), InputKind::Operation);
1489    }
1490
1491    #[test]
1492    fn input_source_variants() {
1493        let sources = vec![
1494            InputOrigin::Operator,
1495            InputOrigin::Peer {
1496                peer_id: "p1".into(),
1497                display_identity: None,
1498                runtime_id: None,
1499            },
1500            InputOrigin::Flow {
1501                flow_id: "f1".into(),
1502                step_index: 0,
1503            },
1504            InputOrigin::System,
1505            InputOrigin::External {
1506                source_name: "webhook".into(),
1507            },
1508        ];
1509        for source in sources {
1510            let json = serde_json::to_value(&source).unwrap();
1511            let parsed: InputOrigin = serde_json::from_value(json).unwrap();
1512            assert_eq!(source, parsed);
1513        }
1514    }
1515
1516    #[test]
1517    fn input_durability_serde() {
1518        for d in [
1519            InputDurability::Durable,
1520            InputDurability::Ephemeral,
1521            InputDurability::Derived,
1522        ] {
1523            let json = serde_json::to_value(d).unwrap();
1524            let parsed: InputDurability = serde_json::from_value(json).unwrap();
1525            assert_eq!(d, parsed);
1526        }
1527    }
1528
1529    #[test]
1530    fn peer_input_without_handling_mode_deserializes_as_none() {
1531        // Simulate old serialized PeerInput without the handling_mode field.
1532        let json = serde_json::json!({
1533            "input_type": "peer",
1534            "header": serde_json::to_value(make_header()).unwrap(),
1535            "convention": { "convention_type": "message" },
1536            "body": "hello"
1537        });
1538        let parsed: Input = serde_json::from_value(json).unwrap();
1539        match parsed {
1540            Input::Peer(p) => assert!(p.handling_mode.is_none()),
1541            other => panic!("Expected Peer, got {other:?}"),
1542        }
1543    }
1544
1545    #[test]
1546    fn peer_input_with_queue_handling_mode_roundtrips() {
1547        let input = Input::Peer(PeerInput {
1548            header: make_header(),
1549            convention: Some(PeerConvention::Message),
1550            body: "hi".into(),
1551            payload: None,
1552            blocks: None,
1553            handling_mode: Some(HandlingMode::Queue),
1554        });
1555        let json = serde_json::to_value(&input).unwrap();
1556        assert_eq!(json["handling_mode"], "queue");
1557        let parsed: Input = serde_json::from_value(json).unwrap();
1558        match parsed {
1559            Input::Peer(p) => assert_eq!(p.handling_mode, Some(HandlingMode::Queue)),
1560            other => panic!("Expected Peer, got {other:?}"),
1561        }
1562    }
1563
1564    #[test]
1565    fn peer_response_terminal_input_owns_wire_status_mapping() {
1566        let peer_id = meerkat_core::comms::PeerId::from_uuid(
1567            uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000161").unwrap(),
1568        );
1569        let display_name = meerkat_core::comms::PeerName::new("analyst").unwrap();
1570        let request_id = meerkat_core::PeerCorrelationId::from_uuid(
1571            uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000162").unwrap(),
1572        );
1573        let input = peer_response_terminal_input(
1574            peer_id,
1575            Some(display_name),
1576            request_id,
1577            meerkat_contracts::PeerResponseTerminalStatusWire::Cancelled,
1578            serde_json::json!({"ok": false}),
1579        );
1580
1581        match input {
1582            Input::Peer(PeerInput {
1583                header:
1584                    InputHeader {
1585                        source:
1586                            InputOrigin::Peer {
1587                                peer_id,
1588                                display_identity,
1589                                runtime_id,
1590                            },
1591                        durability: InputDurability::Durable,
1592                        correlation_id,
1593                        ..
1594                    },
1595                convention: Some(PeerConvention::ResponseTerminal { request_id, status }),
1596                payload: Some(payload),
1597                handling_mode: None,
1598                ..
1599            }) => {
1600                assert_eq!(peer_id, "00000000-0000-4000-8000-000000000161");
1601                assert_eq!(display_identity.as_deref(), Some("analyst"));
1602                assert_eq!(runtime_id, None);
1603                assert_eq!(request_id, "00000000-0000-4000-8000-000000000162");
1604                assert_eq!(
1605                    correlation_id,
1606                    Some(CorrelationId::from_uuid(
1607                        uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000162").unwrap()
1608                    ))
1609                );
1610                assert_eq!(status, ResponseTerminalStatus::Cancelled);
1611                assert_eq!(payload["ok"], false);
1612            }
1613            other => panic!("expected terminal peer input, got {other:?}"),
1614        }
1615    }
1616
1617    #[test]
1618    fn peer_input_with_steer_handling_mode_roundtrips() {
1619        let input = Input::Peer(PeerInput {
1620            header: make_header(),
1621            convention: Some(PeerConvention::Message),
1622            body: "hi".into(),
1623            payload: None,
1624            blocks: None,
1625            handling_mode: Some(HandlingMode::Steer),
1626        });
1627        let json = serde_json::to_value(&input).unwrap();
1628        assert_eq!(json["handling_mode"], "steer");
1629        let parsed: Input = serde_json::from_value(json).unwrap();
1630        match parsed {
1631            Input::Peer(p) => assert_eq!(p.handling_mode, Some(HandlingMode::Steer)),
1632            other => panic!("Expected Peer, got {other:?}"),
1633        }
1634    }
1635
1636    #[test]
1637    fn peer_input_handling_mode_not_serialized_when_none() {
1638        let input = Input::Peer(PeerInput {
1639            header: make_header(),
1640            convention: Some(PeerConvention::Message),
1641            body: "hi".into(),
1642            payload: None,
1643            blocks: None,
1644            handling_mode: None,
1645        });
1646        let json = serde_json::to_value(&input).unwrap();
1647        assert!(json.get("handling_mode").is_none());
1648    }
1649}