Skip to main content

meerkat_runtime/
input.rs

1//! §8 Input types — the 6 input variants accepted by the runtime layer.
2//!
3//! Core never sees these. Generated admission authority resolves each accepted
4//! Input to a PolicyDecision, then the runtime translates accepted Inputs into
5//! RunPrimitive for core consumption.
6
7use chrono::{DateTime, Utc};
8use meerkat_core::lifecycle::InputId;
9use meerkat_core::lifecycle::run_primitive::{
10    ConversationAppend, ConversationAppendRole, ConversationContextAppend, CoreRenderable,
11    RuntimeTurnMetadata,
12};
13use meerkat_core::ops::{OpEvent, OperationId};
14use meerkat_core::service::TurnToolOverlay;
15use meerkat_core::types::{
16    ContentInput, HandlingMode, SystemNoticeBlock, SystemNoticeDirection, SystemNoticeKind,
17    SystemNoticePeer,
18};
19use meerkat_core::{
20    BlobStore, BlobStoreError, MissingBlobBehavior, PeerConversationProjection,
21    PeerResponseProgressProjectionPhase, PeerResponseTerminalCorrelationId,
22    PeerResponseTerminalDisplayIdentity, PeerResponseTerminalFact, PeerResponseTerminalFactError,
23    PeerResponseTerminalProjectionStatus, PeerResponseTerminalRenderPayload,
24    PeerResponseTerminalRouteIdentity, PeerResponseTerminalSource,
25    PeerResponseTerminalTransportIdentity, externalize_content_blocks, hydrate_content_blocks,
26};
27use serde::{Deserialize, Serialize};
28
29use crate::identifiers::{
30    CorrelationId, IdempotencyKey, InputKind, KindId, LogicalRuntimeId, SupersessionKey,
31};
32use meerkat_core::types::RenderMetadata;
33
34/// Common header for all input variants.
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct InputHeader {
37    /// Unique ID for this input.
38    pub id: InputId,
39    /// When the input was created.
40    pub timestamp: DateTime<Utc>,
41    /// Source of the input.
42    pub source: InputOrigin,
43    /// Durability requirement.
44    pub durability: InputDurability,
45    /// Visibility controls.
46    pub visibility: InputVisibility,
47    /// Optional idempotency key for dedup.
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub idempotency_key: Option<IdempotencyKey>,
50    /// Optional supersession key.
51    #[serde(skip_serializing_if = "Option::is_none")]
52    pub supersession_key: Option<SupersessionKey>,
53    /// Optional correlation ID.
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub correlation_id: Option<CorrelationId>,
56}
57
58/// Where the input originated.
59#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
60#[serde(tag = "type", rename_all = "snake_case")]
61#[non_exhaustive]
62pub enum InputOrigin {
63    /// Human operator / external API caller.
64    Operator,
65    /// Peer agent (comms).
66    Peer {
67        /// Canonical comms peer id used by machine/runtime policy and schema
68        /// projection.
69        peer_id: String,
70        /// Optional display/source label admitted at peer ingress. This is
71        /// presentation metadata only; routing and trust must use typed ingress
72        /// facts before the runtime input seam or the canonical `peer_id`.
73        #[serde(default, skip_serializing_if = "Option::is_none")]
74        display_identity: Option<String>,
75        #[serde(skip_serializing_if = "Option::is_none")]
76        runtime_id: Option<LogicalRuntimeId>,
77    },
78    /// Flow engine (mob orchestration).
79    Flow { flow_id: String, step_index: usize },
80    /// System-generated (compaction, projection, etc.).
81    System,
82    /// External event source.
83    External { source_name: String },
84}
85
86/// Durability requirement for an input.
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
88#[serde(rename_all = "snake_case")]
89#[non_exhaustive]
90pub enum InputDurability {
91    /// Must be persisted before acknowledgment.
92    Durable,
93    /// In-memory only, may be lost on crash.
94    Ephemeral,
95    /// Derived from other inputs (can be reconstructed).
96    Derived,
97}
98
99/// Visibility controls for an input.
100#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
101pub struct InputVisibility {
102    /// Whether this input appears in the conversation transcript.
103    pub transcript_eligible: bool,
104    /// Whether this input is visible to operator surfaces.
105    pub operator_eligible: bool,
106}
107
108impl Default for InputVisibility {
109    fn default() -> Self {
110        Self {
111            transcript_eligible: true,
112            operator_eligible: true,
113        }
114    }
115}
116
117/// The 6 input variants accepted by the runtime layer.
118#[derive(Debug, Clone, Serialize, Deserialize)]
119#[serde(tag = "input_type", rename_all = "snake_case")]
120#[non_exhaustive]
121pub enum Input {
122    /// User/operator prompt.
123    Prompt(PromptInput),
124    /// Peer-originated input (comms).
125    Peer(PeerInput),
126    /// Flow step input (mob orchestration).
127    FlowStep(FlowStepInput),
128    /// External event input.
129    ExternalEvent(ExternalEventInput),
130    /// Explicit runtime continuation work.
131    Continuation(ContinuationInput),
132    /// Explicit non-content operation/lifecycle input.
133    Operation(OperationInput),
134}
135
136impl Input {
137    /// Get the input header.
138    pub fn header(&self) -> &InputHeader {
139        match self {
140            Input::Prompt(i) => &i.header,
141            Input::Peer(i) => &i.header,
142            Input::FlowStep(i) => &i.header,
143            Input::ExternalEvent(i) => &i.header,
144            Input::Continuation(i) => &i.header,
145            Input::Operation(i) => &i.header,
146        }
147    }
148
149    /// Get the input ID.
150    pub fn id(&self) -> &InputId {
151        &self.header().id
152    }
153
154    /// Typed kind for policy dispatch.
155    pub fn kind(&self) -> InputKind {
156        match self {
157            Input::Prompt(_) => InputKind::Prompt,
158            Input::Peer(p) => match &p.convention {
159                Some(PeerConvention::Message) | None => InputKind::PeerMessage,
160                Some(PeerConvention::Request { .. }) => InputKind::PeerRequest,
161                Some(PeerConvention::ResponseProgress { .. }) => InputKind::PeerResponseProgress,
162                Some(PeerConvention::ResponseTerminal { .. }) => InputKind::PeerResponseTerminal,
163            },
164            Input::FlowStep(_) => InputKind::FlowStep,
165            Input::ExternalEvent(_) => InputKind::ExternalEvent,
166            Input::Continuation(_) => InputKind::Continuation,
167            Input::Operation(_) => InputKind::Operation,
168        }
169    }
170
171    /// Wrapped kind identifier (for newtype-discipline call sites).
172    pub fn kind_id(&self) -> KindId {
173        KindId::new(self.kind())
174    }
175
176    /// Handling-mode hint for ordinary work admitted through the runtime.
177    pub fn handling_mode(&self) -> Option<HandlingMode> {
178        match self {
179            Input::Prompt(prompt) => prompt.turn_metadata.as_ref()?.handling_mode,
180            Input::FlowStep(flow_step) => flow_step.turn_metadata.as_ref()?.handling_mode,
181            Input::ExternalEvent(event) => Some(event.handling_mode),
182            Input::Continuation(continuation) => Some(continuation.handling_mode),
183            Input::Peer(peer) => peer.handling_mode,
184            Input::Operation(_) => None,
185        }
186    }
187
188    /// Typed continuation discriminant threaded into admission. Only
189    /// continuations carry a non-ordinary kind; every other input family is
190    /// [`ContinuationKind::Ordinary`]. This is a typed pass-through of the
191    /// producer-declared fact — admission never re-classifies continuation
192    /// routing from reason strings or overlay dispatch-context keys.
193    pub fn continuation_kind(&self) -> ContinuationKind {
194        match self {
195            Input::Continuation(continuation) => continuation.continuation_kind,
196            _ => ContinuationKind::Ordinary,
197        }
198    }
199}
200
201/// Fail closed on the retired external-event shape that smuggled multimodal
202/// blocks inside the payload JSON object. The typed [`ExternalEventInput::blocks`]
203/// field is the only content owner; a payload-level `blocks` key is rejected
204/// with a typed error instead of being migrated or silently passed through.
205fn reject_legacy_payload_blocks(event: &ExternalEventInput) -> Result<(), BlobStoreError> {
206    if event
207        .payload
208        .as_object()
209        .is_some_and(|obj| obj.contains_key("blocks"))
210    {
211        return Err(BlobStoreError::Internal(format!(
212            "external-event payload for event_type `{}` carries the retired payload-level \
213             `blocks` key; multimodal content must use the typed `ExternalEventInput.blocks` owner",
214            event.event_type
215        )));
216    }
217    Ok(())
218}
219
220pub async fn externalize_input_images(
221    blob_store: &dyn BlobStore,
222    input: &mut Input,
223) -> Result<(), BlobStoreError> {
224    match input {
225        Input::Prompt(prompt) => {
226            if let ContentInput::Blocks(blocks) = &mut prompt.content {
227                externalize_content_blocks(blob_store, blocks).await?;
228            }
229        }
230        Input::Peer(peer) => {
231            if let ContentInput::Blocks(blocks) = &mut peer.content {
232                externalize_content_blocks(blob_store, blocks).await?;
233            }
234        }
235        Input::FlowStep(flow_step) => {
236            if let ContentInput::Blocks(blocks) = &mut flow_step.content {
237                externalize_content_blocks(blob_store, blocks).await?;
238            }
239        }
240        Input::ExternalEvent(event) => {
241            reject_legacy_payload_blocks(event)?;
242            if let Some(blocks) = event.blocks.as_mut() {
243                externalize_content_blocks(blob_store, blocks).await?;
244            }
245        }
246        Input::Continuation(_) | Input::Operation(_) => {}
247    }
248    Ok(())
249}
250
251pub async fn hydrate_input_images(
252    blob_store: &dyn BlobStore,
253    input: &mut Input,
254    missing_behavior: MissingBlobBehavior,
255) -> Result<(), BlobStoreError> {
256    match input {
257        Input::Prompt(prompt) => {
258            if let ContentInput::Blocks(blocks) = &mut prompt.content {
259                hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
260            }
261        }
262        Input::Peer(peer) => {
263            if let ContentInput::Blocks(blocks) = &mut peer.content {
264                hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
265            }
266        }
267        Input::FlowStep(flow_step) => {
268            if let ContentInput::Blocks(blocks) = &mut flow_step.content {
269                hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
270            }
271        }
272        Input::ExternalEvent(event) => {
273            reject_legacy_payload_blocks(event)?;
274            if let Some(blocks) = event.blocks.as_mut() {
275                hydrate_content_blocks(blob_store, blocks, missing_behavior).await?;
276            }
277        }
278        Input::Continuation(_) | Input::Operation(_) => {}
279    }
280    Ok(())
281}
282
283/// User/operator prompt input.
284#[derive(Debug, Clone, Serialize, Deserialize)]
285pub struct PromptInput {
286    pub header: InputHeader,
287    /// The prompt content — the single typed owner of this input's content
288    /// fact (plain text or multimodal blocks). The text projection is derived
289    /// at read time via [`ContentInput::text_content`]; it is never stored
290    /// separately.
291    pub content: ContentInput,
292    /// Runtime-authored typed transcript appends that travel with this turn.
293    ///
294    /// These are not operator-authored prompt content. The runtime projects
295    /// them into model-facing text only when building the provider request.
296    #[serde(default, skip_serializing_if = "Vec::is_empty")]
297    pub typed_turn_appends: Vec<ConversationAppend>,
298    #[serde(default, skip_serializing_if = "Option::is_none")]
299    pub turn_metadata: Option<RuntimeTurnMetadata>,
300}
301
302impl PromptInput {
303    /// Create a new operator prompt with default header.
304    pub fn new(text: impl Into<String>, turn_metadata: Option<RuntimeTurnMetadata>) -> Self {
305        Self {
306            header: InputHeader {
307                id: meerkat_core::lifecycle::InputId::new(),
308                timestamp: chrono::Utc::now(),
309                source: InputOrigin::Operator,
310                durability: InputDurability::Durable,
311                visibility: InputVisibility::default(),
312                idempotency_key: None,
313                supersession_key: None,
314                correlation_id: None,
315            },
316            content: ContentInput::Text(text.into()),
317            typed_turn_appends: Vec::new(),
318            turn_metadata,
319        }
320    }
321
322    /// Create a prompt from `ContentInput` (text or multimodal blocks).
323    pub fn from_content_input(
324        input: ContentInput,
325        turn_metadata: Option<RuntimeTurnMetadata>,
326    ) -> Self {
327        Self {
328            header: InputHeader {
329                id: meerkat_core::lifecycle::InputId::new(),
330                timestamp: chrono::Utc::now(),
331                source: InputOrigin::Operator,
332                durability: InputDurability::Durable,
333                visibility: InputVisibility::default(),
334                idempotency_key: None,
335                supersession_key: None,
336                correlation_id: None,
337            },
338            content: input,
339            typed_turn_appends: Vec::new(),
340            turn_metadata,
341        }
342    }
343}
344
345/// Peer-originated input from comms.
346#[derive(Debug, Clone, Serialize, Deserialize)]
347pub struct PeerInput {
348    pub header: InputHeader,
349    /// The peer convention (message, request, response).
350    #[serde(skip_serializing_if = "Option::is_none")]
351    pub convention: Option<PeerConvention>,
352    /// The peer content — the single typed owner of this input's content fact
353    /// (plain text or multimodal blocks). Message-style peer traffic uses this
354    /// directly. Request/response prompt projection is runtime-owned and must
355    /// be reconstructed from `convention + payload + source` rather than
356    /// helper-rendered prose. The text projection is derived at read time via
357    /// [`ContentInput::text_content`]; it is never stored separately.
358    pub content: ContentInput,
359    /// Structured peer payload, when one exists.
360    ///
361    /// For `Request`, this is the request params. For `Response*`, this is the
362    /// response result payload. Message traffic leaves this unset.
363    #[serde(default, skip_serializing_if = "Option::is_none")]
364    pub payload: Option<serde_json::Value>,
365    /// Optional handling-mode override for actionable peer inputs.
366    /// When present on Message/Request/no-convention, overrides kind-based
367    /// policy defaults. Forbidden on ResponseProgress; ResponseTerminal may
368    /// carry a typed override for requester reaction urgency (enforced by
369    /// [`validate_peer_handling_mode`]).
370    #[serde(default, skip_serializing_if = "Option::is_none")]
371    pub handling_mode: Option<HandlingMode>,
372}
373
374/// Peer communication conventions.
375#[derive(Debug, Clone, Serialize, Deserialize)]
376#[serde(tag = "convention_type", rename_all = "snake_case")]
377#[non_exhaustive]
378pub enum PeerConvention {
379    /// Simple peer-to-peer message.
380    Message,
381    /// Request expecting a response.
382    Request { request_id: String, intent: String },
383    /// Progress update for an ongoing response.
384    ResponseProgress {
385        request_id: String,
386        phase: ResponseProgressPhase,
387    },
388    /// Terminal response (completed or failed).
389    ResponseTerminal {
390        request_id: String,
391        status: ResponseTerminalStatus,
392    },
393}
394
395/// Phase of a response progress update. This is the core projection enum, not
396/// a runtime-local duplicate.
397pub type ResponseProgressPhase = PeerResponseProgressProjectionPhase;
398
399/// Terminal status of a response. This is the core projection enum, not a
400/// runtime-local duplicate.
401pub type ResponseTerminalStatus = PeerResponseTerminalProjectionStatus;
402
403pub fn response_terminal_status_from_wire(
404    status: meerkat_contracts::PeerResponseTerminalStatusWire,
405) -> ResponseTerminalStatus {
406    match status {
407        meerkat_contracts::PeerResponseTerminalStatusWire::Completed => {
408            PeerResponseTerminalProjectionStatus::Completed
409        }
410        meerkat_contracts::PeerResponseTerminalStatusWire::Failed => {
411            PeerResponseTerminalProjectionStatus::Failed
412        }
413        meerkat_contracts::PeerResponseTerminalStatusWire::Cancelled => {
414            PeerResponseTerminalProjectionStatus::Cancelled
415        }
416    }
417}
418
419pub fn peer_response_terminal_input(
420    peer_id: meerkat_core::comms::PeerId,
421    display_name: Option<meerkat_core::comms::PeerName>,
422    request_id: meerkat_core::PeerCorrelationId,
423    status: meerkat_contracts::PeerResponseTerminalStatusWire,
424    result: serde_json::Value,
425) -> Input {
426    let correlation_id = CorrelationId::from_uuid(request_id.as_uuid());
427    let request_id = request_id.to_string();
428    let peer_id = peer_id.to_string();
429    let display_identity = display_name.map_or_else(|| peer_id.clone(), |name| name.as_string());
430
431    Input::Peer(PeerInput {
432        header: InputHeader {
433            id: InputId::new(),
434            timestamp: Utc::now(),
435            source: InputOrigin::Peer {
436                peer_id,
437                display_identity: Some(display_identity),
438                runtime_id: None,
439            },
440            durability: InputDurability::Durable,
441            visibility: InputVisibility::default(),
442            idempotency_key: None,
443            supersession_key: None,
444            correlation_id: Some(correlation_id),
445        },
446        convention: Some(PeerConvention::ResponseTerminal {
447            request_id,
448            status: response_terminal_status_from_wire(status),
449        }),
450        content: ContentInput::Text(String::new()),
451        payload: Some(result),
452        handling_mode: None,
453    })
454}
455
456/// Flow step input from mob orchestration.
457#[derive(Debug, Clone, Serialize, Deserialize)]
458pub struct FlowStepInput {
459    pub header: InputHeader,
460    /// Flow step identifier.
461    pub step_id: String,
462    /// Step instructions — the single typed owner of this input's content
463    /// fact (plain text or multimodal blocks). The text projection is derived
464    /// at read time via [`ContentInput::text_content`]; it is never stored
465    /// separately.
466    pub content: ContentInput,
467    #[serde(default, skip_serializing_if = "Option::is_none")]
468    pub turn_metadata: Option<RuntimeTurnMetadata>,
469}
470
471/// External event input.
472#[derive(Debug, Clone, Serialize, Deserialize)]
473pub struct ExternalEventInput {
474    pub header: InputHeader,
475    /// Event type/name.
476    pub event_type: String,
477    /// Event payload. Uses `Value` because the runtime layer may inspect/merge
478    /// payloads during coalescing and projection — not a pure pass-through.
479    /// Multimodal content does NOT live here canonically; use `blocks`.
480    pub payload: serde_json::Value,
481    /// Optional multimodal blocks carried by the external event. This is the
482    /// canonical owner for multimodal external-event content.
483    #[serde(default, skip_serializing_if = "Option::is_none")]
484    pub blocks: Option<Vec<meerkat_core::types::ContentBlock>>,
485    /// Runtime-owned handling hint for this external event.
486    #[serde(default)]
487    pub handling_mode: HandlingMode,
488    /// Optional normalized render metadata carried with the event.
489    #[serde(default, skip_serializing_if = "Option::is_none")]
490    pub render_metadata: Option<RenderMetadata>,
491}
492
493/// Typed continuation discriminant carried on a [`ContinuationInput`].
494///
495/// The producer of a continuation declares how the runtime must re-enter the
496/// session: an ordinary continuation resumes the pending run, while a WorkGraph
497/// attention continuation re-enters as a fresh queued content turn. This is a
498/// typed fact owned by the producer; admission threads it into
499/// `MeerkatMachine::ResolveAdmissionPlan`, which owns the lane and run-apply
500/// semantics derived from it. No downstream consumer re-classifies continuation
501/// routing from continuation reason strings or overlay dispatch-context keys.
502#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
503#[serde(rename_all = "snake_case")]
504pub enum ContinuationKind {
505    /// Ordinary continuation: resume the pending run at the run boundary.
506    #[default]
507    Ordinary,
508    /// WorkGraph attention continuation: re-enter as a fresh queued content turn.
509    WorkgraphAttention,
510}
511
512/// Explicit continuation request that asks the runtime to keep draining
513/// ordinary work after a boundary-local event (for example, terminal peer
514/// responses injected into session state).
515#[derive(Debug, Clone, Serialize, Deserialize)]
516pub struct ContinuationInput {
517    pub header: InputHeader,
518    /// Stable reason for the continuation request.
519    pub reason: String,
520    /// Typed continuation discriminant owned by the producer. Admission threads
521    /// it into `MeerkatMachine::ResolveAdmissionPlan` so the machine owns the
522    /// lane and run-apply semantics for WorkGraph attention re-entry.
523    #[serde(default)]
524    pub continuation_kind: ContinuationKind,
525    /// Ordinary-work handling mode for the continuation.
526    #[serde(default)]
527    pub handling_mode: HandlingMode,
528    /// Optional request/correlation handle tied to the continuation.
529    #[serde(default, skip_serializing_if = "Option::is_none")]
530    pub request_id: Option<String>,
531    /// Optional per-turn tool visibility overlay for scoped continuations.
532    #[serde(default, skip_serializing_if = "Option::is_none")]
533    pub flow_tool_overlay: Option<TurnToolOverlay>,
534    /// Optional runtime-owned context projected into the next turn boundary.
535    #[serde(default, skip_serializing_if = "Option::is_none")]
536    pub context_append: Option<ConversationContextAppend>,
537    /// Optional runtime-owned turn append used to force a continuation turn.
538    #[serde(default, skip_serializing_if = "Option::is_none")]
539    pub turn_append: Option<ConversationAppend>,
540}
541
542impl ContinuationInput {
543    /// Build a continuation for waking an idle session after a detached
544    /// background operation reaches terminal state.
545    ///
546    /// Properties: `Derived` durability, invisible to transcript and operator,
547    /// `System` origin, `Steer` handling mode.
548    pub fn detached_background_op_completed() -> Self {
549        Self {
550            header: InputHeader {
551                id: meerkat_core::lifecycle::InputId::new(),
552                timestamp: chrono::Utc::now(),
553                source: InputOrigin::System,
554                durability: InputDurability::Derived,
555                visibility: InputVisibility {
556                    transcript_eligible: false,
557                    operator_eligible: false,
558                },
559                idempotency_key: None,
560                supersession_key: None,
561                correlation_id: None,
562            },
563            reason: "detached_background_op_completed".to_string(),
564            continuation_kind: ContinuationKind::Ordinary,
565            handling_mode: HandlingMode::Steer,
566            request_id: None,
567            flow_tool_overlay: None,
568            context_append: None,
569            turn_append: None,
570        }
571    }
572}
573
574/// Explicit operation/lifecycle input admitted through runtime instead of
575/// being smuggled through transcript projections or peer-only paths.
576#[derive(Debug, Clone, Serialize, Deserialize)]
577pub struct OperationInput {
578    pub header: InputHeader,
579    /// Stable operation identifier.
580    pub operation_id: OperationId,
581    /// Typed lifecycle event for the operation.
582    pub event: OpEvent,
583}
584
585/// Build the core-owned peer conversation projection for a runtime peer input.
586///
587/// Peer-response terminal context projection is deliberately excluded here:
588/// admission must not store it as pre-machine truth. Runtime-loop batch
589/// construction uses [`runtime_input_projection_for_machine_batch`] after the
590/// machine-selected input is dequeued.
591pub(crate) fn peer_projection_from_peer_input(
592    peer: &PeerInput,
593) -> Option<PeerConversationProjection> {
594    peer_projection_from_peer_input_with_id(peer, peer_canonical_id(peer)?.as_str())
595}
596
597fn peer_projection_from_peer_input_with_id(
598    peer: &PeerInput,
599    peer_id: &str,
600) -> Option<PeerConversationProjection> {
601    let peer_id = peer_id.to_string();
602
603    match &peer.convention {
604        Some(PeerConvention::Message) => Some(PeerConversationProjection::Message { peer_id }),
605        Some(PeerConvention::Request { request_id, intent }) => {
606            let peer_id = match meerkat_core::comms::PeerId::parse(peer_id.as_str()) {
607                Ok(peer_id) => peer_id,
608                Err(error) => {
609                    tracing::warn!(
610                        peer_id,
611                        error = %error,
612                        "dropping peer request projection with non-canonical peer_id"
613                    );
614                    return None;
615                }
616            };
617            Some(PeerConversationProjection::Request {
618                peer_id,
619                display_name: peer_display_label(peer),
620                request_id: request_id.clone(),
621                intent: intent.clone(),
622                payload: peer.payload.clone(),
623            })
624        }
625        Some(PeerConvention::ResponseProgress { request_id, phase }) => {
626            Some(PeerConversationProjection::ResponseProgress {
627                peer_id,
628                request_id: request_id.clone(),
629                phase: *phase,
630                payload: peer.payload.clone(),
631            })
632        }
633        Some(PeerConvention::ResponseTerminal { .. }) => None,
634        None => None,
635    }
636}
637
638pub(crate) fn peer_response_terminal_fact(
639    peer: &PeerInput,
640) -> Result<Option<PeerResponseTerminalFact>, PeerResponseTerminalFactError> {
641    let InputOrigin::Peer {
642        peer_id,
643        display_identity,
644        runtime_id,
645    } = &peer.header.source
646    else {
647        return Ok(None);
648    };
649    let Some(PeerConvention::ResponseTerminal { request_id, status }) = &peer.convention else {
650        return Ok(None);
651    };
652
653    let transport_identity = runtime_id
654        .as_ref()
655        .map(ToString::to_string)
656        .map(PeerResponseTerminalTransportIdentity::parse)
657        .transpose()?;
658    let source = PeerResponseTerminalSource::new(
659        transport_identity,
660        PeerResponseTerminalRouteIdentity::parse(peer_id.clone())?,
661        PeerResponseTerminalDisplayIdentity::parse(
662            display_identity
663                .as_ref()
664                .ok_or(PeerResponseTerminalFactError::MissingDisplayIdentity)?
665                .clone(),
666        )?,
667    );
668    Ok(Some(PeerResponseTerminalFact::new(
669        source,
670        PeerResponseTerminalCorrelationId::parse(request_id)?,
671        *status,
672        PeerResponseTerminalRenderPayload::new(peer.payload.clone()),
673    )))
674}
675
676pub(crate) fn validate_peer_response_terminal_fact(
677    input: &Input,
678) -> Result<(), PeerResponseTerminalFactError> {
679    let Input::Peer(peer) = input else {
680        return Ok(());
681    };
682    peer_response_terminal_fact(peer).map(|_| ())
683}
684
685/// Lift an [`Input`] to its core peer projection when it is a peer input with a
686/// peer-origin header.
687#[cfg(test)]
688pub(crate) fn peer_projection(input: &Input) -> Option<PeerConversationProjection> {
689    let Input::Peer(peer) = input else {
690        return None;
691    };
692    peer_projection_from_peer_input(peer)
693}
694
695fn peer_canonical_id(peer: &PeerInput) -> Option<String> {
696    let InputOrigin::Peer { peer_id, .. } = &peer.header.source else {
697        return None;
698    };
699    Some(peer_id.clone())
700}
701
702fn peer_display_label(peer: &PeerInput) -> Option<String> {
703    let InputOrigin::Peer {
704        display_identity, ..
705    } = &peer.header.source
706    else {
707        return None;
708    };
709
710    display_identity
711        .as_ref()
712        .map(|label| label.trim())
713        .filter(|label| !label.is_empty())
714        .map(ToOwned::to_owned)
715}
716
717/// Rendered prompt-text projection for a peer input.
718pub(crate) fn peer_prompt_text(peer: &PeerInput) -> String {
719    peer_projection_from_peer_input(peer)
720        .map(|projection| {
721            let prompt = projection.prompt_text();
722            if prompt.is_empty() {
723                peer.content.text_content()
724            } else {
725                prompt
726            }
727        })
728        .unwrap_or_else(|| peer.content.text_content())
729}
730
731pub(crate) fn input_prompt_text(input: &Input) -> String {
732    match input {
733        Input::Prompt(p) => p.content.text_content(),
734        Input::Peer(p) => peer_prompt_text(p),
735        Input::FlowStep(f) => f.content.text_content(),
736        Input::ExternalEvent(e) => external_event_projection_text(e),
737        Input::Continuation(continuation) => format!("[Continuation] {}", continuation.reason),
738        Input::Operation(operation) => {
739            format!(
740                "[Operation {}] {:?}",
741                operation.operation_id, operation.event
742            )
743        }
744    }
745}
746
747fn external_event_projection_text(event: &ExternalEventInput) -> String {
748    let source_name = match &event.header.source {
749        InputOrigin::External { source_name } if !source_name.trim().is_empty() => {
750            source_name.as_str()
751        }
752        _ => event.event_type.as_str(),
753    };
754    let body = event
755        .payload
756        .get("body")
757        .and_then(serde_json::Value::as_str)
758        .map(str::trim);
759
760    meerkat_core::interaction::format_external_event_projection(source_name, body)
761}
762
763fn peer_notice_renderable(peer: &PeerInput) -> Option<CoreRenderable> {
764    let (peer_id, display_name) = match &peer.header.source {
765        InputOrigin::Peer {
766            peer_id,
767            display_identity,
768            ..
769        } => (peer_id.clone(), display_identity.clone()),
770        _ => return None,
771    };
772    use meerkat_core::types::CommsNoticeKind;
773    let (kind, request_id, intent, status) = match &peer.convention {
774        Some(PeerConvention::Message) | None => (CommsNoticeKind::Message, None, None, None),
775        Some(PeerConvention::Request { request_id, intent }) => (
776            CommsNoticeKind::Request,
777            Some(request_id.clone()),
778            Some(intent.clone()),
779            None,
780        ),
781        Some(PeerConvention::ResponseProgress { request_id, phase }) => (
782            CommsNoticeKind::ResponseProgress,
783            Some(request_id.clone()),
784            None,
785            Some(format!("{phase:?}")),
786        ),
787        Some(PeerConvention::ResponseTerminal { request_id, status }) => (
788            CommsNoticeKind::ResponseTerminal,
789            Some(request_id.clone()),
790            None,
791            Some(format!("{status:?}")),
792        ),
793    };
794    let summary = match kind {
795        CommsNoticeKind::Request => intent.as_ref().map_or_else(
796            || "Peer request".to_string(),
797            |intent| format!("Peer request: {intent}"),
798        ),
799        CommsNoticeKind::ResponseProgress => "Peer response progress".to_string(),
800        CommsNoticeKind::ResponseTerminal => "Peer response terminal".to_string(),
801        CommsNoticeKind::Message | CommsNoticeKind::Other(_) => "Peer message".to_string(),
802    };
803    let content = match &peer.content {
804        ContentInput::Text(body) if body.is_empty() => Vec::new(),
805        ContentInput::Text(body) => {
806            vec![meerkat_core::types::ContentBlock::Text { text: body.clone() }]
807        }
808        ContentInput::Blocks(blocks) => blocks.clone(),
809    };
810    // The peer routing identity is the canonical typed `PeerId`. Production
811    // peer inputs always carry a hyphenated UUID here (the comms bridge stamps
812    // `canonical_peer_id_string()`); parse once at this producer boundary. A
813    // value that is not a valid `PeerId` cannot populate the typed identity, so
814    // the notice renders without a peer identity (degraded projection) rather
815    // than smuggling an unparseable string through the transcript.
816    let notice_peer = meerkat_core::comms::PeerId::parse(&peer_id)
817        .ok()
818        .map(|id| SystemNoticePeer { id, display_name });
819    Some(CoreRenderable::SystemNotice {
820        kind: SystemNoticeKind::Comms,
821        body: Some(summary.clone()),
822        blocks: vec![SystemNoticeBlock::Comms {
823            kind,
824            direction: SystemNoticeDirection::Incoming,
825            peer: notice_peer,
826            request_id,
827            intent,
828            status,
829            summary: Some(summary),
830            payload: peer.payload.clone(),
831            content,
832        }],
833    })
834}
835
836fn external_event_notice_renderable(event: &ExternalEventInput) -> CoreRenderable {
837    let source = match &event.header.source {
838        InputOrigin::External { source_name } if !source_name.trim().is_empty() => {
839            source_name.clone()
840        }
841        _ => event.event_type.clone(),
842    };
843    let body = event
844        .payload
845        .get("body")
846        .and_then(serde_json::Value::as_str)
847        .map(str::trim)
848        .filter(|body| !body.is_empty())
849        .map(ToOwned::to_owned);
850    let summary = body.as_ref().map_or_else(
851        || format!("External event via {source}"),
852        std::clone::Clone::clone,
853    );
854    CoreRenderable::SystemNotice {
855        kind: SystemNoticeKind::ExternalEvent,
856        body: Some(summary.clone()),
857        blocks: vec![SystemNoticeBlock::ExternalEvent {
858            source,
859            event_type: event.event_type.clone(),
860            summary: Some(summary),
861            body,
862            payload: Some(event.payload.clone()),
863            content: event.blocks.clone().unwrap_or_default(),
864        }],
865    }
866}
867
868fn input_to_append(input: &Input) -> Option<ConversationAppend> {
869    // Terminal peer responses always carry their typed comms notice as the
870    // turn's conversation append — with or without multimodal blocks. The
871    // former `blocks: None => no append` special case left the mandatory
872    // `AppendContextAndRun` reaction turn with a fabricated empty prompt,
873    // which providers reject (Anthropic: "user messages must have non-empty
874    // content") and which violates the typed-run-input doctrine that no
875    // empty-string prompt is ever synthesized.
876    let (role, content) = match input {
877        Input::Prompt(p)
878            if !p.typed_turn_appends.is_empty()
879                && match &p.content {
880                    ContentInput::Text(text) => text.trim().is_empty(),
881                    ContentInput::Blocks(blocks) => blocks.is_empty(),
882                } =>
883        {
884            return None;
885        }
886        Input::Prompt(p) => match &p.content {
887            ContentInput::Blocks(blocks) => (
888                ConversationAppendRole::User,
889                CoreRenderable::Blocks {
890                    blocks: blocks.clone(),
891                },
892            ),
893            ContentInput::Text(_) => (
894                ConversationAppendRole::User,
895                CoreRenderable::Text {
896                    text: input_prompt_text(input),
897                },
898            ),
899        },
900        Input::Peer(p) => peer_notice_renderable(p)
901            .map(|content| (ConversationAppendRole::SystemNotice, content))?,
902        Input::FlowStep(f) => (
903            ConversationAppendRole::SystemNotice,
904            CoreRenderable::SystemNotice {
905                kind: SystemNoticeKind::Generic,
906                body: Some(format!("Flow step {}", f.step_id)),
907                blocks: vec![SystemNoticeBlock::RuntimeNotice {
908                    category: "flow_step".to_string(),
909                    detail: Some(f.content.text_content()),
910                    payload: None,
911                }],
912            },
913        ),
914        Input::ExternalEvent(e) => (
915            ConversationAppendRole::SystemNotice,
916            external_event_notice_renderable(e),
917        ),
918        Input::Continuation(continuation) => return continuation.turn_append.clone(),
919        Input::Operation(_) => return None,
920    };
921
922    Some(ConversationAppend { role, content })
923}
924
925fn input_to_context_append(input: &Input) -> Option<ConversationContextAppend> {
926    let (projection, content) = match input {
927        Input::Continuation(continuation) => {
928            return continuation.context_append.clone();
929        }
930        Input::Peer(peer) => {
931            let projection = peer_projection_from_peer_input(peer)?;
932            let content = peer_notice_renderable(peer)?;
933            (projection, content)
934        }
935        _ => return None,
936    };
937
938    Some(ConversationContextAppend {
939        key: projection.context_key()?,
940        content,
941    })
942}
943
944fn peer_response_terminal_context_append(
945    peer: &PeerInput,
946) -> Result<Option<ConversationContextAppend>, PeerResponseTerminalFactError> {
947    let Some(fact) = peer_response_terminal_fact(peer)? else {
948        return Ok(None);
949    };
950
951    Ok(Some(ConversationContextAppend {
952        key: fact.context_key(),
953        content: CoreRenderable::SystemNotice {
954            kind: SystemNoticeKind::Comms,
955            body: Some("Peer terminal response context".to_string()),
956            blocks: vec![SystemNoticeBlock::Comms {
957                kind: meerkat_core::types::CommsNoticeKind::ResponseTerminal,
958                direction: SystemNoticeDirection::Incoming,
959                peer: Some(SystemNoticePeer {
960                    id: fact.source.route_identity.peer_id(),
961                    display_name: Some(fact.source.display_identity.to_string()),
962                }),
963                request_id: Some(fact.correlation_id.to_string()),
964                intent: None,
965                status: Some(fact.status.label().to_string()),
966                summary: Some("Peer terminal response".to_string()),
967                payload: fact.render_payload.as_ref().cloned(),
968                content: Vec::new(),
969            }],
970        },
971    }))
972}
973
974pub(crate) fn runtime_input_projection(
975    input: &Input,
976) -> crate::ingress_types::RuntimeInputProjection {
977    crate::ingress_types::RuntimeInputProjection {
978        append: input_to_append(input),
979        additional_appends: match input {
980            Input::Prompt(prompt) => prompt.typed_turn_appends.clone(),
981            _ => Vec::new(),
982        },
983        context_append: input_to_context_append(input),
984        peer_response_terminal: None,
985    }
986}
987
988pub(crate) fn runtime_input_projection_for_machine_batch(
989    input: &Input,
990) -> crate::ingress_types::RuntimeInputProjection {
991    let mut projection = runtime_input_projection(input);
992    if let Input::Peer(peer) = input
993        && let Ok(Some(context_append)) = peer_response_terminal_context_append(peer)
994    {
995        projection.context_append = Some(context_append);
996        // Carry the typed terminal-peer-response fact alongside the rendered
997        // context append so the realtime/live consumer reads the typed fact
998        // directly instead of re-parsing the flattened prose.
999        if let Ok(fact) = peer_response_terminal_fact(peer) {
1000            projection.peer_response_terminal = fact;
1001        }
1002    }
1003    projection
1004}
1005
1006pub(crate) fn context_append_to_pending_system_context_append(
1007    append: &ConversationContextAppend,
1008    peer_response_terminal: Option<&meerkat_core::PeerResponseTerminalFact>,
1009) -> meerkat_core::PendingSystemContextAppend {
1010    meerkat_core::PendingSystemContextAppend {
1011        content: append.content.clone(),
1012        source: Some(append.key.clone()),
1013        idempotency_key: Some(append.key.clone()),
1014        // Durable keyed context append (peer responses, etc.) — not a steer.
1015        source_kind: meerkat_core::session::SystemContextSource::Normal,
1016        // Carry the typed `PeerResponseTerminalFact` so the realtime consumer
1017        // reads it directly (mirrors the `source_kind` precedent that retired
1018        // the `runtime:steer:` string prefix). The fact threads from the
1019        // admitted `RuntimeInputProjection.peer_response_terminal` field.
1020        peer_response_terminal: peer_response_terminal.cloned(),
1021        accepted_at: meerkat_core::time_compat::SystemTime::now(),
1022    }
1023}
1024
1025pub(crate) fn projection_to_pending_system_context_appends(
1026    input_id: &InputId,
1027    projection: &crate::ingress_types::RuntimeInputProjection,
1028) -> Vec<meerkat_core::PendingSystemContextAppend> {
1029    if let Some(append) = projection.context_append.as_ref() {
1030        return std::iter::once(context_append_to_pending_system_context_append(
1031            append,
1032            projection.peer_response_terminal.as_ref(),
1033        ))
1034        .filter(|append| !append.content.render_text().trim().is_empty())
1035        .collect();
1036    }
1037
1038    projection
1039        .append
1040        .as_ref()
1041        .map(|append| {
1042            // The PRODUCER of a runtime-steer append sets the typed marker
1043            // here, at construction. This is the single source of truth for
1044            // the runtime-steer fact — no downstream code reclassifies the
1045            // `source` string. The `runtime:steer:` source/idempotency key is
1046            // retained only as a stable per-input idempotency identifier.
1047            let key = format!("runtime:steer:{input_id}");
1048            meerkat_core::PendingSystemContextAppend {
1049                content: append.content.clone(),
1050                source: Some(key.clone()),
1051                idempotency_key: Some(key),
1052                source_kind: meerkat_core::session::SystemContextSource::RuntimeSteer,
1053                // A runtime steer is never a terminal-peer-response projection.
1054                peer_response_terminal: None,
1055                accepted_at: meerkat_core::time_compat::SystemTime::now(),
1056            }
1057        })
1058        .into_iter()
1059        .filter(|append| !append.content.render_text().trim().is_empty())
1060        .collect()
1061}
1062
1063#[cfg(test)]
1064#[allow(clippy::unwrap_used, clippy::panic)]
1065mod tests {
1066    use super::*;
1067    use chrono::Utc;
1068
1069    fn make_header() -> InputHeader {
1070        InputHeader {
1071            id: InputId::new(),
1072            timestamp: Utc::now(),
1073            source: InputOrigin::Operator,
1074            durability: InputDurability::Durable,
1075            visibility: InputVisibility::default(),
1076            idempotency_key: None,
1077            supersession_key: None,
1078            correlation_id: None,
1079        }
1080    }
1081
1082    fn typed_runtime_notice_append(detail: &str) -> ConversationAppend {
1083        ConversationAppend {
1084            role: ConversationAppendRole::SystemNotice,
1085            content: CoreRenderable::SystemNotice {
1086                kind: meerkat_core::types::SystemNoticeKind::Generic,
1087                body: Some(detail.to_string()),
1088                blocks: vec![meerkat_core::types::SystemNoticeBlock::RuntimeNotice {
1089                    category: "test".to_string(),
1090                    detail: Some(detail.to_string()),
1091                    payload: None,
1092                }],
1093            },
1094        }
1095    }
1096
1097    #[test]
1098    fn prompt_input_serde() {
1099        let input = Input::Prompt(PromptInput {
1100            header: make_header(),
1101            content: "hello".into(),
1102            typed_turn_appends: Vec::new(),
1103            turn_metadata: None,
1104        });
1105        let json = serde_json::to_value(&input).unwrap();
1106        assert_eq!(json["input_type"], "prompt");
1107        let parsed: Input = serde_json::from_value(json).unwrap();
1108        assert!(matches!(parsed, Input::Prompt(_)));
1109    }
1110
1111    #[test]
1112    fn prompt_input_typed_turn_appends_project_without_user_text() {
1113        let append = typed_runtime_notice_append("peer delivery");
1114        let input = Input::Prompt(PromptInput {
1115            header: make_header(),
1116            content: ContentInput::Text(String::new()),
1117            typed_turn_appends: vec![append.clone()],
1118            turn_metadata: None,
1119        });
1120
1121        let projection = runtime_input_projection(&input);
1122        assert!(
1123            projection.append.is_none(),
1124            "empty runtime-authored prompt carrier must not synthesize a user append"
1125        );
1126        assert_eq!(projection.additional_appends, vec![append]);
1127    }
1128
1129    #[test]
1130    fn prompt_input_typed_turn_appends_serde_roundtrip() {
1131        let append = typed_runtime_notice_append("typed appends persist");
1132        let input = Input::Prompt(PromptInput {
1133            header: make_header(),
1134            content: ContentInput::Text(String::new()),
1135            typed_turn_appends: vec![append.clone()],
1136            turn_metadata: None,
1137        });
1138
1139        let json = serde_json::to_value(&input).unwrap();
1140        let parsed: Input = serde_json::from_value(json).unwrap();
1141        let Input::Prompt(prompt) = parsed else {
1142            panic!("expected prompt input");
1143        };
1144        assert_eq!(prompt.content.text_content(), "");
1145        assert_eq!(prompt.typed_turn_appends, vec![append]);
1146    }
1147
1148    #[test]
1149    fn peer_input_message_serde() {
1150        let input = Input::Peer(PeerInput {
1151            header: make_header(),
1152            convention: Some(PeerConvention::Message),
1153            content: "hi there".into(),
1154            payload: None,
1155            handling_mode: None,
1156        });
1157        let json = serde_json::to_value(&input).unwrap();
1158        assert_eq!(json["input_type"], "peer");
1159        let parsed: Input = serde_json::from_value(json).unwrap();
1160        assert!(matches!(parsed, Input::Peer(_)));
1161    }
1162
1163    #[test]
1164    fn peer_message_blocks_preserve_typed_comms_content_without_prefix_injection() {
1165        let peer_id = "018f6f79-7a82-7c4e-a552-a3b86f963005";
1166        let mut header = make_header();
1167        header.source = InputOrigin::Peer {
1168            peer_id: peer_id.into(),
1169            display_identity: Some("display-agent".into()),
1170            runtime_id: None,
1171        };
1172        let input = Input::Peer(PeerInput {
1173            header,
1174            convention: Some(PeerConvention::Message),
1175            content: ContentInput::Blocks(vec![
1176                meerkat_core::types::ContentBlock::Text {
1177                    text: "caption".into(),
1178                },
1179                meerkat_core::types::ContentBlock::Image {
1180                    media_type: "image/png".into(),
1181                    data: "abc".into(),
1182                },
1183            ]),
1184            payload: None,
1185            handling_mode: None,
1186        });
1187
1188        let Input::Peer(peer) = &input else {
1189            panic!("expected peer input");
1190        };
1191        assert_eq!(
1192            peer_projection_from_peer_input(peer)
1193                .and_then(|projection| projection.block_prefix_text())
1194                .as_deref(),
1195            Some(format!("Peer message from {peer_id}").as_str())
1196        );
1197
1198        let projection = runtime_input_projection(&input);
1199        let append = projection.append.expect("conversation append");
1200        let CoreRenderable::SystemNotice { blocks, .. } = append.content else {
1201            panic!("expected typed system notice");
1202        };
1203        let Some(meerkat_core::types::SystemNoticeBlock::Comms { content, peer, .. }) =
1204            blocks.first()
1205        else {
1206            panic!("expected comms block");
1207        };
1208        assert_eq!(
1209            peer.as_ref().and_then(|peer| peer.display_name.as_deref()),
1210            Some("display-agent")
1211        );
1212        assert_eq!(
1213            content.first(),
1214            Some(&meerkat_core::types::ContentBlock::Text {
1215                text: "caption".into()
1216            })
1217        );
1218    }
1219
1220    #[test]
1221    fn peer_response_terminal_context_is_deferred_to_machine_batch_projection() {
1222        let route_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f2";
1223        let request_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f1";
1224        let mut header = make_header();
1225        header.source = InputOrigin::Peer {
1226            peer_id: route_id.into(),
1227            display_identity: Some("display-agent".into()),
1228            runtime_id: None,
1229        };
1230        let input = Input::Peer(PeerInput {
1231            header,
1232            convention: Some(PeerConvention::ResponseTerminal {
1233                request_id: request_id.into(),
1234                status: ResponseTerminalStatus::Completed,
1235            }),
1236            content: "response body".into(),
1237            payload: Some(serde_json::json!({"answer":"ok"})),
1238            handling_mode: None,
1239        });
1240
1241        let Input::Peer(peer) = &input else {
1242            panic!("expected peer input");
1243        };
1244        let expected_canonical_key = format!("peer_response_terminal:{route_id}:{request_id}");
1245        assert!(
1246            peer_projection_from_peer_input(peer).is_none(),
1247            "terminal peer response projection must not be built before machine batch selection"
1248        );
1249
1250        let projection = runtime_input_projection(&input);
1251        assert!(
1252            projection.context_append.is_none(),
1253            "admission projection must not store terminal peer response context"
1254        );
1255        let projection = runtime_input_projection_for_machine_batch(&input);
1256        let context = projection.context_append.expect("context append");
1257        assert_eq!(context.key, expected_canonical_key);
1258        let CoreRenderable::SystemNotice { blocks, .. } = context.content else {
1259            panic!("expected typed context");
1260        };
1261        let Some(meerkat_core::types::SystemNoticeBlock::Comms { peer, .. }) = blocks.first()
1262        else {
1263            panic!("expected comms block");
1264        };
1265        assert_eq!(
1266            peer.as_ref().and_then(|peer| peer.display_name.as_deref()),
1267            Some("display-agent")
1268        );
1269        assert_eq!(
1270            peer.as_ref().map(|peer| peer.id),
1271            Some(meerkat_core::comms::PeerId::parse(route_id).expect("valid route id"))
1272        );
1273    }
1274
1275    #[test]
1276    fn steer_projection_uses_context_append_as_pending_system_context() {
1277        let input_id = InputId::new();
1278        let projection = crate::ingress_types::RuntimeInputProjection {
1279            append: Some(ConversationAppend {
1280                role: ConversationAppendRole::SystemNotice,
1281                content: CoreRenderable::Text {
1282                    text: "ordinary append must lose to context append".into(),
1283                },
1284            }),
1285            additional_appends: Vec::new(),
1286            context_append: Some(ConversationContextAppend {
1287                key: "peer_response_terminal:peer:req".into(),
1288                content: CoreRenderable::Text {
1289                    text: "terminal response is ready".into(),
1290                },
1291            }),
1292            peer_response_terminal: None,
1293        };
1294
1295        let appends = projection_to_pending_system_context_appends(&input_id, &projection);
1296
1297        assert_eq!(appends.len(), 1);
1298        assert_eq!(
1299            appends[0].content.render_text(),
1300            "terminal response is ready"
1301        );
1302        assert_eq!(
1303            appends[0].source.as_deref(),
1304            Some("peer_response_terminal:peer:req")
1305        );
1306        assert_eq!(
1307            appends[0].idempotency_key.as_deref(),
1308            Some("peer_response_terminal:peer:req")
1309        );
1310    }
1311
1312    #[test]
1313    fn continuation_projection_can_carry_runtime_context_append() {
1314        let input = Input::Continuation(ContinuationInput {
1315            header: make_header(),
1316            reason: "workgraph_attention".into(),
1317            continuation_kind: ContinuationKind::WorkgraphAttention,
1318            handling_mode: HandlingMode::Steer,
1319            request_id: Some("binding-1".into()),
1320            flow_tool_overlay: Some(TurnToolOverlay {
1321                allowed_tools: Some(vec!["workgraph_add_evidence".into()]),
1322                blocked_tools: None,
1323                dispatch_context: Default::default(),
1324            }),
1325            context_append: Some(ConversationContextAppend {
1326                key: "workgraph_attention:binding-1:2:5".into(),
1327                content: CoreRenderable::Text {
1328                    text: "WorkGraph attention projection".into(),
1329                },
1330            }),
1331            turn_append: None,
1332        });
1333        let projection = runtime_input_projection_for_machine_batch(&input);
1334        let appends = projection_to_pending_system_context_appends(input.id(), &projection);
1335
1336        assert_eq!(appends.len(), 1);
1337        assert_eq!(
1338            appends[0].content.render_text(),
1339            "WorkGraph attention projection"
1340        );
1341        assert_eq!(
1342            appends[0].source.as_deref(),
1343            Some("workgraph_attention:binding-1:2:5")
1344        );
1345        let metadata = crate::runtime_loop::for_input(
1346            &input,
1347            crate::ingress_types::RuntimeInputSemantics {
1348                boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary::RunStart,
1349                execution_kind: meerkat_core::lifecycle::RuntimeExecutionKind::ContentTurn,
1350                execution_handling_mode: None,
1351                peer_response_terminal_apply_intent: None,
1352                live_interrupt_required: false,
1353            },
1354        );
1355        assert_eq!(
1356            metadata
1357                .flow_tool_overlay
1358                .and_then(|overlay| overlay.allowed_tools),
1359            Some(vec!["workgraph_add_evidence".into()])
1360        );
1361    }
1362
1363    #[test]
1364    fn steer_projection_falls_back_to_ordinary_peer_append() {
1365        let mut header = make_header();
1366        header.source = InputOrigin::Peer {
1367            peer_id: "peer-a".into(),
1368            display_identity: Some("Peer A".into()),
1369            runtime_id: None,
1370        };
1371        let input = Input::Peer(PeerInput {
1372            header,
1373            convention: Some(PeerConvention::Message),
1374            content: "please look at this while you work".into(),
1375            payload: None,
1376            handling_mode: Some(HandlingMode::Steer),
1377        });
1378        let input_id = input.id().clone();
1379        let projection = runtime_input_projection(&input);
1380
1381        let appends = projection_to_pending_system_context_appends(&input_id, &projection);
1382
1383        assert_eq!(appends.len(), 1);
1384        let rendered = appends[0].content.render_text();
1385        assert!(
1386            rendered.contains("please look at this while you work"),
1387            "peer message append should be renderable as live system context: {rendered:?}"
1388        );
1389        assert_eq!(
1390            appends[0].source.as_deref(),
1391            Some(format!("runtime:steer:{input_id}").as_str())
1392        );
1393        assert_eq!(
1394            appends[0].idempotency_key.as_deref(),
1395            Some(format!("runtime:steer:{input_id}").as_str())
1396        );
1397    }
1398
1399    #[test]
1400    fn steer_projection_filters_empty_context_and_empty_append() {
1401        let input_id = InputId::new();
1402        let context_projection = crate::ingress_types::RuntimeInputProjection {
1403            append: None,
1404            additional_appends: Vec::new(),
1405            context_append: Some(ConversationContextAppend {
1406                key: "empty-context".into(),
1407                content: CoreRenderable::Text { text: "  ".into() },
1408            }),
1409            peer_response_terminal: None,
1410        };
1411        assert!(
1412            projection_to_pending_system_context_appends(&input_id, &context_projection).is_empty()
1413        );
1414
1415        let append_projection = crate::ingress_types::RuntimeInputProjection {
1416            append: Some(ConversationAppend {
1417                role: ConversationAppendRole::SystemNotice,
1418                content: CoreRenderable::Text { text: "\n".into() },
1419            }),
1420            additional_appends: Vec::new(),
1421            context_append: None,
1422            peer_response_terminal: None,
1423        };
1424        assert!(
1425            projection_to_pending_system_context_appends(&input_id, &append_projection).is_empty()
1426        );
1427    }
1428
1429    #[test]
1430    fn peer_response_terminal_with_blocks_projects_append_and_context() {
1431        let route_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f2";
1432        let request_id = "018f6f79-7a82-7c4e-a552-a3b86f9630f1";
1433        let mut header = make_header();
1434        header.source = InputOrigin::Peer {
1435            peer_id: route_id.into(),
1436            display_identity: Some("display-agent".into()),
1437            runtime_id: None,
1438        };
1439        let input = Input::Peer(PeerInput {
1440            header,
1441            convention: Some(PeerConvention::ResponseTerminal {
1442                request_id: request_id.into(),
1443                status: ResponseTerminalStatus::Completed,
1444            }),
1445            content: ContentInput::Blocks(vec![meerkat_core::types::ContentBlock::Image {
1446                media_type: "image/jpeg".into(),
1447                data: "abc".into(),
1448            }]),
1449            payload: Some(serde_json::json!({"answer":"ok"})),
1450            handling_mode: None,
1451        });
1452
1453        let projection = runtime_input_projection_for_machine_batch(&input);
1454        let append = projection.append.expect("conversation append");
1455        let CoreRenderable::SystemNotice { blocks, .. } = append.content else {
1456            panic!("expected typed append");
1457        };
1458        let Some(meerkat_core::types::SystemNoticeBlock::Comms { content, peer, .. }) =
1459            blocks.first()
1460        else {
1461            panic!("expected comms block");
1462        };
1463        assert_eq!(
1464            peer.as_ref().and_then(|peer| peer.display_name.as_deref()),
1465            Some("display-agent")
1466        );
1467        assert!(matches!(
1468            content.first(),
1469            Some(meerkat_core::types::ContentBlock::Image { media_type, .. })
1470                if media_type == "image/jpeg"
1471        ));
1472        assert!(
1473            projection.context_append.is_some(),
1474            "terminal response must still apply runtime-owned context"
1475        );
1476    }
1477
1478    #[test]
1479    fn peer_input_request_serde() {
1480        let input = Input::Peer(PeerInput {
1481            header: make_header(),
1482            convention: Some(PeerConvention::Request {
1483                request_id: "req-1".into(),
1484                intent: "mob.peer_added".into(),
1485            }),
1486            content: "Agent joined".into(),
1487            payload: Some(serde_json::json!({"name": "agent-1"})),
1488            handling_mode: None,
1489        });
1490        let json = serde_json::to_value(&input).unwrap();
1491        let parsed: Input = serde_json::from_value(json).unwrap();
1492        if let Input::Peer(p) = parsed {
1493            assert!(matches!(p.convention, Some(PeerConvention::Request { .. })));
1494        } else {
1495            panic!("Expected PeerInput");
1496        }
1497    }
1498
1499    #[test]
1500    fn peer_input_response_terminal_serde() {
1501        let input = Input::Peer(PeerInput {
1502            header: make_header(),
1503            convention: Some(PeerConvention::ResponseTerminal {
1504                request_id: "req-1".into(),
1505                status: ResponseTerminalStatus::Completed,
1506            }),
1507            content: "Done".into(),
1508            payload: Some(serde_json::json!({"ok": true})),
1509            handling_mode: None,
1510        });
1511        let json = serde_json::to_value(&input).unwrap();
1512        let parsed: Input = serde_json::from_value(json).unwrap();
1513        assert!(matches!(parsed, Input::Peer(_)));
1514    }
1515
1516    #[test]
1517    fn peer_input_response_progress_serde() {
1518        let input = Input::Peer(PeerInput {
1519            header: make_header(),
1520            convention: Some(PeerConvention::ResponseProgress {
1521                request_id: "req-1".into(),
1522                phase: ResponseProgressPhase::InProgress,
1523            }),
1524            content: "Working...".into(),
1525            payload: Some(serde_json::json!({"progress": "working"})),
1526            handling_mode: None,
1527        });
1528        let json = serde_json::to_value(&input).unwrap();
1529        let parsed: Input = serde_json::from_value(json).unwrap();
1530        assert!(matches!(parsed, Input::Peer(_)));
1531    }
1532
1533    #[test]
1534    fn flow_step_input_serde() {
1535        let input = Input::FlowStep(FlowStepInput {
1536            header: make_header(),
1537            step_id: "step-1".into(),
1538            content: ContentInput::Blocks(vec![
1539                meerkat_core::types::ContentBlock::Text {
1540                    text: "analyze the data".into(),
1541                },
1542                meerkat_core::types::ContentBlock::Image {
1543                    media_type: "image/png".into(),
1544                    data: meerkat_core::types::ImageData::Inline {
1545                        data: "abc123".into(),
1546                    },
1547                },
1548            ]),
1549            turn_metadata: None,
1550        });
1551        let json = serde_json::to_value(&input).unwrap();
1552        assert_eq!(json["input_type"], "flow_step");
1553        let parsed: Input = serde_json::from_value(json).unwrap();
1554        assert!(matches!(parsed, Input::FlowStep(_)));
1555    }
1556
1557    #[test]
1558    fn external_event_input_serde() {
1559        let input = Input::ExternalEvent(ExternalEventInput {
1560            header: make_header(),
1561            event_type: "webhook.received".into(),
1562            payload: serde_json::json!({"url": "https://example.com"}),
1563            blocks: Some(vec![
1564                meerkat_core::types::ContentBlock::Text {
1565                    text: "look".into(),
1566                },
1567                meerkat_core::types::ContentBlock::Image {
1568                    media_type: "image/png".into(),
1569                    data: meerkat_core::types::ImageData::Inline {
1570                        data: "abc123".into(),
1571                    },
1572                },
1573            ]),
1574            handling_mode: HandlingMode::Queue,
1575            render_metadata: None,
1576        });
1577        let json = serde_json::to_value(&input).unwrap();
1578        assert_eq!(json["input_type"], "external_event");
1579        let parsed: Input = serde_json::from_value(json).unwrap();
1580        assert!(matches!(parsed, Input::ExternalEvent(_)));
1581    }
1582
1583    #[test]
1584    fn legacy_external_event_payload_blocks_are_rejected() {
1585        // The retired shape smuggled multimodal blocks inside the payload
1586        // JSON. It must fail closed with a typed error — never migrate.
1587        let event = ExternalEventInput {
1588            header: make_header(),
1589            event_type: "webhook.received".into(),
1590            payload: serde_json::json!({
1591                "body": "see image",
1592                "blocks": [
1593                    { "type": "text", "text": "caption text" },
1594                    { "type": "image", "media_type": "image/png", "source": "inline", "data": "abc123" }
1595                ]
1596            }),
1597            blocks: None,
1598            handling_mode: HandlingMode::Queue,
1599            render_metadata: None,
1600        };
1601
1602        let err = reject_legacy_payload_blocks(&event)
1603            .expect_err("payload-level blocks must fail closed");
1604        assert!(matches!(err, BlobStoreError::Internal(_)));
1605        // Payload is untouched: rejection never strips or rewrites it.
1606        assert!(event.payload.get("blocks").is_some());
1607        assert!(event.blocks.is_none());
1608    }
1609
1610    #[test]
1611    fn external_event_payload_without_blocks_key_passes_rejection_gate() {
1612        let event = ExternalEventInput {
1613            header: make_header(),
1614            event_type: "webhook.received".into(),
1615            payload: serde_json::json!({ "body": "plain payload" }),
1616            blocks: Some(vec![meerkat_core::types::ContentBlock::Text {
1617                text: "typed owner content".into(),
1618            }]),
1619            handling_mode: HandlingMode::Queue,
1620            render_metadata: None,
1621        };
1622
1623        reject_legacy_payload_blocks(&event)
1624            .expect("payload without a legacy blocks key must pass");
1625    }
1626
1627    #[test]
1628    fn continuation_input_serde() {
1629        let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
1630        let json = serde_json::to_value(&input).unwrap();
1631        assert_eq!(json["input_type"], "continuation");
1632        let parsed: Input = serde_json::from_value(json).unwrap();
1633        match parsed {
1634            Input::Continuation(continuation) => {
1635                assert_eq!(continuation.handling_mode, HandlingMode::Steer);
1636                assert_eq!(continuation.reason, "detached_background_op_completed");
1637            }
1638            other => panic!("Expected Continuation, got {other:?}"),
1639        }
1640    }
1641
1642    #[test]
1643    fn continuation_input_rejects_legacy_system_generated_tag() {
1644        // The pre-rename `system_generated` tag is a retired persisted shape:
1645        // it must fail closed instead of being folded into `continuation`.
1646        let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
1647        let mut json = serde_json::to_value(&input).unwrap();
1648        json["input_type"] = serde_json::Value::String("system_generated".into());
1649        serde_json::from_value::<Input>(json)
1650            .expect_err("legacy system_generated input_type tag must be rejected");
1651    }
1652
1653    #[test]
1654    fn operation_input_serde() {
1655        let input = Input::Operation(OperationInput {
1656            header: InputHeader {
1657                durability: InputDurability::Derived,
1658                ..make_header()
1659            },
1660            operation_id: OperationId::new(),
1661            event: OpEvent::Cancelled {
1662                id: OperationId::new(),
1663            },
1664        });
1665        let json = serde_json::to_value(&input).unwrap();
1666        assert_eq!(json["input_type"], "operation");
1667        let parsed: Input = serde_json::from_value(json).unwrap();
1668        assert!(matches!(parsed, Input::Operation(_)));
1669    }
1670
1671    #[test]
1672    fn operation_input_rejects_legacy_projected_tag() {
1673        // The pre-rename `projected` tag is a retired persisted shape: it
1674        // must fail closed instead of being folded into `operation`.
1675        let input = Input::Operation(OperationInput {
1676            header: InputHeader {
1677                durability: InputDurability::Derived,
1678                ..make_header()
1679            },
1680            operation_id: OperationId::new(),
1681            event: OpEvent::Cancelled {
1682                id: OperationId::new(),
1683            },
1684        });
1685        let mut json = serde_json::to_value(&input).unwrap();
1686        json["input_type"] = serde_json::Value::String("projected".into());
1687        serde_json::from_value::<Input>(json)
1688            .expect_err("legacy projected input_type tag must be rejected");
1689    }
1690
1691    #[test]
1692    fn legacy_dual_carrier_input_shapes_are_rejected() {
1693        // The retired persisted shape stored the content fact twice: a textual
1694        // carrier (`text` / `body` / `instructions`) plus optional `blocks`.
1695        // The single typed `content` owner replaced both; old shapes must fail
1696        // closed instead of being coerced.
1697        let header = serde_json::to_value(make_header()).unwrap();
1698
1699        let legacy_prompt = serde_json::json!({
1700            "input_type": "prompt",
1701            "header": header.clone(),
1702            "text": "hello",
1703            "blocks": null
1704        });
1705        serde_json::from_value::<Input>(legacy_prompt)
1706            .expect_err("legacy prompt text+blocks shape must be rejected");
1707
1708        let legacy_peer = serde_json::json!({
1709            "input_type": "peer",
1710            "header": header.clone(),
1711            "convention": { "convention_type": "message" },
1712            "body": "hi there"
1713        });
1714        serde_json::from_value::<Input>(legacy_peer)
1715            .expect_err("legacy peer body+blocks shape must be rejected");
1716
1717        let legacy_flow_step = serde_json::json!({
1718            "input_type": "flow_step",
1719            "header": header,
1720            "step_id": "step-1",
1721            "instructions": "analyze the data"
1722        });
1723        serde_json::from_value::<Input>(legacy_flow_step)
1724            .expect_err("legacy flow-step instructions+blocks shape must be rejected");
1725    }
1726
1727    #[test]
1728    fn input_kind_id() {
1729        let prompt = Input::Prompt(PromptInput {
1730            header: make_header(),
1731            content: "hi".into(),
1732            typed_turn_appends: Vec::new(),
1733            turn_metadata: None,
1734        });
1735        assert_eq!(prompt.kind(), InputKind::Prompt);
1736
1737        let peer_msg = Input::Peer(PeerInput {
1738            header: make_header(),
1739            convention: Some(PeerConvention::Message),
1740            content: "hi".into(),
1741            payload: None,
1742            handling_mode: None,
1743        });
1744        assert_eq!(peer_msg.kind(), InputKind::PeerMessage);
1745
1746        let peer_req = Input::Peer(PeerInput {
1747            header: make_header(),
1748            convention: Some(PeerConvention::Request {
1749                request_id: "r".into(),
1750                intent: "i".into(),
1751            }),
1752            content: "hi".into(),
1753            payload: Some(serde_json::json!({"subject": "x"})),
1754            handling_mode: None,
1755        });
1756        assert_eq!(peer_req.kind(), InputKind::PeerRequest);
1757
1758        let continuation = Input::Continuation(ContinuationInput {
1759            header: make_header(),
1760            reason: "continue".into(),
1761            continuation_kind: ContinuationKind::Ordinary,
1762            handling_mode: HandlingMode::Steer,
1763            request_id: None,
1764            flow_tool_overlay: None,
1765            context_append: None,
1766            turn_append: None,
1767        });
1768        assert_eq!(continuation.kind(), InputKind::Continuation);
1769
1770        let operation = Input::Operation(OperationInput {
1771            header: make_header(),
1772            operation_id: OperationId::new(),
1773            event: OpEvent::Cancelled {
1774                id: OperationId::new(),
1775            },
1776        });
1777        assert_eq!(operation.kind(), InputKind::Operation);
1778    }
1779
1780    #[test]
1781    fn input_source_variants() {
1782        let sources = vec![
1783            InputOrigin::Operator,
1784            InputOrigin::Peer {
1785                peer_id: "p1".into(),
1786                display_identity: None,
1787                runtime_id: None,
1788            },
1789            InputOrigin::Flow {
1790                flow_id: "f1".into(),
1791                step_index: 0,
1792            },
1793            InputOrigin::System,
1794            InputOrigin::External {
1795                source_name: "webhook".into(),
1796            },
1797        ];
1798        for source in sources {
1799            let json = serde_json::to_value(&source).unwrap();
1800            let parsed: InputOrigin = serde_json::from_value(json).unwrap();
1801            assert_eq!(source, parsed);
1802        }
1803    }
1804
1805    #[test]
1806    fn input_durability_serde() {
1807        for d in [
1808            InputDurability::Durable,
1809            InputDurability::Ephemeral,
1810            InputDurability::Derived,
1811        ] {
1812            let json = serde_json::to_value(d).unwrap();
1813            let parsed: InputDurability = serde_json::from_value(json).unwrap();
1814            assert_eq!(d, parsed);
1815        }
1816    }
1817
1818    #[test]
1819    fn peer_input_without_handling_mode_deserializes_as_none() {
1820        // Serialized PeerInput without the optional handling_mode field.
1821        let json = serde_json::json!({
1822            "input_type": "peer",
1823            "header": serde_json::to_value(make_header()).unwrap(),
1824            "convention": { "convention_type": "message" },
1825            "content": "hello"
1826        });
1827        let parsed: Input = serde_json::from_value(json).unwrap();
1828        match parsed {
1829            Input::Peer(p) => assert!(p.handling_mode.is_none()),
1830            other => panic!("Expected Peer, got {other:?}"),
1831        }
1832    }
1833
1834    #[test]
1835    fn peer_input_with_queue_handling_mode_roundtrips() {
1836        let input = Input::Peer(PeerInput {
1837            header: make_header(),
1838            convention: Some(PeerConvention::Message),
1839            content: "hi".into(),
1840            payload: None,
1841            handling_mode: Some(HandlingMode::Queue),
1842        });
1843        let json = serde_json::to_value(&input).unwrap();
1844        assert_eq!(json["handling_mode"], "queue");
1845        let parsed: Input = serde_json::from_value(json).unwrap();
1846        match parsed {
1847            Input::Peer(p) => assert_eq!(p.handling_mode, Some(HandlingMode::Queue)),
1848            other => panic!("Expected Peer, got {other:?}"),
1849        }
1850    }
1851
1852    #[test]
1853    fn peer_response_terminal_input_owns_wire_status_mapping() {
1854        let peer_id = meerkat_core::comms::PeerId::from_uuid(
1855            uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000161").unwrap(),
1856        );
1857        let display_name = meerkat_core::comms::PeerName::new("analyst").unwrap();
1858        let request_id = meerkat_core::PeerCorrelationId::from_uuid(
1859            uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000162").unwrap(),
1860        );
1861        let input = peer_response_terminal_input(
1862            peer_id,
1863            Some(display_name),
1864            request_id,
1865            meerkat_contracts::PeerResponseTerminalStatusWire::Completed,
1866            serde_json::json!({"ok": true}),
1867        );
1868
1869        match input {
1870            Input::Peer(PeerInput {
1871                header:
1872                    InputHeader {
1873                        source:
1874                            InputOrigin::Peer {
1875                                peer_id,
1876                                display_identity,
1877                                runtime_id,
1878                            },
1879                        durability: InputDurability::Durable,
1880                        correlation_id,
1881                        ..
1882                    },
1883                convention: Some(PeerConvention::ResponseTerminal { request_id, status }),
1884                payload: Some(payload),
1885                handling_mode: None,
1886                ..
1887            }) => {
1888                assert_eq!(peer_id, "00000000-0000-4000-8000-000000000161");
1889                assert_eq!(display_identity.as_deref(), Some("analyst"));
1890                assert_eq!(runtime_id, None);
1891                assert_eq!(request_id, "00000000-0000-4000-8000-000000000162");
1892                assert_eq!(
1893                    correlation_id,
1894                    Some(CorrelationId::from_uuid(
1895                        uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000162").unwrap()
1896                    ))
1897                );
1898                assert_eq!(status, ResponseTerminalStatus::Completed);
1899                assert_eq!(payload["ok"], true);
1900            }
1901            other => panic!("expected terminal peer input, got {other:?}"),
1902        }
1903    }
1904
1905    #[test]
1906    fn peer_response_terminal_validation_is_structural_only() {
1907        let peer_id = meerkat_core::comms::PeerId::from_uuid(
1908            uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000161").unwrap(),
1909        );
1910        let display_name = meerkat_core::comms::PeerName::new("analyst").unwrap();
1911        let request_id = meerkat_core::PeerCorrelationId::from_uuid(
1912            uuid::Uuid::parse_str("00000000-0000-4000-8000-000000000162").unwrap(),
1913        );
1914        let input = peer_response_terminal_input(
1915            peer_id,
1916            Some(display_name),
1917            request_id,
1918            meerkat_contracts::PeerResponseTerminalStatusWire::Cancelled,
1919            serde_json::json!({"ok": false}),
1920        );
1921
1922        validate_peer_response_terminal_fact(&input)
1923            .expect("status support is generated admission authority, structural fact validation should pass");
1924    }
1925
1926    #[test]
1927    fn peer_input_with_steer_handling_mode_roundtrips() {
1928        let input = Input::Peer(PeerInput {
1929            header: make_header(),
1930            convention: Some(PeerConvention::Message),
1931            content: "hi".into(),
1932            payload: None,
1933            handling_mode: Some(HandlingMode::Steer),
1934        });
1935        let json = serde_json::to_value(&input).unwrap();
1936        assert_eq!(json["handling_mode"], "steer");
1937        let parsed: Input = serde_json::from_value(json).unwrap();
1938        match parsed {
1939            Input::Peer(p) => assert_eq!(p.handling_mode, Some(HandlingMode::Steer)),
1940            other => panic!("Expected Peer, got {other:?}"),
1941        }
1942    }
1943
1944    #[test]
1945    fn peer_input_handling_mode_not_serialized_when_none() {
1946        let input = Input::Peer(PeerInput {
1947            header: make_header(),
1948            convention: Some(PeerConvention::Message),
1949            content: "hi".into(),
1950            payload: None,
1951            handling_mode: None,
1952        });
1953        let json = serde_json::to_value(&input).unwrap();
1954        assert!(json.get("handling_mode").is_none());
1955    }
1956}