Skip to main content

meerkat_runtime/
input.rs

1//! §8 Input types — the 6 input variants accepted by the runtime layer.
2//!
3//! Core never sees these. The runtime's policy table resolves each Input
4//! to a PolicyDecision, then the runtime translates accepted Inputs into
5//! RunPrimitive for core consumption.
6
7use chrono::{DateTime, Utc};
8use meerkat_core::lifecycle::InputId;
9use meerkat_core::lifecycle::run_primitive::{
10    ConversationAppend, ConversationAppendRole, ConversationContextAppend, CoreRenderable,
11    RuntimeTurnMetadata,
12};
13use meerkat_core::ops::{OpEvent, OperationId};
14use meerkat_core::types::{
15    HandlingMode, SystemNoticeBlock, SystemNoticeDirection, SystemNoticeKind, SystemNoticePeer,
16};
17use meerkat_core::{
18    BlobStore, BlobStoreError, MissingBlobBehavior, PeerConversationProjection,
19    PeerResponseProgressProjectionPhase, PeerResponseTerminalCorrelationId,
20    PeerResponseTerminalDisplayIdentity, PeerResponseTerminalFact, PeerResponseTerminalFactError,
21    PeerResponseTerminalProjectionStatus, PeerResponseTerminalRenderPayload,
22    PeerResponseTerminalRouteIdentity, PeerResponseTerminalSource,
23    PeerResponseTerminalTransportIdentity, externalize_content_blocks, hydrate_content_blocks,
24};
25use serde::{Deserialize, Serialize};
26
27use crate::identifiers::{
28    CorrelationId, IdempotencyKey, InputKind, KindId, LogicalRuntimeId, SupersessionKey,
29};
30use meerkat_core::types::RenderMetadata;
31
32/// Common header for all input variants.
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct InputHeader {
35    /// Unique ID for this input.
36    pub id: InputId,
37    /// When the input was created.
38    pub timestamp: DateTime<Utc>,
39    /// Source of the input.
40    pub source: InputOrigin,
41    /// Durability requirement.
42    pub durability: InputDurability,
43    /// Visibility controls.
44    pub visibility: InputVisibility,
45    /// Optional idempotency key for dedup.
46    #[serde(skip_serializing_if = "Option::is_none")]
47    pub idempotency_key: Option<IdempotencyKey>,
48    /// Optional supersession key.
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub supersession_key: Option<SupersessionKey>,
51    /// Optional correlation ID.
52    #[serde(skip_serializing_if = "Option::is_none")]
53    pub correlation_id: Option<CorrelationId>,
54}
55
56/// Where the input originated.
57#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
58#[serde(tag = "type", rename_all = "snake_case")]
59#[non_exhaustive]
60pub enum InputOrigin {
61    /// Human operator / external API caller.
62    Operator,
63    /// Peer agent (comms).
64    Peer {
65        /// Canonical comms peer id used by machine/runtime policy and schema
66        /// projection.
67        peer_id: String,
68        /// Optional display/source label admitted at peer ingress. This is
69        /// presentation metadata only; routing and trust must use typed ingress
70        /// facts before the runtime input seam or the canonical `peer_id`.
71        #[serde(default, skip_serializing_if = "Option::is_none")]
72        display_identity: Option<String>,
73        #[serde(skip_serializing_if = "Option::is_none")]
74        runtime_id: Option<LogicalRuntimeId>,
75    },
76    /// Flow engine (mob orchestration).
77    Flow { flow_id: String, step_index: usize },
78    /// System-generated (compaction, projection, etc.).
79    System,
80    /// External event source.
81    External { source_name: String },
82}
83
84/// Durability requirement for an input.
85#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
86#[serde(rename_all = "snake_case")]
87#[non_exhaustive]
88pub enum InputDurability {
89    /// Must be persisted before acknowledgment.
90    Durable,
91    /// In-memory only, may be lost on crash.
92    Ephemeral,
93    /// Derived from other inputs (can be reconstructed).
94    Derived,
95}
96
97/// Visibility controls for an input.
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
99pub struct InputVisibility {
100    /// Whether this input appears in the conversation transcript.
101    pub transcript_eligible: bool,
102    /// Whether this input is visible to operator surfaces.
103    pub operator_eligible: bool,
104}
105
106impl Default for InputVisibility {
107    fn default() -> Self {
108        Self {
109            transcript_eligible: true,
110            operator_eligible: true,
111        }
112    }
113}
114
115/// The 6 input variants accepted by the runtime layer.
116#[derive(Debug, Clone, Serialize, Deserialize)]
117#[serde(tag = "input_type", rename_all = "snake_case")]
118#[non_exhaustive]
119pub enum Input {
120    /// User/operator prompt.
121    Prompt(PromptInput),
122    /// Peer-originated input (comms).
123    Peer(PeerInput),
124    /// Flow step input (mob orchestration).
125    FlowStep(FlowStepInput),
126    /// External event input.
127    ExternalEvent(ExternalEventInput),
128    /// Explicit runtime continuation work.
129    #[serde(alias = "system_generated")]
130    Continuation(ContinuationInput),
131    /// Explicit non-content operation/lifecycle input.
132    #[serde(alias = "projected")]
133    Operation(OperationInput),
134}
135
136impl Input {
137    /// Get the input header.
138    pub fn header(&self) -> &InputHeader {
139        match self {
140            Input::Prompt(i) => &i.header,
141            Input::Peer(i) => &i.header,
142            Input::FlowStep(i) => &i.header,
143            Input::ExternalEvent(i) => &i.header,
144            Input::Continuation(i) => &i.header,
145            Input::Operation(i) => &i.header,
146        }
147    }
148
149    /// Get the input ID.
150    pub fn id(&self) -> &InputId {
151        &self.header().id
152    }
153
154    /// Typed kind for policy dispatch.
155    pub fn kind(&self) -> InputKind {
156        match self {
157            Input::Prompt(_) => InputKind::Prompt,
158            Input::Peer(p) => match &p.convention {
159                Some(PeerConvention::Message) | None => InputKind::PeerMessage,
160                Some(PeerConvention::Request { .. }) => InputKind::PeerRequest,
161                Some(PeerConvention::ResponseProgress { .. }) => InputKind::PeerResponseProgress,
162                Some(PeerConvention::ResponseTerminal { .. }) => InputKind::PeerResponseTerminal,
163            },
164            Input::FlowStep(_) => InputKind::FlowStep,
165            Input::ExternalEvent(_) => InputKind::ExternalEvent,
166            Input::Continuation(_) => InputKind::Continuation,
167            Input::Operation(_) => InputKind::Operation,
168        }
169    }
170
171    /// Wrapped kind identifier (for newtype-discipline call sites).
172    pub fn kind_id(&self) -> KindId {
173        KindId::new(self.kind())
174    }
175
176    /// Handling-mode hint for ordinary work admitted through the runtime.
177    pub fn handling_mode(&self) -> Option<HandlingMode> {
178        match self {
179            Input::Prompt(prompt) => prompt.turn_metadata.as_ref()?.handling_mode,
180            Input::FlowStep(flow_step) => flow_step.turn_metadata.as_ref()?.handling_mode,
181            Input::ExternalEvent(event) => Some(event.handling_mode),
182            Input::Continuation(continuation) => Some(continuation.handling_mode),
183            Input::Peer(peer) => peer.handling_mode,
184            Input::Operation(_) => None,
185        }
186    }
187}
188
189fn migrate_legacy_payload_blocks(event: &mut ExternalEventInput) -> Result<(), BlobStoreError> {
190    let Some(obj) = event.payload.as_object_mut() else {
191        return Ok(());
192    };
193    let Some(blocks_value) = obj.remove("blocks") else {
194        return Ok(());
195    };
196    if event.blocks.is_some() {
197        return Ok(());
198    }
199    let blocks = serde_json::from_value::<Vec<meerkat_core::types::ContentBlock>>(blocks_value)
200        .map_err(|err| {
201            BlobStoreError::Internal(format!("failed to decode payload blocks: {err}"))
202        })?;
203    event.blocks = Some(blocks);
204    Ok(())
205}
206
207pub async fn externalize_input_images(
208    blob_store: &dyn BlobStore,
209    input: &mut Input,
210) -> Result<(), BlobStoreError> {
211    match input {
212        Input::Prompt(prompt) => {
213            if let Some(blocks) = prompt.blocks.as_mut() {
214                externalize_content_blocks(blob_store, blocks).await?;
215            }
216        }
217        Input::Peer(peer) => {
218            if let Some(blocks) = peer.blocks.as_mut() {
219                externalize_content_blocks(blob_store, blocks).await?;
220            }
221        }
222        Input::FlowStep(flow_step) => {
223            if let Some(blocks) = flow_step.blocks.as_mut() {
224                externalize_content_blocks(blob_store, blocks).await?;
225            }
226        }
227        Input::ExternalEvent(event) => {
228            migrate_legacy_payload_blocks(event)?;
229            if let Some(blocks) = event.blocks.as_mut() {
230                externalize_content_blocks(blob_store, blocks).await?;
231            }
232        }
233        Input::Continuation(_) | Input::Operation(_) => {}
234    }
235    Ok(())
236}
237
238pub async fn hydrate_input_images(
239    blob_store: &dyn BlobStore,
240    input: &mut Input,
241    missing_behavior: MissingBlobBehavior,
242) -> Result<(), BlobStoreError> {
243    match input {
244        Input::Prompt(prompt) => {
245            if let Some(blocks) = prompt.blocks.as_mut() {
246                hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
247            }
248        }
249        Input::Peer(peer) => {
250            if let Some(blocks) = peer.blocks.as_mut() {
251                hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
252            }
253        }
254        Input::FlowStep(flow_step) => {
255            if let Some(blocks) = flow_step.blocks.as_mut() {
256                hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
257            }
258        }
259        Input::ExternalEvent(event) => {
260            migrate_legacy_payload_blocks(event)?;
261            if let Some(blocks) = event.blocks.as_mut() {
262                hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
263            }
264        }
265        Input::Continuation(_) | Input::Operation(_) => {}
266    }
267    Ok(())
268}
269
270/// User/operator prompt input.
271#[derive(Debug, Clone, Serialize, Deserialize)]
272pub struct PromptInput {
273    pub header: InputHeader,
274    /// The prompt text.
275    pub text: String,
276    /// Optional multimodal content blocks. When present, `text` serves as the
277    /// text projection (backwards compat), and `blocks` carries the full content.
278    #[serde(default, skip_serializing_if = "Option::is_none")]
279    pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
280    /// Runtime-authored typed transcript appends that travel with this turn.
281    ///
282    /// These are not operator-authored prompt content. The runtime projects
283    /// them into model-facing text only when building the provider request.
284    #[serde(default, skip_serializing_if = "Vec::is_empty")]
285    pub typed_turn_appends: Vec<ConversationAppend>,
286    #[serde(default, skip_serializing_if = "Option::is_none")]
287    pub turn_metadata: Option<RuntimeTurnMetadata>,
288}
289
290impl PromptInput {
291    /// Create a new operator prompt with default header.
292    pub fn new(text: impl Into<String>, turn_metadata: Option<RuntimeTurnMetadata>) -> Self {
293        Self {
294            header: InputHeader {
295                id: meerkat_core::lifecycle::InputId::new(),
296                timestamp: chrono::Utc::now(),
297                source: InputOrigin::Operator,
298                durability: InputDurability::Durable,
299                visibility: InputVisibility::default(),
300                idempotency_key: None,
301                supersession_key: None,
302                correlation_id: None,
303            },
304            text: text.into(),
305            blocks: None,
306            typed_turn_appends: Vec::new(),
307            turn_metadata,
308        }
309    }
310
311    /// Create a multimodal prompt from `ContentInput`.
312    pub fn from_content_input(
313        input: meerkat_core::types::ContentInput,
314        turn_metadata: Option<RuntimeTurnMetadata>,
315    ) -> Self {
316        let text = input.text_content();
317        let blocks = if input.has_images() {
318            Some(input.into_blocks())
319        } else {
320            None
321        };
322        Self {
323            header: InputHeader {
324                id: meerkat_core::lifecycle::InputId::new(),
325                timestamp: chrono::Utc::now(),
326                source: InputOrigin::Operator,
327                durability: InputDurability::Durable,
328                visibility: InputVisibility::default(),
329                idempotency_key: None,
330                supersession_key: None,
331                correlation_id: None,
332            },
333            text,
334            blocks,
335            typed_turn_appends: Vec::new(),
336            turn_metadata,
337        }
338    }
339}
340
341/// Peer-originated input from comms.
342#[derive(Debug, Clone, Serialize, Deserialize)]
343pub struct PeerInput {
344    pub header: InputHeader,
345    /// The peer convention (message, request, response).
346    #[serde(skip_serializing_if = "Option::is_none")]
347    pub convention: Option<PeerConvention>,
348    /// Legacy textual body for this peer input.
349    ///
350    /// Message-style peer traffic uses this directly. Request/response prompt
351    /// projection is runtime-owned and must be reconstructed from
352    /// `convention + payload + source` rather than helper-rendered prose.
353    pub body: String,
354    /// Structured peer payload, when one exists.
355    ///
356    /// For `Request`, this is the request params. For `Response*`, this is the
357    /// response result payload. Message traffic leaves this unset.
358    #[serde(default, skip_serializing_if = "Option::is_none")]
359    pub payload: Option<serde_json::Value>,
360    /// Optional multimodal content blocks. When present, `body` serves as the
361    /// text projection (backwards compat), and `blocks` carries the full content.
362    #[serde(default, skip_serializing_if = "Option::is_none")]
363    pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
364    /// Optional handling-mode override for actionable peer inputs.
365    /// When present on Message/Request/no-convention, overrides kind-based
366    /// policy defaults. Forbidden on ResponseProgress; ResponseTerminal may
367    /// carry a typed override for requester reaction urgency (enforced by
368    /// [`validate_peer_handling_mode`]).
369    #[serde(default, skip_serializing_if = "Option::is_none")]
370    pub handling_mode: Option<HandlingMode>,
371}
372
373/// Peer communication conventions.
374#[derive(Debug, Clone, Serialize, Deserialize)]
375#[serde(tag = "convention_type", rename_all = "snake_case")]
376#[non_exhaustive]
377pub enum PeerConvention {
378    /// Simple peer-to-peer message.
379    Message,
380    /// Request expecting a response.
381    Request { request_id: String, intent: String },
382    /// Progress update for an ongoing response.
383    ResponseProgress {
384        request_id: String,
385        phase: ResponseProgressPhase,
386    },
387    /// Terminal response (completed or failed).
388    ResponseTerminal {
389        request_id: String,
390        status: ResponseTerminalStatus,
391    },
392}
393
394/// Phase of a response progress update. This is the core projection enum, not
395/// a runtime-local duplicate.
396pub type ResponseProgressPhase = PeerResponseProgressProjectionPhase;
397
398/// Terminal status of a response. This is the core projection enum, not a
399/// runtime-local duplicate.
400pub type ResponseTerminalStatus = PeerResponseTerminalProjectionStatus;
401
402pub fn response_terminal_status_from_wire(
403    status: meerkat_contracts::PeerResponseTerminalStatusWire,
404) -> ResponseTerminalStatus {
405    match status {
406        meerkat_contracts::PeerResponseTerminalStatusWire::Completed => {
407            PeerResponseTerminalProjectionStatus::Completed
408        }
409        meerkat_contracts::PeerResponseTerminalStatusWire::Failed => {
410            PeerResponseTerminalProjectionStatus::Failed
411        }
412        meerkat_contracts::PeerResponseTerminalStatusWire::Cancelled => {
413            PeerResponseTerminalProjectionStatus::Cancelled
414        }
415    }
416}
417
418pub fn peer_response_terminal_input(
419    peer_id: meerkat_core::comms::PeerId,
420    display_name: Option<meerkat_core::comms::PeerName>,
421    request_id: meerkat_core::PeerCorrelationId,
422    status: meerkat_contracts::PeerResponseTerminalStatusWire,
423    result: serde_json::Value,
424) -> Input {
425    let correlation_id = CorrelationId::from_uuid(request_id.as_uuid());
426    let request_id = request_id.to_string();
427    let peer_id = peer_id.to_string();
428    let display_identity = display_name.map_or_else(|| peer_id.clone(), |name| name.as_string());
429
430    Input::Peer(PeerInput {
431        header: InputHeader {
432            id: InputId::new(),
433            timestamp: Utc::now(),
434            source: InputOrigin::Peer {
435                peer_id,
436                display_identity: Some(display_identity),
437                runtime_id: None,
438            },
439            durability: InputDurability::Durable,
440            visibility: InputVisibility::default(),
441            idempotency_key: None,
442            supersession_key: None,
443            correlation_id: Some(correlation_id),
444        },
445        convention: Some(PeerConvention::ResponseTerminal {
446            request_id,
447            status: response_terminal_status_from_wire(status),
448        }),
449        body: String::new(),
450        payload: Some(result),
451        blocks: None,
452        handling_mode: None,
453    })
454}
455
456/// Flow step input from mob orchestration.
457#[derive(Debug, Clone, Serialize, Deserialize)]
458pub struct FlowStepInput {
459    pub header: InputHeader,
460    /// Flow step identifier.
461    pub step_id: String,
462    /// Step instructions/prompt.
463    pub instructions: String,
464    /// Optional multimodal content blocks. When present, `instructions` serves
465    /// as the text projection (backwards compat), and `blocks` carries the
466    /// full content.
467    #[serde(default, skip_serializing_if = "Option::is_none")]
468    pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
469    #[serde(default, skip_serializing_if = "Option::is_none")]
470    pub turn_metadata: Option<RuntimeTurnMetadata>,
471}
472
473/// External event input.
474#[derive(Debug, Clone, Serialize, Deserialize)]
475pub struct ExternalEventInput {
476    pub header: InputHeader,
477    /// Event type/name.
478    pub event_type: String,
479    /// Event payload. Uses `Value` because the runtime layer may inspect/merge
480    /// payloads during coalescing and projection — not a pure pass-through.
481    /// Multimodal content does NOT live here canonically; use `blocks`.
482    pub payload: serde_json::Value,
483    /// Optional multimodal blocks carried by the external event. This is the
484    /// canonical owner for multimodal external-event content.
485    #[serde(default, skip_serializing_if = "Option::is_none")]
486    pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
487    /// Runtime-owned handling hint for this external event.
488    #[serde(default)]
489    pub handling_mode: HandlingMode,
490    /// Optional normalized render metadata carried with the event.
491    #[serde(default, skip_serializing_if = "Option::is_none")]
492    pub render_metadata: Option<RenderMetadata>,
493}
494
495/// Explicit continuation request that asks the runtime to keep draining
496/// ordinary work after a boundary-local event (for example, terminal peer
497/// responses injected into session state).
498#[derive(Debug, Clone, Serialize, Deserialize)]
499pub struct ContinuationInput {
500    pub header: InputHeader,
501    /// Stable reason for the continuation request.
502    pub reason: String,
503    /// Ordinary-work handling mode for the continuation.
504    #[serde(default)]
505    pub handling_mode: HandlingMode,
506    /// Optional request/correlation handle tied to the continuation.
507    #[serde(default, skip_serializing_if = "Option::is_none")]
508    pub request_id: Option<String>,
509}
510
511impl ContinuationInput {
512    /// Build a continuation for waking an idle session after a detached
513    /// background operation reaches terminal state.
514    ///
515    /// Properties: `Derived` durability, invisible to transcript and operator,
516    /// `System` origin, `Steer` handling mode.
517    pub fn detached_background_op_completed() -> Self {
518        Self {
519            header: InputHeader {
520                id: meerkat_core::lifecycle::InputId::new(),
521                timestamp: chrono::Utc::now(),
522                source: InputOrigin::System,
523                durability: InputDurability::Derived,
524                visibility: InputVisibility {
525                    transcript_eligible: false,
526                    operator_eligible: false,
527                },
528                idempotency_key: None,
529                supersession_key: None,
530                correlation_id: None,
531            },
532            reason: "detached_background_op_completed".to_string(),
533            handling_mode: HandlingMode::Steer,
534            request_id: None,
535        }
536    }
537}
538
539/// Explicit operation/lifecycle input admitted through runtime instead of
540/// being smuggled through transcript projections or peer-only paths.
541#[derive(Debug, Clone, Serialize, Deserialize)]
542pub struct OperationInput {
543    pub header: InputHeader,
544    /// Stable operation identifier.
545    pub operation_id: OperationId,
546    /// Typed lifecycle event for the operation.
547    pub event: OpEvent,
548}
549
550/// Build the core-owned peer conversation projection for a runtime peer input.
551///
552/// Peer-response terminal context projection is deliberately excluded here:
553/// admission must not store it as pre-machine truth. Runtime-loop batch
554/// construction uses [`runtime_input_projection_for_machine_batch`] after the
555/// machine-selected input is dequeued.
556pub(crate) fn peer_projection_from_peer_input(
557    peer: &PeerInput,
558) -> Option<PeerConversationProjection> {
559    peer_projection_from_peer_input_with_id(peer, peer_canonical_id(peer)?.as_str())
560}
561
562fn peer_projection_from_peer_input_with_id(
563    peer: &PeerInput,
564    peer_id: &str,
565) -> Option<PeerConversationProjection> {
566    let peer_id = peer_id.to_string();
567
568    match &peer.convention {
569        Some(PeerConvention::Message) => Some(PeerConversationProjection::Message { peer_id }),
570        Some(PeerConvention::Request { request_id, intent }) => {
571            let peer_id = match meerkat_core::comms::PeerId::parse(peer_id.as_str()) {
572                Ok(peer_id) => peer_id,
573                Err(error) => {
574                    tracing::warn!(
575                        peer_id,
576                        error = %error,
577                        "dropping peer request projection with non-canonical peer_id"
578                    );
579                    return None;
580                }
581            };
582            Some(PeerConversationProjection::Request {
583                peer_id,
584                display_name: peer_display_label(peer),
585                request_id: request_id.clone(),
586                intent: intent.clone(),
587                payload: peer.payload.clone(),
588            })
589        }
590        Some(PeerConvention::ResponseProgress { request_id, phase }) => {
591            Some(PeerConversationProjection::ResponseProgress {
592                peer_id,
593                request_id: request_id.clone(),
594                phase: *phase,
595                payload: peer.payload.clone(),
596            })
597        }
598        Some(PeerConvention::ResponseTerminal { .. }) => None,
599        None => None,
600    }
601}
602
603pub(crate) fn peer_response_terminal_fact(
604    peer: &PeerInput,
605) -> Result<Option<PeerResponseTerminalFact>, PeerResponseTerminalFactError> {
606    let InputOrigin::Peer {
607        peer_id,
608        display_identity,
609        runtime_id,
610    } = &peer.header.source
611    else {
612        return Ok(None);
613    };
614    let Some(PeerConvention::ResponseTerminal { request_id, status }) = &peer.convention else {
615        return Ok(None);
616    };
617
618    let transport_identity = runtime_id
619        .as_ref()
620        .map(ToString::to_string)
621        .map(PeerResponseTerminalTransportIdentity::parse)
622        .transpose()?;
623    let source = PeerResponseTerminalSource::new(
624        transport_identity,
625        PeerResponseTerminalRouteIdentity::parse(peer_id.clone())?,
626        PeerResponseTerminalDisplayIdentity::parse(
627            display_identity
628                .as_ref()
629                .ok_or(PeerResponseTerminalFactError::MissingDisplayIdentity)?
630                .clone(),
631        )?,
632    );
633    Ok(Some(PeerResponseTerminalFact::new(
634        source,
635        PeerResponseTerminalCorrelationId::parse(request_id)?,
636        *status,
637        PeerResponseTerminalRenderPayload::new(peer.payload.clone()),
638    )))
639}
640
641pub(crate) fn validate_peer_response_terminal_fact(
642    input: &Input,
643) -> Result<(), PeerResponseTerminalFactError> {
644    let Input::Peer(peer) = input else {
645        return Ok(());
646    };
647    peer_response_terminal_fact(peer).map(|_| ())
648}
649
650/// Lift an [`Input`] to its core peer projection when it is a peer input with a
651/// peer-origin header.
652#[cfg(test)]
653pub(crate) fn peer_projection(input: &Input) -> Option<PeerConversationProjection> {
654    let Input::Peer(peer) = input else {
655        return None;
656    };
657    peer_projection_from_peer_input(peer)
658}
659
660fn peer_canonical_id(peer: &PeerInput) -> Option<String> {
661    let InputOrigin::Peer { peer_id, .. } = &peer.header.source else {
662        return None;
663    };
664    Some(peer_id.clone())
665}
666
667fn peer_display_label(peer: &PeerInput) -> Option<String> {
668    let InputOrigin::Peer {
669        display_identity, ..
670    } = &peer.header.source
671    else {
672        return None;
673    };
674
675    display_identity
676        .as_ref()
677        .map(|label| label.trim())
678        .filter(|label| !label.is_empty())
679        .map(ToOwned::to_owned)
680}
681
682/// Rendered prompt-text projection for a peer input.
683pub(crate) fn peer_prompt_text(peer: &PeerInput) -> String {
684    peer_projection_from_peer_input(peer)
685        .map(|projection| {
686            let prompt = projection.prompt_text();
687            if prompt.is_empty() {
688                peer.body.clone()
689            } else {
690                prompt
691            }
692        })
693        .unwrap_or_else(|| peer.body.clone())
694}
695
696pub(crate) fn input_prompt_text(input: &Input) -> String {
697    match input {
698        Input::Prompt(p) => p.text.clone(),
699        Input::Peer(p) => peer_prompt_text(p),
700        Input::FlowStep(f) => f.instructions.clone(),
701        Input::ExternalEvent(e) => external_event_projection_text(e),
702        Input::Continuation(continuation) => format!("[Continuation] {}", continuation.reason),
703        Input::Operation(operation) => {
704            format!(
705                "[Operation {}] {:?}",
706                operation.operation_id, operation.event
707            )
708        }
709    }
710}
711
712fn external_event_projection_text(event: &ExternalEventInput) -> String {
713    let source_name = match &event.header.source {
714        InputOrigin::External { source_name } if !source_name.trim().is_empty() => {
715            source_name.as_str()
716        }
717        _ => event.event_type.as_str(),
718    };
719    let body = event
720        .payload
721        .get("body")
722        .and_then(serde_json::Value::as_str)
723        .map(str::trim);
724
725    meerkat_core::interaction::format_external_event_projection(source_name, body)
726}
727
728fn peer_notice_renderable(peer: &PeerInput) -> Option<CoreRenderable> {
729    let (peer_id, display_name) = match &peer.header.source {
730        InputOrigin::Peer {
731            peer_id,
732            display_identity,
733            ..
734        } => (peer_id.clone(), display_identity.clone()),
735        _ => return None,
736    };
737    let (kind, request_id, intent, status) = match &peer.convention {
738        Some(PeerConvention::Message) | None => ("message", None, None, None),
739        Some(PeerConvention::Request { request_id, intent }) => (
740            "request",
741            Some(request_id.clone()),
742            Some(intent.clone()),
743            None,
744        ),
745        Some(PeerConvention::ResponseProgress { request_id, phase }) => (
746            "response_progress",
747            Some(request_id.clone()),
748            None,
749            Some(format!("{phase:?}")),
750        ),
751        Some(PeerConvention::ResponseTerminal { request_id, status }) => (
752            "response_terminal",
753            Some(request_id.clone()),
754            None,
755            Some(format!("{status:?}")),
756        ),
757    };
758    let summary = match kind {
759        "request" => intent.as_ref().map_or_else(
760            || "Peer request".to_string(),
761            |intent| format!("Peer request: {intent}"),
762        ),
763        "response_progress" => "Peer response progress".to_string(),
764        "response_terminal" => "Peer response terminal".to_string(),
765        _ => "Peer message".to_string(),
766    };
767    let content = if let Some(blocks) = peer.blocks.clone() {
768        let body_already_in_blocks = blocks.iter().any(|block| {
769            matches!(block, meerkat_core::types::ContentBlock::Text { text } if text.trim() == peer.body.trim())
770        });
771        if peer.body.trim().is_empty() || body_already_in_blocks {
772            blocks
773        } else {
774            let mut content = vec![meerkat_core::types::ContentBlock::Text {
775                text: peer.body.clone(),
776            }];
777            content.extend(blocks);
778            content
779        }
780    } else if peer.body.is_empty() {
781        Vec::new()
782    } else {
783        vec![meerkat_core::types::ContentBlock::Text {
784            text: peer.body.clone(),
785        }]
786    };
787    Some(CoreRenderable::SystemNotice {
788        kind: SystemNoticeKind::Comms,
789        body: Some(summary.clone()),
790        blocks: vec![SystemNoticeBlock::Comms {
791            kind: kind.to_string(),
792            direction: SystemNoticeDirection::Incoming,
793            peer: Some(SystemNoticePeer {
794                id: peer_id,
795                display_name,
796            }),
797            request_id,
798            intent,
799            status,
800            summary: Some(summary),
801            payload: peer.payload.clone(),
802            content,
803        }],
804    })
805}
806
807fn external_event_notice_renderable(event: &ExternalEventInput) -> CoreRenderable {
808    let source = match &event.header.source {
809        InputOrigin::External { source_name } if !source_name.trim().is_empty() => {
810            source_name.clone()
811        }
812        _ => event.event_type.clone(),
813    };
814    let body = event
815        .payload
816        .get("body")
817        .and_then(serde_json::Value::as_str)
818        .map(str::trim)
819        .filter(|body| !body.is_empty())
820        .map(ToOwned::to_owned);
821    let summary = body.as_ref().map_or_else(
822        || format!("External event via {source}"),
823        std::clone::Clone::clone,
824    );
825    CoreRenderable::SystemNotice {
826        kind: SystemNoticeKind::ExternalEvent,
827        body: Some(summary.clone()),
828        blocks: vec![SystemNoticeBlock::ExternalEvent {
829            source,
830            event_type: event.event_type.clone(),
831            summary: Some(summary),
832            body,
833            payload: Some(event.payload.clone()),
834            content: event.blocks.clone().unwrap_or_default(),
835        }],
836    }
837}
838
839fn input_to_append(input: &Input) -> Option<ConversationAppend> {
840    if matches!(
841        input,
842        Input::Peer(PeerInput {
843            convention: Some(PeerConvention::ResponseTerminal { .. }),
844            blocks: None,
845            ..
846        })
847    ) {
848        return None;
849    }
850
851    let (role, content) = match input {
852        Input::Prompt(p)
853            if !p.typed_turn_appends.is_empty()
854                && p.text.trim().is_empty()
855                && p.blocks.as_ref().is_none_or(Vec::is_empty) =>
856        {
857            return None;
858        }
859        Input::Prompt(p) if p.blocks.is_some() => (
860            ConversationAppendRole::User,
861            CoreRenderable::Blocks {
862                blocks: p.blocks.clone().unwrap_or_default(),
863            },
864        ),
865        Input::Prompt(_) => (
866            ConversationAppendRole::User,
867            CoreRenderable::Text {
868                text: input_prompt_text(input),
869            },
870        ),
871        Input::Peer(p) => peer_notice_renderable(p)
872            .map(|content| (ConversationAppendRole::SystemNotice, content))?,
873        Input::FlowStep(f) => (
874            ConversationAppendRole::SystemNotice,
875            CoreRenderable::SystemNotice {
876                kind: SystemNoticeKind::Generic,
877                body: Some(format!("Flow step {}", f.step_id)),
878                blocks: vec![SystemNoticeBlock::RuntimeNotice {
879                    category: "flow_step".to_string(),
880                    detail: Some(f.instructions.clone()),
881                    payload: None,
882                }],
883            },
884        ),
885        Input::ExternalEvent(e) => (
886            ConversationAppendRole::SystemNotice,
887            external_event_notice_renderable(e),
888        ),
889        Input::Continuation(_) | Input::Operation(_) => return None,
890    };
891
892    Some(ConversationAppend { role, content })
893}
894
895fn input_to_context_append(input: &Input) -> Option<ConversationContextAppend> {
896    let (projection, content) = match input {
897        Input::Peer(peer) => {
898            let projection = peer_projection_from_peer_input(peer)?;
899            let content = peer_notice_renderable(peer)?;
900            (projection, content)
901        }
902        _ => return None,
903    };
904
905    Some(ConversationContextAppend {
906        key: projection.context_key()?,
907        content,
908    })
909}
910
911fn peer_response_terminal_context_append(
912    peer: &PeerInput,
913) -> Result<Option<ConversationContextAppend>, PeerResponseTerminalFactError> {
914    let Some(fact) = peer_response_terminal_fact(peer)? else {
915        return Ok(None);
916    };
917
918    Ok(Some(ConversationContextAppend {
919        key: fact.context_key(),
920        content: CoreRenderable::SystemNotice {
921            kind: SystemNoticeKind::Comms,
922            body: Some("Peer terminal response context".to_string()),
923            blocks: vec![SystemNoticeBlock::Comms {
924                kind: "response_terminal".to_string(),
925                direction: SystemNoticeDirection::Incoming,
926                peer: Some(SystemNoticePeer {
927                    id: fact.source.route_identity.to_string(),
928                    display_name: Some(fact.source.display_identity.to_string()),
929                }),
930                request_id: Some(fact.correlation_id.to_string()),
931                intent: None,
932                status: Some(
933                    match fact.status {
934                        PeerResponseTerminalProjectionStatus::Completed => "completed",
935                        PeerResponseTerminalProjectionStatus::Failed => "failed",
936                        PeerResponseTerminalProjectionStatus::Cancelled => "cancelled",
937                    }
938                    .to_string(),
939                ),
940                summary: Some("Peer terminal response".to_string()),
941                payload: fact.render_payload.as_ref().cloned(),
942                content: Vec::new(),
943            }],
944        },
945    }))
946}
947
948pub(crate) fn runtime_input_projection(
949    input: &Input,
950) -> crate::ingress_types::RuntimeInputProjection {
951    crate::ingress_types::RuntimeInputProjection {
952        append: input_to_append(input),
953        additional_appends: match input {
954            Input::Prompt(prompt) => prompt.typed_turn_appends.clone(),
955            _ => Vec::new(),
956        },
957        context_append: input_to_context_append(input),
958    }
959}
960
961pub(crate) fn runtime_input_projection_for_machine_batch(
962    input: &Input,
963) -> crate::ingress_types::RuntimeInputProjection {
964    let mut projection = runtime_input_projection(input);
965    if let Input::Peer(peer) = input
966        && let Ok(Some(context_append)) = peer_response_terminal_context_append(peer)
967    {
968        projection.context_append = Some(context_append);
969    }
970    projection
971}
972
973pub(crate) fn context_append_to_pending_system_context_append(
974    append: &ConversationContextAppend,
975) -> meerkat_core::PendingSystemContextAppend {
976    let text = render_core_context_for_pending_system_context(&append.content);
977    meerkat_core::PendingSystemContextAppend {
978        text,
979        source: Some(append.key.clone()),
980        idempotency_key: Some(append.key.clone()),
981        accepted_at: meerkat_core::time_compat::SystemTime::now(),
982    }
983}
984
985pub(crate) fn projection_to_pending_system_context_appends(
986    input_id: &InputId,
987    projection: &crate::ingress_types::RuntimeInputProjection,
988) -> Vec<meerkat_core::PendingSystemContextAppend> {
989    if let Some(append) = projection.context_append.as_ref() {
990        return std::iter::once(context_append_to_pending_system_context_append(append))
991            .filter(|append| !append.text.trim().is_empty())
992            .collect();
993    }
994
995    projection
996        .append
997        .as_ref()
998        .map(|append| {
999            let key = format!("runtime:steer:{input_id}");
1000            meerkat_core::PendingSystemContextAppend {
1001                text: render_core_context_for_pending_system_context(&append.content),
1002                source: Some(key.clone()),
1003                idempotency_key: Some(key),
1004                accepted_at: meerkat_core::time_compat::SystemTime::now(),
1005            }
1006        })
1007        .into_iter()
1008        .filter(|append| !append.text.trim().is_empty())
1009        .collect()
1010}
1011
1012fn render_core_context_for_pending_system_context(content: &CoreRenderable) -> String {
1013    match content {
1014        CoreRenderable::Text { text } => text.clone(),
1015        CoreRenderable::Blocks { blocks } => meerkat_core::types::text_content(blocks),
1016        CoreRenderable::Json { value } => {
1017            serde_json::to_string_pretty(value).unwrap_or_else(|_| value.to_string())
1018        }
1019        CoreRenderable::Reference { uri, label } => match label {
1020            Some(label) => format!("{label}: {uri}"),
1021            None => uri.clone(),
1022        },
1023        CoreRenderable::SystemNotice { kind, body, blocks } => {
1024            meerkat_core::types::SystemNoticeMessage::with_blocks(
1025                *kind,
1026                body.clone(),
1027                blocks.clone(),
1028            )
1029            .model_projection_text()
1030        }
1031        _ => String::new(),
1032    }
1033}
1034
1035#[cfg(test)]
1036#[allow(clippy::unwrap_used, clippy::panic)]
1037mod tests {
1038    use super::*;
1039    use chrono::Utc;
1040
1041    fn make_header() -> InputHeader {
1042        InputHeader {
1043            id: InputId::new(),
1044            timestamp: Utc::now(),
1045            source: InputOrigin::Operator,
1046            durability: InputDurability::Durable,
1047            visibility: InputVisibility::default(),
1048            idempotency_key: None,
1049            supersession_key: None,
1050            correlation_id: None,
1051        }
1052    }
1053
1054    fn typed_runtime_notice_append(detail: &str) -> ConversationAppend {
1055        ConversationAppend {
1056            role: ConversationAppendRole::SystemNotice,
1057            content: CoreRenderable::SystemNotice {
1058                kind: meerkat_core::types::SystemNoticeKind::Generic,
1059                body: Some(detail.to_string()),
1060                blocks: vec![meerkat_core::types::SystemNoticeBlock::RuntimeNotice {
1061                    category: "test".to_string(),
1062                    detail: Some(detail.to_string()),
1063                    payload: None,
1064                }],
1065            },
1066        }
1067    }
1068
1069    #[test]
1070    fn prompt_input_serde() {
1071        let input = Input::Prompt(PromptInput {
1072            header: make_header(),
1073            text: "hello".into(),
1074            blocks: None,
1075            typed_turn_appends: Vec::new(),
1076            turn_metadata: None,
1077        });
1078        let json = serde_json::to_value(&input).unwrap();
1079        assert_eq!(json["input_type"], "prompt");
1080        let parsed: Input = serde_json::from_value(json).unwrap();
1081        assert!(matches!(parsed, Input::Prompt(_)));
1082    }
1083
1084    #[test]
1085    fn prompt_input_typed_turn_appends_project_without_user_text() {
1086        let append = typed_runtime_notice_append("peer delivery");
1087        let input = Input::Prompt(PromptInput {
1088            header: make_header(),
1089            text: String::new(),
1090            blocks: None,
1091            typed_turn_appends: vec![append.clone()],
1092            turn_metadata: None,
1093        });
1094
1095        let projection = runtime_input_projection(&input);
1096        assert!(
1097            projection.append.is_none(),
1098            "empty runtime-authored prompt carrier must not synthesize a user append"
1099        );
1100        assert_eq!(projection.additional_appends, vec![append]);
1101    }
1102
1103    #[test]
1104    fn prompt_input_typed_turn_appends_serde_roundtrip() {
1105        let append = typed_runtime_notice_append("typed appends persist");
1106        let input = Input::Prompt(PromptInput {
1107            header: make_header(),
1108            text: String::new(),
1109            blocks: None,
1110            typed_turn_appends: vec![append.clone()],
1111            turn_metadata: None,
1112        });
1113
1114        let json = serde_json::to_value(&input).unwrap();
1115        let parsed: Input = serde_json::from_value(json).unwrap();
1116        let Input::Prompt(prompt) = parsed else {
1117            panic!("expected prompt input");
1118        };
1119        assert_eq!(prompt.text, "");
1120        assert_eq!(prompt.typed_turn_appends, vec![append]);
1121    }
1122
1123    #[test]
1124    fn peer_input_message_serde() {
1125        let input = Input::Peer(PeerInput {
1126            header: make_header(),
1127            convention: Some(PeerConvention::Message),
1128            body: "hi there".into(),
1129            payload: None,
1130            blocks: None,
1131            handling_mode: None,
1132        });
1133        let json = serde_json::to_value(&input).unwrap();
1134        assert_eq!(json["input_type"], "peer");
1135        let parsed: Input = serde_json::from_value(json).unwrap();
1136        assert!(matches!(parsed, Input::Peer(_)));
1137    }
1138
1139    #[test]
1140    fn peer_message_blocks_preserve_typed_comms_content_without_prefix_injection() {
1141        let mut header = make_header();
1142        header.source = InputOrigin::Peer {
1143            peer_id: "canonical-peer-id".into(),
1144            display_identity: Some("display-agent".into()),
1145            runtime_id: None,
1146        };
1147        let input = Input::Peer(PeerInput {
1148            header,
1149            convention: Some(PeerConvention::Message),
1150            body: "caption".into(),
1151            payload: None,
1152            blocks: Some(vec![
1153                meerkat_core::types::ContentBlock::Text {
1154                    text: "caption".into(),
1155                },
1156                meerkat_core::types::ContentBlock::Image {
1157                    media_type: "image/png".into(),
1158                    data: "abc".into(),
1159                },
1160            ]),
1161            handling_mode: None,
1162        });
1163
1164        let Input::Peer(peer) = &input else {
1165            panic!("expected peer input");
1166        };
1167        assert_eq!(
1168            peer_projection_from_peer_input(peer)
1169                .and_then(|projection| projection.block_prefix_text())
1170                .as_deref(),
1171            Some("Peer message from canonical-peer-id")
1172        );
1173
1174        let projection = runtime_input_projection(&input);
1175        let append = projection.append.expect("conversation append");
1176        let CoreRenderable::SystemNotice { blocks, .. } = append.content else {
1177            panic!("expected typed system notice");
1178        };
1179        let Some(meerkat_core::types::SystemNoticeBlock::Comms { content, peer, .. }) =
1180            blocks.first()
1181        else {
1182            panic!("expected comms block");
1183        };
1184        assert_eq!(
1185            peer.as_ref().and_then(|peer| peer.display_name.as_deref()),
1186            Some("display-agent")
1187        );
1188        assert_eq!(
1189            content.first(),
1190            Some(&meerkat_core::types::ContentBlock::Text {
1191                text: "caption".into()
1192            })
1193        );
1194    }
1195
1196    #[test]
1197    fn peer_response_terminal_context_is_deferred_to_machine_batch_projection() {
1198        let route_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f2";
1199        let request_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f1";
1200        let mut header = make_header();
1201        header.source = InputOrigin::Peer {
1202            peer_id: route_id.into(),
1203            display_identity: Some("display-agent".into()),
1204            runtime_id: None,
1205        };
1206        let input = Input::Peer(PeerInput {
1207            header,
1208            convention: Some(PeerConvention::ResponseTerminal {
1209                request_id: request_id.into(),
1210                status: ResponseTerminalStatus::Completed,
1211            }),
1212            body: "legacy response body".into(),
1213            payload: Some(serde_json::json!({"answer":"ok"})),
1214            blocks: None,
1215            handling_mode: None,
1216        });
1217
1218        let Input::Peer(peer) = &input else {
1219            panic!("expected peer input");
1220        };
1221        let expected_canonical_key = format!("peer_response_terminal:{route_id}:{request_id}");
1222        assert!(
1223            peer_projection_from_peer_input(peer).is_none(),
1224            "terminal peer response projection must not be built before machine batch selection"
1225        );
1226
1227        let projection = runtime_input_projection(&input);
1228        assert!(
1229            projection.context_append.is_none(),
1230            "admission projection must not store terminal peer response context"
1231        );
1232        let projection = runtime_input_projection_for_machine_batch(&input);
1233        let context = projection.context_append.expect("context append");
1234        assert_eq!(context.key, expected_canonical_key);
1235        let CoreRenderable::SystemNotice { blocks, .. } = context.content else {
1236            panic!("expected typed context");
1237        };
1238        let Some(meerkat_core::types::SystemNoticeBlock::Comms { peer, .. }) = blocks.first()
1239        else {
1240            panic!("expected comms block");
1241        };
1242        assert_eq!(
1243            peer.as_ref().and_then(|peer| peer.display_name.as_deref()),
1244            Some("display-agent")
1245        );
1246        assert_eq!(peer.as_ref().map(|peer| peer.id.as_str()), Some(route_id));
1247    }
1248
1249    #[test]
1250    fn steer_projection_uses_context_append_as_pending_system_context() {
1251        let input_id = InputId::new();
1252        let projection = crate::ingress_types::RuntimeInputProjection {
1253            append: Some(ConversationAppend {
1254                role: ConversationAppendRole::SystemNotice,
1255                content: CoreRenderable::Text {
1256                    text: "ordinary append must lose to context append".into(),
1257                },
1258            }),
1259            additional_appends: Vec::new(),
1260            context_append: Some(ConversationContextAppend {
1261                key: "peer_response_terminal:peer:req".into(),
1262                content: CoreRenderable::Text {
1263                    text: "terminal response is ready".into(),
1264                },
1265            }),
1266        };
1267
1268        let appends = projection_to_pending_system_context_appends(&input_id, &projection);
1269
1270        assert_eq!(appends.len(), 1);
1271        assert_eq!(appends[0].text, "terminal response is ready");
1272        assert_eq!(
1273            appends[0].source.as_deref(),
1274            Some("peer_response_terminal:peer:req")
1275        );
1276        assert_eq!(
1277            appends[0].idempotency_key.as_deref(),
1278            Some("peer_response_terminal:peer:req")
1279        );
1280    }
1281
1282    #[test]
1283    fn steer_projection_falls_back_to_ordinary_peer_append() {
1284        let mut header = make_header();
1285        header.source = InputOrigin::Peer {
1286            peer_id: "peer-a".into(),
1287            display_identity: Some("Peer A".into()),
1288            runtime_id: None,
1289        };
1290        let input = Input::Peer(PeerInput {
1291            header,
1292            convention: Some(PeerConvention::Message),
1293            body: "please look at this while you work".into(),
1294            payload: None,
1295            blocks: None,
1296            handling_mode: Some(HandlingMode::Steer),
1297        });
1298        let input_id = input.id().clone();
1299        let projection = runtime_input_projection(&input);
1300
1301        let appends = projection_to_pending_system_context_appends(&input_id, &projection);
1302
1303        assert_eq!(appends.len(), 1);
1304        assert!(
1305            appends[0]
1306                .text
1307                .contains("please look at this while you work"),
1308            "peer message append should be renderable as live system context: {:?}",
1309            appends[0].text
1310        );
1311        assert_eq!(
1312            appends[0].source.as_deref(),
1313            Some(format!("runtime:steer:{input_id}").as_str())
1314        );
1315        assert_eq!(
1316            appends[0].idempotency_key.as_deref(),
1317            Some(format!("runtime:steer:{input_id}").as_str())
1318        );
1319    }
1320
1321    #[test]
1322    fn steer_projection_filters_empty_context_and_empty_append() {
1323        let input_id = InputId::new();
1324        let context_projection = crate::ingress_types::RuntimeInputProjection {
1325            append: None,
1326            additional_appends: Vec::new(),
1327            context_append: Some(ConversationContextAppend {
1328                key: "empty-context".into(),
1329                content: CoreRenderable::Text { text: "  ".into() },
1330            }),
1331        };
1332        assert!(
1333            projection_to_pending_system_context_appends(&input_id, &context_projection).is_empty()
1334        );
1335
1336        let append_projection = crate::ingress_types::RuntimeInputProjection {
1337            append: Some(ConversationAppend {
1338                role: ConversationAppendRole::SystemNotice,
1339                content: CoreRenderable::Text { text: "\n".into() },
1340            }),
1341            additional_appends: Vec::new(),
1342            context_append: None,
1343        };
1344        assert!(
1345            projection_to_pending_system_context_appends(&input_id, &append_projection).is_empty()
1346        );
1347    }
1348
1349    #[test]
1350    fn peer_response_terminal_with_blocks_projects_append_and_context() {
1351        let route_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f2";
1352        let request_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f1";
1353        let mut header = make_header();
1354        header.source = InputOrigin::Peer {
1355            peer_id: route_id.into(),
1356            display_identity: Some("display-agent".into()),
1357            runtime_id: None,
1358        };
1359        let input = Input::Peer(PeerInput {
1360            header,
1361            convention: Some(PeerConvention::ResponseTerminal {
1362                request_id: request_id.into(),
1363                status: ResponseTerminalStatus::Completed,
1364            }),
1365            body: String::new(),
1366            payload: Some(serde_json::json!({"answer":"ok"})),
1367            blocks: Some(vec![meerkat_core::types::ContentBlock::Image {
1368                media_type: "image/jpeg".into(),
1369                data: "abc".into(),
1370            }]),
1371            handling_mode: None,
1372        });
1373
1374        let projection = runtime_input_projection_for_machine_batch(&input);
1375        let append = projection.append.expect("conversation append");
1376        let CoreRenderable::SystemNotice { blocks, .. } = append.content else {
1377            panic!("expected typed append");
1378        };
1379        let Some(meerkat_core::types::SystemNoticeBlock::Comms { content, peer, .. }) =
1380            blocks.first()
1381        else {
1382            panic!("expected comms block");
1383        };
1384        assert_eq!(
1385            peer.as_ref().and_then(|peer| peer.display_name.as_deref()),
1386            Some("display-agent")
1387        );
1388        assert!(matches!(
1389            content.first(),
1390            Some(meerkat_core::types::ContentBlock::Image { media_type, .. })
1391                if media_type == "image/jpeg"
1392        ));
1393        assert!(
1394            projection.context_append.is_some(),
1395            "terminal response must still apply runtime-owned context"
1396        );
1397    }
1398
1399    #[test]
1400    fn peer_input_request_serde() {
1401        let input = Input::Peer(PeerInput {
1402            header: make_header(),
1403            convention: Some(PeerConvention::Request {
1404                request_id: "req-1".into(),
1405                intent: "mob.peer_added".into(),
1406            }),
1407            body: "Agent joined".into(),
1408            payload: Some(serde_json::json!({"name": "agent-1"})),
1409            blocks: None,
1410            handling_mode: None,
1411        });
1412        let json = serde_json::to_value(&input).unwrap();
1413        let parsed: Input = serde_json::from_value(json).unwrap();
1414        if let Input::Peer(p) = parsed {
1415            assert!(matches!(p.convention, Some(PeerConvention::Request { .. })));
1416        } else {
1417            panic!("Expected PeerInput");
1418        }
1419    }
1420
1421    #[test]
1422    fn peer_input_response_terminal_serde() {
1423        let input = Input::Peer(PeerInput {
1424            header: make_header(),
1425            convention: Some(PeerConvention::ResponseTerminal {
1426                request_id: "req-1".into(),
1427                status: ResponseTerminalStatus::Completed,
1428            }),
1429            body: "Done".into(),
1430            payload: Some(serde_json::json!({"ok": true})),
1431            blocks: None,
1432            handling_mode: None,
1433        });
1434        let json = serde_json::to_value(&input).unwrap();
1435        let parsed: Input = serde_json::from_value(json).unwrap();
1436        assert!(matches!(parsed, Input::Peer(_)));
1437    }
1438
1439    #[test]
1440    fn peer_input_response_progress_serde() {
1441        let input = Input::Peer(PeerInput {
1442            header: make_header(),
1443            convention: Some(PeerConvention::ResponseProgress {
1444                request_id: "req-1".into(),
1445                phase: ResponseProgressPhase::InProgress,
1446            }),
1447            body: "Working...".into(),
1448            payload: Some(serde_json::json!({"progress": "working"})),
1449            blocks: None,
1450            handling_mode: None,
1451        });
1452        let json = serde_json::to_value(&input).unwrap();
1453        let parsed: Input = serde_json::from_value(json).unwrap();
1454        assert!(matches!(parsed, Input::Peer(_)));
1455    }
1456
1457    #[test]
1458    fn flow_step_input_serde() {
1459        let input = Input::FlowStep(FlowStepInput {
1460            header: make_header(),
1461            step_id: "step-1".into(),
1462            instructions: "analyze the data".into(),
1463            blocks: Some(vec![
1464                meerkat_core::types::ContentBlock::Text {
1465                    text: "analyze the data".into(),
1466                },
1467                meerkat_core::types::ContentBlock::Image {
1468                    media_type: "image/png".into(),
1469                    data: meerkat_core::types::ImageData::Inline {
1470                        data: "abc123".into(),
1471                    },
1472                },
1473            ]),
1474            turn_metadata: None,
1475        });
1476        let json = serde_json::to_value(&input).unwrap();
1477        assert_eq!(json["input_type"], "flow_step");
1478        let parsed: Input = serde_json::from_value(json).unwrap();
1479        assert!(matches!(parsed, Input::FlowStep(_)));
1480    }
1481
1482    #[test]
1483    fn external_event_input_serde() {
1484        let input = Input::ExternalEvent(ExternalEventInput {
1485            header: make_header(),
1486            event_type: "webhook.received".into(),
1487            payload: serde_json::json!({"url": "https://example.com"}),
1488            blocks: Some(vec![
1489                meerkat_core::types::ContentBlock::Text {
1490                    text: "look".into(),
1491                },
1492                meerkat_core::types::ContentBlock::Image {
1493                    media_type: "image/png".into(),
1494                    data: meerkat_core::types::ImageData::Inline {
1495                        data: "abc123".into(),
1496                    },
1497                },
1498            ]),
1499            handling_mode: HandlingMode::Queue,
1500            render_metadata: None,
1501        });
1502        let json = serde_json::to_value(&input).unwrap();
1503        assert_eq!(json["input_type"], "external_event");
1504        let parsed: Input = serde_json::from_value(json).unwrap();
1505        assert!(matches!(parsed, Input::ExternalEvent(_)));
1506    }
1507
1508    #[test]
1509    fn legacy_external_event_payload_blocks_migrate_to_canonical_blocks_owner() {
1510        let mut input = Input::ExternalEvent(ExternalEventInput {
1511            header: make_header(),
1512            event_type: "webhook.received".into(),
1513            payload: serde_json::json!({
1514                "body": "see image",
1515                "blocks": [
1516                    { "type": "text", "text": "caption text" },
1517                    { "type": "image", "media_type": "image/png", "source": "inline", "data": "abc123" }
1518                ]
1519            }),
1520            blocks: None,
1521            handling_mode: HandlingMode::Queue,
1522            render_metadata: None,
1523        });
1524
1525        match &mut input {
1526            Input::ExternalEvent(event) => {
1527                migrate_legacy_payload_blocks(event).unwrap();
1528                assert!(event.payload.get("blocks").is_none());
1529                assert_eq!(event.payload["body"], "see image");
1530                assert_eq!(event.blocks.as_ref().map(Vec::len), Some(2));
1531            }
1532            other => panic!("Expected ExternalEvent, got {other:?}"),
1533        }
1534    }
1535
1536    #[test]
1537    fn continuation_input_serde() {
1538        let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
1539        let json = serde_json::to_value(&input).unwrap();
1540        assert_eq!(json["input_type"], "continuation");
1541        let parsed: Input = serde_json::from_value(json).unwrap();
1542        match parsed {
1543            Input::Continuation(continuation) => {
1544                assert_eq!(continuation.handling_mode, HandlingMode::Steer);
1545                assert_eq!(continuation.reason, "detached_background_op_completed");
1546            }
1547            other => panic!("Expected Continuation, got {other:?}"),
1548        }
1549    }
1550
1551    #[test]
1552    fn continuation_input_accepts_legacy_system_generated_tag() {
1553        let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
1554        let mut json = serde_json::to_value(&input).unwrap();
1555        json["input_type"] = serde_json::Value::String("system_generated".into());
1556        let parsed: Input = serde_json::from_value(json).unwrap();
1557        match parsed {
1558            Input::Continuation(continuation) => {
1559                assert_eq!(continuation.reason, "detached_background_op_completed");
1560            }
1561            other => panic!("Expected Continuation, got {other:?}"),
1562        }
1563    }
1564
1565    #[test]
1566    fn operation_input_serde() {
1567        let input = Input::Operation(OperationInput {
1568            header: InputHeader {
1569                durability: InputDurability::Derived,
1570                ..make_header()
1571            },
1572            operation_id: OperationId::new(),
1573            event: OpEvent::Cancelled {
1574                id: OperationId::new(),
1575            },
1576        });
1577        let json = serde_json::to_value(&input).unwrap();
1578        assert_eq!(json["input_type"], "operation");
1579        let parsed: Input = serde_json::from_value(json).unwrap();
1580        assert!(matches!(parsed, Input::Operation(_)));
1581    }
1582
1583    #[test]
1584    fn operation_input_accepts_legacy_projected_tag() {
1585        let input = Input::Operation(OperationInput {
1586            header: InputHeader {
1587                durability: InputDurability::Derived,
1588                ..make_header()
1589            },
1590            operation_id: OperationId::new(),
1591            event: OpEvent::Cancelled {
1592                id: OperationId::new(),
1593            },
1594        });
1595        let mut json = serde_json::to_value(&input).unwrap();
1596        json["input_type"] = serde_json::Value::String("projected".into());
1597        let parsed: Input = serde_json::from_value(json).unwrap();
1598        assert!(matches!(parsed, Input::Operation(_)));
1599    }
1600
1601    #[test]
1602    fn input_kind_id() {
1603        let prompt = Input::Prompt(PromptInput {
1604            header: make_header(),
1605            text: "hi".into(),
1606            blocks: None,
1607            typed_turn_appends: Vec::new(),
1608            turn_metadata: None,
1609        });
1610        assert_eq!(prompt.kind(), InputKind::Prompt);
1611
1612        let peer_msg = Input::Peer(PeerInput {
1613            header: make_header(),
1614            convention: Some(PeerConvention::Message),
1615            body: "hi".into(),
1616            payload: None,
1617            blocks: None,
1618            handling_mode: None,
1619        });
1620        assert_eq!(peer_msg.kind(), InputKind::PeerMessage);
1621
1622        let peer_req = Input::Peer(PeerInput {
1623            header: make_header(),
1624            convention: Some(PeerConvention::Request {
1625                request_id: "r".into(),
1626                intent: "i".into(),
1627            }),
1628            body: "hi".into(),
1629            payload: Some(serde_json::json!({"subject": "x"})),
1630            blocks: None,
1631            handling_mode: None,
1632        });
1633        assert_eq!(peer_req.kind(), InputKind::PeerRequest);
1634
1635        let continuation = Input::Continuation(ContinuationInput {
1636            header: make_header(),
1637            reason: "continue".into(),
1638            handling_mode: HandlingMode::Steer,
1639            request_id: None,
1640        });
1641        assert_eq!(continuation.kind(), InputKind::Continuation);
1642
1643        let operation = Input::Operation(OperationInput {
1644            header: make_header(),
1645            operation_id: OperationId::new(),
1646            event: OpEvent::Cancelled {
1647                id: OperationId::new(),
1648            },
1649        });
1650        assert_eq!(operation.kind(), InputKind::Operation);
1651    }
1652
1653    #[test]
1654    fn input_source_variants() {
1655        let sources = vec![
1656            InputOrigin::Operator,
1657            InputOrigin::Peer {
1658                peer_id: "p1".into(),
1659                display_identity: None,
1660                runtime_id: None,
1661            },
1662            InputOrigin::Flow {
1663                flow_id: "f1".into(),
1664                step_index: 0,
1665            },
1666            InputOrigin::System,
1667            InputOrigin::External {
1668                source_name: "webhook".into(),
1669            },
1670        ];
1671        for source in sources {
1672            let json = serde_json::to_value(&source).unwrap();
1673            let parsed: InputOrigin = serde_json::from_value(json).unwrap();
1674            assert_eq!(source, parsed);
1675        }
1676    }
1677
1678    #[test]
1679    fn input_durability_serde() {
1680        for d in [
1681            InputDurability::Durable,
1682            InputDurability::Ephemeral,
1683            InputDurability::Derived,
1684        ] {
1685            let json = serde_json::to_value(d).unwrap();
1686            let parsed: InputDurability = serde_json::from_value(json).unwrap();
1687            assert_eq!(d, parsed);
1688        }
1689    }
1690
1691    #[test]
1692    fn peer_input_without_handling_mode_deserializes_as_none() {
1693        // Simulate old serialized PeerInput without the handling_mode field.
1694        let json = serde_json::json!({
1695            "input_type": "peer",
1696            "header": serde_json::to_value(make_header()).unwrap(),
1697            "convention": { "convention_type": "message" },
1698            "body": "hello"
1699        });
1700        let parsed: Input = serde_json::from_value(json).unwrap();
1701        match parsed {
1702            Input::Peer(p) => assert!(p.handling_mode.is_none()),
1703            other => panic!("Expected Peer, got {other:?}"),
1704        }
1705    }
1706
1707    #[test]
1708    fn peer_input_with_queue_handling_mode_roundtrips() {
1709        let input = Input::Peer(PeerInput {
1710            header: make_header(),
1711            convention: Some(PeerConvention::Message),
1712            body: "hi".into(),
1713            payload: None,
1714            blocks: None,
1715            handling_mode: Some(HandlingMode::Queue),
1716        });
1717        let json = serde_json::to_value(&input).unwrap();
1718        assert_eq!(json["handling_mode"], "queue");
1719        let parsed: Input = serde_json::from_value(json).unwrap();
1720        match parsed {
1721            Input::Peer(p) => assert_eq!(p.handling_mode, Some(HandlingMode::Queue)),
1722            other => panic!("Expected Peer, got {other:?}"),
1723        }
1724    }
1725
1726    #[test]
1727    fn peer_response_terminal_input_owns_wire_status_mapping() {
1728        let peer_id = meerkat_core::comms::PeerId::from_uuid(
1729            uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000161").unwrap(),
1730        );
1731        let display_name = meerkat_core::comms::PeerName::new("analyst").unwrap();
1732        let request_id = meerkat_core::PeerCorrelationId::from_uuid(
1733            uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000162").unwrap(),
1734        );
1735        let input = peer_response_terminal_input(
1736            peer_id,
1737            Some(display_name),
1738            request_id,
1739            meerkat_contracts::PeerResponseTerminalStatusWire::Cancelled,
1740            serde_json::json!({"ok": false}),
1741        );
1742
1743        match input {
1744            Input::Peer(PeerInput {
1745                header:
1746                    InputHeader {
1747                        source:
1748                            InputOrigin::Peer {
1749                                peer_id,
1750                                display_identity,
1751                                runtime_id,
1752                            },
1753                        durability: InputDurability::Durable,
1754                        correlation_id,
1755                        ..
1756                    },
1757                convention: Some(PeerConvention::ResponseTerminal { request_id, status }),
1758                payload: Some(payload),
1759                handling_mode: None,
1760                ..
1761            }) => {
1762                assert_eq!(peer_id, "00000000-0000-4000-8000-000000000161");
1763                assert_eq!(display_identity.as_deref(), Some("analyst"));
1764                assert_eq!(runtime_id, None);
1765                assert_eq!(request_id, "00000000-0000-4000-8000-000000000162");
1766                assert_eq!(
1767                    correlation_id,
1768                    Some(CorrelationId::from_uuid(
1769                        uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000162").unwrap()
1770                    ))
1771                );
1772                assert_eq!(status, ResponseTerminalStatus::Cancelled);
1773                assert_eq!(payload["ok"], false);
1774            }
1775            other => panic!("expected terminal peer input, got {other:?}"),
1776        }
1777    }
1778
1779    #[test]
1780    fn peer_input_with_steer_handling_mode_roundtrips() {
1781        let input = Input::Peer(PeerInput {
1782            header: make_header(),
1783            convention: Some(PeerConvention::Message),
1784            body: "hi".into(),
1785            payload: None,
1786            blocks: None,
1787            handling_mode: Some(HandlingMode::Steer),
1788        });
1789        let json = serde_json::to_value(&input).unwrap();
1790        assert_eq!(json["handling_mode"], "steer");
1791        let parsed: Input = serde_json::from_value(json).unwrap();
1792        match parsed {
1793            Input::Peer(p) => assert_eq!(p.handling_mode, Some(HandlingMode::Steer)),
1794            other => panic!("Expected Peer, got {other:?}"),
1795        }
1796    }
1797
1798    #[test]
1799    fn peer_input_handling_mode_not_serialized_when_none() {
1800        let input = Input::Peer(PeerInput {
1801            header: make_header(),
1802            convention: Some(PeerConvention::Message),
1803            body: "hi".into(),
1804            payload: None,
1805            blocks: None,
1806            handling_mode: None,
1807        });
1808        let json = serde_json::to_value(&input).unwrap();
1809        assert!(json.get("handling_mode").is_none());
1810    }
1811}