Skip to main content

meerkat_core/
session.rs

1//! Session management for Meerkat
2//!
3//! A session represents a conversation history that can be persisted and resumed.
4//!
5//! # Performance
6//!
7//! Sessions use Arc-based copy-on-write for message storage:
8//! - `fork()` shares the message buffer (O(1), no clone)
9//! - Mutation (push) triggers CoW only when refcount > 1
10//! - `push_batch()` adds multiple messages with a single timestamp update
11
12use crate::Provider;
13use crate::peer_meta::PeerMeta;
14use crate::realtime_transcript::{
15    RealtimeTranscriptApplyOutcome, RealtimeTranscriptEvent, RealtimeTranscriptMaterializedMessage,
16    RealtimeTranscriptRole, SESSION_REALTIME_TRANSCRIPT_STATE_KEY, TranscriptLane,
17};
18use crate::service::{AppendSystemContextRequest, MobToolAuthorityContext};
19use crate::time_compat::SystemTime;
20use crate::tool_scope::ToolFilter;
21use crate::types::{
22    AssistantBlock, BlockAssistantMessage, ContentBlock, ContentInput, Message, SessionId,
23    StopReason, ToolDef, ToolProvenance, ToolResult, Usage, UserMessage,
24};
25use serde::{Deserialize, Deserializer, Serialize, Serializer};
26use std::collections::{BTreeMap, BTreeSet, HashMap};
27use std::sync::Arc;
28
29/// Current session format version.
30///
31/// Version history:
32/// - v1 — pre-wave-c. `SessionMetadata.auth_binding` inner fields were
33///   untyped strings (`realm_id`, `binding_id`, `profile`); no per-entity
34///   schema version byte on `SessionMetadata`.
35/// - v2 — wave-c C-3. `AuthBindingRef` inner fields are typed
36///   `RealmId`/`BindingId`/`ProfileId` newtypes; `SessionMetadata` carries
37///   a `schema_version` byte. Opportunistic upgrade-on-read —
38///   `meerkat_session::persistent::migrations::migrate` rewrites v1 rows
39///   into v2 shape; the next `save()` persists v2.
40pub const SESSION_VERSION: u32 = 2;
41
42/// Current `SessionMetadata` schema version. Distinct from `SESSION_VERSION`
43/// so `SessionMetadata` can evolve independently of the Session envelope.
44///
45/// - v1 — pre-wave-c. Default on read for rows written before the byte
46///   was introduced.
47/// - v2 — wave-c C-3. Typed `AuthBindingRef` inner fields; any future
48///   `SessionMetadata`-local shape change bumps this without moving
49///   `SESSION_VERSION`.
50pub const SESSION_METADATA_SCHEMA_VERSION: u32 = 2;
51
52/// Typed transcript replacement used to create an edited fork.
53///
54/// Replacements never mutate the source session in place. The owning service
55/// applies this to a forked prefix, producing a new `SessionId`.
56#[derive(Debug, Clone, Serialize, Deserialize)]
57#[serde(tag = "type", rename_all = "snake_case")]
58pub enum TranscriptReplacement {
59    /// Replace the addressed message with a full canonical message.
60    Message { message: Message },
61    /// Replace one user-message content block.
62    UserContentBlock {
63        block_index: usize,
64        block: ContentBlock,
65    },
66    /// Replace one block in a block-assistant message.
67    AssistantBlock {
68        block_index: usize,
69        block: AssistantBlock,
70    },
71    /// Replace one content block inside one tool-result payload.
72    ToolResultContentBlock {
73        result_index: usize,
74        block_index: usize,
75        block: ContentBlock,
76    },
77}
78
79/// Invalid typed transcript edit request.
80#[derive(Debug, Clone, thiserror::Error)]
81pub enum TranscriptEditError {
82    #[error("message index {message_index} out of bounds for {message_count} messages")]
83    MessageIndexOutOfBounds {
84        message_index: usize,
85        message_count: usize,
86    },
87    #[error("{block_kind} index {block_index} out of bounds for {block_count} blocks")]
88    BlockIndexOutOfBounds {
89        block_kind: &'static str,
90        block_index: usize,
91        block_count: usize,
92    },
93    #[error("replacement expected {expected} at message index {message_index}, found {actual}")]
94    MessageRoleMismatch {
95        message_index: usize,
96        expected: &'static str,
97        actual: &'static str,
98    },
99}
100
101fn message_role_name(message: &Message) -> &'static str {
102    match message {
103        Message::System(_) => "system",
104        Message::SystemNotice(_) => "system_notice",
105        Message::User(_) => "user",
106        Message::Assistant(_) => "assistant",
107        Message::BlockAssistant(_) => "block_assistant",
108        Message::ToolResults { .. } => "tool_results",
109    }
110}
111
112#[derive(Debug, Clone, Default, Serialize, Deserialize)]
113#[serde(rename_all = "snake_case")]
114struct SessionRealtimeTranscriptState {
115    #[serde(default)]
116    items: BTreeMap<String, RealtimeTranscriptItemState>,
117    #[serde(default)]
118    first_seen_order: Vec<String>,
119    #[serde(default)]
120    seen_delta_ids: BTreeSet<String>,
121    #[serde(default)]
122    assistant_completions: BTreeMap<String, RealtimeAssistantCompletion>,
123    #[serde(default, skip_serializing_if = "BTreeSet::is_empty")]
124    discarded_assistant_response_ids: BTreeSet<String>,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
128#[serde(rename_all = "snake_case")]
129struct RealtimeTranscriptItemState {
130    role: RealtimeTranscriptRole,
131    #[serde(default)]
132    previous_item_id: Option<String>,
133    #[serde(default)]
134    response_id: Option<String>,
135    #[serde(default)]
136    content_segments: BTreeMap<u32, String>,
137    #[serde(default)]
138    skipped: bool,
139    #[serde(default)]
140    ready: bool,
141    #[serde(default)]
142    materialized: bool,
143    /// T9/T10: output lane this assistant item carries. `Display` is the
144    /// default (matches all pre-T10 sessions on disk); promoted to `Spoken`
145    /// the first time an [`RealtimeTranscriptEvent::AssistantTranscriptDelta`]
146    /// fragment arrives for the item. User-role items always carry
147    /// `Display` (the field is unused for user transcripts).
148    #[serde(default)]
149    lane: TranscriptLane,
150}
151
152impl RealtimeTranscriptItemState {
153    fn new(
154        role: RealtimeTranscriptRole,
155        previous_item_id: Option<String>,
156        response_id: Option<String>,
157    ) -> Self {
158        Self {
159            role,
160            previous_item_id,
161            response_id,
162            content_segments: BTreeMap::new(),
163            skipped: false,
164            ready: false,
165            materialized: false,
166            lane: TranscriptLane::Display,
167        }
168    }
169
170    fn skipped(previous_item_id: Option<String>) -> Self {
171        Self {
172            role: RealtimeTranscriptRole::Assistant,
173            previous_item_id,
174            response_id: None,
175            content_segments: BTreeMap::new(),
176            skipped: true,
177            ready: true,
178            materialized: false,
179            lane: TranscriptLane::Display,
180        }
181    }
182
183    fn text(&self) -> String {
184        self.content_segments.values().cloned().collect()
185    }
186}
187
188#[derive(Debug, Clone, Serialize, Deserialize)]
189#[serde(rename_all = "snake_case")]
190struct RealtimeAssistantCompletion {
191    stop_reason: StopReason,
192    usage: Usage,
193    usage_consumed: bool,
194}
195
196/// A conversation session with full history
197///
198/// Uses Arc<Vec<Message>> internally for efficient forking (copy-on-write).
199#[derive(Debug, Clone)]
200pub struct Session {
201    /// Format version for migrations
202    version: u32,
203    /// Unique identifier
204    id: SessionId,
205    /// All messages in order (Arc for CoW on fork)
206    pub(crate) messages: Arc<Vec<Message>>,
207    /// When the session was created
208    created_at: SystemTime,
209    /// When the session was last updated
210    updated_at: SystemTime,
211    /// Arbitrary metadata
212    metadata: serde_json::Map<String, serde_json::Value>,
213    /// Cumulative token usage across all LLM calls in this session
214    usage: Usage,
215}
216
217/// Serde helper for Session serialization (flattens Arc)
218#[derive(Serialize, Deserialize)]
219#[serde(rename_all = "snake_case")]
220struct SessionSerde {
221    #[serde(default = "default_version")]
222    version: u32,
223    id: SessionId,
224    messages: Vec<Message>,
225    created_at: SystemTime,
226    updated_at: SystemTime,
227    #[serde(default)]
228    metadata: serde_json::Map<String, serde_json::Value>,
229    #[serde(default)]
230    usage: Usage,
231}
232
233impl Serialize for Session {
234    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
235    where
236        S: Serializer,
237    {
238        let serde_repr = SessionSerde {
239            version: self.version,
240            id: self.id.clone(),
241            messages: (*self.messages).clone(),
242            created_at: self.created_at,
243            updated_at: self.updated_at,
244            metadata: self.metadata.clone(),
245            usage: self.usage.clone(),
246        };
247        serde_repr.serialize(serializer)
248    }
249}
250
251impl<'de> Deserialize<'de> for Session {
252    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
253    where
254        D: Deserializer<'de>,
255    {
256        let serde_repr = SessionSerde::deserialize(deserializer)?;
257        Ok(Session {
258            version: serde_repr.version,
259            id: serde_repr.id,
260            messages: Arc::new(serde_repr.messages),
261            created_at: serde_repr.created_at,
262            updated_at: serde_repr.updated_at,
263            metadata: serde_repr.metadata,
264            usage: serde_repr.usage,
265        })
266    }
267}
268
269fn default_version() -> u32 {
270    SESSION_VERSION
271}
272
273/// Metadata key used to store durable system-context control state.
274pub const SESSION_SYSTEM_CONTEXT_STATE_KEY: &str = "session_system_context_state";
275
276/// Metadata key used to store deferred-turn control state.
277pub const SESSION_DEFERRED_TURN_STATE_KEY: &str = "session_deferred_turn_state";
278
279/// Metadata key used to store recoverable build-only session state.
280pub const SESSION_BUILD_STATE_KEY: &str = "session_build_state";
281
282/// Metadata key used to store durable session-local tool visibility intent.
283pub const SESSION_TOOL_VISIBILITY_STATE_KEY: &str = "session_tool_visibility_state_v1";
284
285/// Canonical tool name gated by `image_tool_results` capability.
286pub const VIEW_IMAGE_TOOL_NAME: &str = "view_image";
287
288/// Canonical separator between appended runtime system-context blocks.
289pub const SYSTEM_CONTEXT_SEPARATOR: &str = "\n\n---\n\n";
290
291/// Durable control state for runtime system-context append requests.
292#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
293#[serde(rename_all = "snake_case")]
294pub struct SessionSystemContextState {
295    #[serde(default, skip_serializing_if = "Vec::is_empty")]
296    pub pending: Vec<PendingSystemContextAppend>,
297    #[serde(default, skip_serializing_if = "Vec::is_empty")]
298    pub applied: Vec<PendingSystemContextAppend>,
299    #[serde(default, skip_serializing_if = "std::collections::BTreeMap::is_empty")]
300    pub seen: std::collections::BTreeMap<String, SeenSystemContextKey>,
301}
302
303/// Pending append request accepted by the control plane but not yet applied at an LLM boundary.
304#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
305#[serde(rename_all = "snake_case")]
306pub struct PendingSystemContextAppend {
307    pub text: String,
308    #[serde(default, skip_serializing_if = "Option::is_none")]
309    pub source: Option<String>,
310    #[serde(default, skip_serializing_if = "Option::is_none")]
311    pub idempotency_key: Option<String>,
312    pub accepted_at: SystemTime,
313}
314
315/// Durable control state for deferred first-turn prompt and staged callback tool results.
316#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
317#[serde(rename_all = "snake_case")]
318pub struct SessionDeferredTurnState {
319    #[serde(default, skip_serializing_if = "DeferredFirstTurnPhase::is_inactive")]
320    pub first_turn_phase: DeferredFirstTurnPhase,
321    #[serde(default, skip_serializing_if = "Option::is_none")]
322    pub pending_initial_prompt: Option<PendingDeferredPrompt>,
323    #[serde(default, skip_serializing_if = "Vec::is_empty")]
324    pub pending_tool_results: Vec<PendingToolResultsMessage>,
325}
326
327/// Canonical lifecycle phase for the session's deferred first turn.
328#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq)]
329#[serde(rename_all = "snake_case")]
330pub enum DeferredFirstTurnPhase {
331    /// The session was not created in deferred-first-turn mode.
332    #[default]
333    Inactive,
334    /// The session exists durably but the first turn has not started yet.
335    Pending,
336    /// The first turn has started; build-only overrides are no longer legal.
337    Consumed,
338}
339
340impl DeferredFirstTurnPhase {
341    pub fn is_inactive(&self) -> bool {
342        matches!(self, Self::Inactive)
343    }
344}
345
346fn is_default_hook_run_overrides(value: &crate::HookRunOverrides) -> bool {
347    value == &crate::HookRunOverrides::default()
348}
349
350fn is_default_call_timeout_override(value: &crate::CallTimeoutOverride) -> bool {
351    value == &crate::CallTimeoutOverride::default()
352}
353
354fn is_tool_filter_all(value: &ToolFilter) -> bool {
355    matches!(value, ToolFilter::All)
356}
357
358fn is_zero(value: &u64) -> bool {
359    *value == 0
360}
361
362/// Derive the machine-owned capability base filter from the current image-tool-results support.
363pub fn capability_base_filter_for_image_tool_results(image_tool_results: bool) -> ToolFilter {
364    if image_tool_results {
365        ToolFilter::All
366    } else {
367        ToolFilter::Deny([VIEW_IMAGE_TOOL_NAME.to_string()].into_iter().collect())
368    }
369}
370
371/// Persisted witness for a durable tool-visibility name.
372#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
373#[serde(rename_all = "snake_case")]
374pub struct ToolVisibilityWitness {
375    #[serde(default, skip_serializing_if = "Option::is_none")]
376    pub stable_owner_key: Option<String>,
377    #[serde(default, skip_serializing_if = "Option::is_none")]
378    pub last_seen_provenance: Option<ToolProvenance>,
379}
380
381impl ToolVisibilityWitness {
382    pub fn has_identity_witness(&self) -> bool {
383        self.stable_owner_key.is_some() || self.last_seen_provenance.is_some()
384    }
385
386    pub fn has_provenance_identity_witness(&self) -> bool {
387        self.last_seen_provenance.is_some()
388    }
389}
390
391/// Typed authority value for a deferred-tool load request.
392///
393/// The public/effect seam carries the requested route name and provenance
394/// witness as one value. Canonical owners may project this into name-indexed
395/// maps internally, but callers do not get to make a map key the authority.
396#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
397#[serde(rename_all = "snake_case")]
398pub struct DeferredToolLoadAuthority {
399    pub name: String,
400    pub witness: ToolVisibilityWitness,
401}
402
403impl DeferredToolLoadAuthority {
404    pub fn new(name: impl Into<String>, witness: ToolVisibilityWitness) -> Self {
405        Self {
406            name: name.into(),
407            witness,
408        }
409    }
410
411    pub fn into_parts(self) -> (String, ToolVisibilityWitness) {
412        (self.name, self.witness)
413    }
414}
415
416/// Durable tool-filter intent paired with the witnesses that made the names
417/// authoritative at capture time.
418#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
419#[serde(rename_all = "snake_case")]
420pub struct WitnessedToolFilter {
421    pub filter: ToolFilter,
422    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
423    pub witnesses: BTreeMap<String, ToolVisibilityWitness>,
424}
425
426impl WitnessedToolFilter {
427    pub fn new(filter: ToolFilter, witnesses: BTreeMap<String, ToolVisibilityWitness>) -> Self {
428        Self { filter, witnesses }
429    }
430
431    pub fn into_parts(self) -> (ToolFilter, BTreeMap<String, ToolVisibilityWitness>) {
432        (self.filter, self.witnesses)
433    }
434}
435
436/// Canonical durable session-local tool visibility intent.
437#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
438#[serde(rename_all = "snake_case")]
439pub struct SessionToolVisibilityState {
440    #[serde(default, skip_serializing_if = "is_tool_filter_all")]
441    pub capability_base_filter: ToolFilter,
442    #[serde(default, skip_serializing_if = "is_tool_filter_all")]
443    pub inherited_base_filter: ToolFilter,
444    #[serde(default, skip_serializing_if = "is_tool_filter_all")]
445    pub active_filter: ToolFilter,
446    #[serde(default, skip_serializing_if = "is_tool_filter_all")]
447    pub staged_filter: ToolFilter,
448    #[serde(default, skip_serializing_if = "BTreeSet::is_empty")]
449    pub active_requested_deferred_names: BTreeSet<String>,
450    #[serde(default, skip_serializing_if = "BTreeSet::is_empty")]
451    pub staged_requested_deferred_names: BTreeSet<String>,
452    #[serde(default, skip_serializing_if = "is_zero")]
453    pub active_revision: u64,
454    #[serde(default, skip_serializing_if = "is_zero")]
455    pub staged_revision: u64,
456    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
457    pub requested_witnesses: BTreeMap<String, ToolVisibilityWitness>,
458    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
459    pub filter_witnesses: BTreeMap<String, ToolVisibilityWitness>,
460}
461
462/// Durable build-only session state required to faithfully recover and rebuild
463/// a persisted session without surface-local shadow config.
464#[derive(Debug, Clone, Serialize, Deserialize, Default)]
465#[serde(rename_all = "snake_case")]
466pub struct SessionBuildState {
467    #[serde(default, skip_serializing_if = "Option::is_none")]
468    pub system_prompt: Option<String>,
469    #[serde(default, skip_serializing_if = "Option::is_none")]
470    pub output_schema: Option<crate::OutputSchema>,
471    #[serde(default, skip_serializing_if = "is_default_hook_run_overrides")]
472    pub hooks_override: crate::HookRunOverrides,
473    #[serde(default, skip_serializing_if = "Option::is_none")]
474    pub budget_limits: Option<crate::BudgetLimits>,
475    #[serde(default, skip_serializing_if = "Vec::is_empty")]
476    pub recoverable_tool_defs: Vec<ToolDef>,
477    #[serde(default, skip_serializing_if = "Vec::is_empty")]
478    pub silent_comms_intents: Vec<String>,
479    #[serde(default, skip_serializing_if = "Option::is_none")]
480    pub max_inline_peer_notifications: Option<i32>,
481    #[serde(default, skip_serializing_if = "Option::is_none")]
482    pub app_context: Option<serde_json::Value>,
483    #[serde(default, skip_serializing_if = "Option::is_none")]
484    pub additional_instructions: Option<Vec<String>>,
485    #[serde(default, skip_serializing_if = "Option::is_none")]
486    pub shell_env: Option<HashMap<String, String>>,
487    #[serde(default, skip_serializing_if = "Option::is_none")]
488    pub mob_tool_authority_context: Option<MobToolAuthorityContext>,
489    #[serde(default, skip_serializing_if = "is_default_call_timeout_override")]
490    pub call_timeout_override: crate::CallTimeoutOverride,
491}
492
493/// Deferred create-time prompt staged for the next turn.
494#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
495#[serde(rename_all = "snake_case")]
496pub struct PendingDeferredPrompt {
497    pub prompt: ContentInput,
498    pub accepted_at: SystemTime,
499}
500
501/// Staged callback tool results waiting to be admitted on the next turn seam.
502#[derive(Debug, Clone, Serialize, Deserialize)]
503#[serde(rename_all = "snake_case")]
504pub struct PendingToolResultsMessage {
505    pub results: Vec<ToolResult>,
506    pub accepted_at: SystemTime,
507}
508
509impl PartialEq for PendingToolResultsMessage {
510    fn eq(&self, other: &Self) -> bool {
511        self.accepted_at == other.accepted_at
512            && serde_json::to_value(&self.results).ok() == serde_json::to_value(&other.results).ok()
513    }
514}
515
516/// Seen idempotency-key entry for system-context append requests.
517#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
518#[serde(rename_all = "snake_case")]
519pub struct SeenSystemContextKey {
520    pub text: String,
521    #[serde(default, skip_serializing_if = "Option::is_none")]
522    pub source: Option<String>,
523    pub state: SeenSystemContextState,
524}
525
526/// Lifecycle state for an accepted idempotency key.
527#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
528#[serde(rename_all = "snake_case")]
529pub enum SeenSystemContextState {
530    Pending,
531    Applied,
532}
533
534impl SessionSystemContextState {
535    /// Stage an append request, enforcing per-session idempotency.
536    pub fn stage_append(
537        &mut self,
538        req: &AppendSystemContextRequest,
539        accepted_at: SystemTime,
540    ) -> Result<crate::service::AppendSystemContextStatus, SystemContextStageError> {
541        let text = req.text.trim();
542        if text.is_empty() {
543            return Err(SystemContextStageError::InvalidRequest(
544                "system context text must not be empty".to_string(),
545            ));
546        }
547
548        if let Some(key) = req.idempotency_key.as_ref() {
549            match self.seen.get(key) {
550                Some(existing)
551                    if existing.text == text
552                        && existing.source.as_deref() == req.source.as_deref() =>
553                {
554                    return Ok(crate::service::AppendSystemContextStatus::Duplicate);
555                }
556                Some(existing) => {
557                    return Err(SystemContextStageError::Conflict {
558                        key: key.clone(),
559                        existing_text: existing.text.clone(),
560                        existing_source: existing.source.clone(),
561                    });
562                }
563                None => {}
564            }
565        }
566
567        let append = PendingSystemContextAppend {
568            text: text.to_string(),
569            source: req.source.clone(),
570            idempotency_key: req.idempotency_key.clone(),
571            accepted_at,
572        };
573        if let Some(key) = req.idempotency_key.as_ref() {
574            self.seen.insert(
575                key.clone(),
576                SeenSystemContextKey {
577                    text: append.text.clone(),
578                    source: append.source.clone(),
579                    state: SeenSystemContextState::Pending,
580                },
581            );
582        }
583        self.pending.push(append);
584        Ok(crate::service::AppendSystemContextStatus::Staged)
585    }
586
587    /// Mark all currently-pending appends as applied and clear the pending queue.
588    pub fn mark_pending_applied(&mut self) {
589        for pending in &self.pending {
590            if !self.applied.contains(pending) {
591                self.applied.push(pending.clone());
592            }
593        }
594        for pending in &self.pending {
595            if let Some(key) = pending.idempotency_key.as_ref()
596                && let Some(seen) = self.seen.get_mut(key)
597            {
598                seen.state = SeenSystemContextState::Applied;
599            }
600        }
601        self.pending.clear();
602    }
603}
604
605impl SessionDeferredTurnState {
606    /// Mark that this session has a deferred first turn waiting to start.
607    pub fn mark_initial_turn_pending(&mut self) {
608        self.first_turn_phase = DeferredFirstTurnPhase::Pending;
609    }
610
611    /// Mark the deferred first turn as started.
612    ///
613    /// Returns true when the phase transitioned from `Pending`.
614    pub fn mark_initial_turn_started(&mut self) -> bool {
615        let was_pending = matches!(self.first_turn_phase, DeferredFirstTurnPhase::Pending);
616        if was_pending {
617            self.first_turn_phase = DeferredFirstTurnPhase::Consumed;
618        }
619        was_pending
620    }
621
622    /// Restore the deferred first-turn pending phase after a failed pre-run setup.
623    pub fn restore_initial_turn_pending(&mut self) {
624        self.first_turn_phase = DeferredFirstTurnPhase::Pending;
625    }
626
627    /// Whether build-only first-turn overrides are still legal for this session.
628    pub fn allows_initial_turn_overrides(&self) -> bool {
629        matches!(self.first_turn_phase, DeferredFirstTurnPhase::Pending)
630    }
631
632    /// Stage the create-time prompt for a later first turn.
633    pub fn stage_initial_prompt(&mut self, prompt: ContentInput, accepted_at: SystemTime) {
634        if !prompt.has_images() && prompt.text_content().trim().is_empty() {
635            self.pending_initial_prompt = None;
636            return;
637        }
638
639        self.pending_initial_prompt = Some(PendingDeferredPrompt {
640            prompt,
641            accepted_at,
642        });
643    }
644
645    /// Stage one callback tool-results message for the next turn.
646    pub fn stage_tool_results(
647        &mut self,
648        results: Vec<ToolResult>,
649        accepted_at: SystemTime,
650    ) -> usize {
651        if results.is_empty() {
652            return 0;
653        }
654
655        let accepted = results.len();
656        self.pending_tool_results.push(PendingToolResultsMessage {
657            results,
658            accepted_at,
659        });
660        accepted
661    }
662
663    /// Consume the staged initial prompt, if any.
664    pub fn take_initial_prompt(&mut self) -> Option<ContentInput> {
665        self.pending_initial_prompt
666            .take()
667            .map(|pending| pending.prompt)
668    }
669
670    /// Consume all staged callback tool-results messages.
671    pub fn take_tool_results(&mut self) -> Vec<PendingToolResultsMessage> {
672        std::mem::take(&mut self.pending_tool_results)
673    }
674
675    /// Whether any callback tool results are currently staged.
676    pub fn has_pending_tool_results(&self) -> bool {
677        !self.pending_tool_results.is_empty()
678    }
679}
680
681/// Failure when staging a system-context append request.
682#[derive(Debug, Clone, PartialEq, Eq)]
683pub enum SystemContextStageError {
684    InvalidRequest(String),
685    Conflict {
686        key: String,
687        existing_text: String,
688        existing_source: Option<String>,
689    },
690}
691
692fn render_system_context_block(append: &PendingSystemContextAppend) -> String {
693    let mut rendered = String::from("[Runtime System Context]");
694    if let Some(source) = &append.source {
695        rendered.push_str("\nsource: ");
696        rendered.push_str(source);
697    }
698    rendered.push_str("\n\n");
699    rendered.push_str(&append.text);
700    rendered
701}
702
703fn seen_system_context_matches(
704    seen: &SeenSystemContextKey,
705    append: &PendingSystemContextAppend,
706) -> bool {
707    seen.text == append.text && seen.source.as_deref() == append.source.as_deref()
708}
709
710fn pending_system_context_matches(
711    existing: &PendingSystemContextAppend,
712    append: &PendingSystemContextAppend,
713) -> bool {
714    existing.text == append.text && existing.source.as_deref() == append.source.as_deref()
715}
716
717impl Session {
718    /// Create a new empty session
719    pub fn new() -> Self {
720        let now = SystemTime::now();
721        Self {
722            version: SESSION_VERSION,
723            id: SessionId::new(),
724            messages: Arc::new(Vec::new()),
725            created_at: now,
726            updated_at: now,
727            metadata: serde_json::Map::new(),
728            usage: Usage::default(),
729        }
730    }
731
732    /// Create a session with a specific ID (for loading)
733    pub fn with_id(id: SessionId) -> Self {
734        let mut session = Self::new();
735        session.id = id;
736        session
737    }
738
739    /// Get the session ID
740    pub fn id(&self) -> &SessionId {
741        &self.id
742    }
743
744    /// Get the session version
745    pub fn version(&self) -> u32 {
746        self.version
747    }
748
749    /// Get all messages.
750    pub fn messages(&self) -> &[Message] {
751        &self.messages
752    }
753
754    /// Mutable access to the message buffer — *append-only witness escape hatch*.
755    ///
756    /// Intentionally `pub(crate)`: the only legitimate mutations of the
757    /// message history within a single `SessionId` are `push` / `push_batch`
758    /// (extend) and the two in-crate rewrite operations used by the agent
759    /// loop (compaction-summary replacement, synthetic-notice stripping).
760    /// Cross-crate consumers must route in-place content rewrites through
761    /// the typed proxy [`Session::externalize_media`]; shrink or replace
762    /// operations must go through [`Session::fork_at`] which rotates
763    /// `SessionId` (F1/F7 closure from the state-scope audit).
764    pub(crate) fn messages_mut_internal(&mut self) -> &mut Vec<Message> {
765        Arc::make_mut(&mut self.messages)
766    }
767
768    /// Get creation time
769    pub fn created_at(&self) -> SystemTime {
770        self.created_at
771    }
772
773    /// Get last update time
774    pub fn updated_at(&self) -> SystemTime {
775        self.updated_at
776    }
777
778    /// Add a message to the session
779    ///
780    /// Updates the timestamp. For adding multiple messages, prefer `push_batch`.
781    pub fn push(&mut self, message: Message) {
782        Arc::make_mut(&mut self.messages).push(message);
783        self.updated_at = SystemTime::now();
784    }
785
786    /// Add multiple messages in one operation (single timestamp update)
787    ///
788    /// More efficient than multiple `push` calls when adding many messages.
789    pub fn push_batch(&mut self, messages: Vec<Message>) {
790        if messages.is_empty() {
791            return;
792        }
793        let inner = Arc::make_mut(&mut self.messages);
794        inner.extend(messages);
795        self.updated_at = SystemTime::now();
796    }
797
798    /// Rewrite inline media payloads in-place as `BlobRef` pointers.
799    ///
800    /// Message count is invariant across this operation — `externalize`
801    /// only swaps inline image/media bytes for opaque blob references.
802    /// This is the cross-crate-legitimate rewrite operation that used
803    /// to require public `messages_mut()`; post-C-H1 callers in
804    /// `meerkat-session` go through this typed method.
805    ///
806    /// Does not touch `updated_at` — externalization is bookkeeping, not
807    /// a semantic session mutation.
808    pub async fn externalize_media(
809        &mut self,
810        blob_store: &dyn crate::BlobStore,
811        start: usize,
812    ) -> Result<(), crate::blob::BlobStoreError> {
813        let messages = Arc::make_mut(&mut self.messages);
814        crate::image_content::externalize_messages_from(blob_store, messages, start).await
815    }
816
817    /// Explicitly update the timestamp
818    ///
819    /// Call this after bulk operations that don't update timestamps automatically.
820    pub fn touch(&mut self) {
821        self.updated_at = SystemTime::now();
822    }
823
824    /// Whether the conversation has a pending turn boundary.
825    ///
826    /// Returns `true` if the last message is `User` or `ToolResults`, meaning
827    /// the conversation is waiting for an assistant turn and `run_pending` can
828    /// resume without a new user message.
829    pub fn has_pending_boundary(&self) -> bool {
830        self.messages
831            .last()
832            .is_some_and(|m| matches!(m, Message::User(_) | Message::ToolResults { .. }))
833    }
834
835    /// Get the last N messages
836    pub fn last_n(&self, n: usize) -> &[Message] {
837        let start = self.messages.len().saturating_sub(n);
838        &self.messages[start..]
839    }
840
841    /// Count total tokens used.
842    pub fn total_tokens(&self) -> u64 {
843        self.usage.total_tokens()
844    }
845
846    /// Get total usage statistics for the session.
847    pub fn total_usage(&self) -> Usage {
848        self.usage.clone()
849    }
850
851    /// Update cumulative usage after an LLM call.
852    pub fn record_usage(&mut self, turn_usage: Usage) {
853        self.usage.add(&turn_usage);
854        self.updated_at = SystemTime::now();
855    }
856
857    /// Append externally-produced user content to the canonical transcript.
858    pub fn append_external_user_content(&mut self, content: ContentInput) {
859        self.push(Message::User(UserMessage::with_blocks(
860            content.into_blocks(),
861        )));
862    }
863
864    /// Append externally-produced assistant output to the canonical transcript.
865    pub fn append_external_assistant_blocks(
866        &mut self,
867        blocks: Vec<AssistantBlock>,
868        stop_reason: StopReason,
869        usage: Usage,
870    ) {
871        if !blocks.is_empty() {
872            self.push(Message::BlockAssistant(BlockAssistantMessage::new(
873                blocks,
874                stop_reason,
875            )));
876        }
877        if usage != Usage::default() {
878            self.record_usage(usage);
879        }
880    }
881
882    /// Apply an identity-bearing provider realtime transcript event.
883    ///
884    /// This is the canonical append authority for provider-managed realtime
885    /// turns: provider item ids, predecessor links, and content segment ids are
886    /// persisted in session metadata so duplicate websocket delivery,
887    /// reconnect replay, and causally equivalent event ordering cannot create
888    /// duplicate or misordered canonical messages.
889    pub fn append_realtime_transcript_event(
890        &mut self,
891        event: RealtimeTranscriptEvent,
892    ) -> RealtimeTranscriptApplyOutcome {
893        let mut state = self.realtime_transcript_state();
894        match event {
895            RealtimeTranscriptEvent::ItemObserved {
896                item_id,
897                previous_item_id,
898                role,
899                response_id,
900            } => {
901                let response_id = normalize_realtime_optional_response_id(response_id);
902                if role == RealtimeTranscriptRole::Assistant
903                    && response_id
904                        .as_ref()
905                        .is_some_and(|id| state.discarded_assistant_response_ids.contains(id))
906                {
907                    observe_realtime_skipped_item(&mut state, item_id, previous_item_id);
908                } else {
909                    observe_realtime_item(&mut state, item_id, previous_item_id, role, response_id);
910                }
911            }
912            RealtimeTranscriptEvent::ItemSkipped {
913                item_id,
914                previous_item_id,
915            } => {
916                observe_realtime_skipped_item(&mut state, item_id, previous_item_id);
917            }
918            RealtimeTranscriptEvent::UserTranscriptFinal {
919                item_id,
920                previous_item_id,
921                content_index,
922                text,
923            } => {
924                if let Some(item) = observe_realtime_item(
925                    &mut state,
926                    item_id,
927                    previous_item_id,
928                    RealtimeTranscriptRole::User,
929                    None,
930                ) {
931                    let segment = item.content_segments.entry(content_index).or_default();
932                    if segment.is_empty() && !text.is_empty() {
933                        *segment = text;
934                    } else if !text.is_empty() && segment.as_str() != text {
935                        tracing::warn!(
936                            content_index,
937                            "ignoring conflicting realtime user transcript segment replay"
938                        );
939                    }
940                    item.ready = true;
941                }
942            }
943            RealtimeTranscriptEvent::AssistantTextDelta {
944                response_id,
945                delta_id,
946                item_id,
947                previous_item_id,
948                content_index,
949                delta,
950            } => {
951                let Some(response_id) = normalize_realtime_response_id(response_id) else {
952                    return RealtimeTranscriptApplyOutcome::default();
953                };
954                if state
955                    .discarded_assistant_response_ids
956                    .contains(&response_id)
957                {
958                    observe_realtime_skipped_item(&mut state, item_id, previous_item_id);
959                    let outcome = self.materialize_realtime_transcript_ready_items(&mut state);
960                    self.store_realtime_transcript_state(&state);
961                    return outcome;
962                }
963                if !delta_id.trim().is_empty() && !state.seen_delta_ids.insert(delta_id) {
964                    return RealtimeTranscriptApplyOutcome::default();
965                }
966                let response_completed = state.assistant_completions.contains_key(&response_id);
967                if let Some(item) = observe_realtime_item(
968                    &mut state,
969                    item_id,
970                    previous_item_id,
971                    RealtimeTranscriptRole::Assistant,
972                    Some(response_id),
973                ) {
974                    if promote_item_lane(item, TranscriptLane::Display) {
975                        item.content_segments
976                            .entry(content_index)
977                            .or_default()
978                            .push_str(&delta);
979                        if response_completed && !item.text().is_empty() {
980                            item.ready = true;
981                        }
982                    } else {
983                        // R5-6 sibling: this delta was routed at a
984                        // Spoken-classified item (e.g. truncation arrived
985                        // first and locked the lane). `promote_item_lane`
986                        // already warned about the lane conflict; drop the
987                        // delta to preserve the lane invariant rather than
988                        // clobbering Spoken content with Display text.
989                        tracing::warn!(
990                            "AssistantTextDelta routed to a Spoken-lane item; dropping delta to preserve lane invariant — this indicates a provider lane-classification bug"
991                        );
992                    }
993                }
994            }
995            RealtimeTranscriptEvent::AssistantTranscriptDelta {
996                response_id,
997                delta_id,
998                item_id,
999                previous_item_id,
1000                content_index,
1001                delta,
1002            } => {
1003                // T9/T10: spoken-transcript lane. Identical staging shape to
1004                // `AssistantTextDelta` (same idempotency / ordering / dedup
1005                // logic owns both lanes); the only difference is that the
1006                // owning item is tagged `Spoken` so the materializer flushes
1007                // `AssistantBlock::Transcript` instead of `AssistantBlock::Text`.
1008                let Some(response_id) = normalize_realtime_response_id(response_id) else {
1009                    return RealtimeTranscriptApplyOutcome::default();
1010                };
1011                if state
1012                    .discarded_assistant_response_ids
1013                    .contains(&response_id)
1014                {
1015                    observe_realtime_skipped_item(&mut state, item_id, previous_item_id);
1016                    let outcome = self.materialize_realtime_transcript_ready_items(&mut state);
1017                    self.store_realtime_transcript_state(&state);
1018                    return outcome;
1019                }
1020                if !delta_id.trim().is_empty() && !state.seen_delta_ids.insert(delta_id) {
1021                    return RealtimeTranscriptApplyOutcome::default();
1022                }
1023                let response_completed = state.assistant_completions.contains_key(&response_id);
1024                if let Some(item) = observe_realtime_item(
1025                    &mut state,
1026                    item_id,
1027                    previous_item_id,
1028                    RealtimeTranscriptRole::Assistant,
1029                    Some(response_id),
1030                ) {
1031                    if promote_item_lane(item, TranscriptLane::Spoken) {
1032                        item.content_segments
1033                            .entry(content_index)
1034                            .or_default()
1035                            .push_str(&delta);
1036                        if response_completed && !item.text().is_empty() {
1037                            item.ready = true;
1038                        }
1039                    } else {
1040                        // R5-6 sibling: this transcript delta arrived for a
1041                        // Display-classified item (a Display delta or other
1042                        // Display-lane event landed first). `promote_item_lane`
1043                        // already warned; drop the delta rather than appending
1044                        // Spoken text into a Display-locked content_segment.
1045                        tracing::warn!(
1046                            "AssistantTranscriptDelta routed to a Display-lane item; dropping delta to preserve lane invariant — this indicates a provider lane-classification bug"
1047                        );
1048                    }
1049                }
1050            }
1051            RealtimeTranscriptEvent::AssistantTranscriptTruncated {
1052                response_id,
1053                item_id,
1054                content_index,
1055                text,
1056            } => {
1057                let Some(response_id) = normalize_realtime_response_id(response_id) else {
1058                    return RealtimeTranscriptApplyOutcome::default();
1059                };
1060                if state
1061                    .discarded_assistant_response_ids
1062                    .contains(&response_id)
1063                {
1064                    observe_realtime_skipped_item(&mut state, item_id, None);
1065                    let outcome = self.materialize_realtime_transcript_ready_items(&mut state);
1066                    self.store_realtime_transcript_state(&state);
1067                    return outcome;
1068                }
1069                let response_completed = state.assistant_completions.contains_key(&response_id);
1070                let item_id_for_log = item_id.clone();
1071                let response_id_for_log = response_id.clone();
1072                if let Some(item) = observe_realtime_item(
1073                    &mut state,
1074                    item_id,
1075                    None,
1076                    RealtimeTranscriptRole::Assistant,
1077                    Some(response_id),
1078                ) {
1079                    // R5-7-sibling: a late truncation arriving after the
1080                    // canonical message has already committed cannot mutate
1081                    // history (append-only invariant). Same shape as the
1082                    // late-FinalText guard above — warn and skip.
1083                    if item.materialized {
1084                        tracing::warn!(
1085                            target: "meerkat::session",
1086                            item_id = %item_id_for_log,
1087                            response_id = %response_id_for_log,
1088                            "AssistantTranscriptTruncated arrived after item already materialized; canonical message is locked, late truncation dropped",
1089                        );
1090                    } else if promote_item_lane(item, TranscriptLane::Spoken) {
1091                        // R5-6: truncation is a Spoken-lane-only semantic —
1092                        // it describes the audio output that was actually
1093                        // heard before barge-in cut it short. Promote to
1094                        // Spoken so the materializer commits as
1095                        // `AssistantBlock::Transcript`. If a Display delta
1096                        // arrived first, `promote_item_lane` keeps the
1097                        // existing lane and returns `false` — that's a
1098                        // provider bug; warn already emitted inside
1099                        // `promote_item_lane`.
1100                        item.content_segments.insert(content_index, text);
1101                        if response_completed && !item.text().is_empty() {
1102                            item.ready = true;
1103                        }
1104                    }
1105                }
1106            }
1107            RealtimeTranscriptEvent::AssistantTranscriptFinalText {
1108                response_id,
1109                item_id,
1110                content_index,
1111                text,
1112            } => {
1113                // R5-7: authoritative final text overrides any incomplete
1114                // delta accumulation for this `(response_id, item_id,
1115                // content_index)`. If no item is staged yet (final-only
1116                // provider, or all deltas dropped), create one on the
1117                // Spoken lane so the materializer flushes it as
1118                // `AssistantBlock::Transcript`. Flush gating is unchanged:
1119                // `AssistantTurnCompleted` still drives readiness.
1120                let Some(response_id) = normalize_realtime_response_id(response_id) else {
1121                    return RealtimeTranscriptApplyOutcome::default();
1122                };
1123                if state
1124                    .discarded_assistant_response_ids
1125                    .contains(&response_id)
1126                {
1127                    observe_realtime_skipped_item(&mut state, item_id, None);
1128                    let outcome = self.materialize_realtime_transcript_ready_items(&mut state);
1129                    self.store_realtime_transcript_state(&state);
1130                    return outcome;
1131                }
1132                let response_completed = state.assistant_completions.contains_key(&response_id);
1133                if let Some(item) = observe_realtime_item(
1134                    &mut state,
1135                    item_id,
1136                    None,
1137                    RealtimeTranscriptRole::Assistant,
1138                    Some(response_id),
1139                ) {
1140                    // R5-7: late `AssistantTranscriptFinalText` after the
1141                    // item has already been committed to canonical history.
1142                    // The staged item is `materialized = true` and the
1143                    // session's `Message::BlockAssistant` already carries
1144                    // the delta-accumulated text. Append-only history is a
1145                    // stronger invariant than typed text repair: rewriting
1146                    // the canonical message would violate it (consumers may
1147                    // already have observed the prior text via the event
1148                    // stream / projector). Warn-and-skip; the SDK or human
1149                    // can decide whether the provider-side ordering bug
1150                    // matters.
1151                    if item.materialized {
1152                        tracing::warn!(
1153                            "AssistantTranscriptFinalText arrived after item already materialized; canonical message is locked, late repair dropped"
1154                        );
1155                        self.store_realtime_transcript_state(&state);
1156                        return RealtimeTranscriptApplyOutcome::default();
1157                    }
1158                    // Spoken lane: this variant is the authoritative
1159                    // transcript-final text path. Display-text finals come
1160                    // through a different seam. If the item is locked to
1161                    // Display (e.g. an earlier `AssistantTextDelta` staged
1162                    // it), drop the final to preserve the lane invariant —
1163                    // append-only history would otherwise silently switch
1164                    // block type at materialize time.
1165                    if promote_item_lane(item, TranscriptLane::Spoken) {
1166                        // Replace, not append: the final's text is
1167                        // authoritative and supersedes any (possibly
1168                        // partial) accumulated segment text.
1169                        item.content_segments.insert(content_index, text);
1170                        if response_completed && !item.text().is_empty() {
1171                            item.ready = true;
1172                        }
1173                    } else {
1174                        tracing::warn!(
1175                            "AssistantTranscriptFinalText routed to a Display-lane item; dropping authoritative final to preserve lane invariant — this indicates a provider lane-classification bug"
1176                        );
1177                    }
1178                }
1179            }
1180            RealtimeTranscriptEvent::AssistantTurnCompleted {
1181                response_id,
1182                stop_reason,
1183                usage,
1184            } => {
1185                let Some(response_id) = normalize_realtime_response_id(response_id) else {
1186                    return RealtimeTranscriptApplyOutcome::default();
1187                };
1188                if state
1189                    .discarded_assistant_response_ids
1190                    .contains(&response_id)
1191                {
1192                    discard_realtime_assistant_response(&mut state, &response_id);
1193                    let outcome = self.materialize_realtime_transcript_ready_items(&mut state);
1194                    self.store_realtime_transcript_state(&state);
1195                    return outcome;
1196                }
1197                match stop_reason {
1198                    StopReason::Cancelled => {
1199                        discard_realtime_assistant_response(&mut state, &response_id);
1200                    }
1201                    StopReason::ToolUse => {
1202                        state.assistant_completions.remove(&response_id);
1203                    }
1204                    _ => {
1205                        state
1206                            .assistant_completions
1207                            .entry(response_id.clone())
1208                            .or_insert(RealtimeAssistantCompletion {
1209                                stop_reason,
1210                                usage,
1211                                usage_consumed: false,
1212                            });
1213                        mark_realtime_assistant_response_ready(&mut state, &response_id);
1214                    }
1215                }
1216            }
1217            RealtimeTranscriptEvent::AssistantTurnInterrupted { response_id } => {
1218                let Some(response_id) = normalize_realtime_response_id(response_id) else {
1219                    return RealtimeTranscriptApplyOutcome::default();
1220                };
1221                // R5-5: barge-in invalidates the spoken/audio lane (the user
1222                // is speaking over what they heard) but preserves the
1223                // Display lane (sideband display text from the same response
1224                // is not "spoken over"). The interrupt is also terminal for
1225                // the response on the realtime-staging path: any later
1226                // `AssistantTurnCompleted` arrives with `StopReason::Cancelled`
1227                // and short-circuits via the discarded-set guard. Therefore
1228                // we must materialize retained Display items immediately here
1229                // by inserting a synthetic `assistant_completions` entry —
1230                // otherwise they would never commit.
1231                discard_realtime_assistant_response_by_lane(&mut state, &response_id);
1232                state
1233                    .assistant_completions
1234                    .entry(response_id.clone())
1235                    .or_insert(RealtimeAssistantCompletion {
1236                        stop_reason: StopReason::Cancelled,
1237                        usage: Usage::default(),
1238                        usage_consumed: false,
1239                    });
1240                mark_realtime_assistant_response_ready(&mut state, &response_id);
1241            }
1242        }
1243
1244        let outcome = self.materialize_realtime_transcript_ready_items(&mut state);
1245        self.store_realtime_transcript_state(&state);
1246        outcome
1247    }
1248
1249    /// Return every distinct provider `response_id` currently staged in the
1250    /// realtime-transcript metadata that has at least one **unmaterialized**
1251    /// assistant item and is **not already discarded**.
1252    ///
1253    /// CC4 (Round-4 architectural reconciliation): when the live boundary
1254    /// signals a barge-in (`TurnInterrupted`), the projection sink does not
1255    /// know which provider response_ids have streaming deltas staged in
1256    /// session metadata. This accessor lets the sink fan
1257    /// [`RealtimeTranscriptEvent::AssistantTurnInterrupted`] events out to
1258    /// each in-flight response so staged-but-not-yet-materialized transcript
1259    /// fragments are discarded — preventing them from silently committing
1260    /// when the *next* turn's `AssistantTurnCompleted` (synthesized by the
1261    /// CC2 fix in `signal_turn_completed`) sweeps the materializer.
1262    ///
1263    /// Order is the [`SessionRealtimeTranscriptState::first_seen_order`]
1264    /// projection so callers see deterministic iteration. Items already
1265    /// materialized or skipped are excluded — only response_ids with at
1266    /// least one live unmaterialized assistant item are returned.
1267    #[must_use]
1268    pub fn in_flight_realtime_assistant_response_ids(&self) -> Vec<String> {
1269        let state = self.realtime_transcript_state();
1270        let mut seen: BTreeSet<String> = BTreeSet::new();
1271        let mut out: Vec<String> = Vec::new();
1272        for item_id in &state.first_seen_order {
1273            let Some(item) = state.items.get(item_id) else {
1274                continue;
1275            };
1276            if item.role != RealtimeTranscriptRole::Assistant {
1277                continue;
1278            }
1279            if item.materialized || item.skipped {
1280                continue;
1281            }
1282            let Some(response_id) = item.response_id.as_ref() else {
1283                continue;
1284            };
1285            if state.discarded_assistant_response_ids.contains(response_id) {
1286                continue;
1287            }
1288            if seen.insert(response_id.clone()) {
1289                out.push(response_id.clone());
1290            }
1291        }
1292        out
1293    }
1294
1295    fn realtime_transcript_state(&self) -> SessionRealtimeTranscriptState {
1296        self.metadata
1297            .get(SESSION_REALTIME_TRANSCRIPT_STATE_KEY)
1298            .cloned()
1299            .and_then(|value| serde_json::from_value(value).ok())
1300            .unwrap_or_default()
1301    }
1302
1303    fn store_realtime_transcript_state(&mut self, state: &SessionRealtimeTranscriptState) {
1304        match serde_json::to_value(state) {
1305            Ok(value) => self.set_metadata(SESSION_REALTIME_TRANSCRIPT_STATE_KEY, value),
1306            Err(error) => {
1307                tracing::warn!(error = %error, "failed to serialize realtime transcript state");
1308            }
1309        }
1310    }
1311
1312    fn materialize_realtime_transcript_ready_items(
1313        &mut self,
1314        state: &mut SessionRealtimeTranscriptState,
1315    ) -> RealtimeTranscriptApplyOutcome {
1316        let mut materialized = Vec::new();
1317
1318        // Round-4 CC7: when a single response_id produces both display-text
1319        // and spoken-transcript items (mixed-modality response), we emit a
1320        // SINGLE `Message::BlockAssistant` whose `blocks` interleave
1321        // `AssistantBlock::Text` and `AssistantBlock::Transcript` in
1322        // arrival order — not multiple messages. Pending state lives across
1323        // outer-loop batches: when chained items (`previous_item_id`) force
1324        // serial materialization, batch N may emit the display item and
1325        // batch N+1 may emit the spoken item under the same response_id —
1326        // we still want one combined message.
1327        //
1328        // The pending group is flushed when:
1329        //   - a User item lands (canonical-history ordering boundary)
1330        //   - an assistant item with a different response_id lands
1331        //   - the outer materialization loop terminates
1332        let mut pending_blocks: Vec<AssistantBlock> = Vec::new();
1333        let mut pending_response_id: Option<String> = None;
1334        let mut pending_stop_reason: StopReason = StopReason::EndTurn;
1335        let mut pending_usage: Usage = Usage::default();
1336
1337        loop {
1338            let order = realtime_transcript_order(state);
1339            let mut skipped_batch = Vec::new();
1340            let mut batch = Vec::new();
1341            for item_id in order {
1342                let Some(item) = state.items.get(&item_id) else {
1343                    continue;
1344                };
1345                if item.materialized {
1346                    continue;
1347                }
1348                if !realtime_predecessor_materialized(state, item.previous_item_id.as_deref()) {
1349                    continue;
1350                }
1351                if item.skipped {
1352                    skipped_batch.push(item_id.clone());
1353                    continue;
1354                }
1355                if !item.ready {
1356                    continue;
1357                }
1358                let text = item.text();
1359                if text.is_empty() {
1360                    continue;
1361                }
1362                match item.role {
1363                    RealtimeTranscriptRole::User => {
1364                        batch.push(RealtimeTranscriptMaterializedMessage::User {
1365                            item_id: item_id.clone(),
1366                            text,
1367                        });
1368                    }
1369                    RealtimeTranscriptRole::Assistant => {
1370                        let Some(response_id) = item.response_id.as_ref() else {
1371                            continue;
1372                        };
1373                        let Some(completion) = state.assistant_completions.get(response_id) else {
1374                            continue;
1375                        };
1376                        let usage = if completion.usage_consumed {
1377                            Usage::default()
1378                        } else {
1379                            completion.usage.clone()
1380                        };
1381                        batch.push(RealtimeTranscriptMaterializedMessage::Assistant {
1382                            item_id: item_id.clone(),
1383                            response_id: response_id.clone(),
1384                            text,
1385                            stop_reason: completion.stop_reason,
1386                            usage,
1387                            lane: item.lane,
1388                        });
1389                    }
1390                }
1391            }
1392            if skipped_batch.is_empty() && batch.is_empty() {
1393                break;
1394            }
1395            for item_id in skipped_batch {
1396                if let Some(item) = state.items.get_mut(&item_id) {
1397                    item.materialized = true;
1398                }
1399            }
1400
1401            for message in batch {
1402                match &message {
1403                    RealtimeTranscriptMaterializedMessage::User { item_id, text } => {
1404                        if !pending_blocks.is_empty() {
1405                            let drained = std::mem::take(&mut pending_blocks);
1406                            self.append_external_assistant_blocks(
1407                                drained,
1408                                pending_stop_reason,
1409                                std::mem::take(&mut pending_usage),
1410                            );
1411                            pending_response_id = None;
1412                        }
1413                        if let Some(item) = state.items.get_mut(item_id) {
1414                            item.materialized = true;
1415                        }
1416                        self.append_external_user_content(ContentInput::Text(text.clone()));
1417                    }
1418                    RealtimeTranscriptMaterializedMessage::Assistant {
1419                        item_id,
1420                        response_id,
1421                        text,
1422                        stop_reason,
1423                        usage,
1424                        lane,
1425                    } => {
1426                        // Flush if this assistant item belongs to a different
1427                        // response than the pending group. (Same response_id
1428                        // → accumulate; different → emit prior message.)
1429                        if pending_response_id
1430                            .as_ref()
1431                            .is_some_and(|existing| existing != response_id)
1432                            && !pending_blocks.is_empty()
1433                        {
1434                            let drained = std::mem::take(&mut pending_blocks);
1435                            self.append_external_assistant_blocks(
1436                                drained,
1437                                pending_stop_reason,
1438                                std::mem::take(&mut pending_usage),
1439                            );
1440                            pending_response_id = None;
1441                        }
1442                        if let Some(item) = state.items.get_mut(item_id) {
1443                            item.materialized = true;
1444                        }
1445                        if let Some(completion) = state.assistant_completions.get_mut(response_id) {
1446                            completion.usage_consumed = true;
1447                        }
1448                        // T9/T10: route to the correct canonical block by
1449                        // lane. `Display` keeps the legacy `AssistantBlock::Text`
1450                        // shape; `Spoken` flushes
1451                        // `AssistantBlock::Transcript { source: Spoken }` so
1452                        // OpenAI realtime audio transcripts stop being
1453                        // persisted as authored display text.
1454                        let block = match lane {
1455                            TranscriptLane::Display => AssistantBlock::Text {
1456                                text: text.clone(),
1457                                meta: None,
1458                            },
1459                            TranscriptLane::Spoken => AssistantBlock::Transcript {
1460                                text: text.clone(),
1461                                source: crate::types::TranscriptSource::Spoken,
1462                                meta: None,
1463                            },
1464                        };
1465                        // First item in a group seeds the response_id /
1466                        // stop_reason / usage. `usage_consumed` is flipped
1467                        // to true above as the first item is processed, so
1468                        // every subsequent item sees `Usage::default()` from
1469                        // the materializer's per-item builder — accumulating
1470                        // across items in one group is naturally
1471                        // single-counted.
1472                        if pending_response_id.is_none() {
1473                            pending_response_id = Some(response_id.clone());
1474                            pending_stop_reason = *stop_reason;
1475                            pending_usage = usage.clone();
1476                        }
1477                        pending_blocks.push(block);
1478                    }
1479                }
1480                materialized.push(message);
1481            }
1482        }
1483
1484        // Final flush after the outer materialization loop has drained.
1485        if !pending_blocks.is_empty() {
1486            self.append_external_assistant_blocks(
1487                pending_blocks,
1488                pending_stop_reason,
1489                pending_usage,
1490            );
1491        }
1492
1493        RealtimeTranscriptApplyOutcome {
1494            materialized_messages: materialized,
1495        }
1496    }
1497
1498    /// Set a system prompt (adds or replaces System message at start)
1499    pub fn set_system_prompt(&mut self, prompt: String) {
1500        use crate::types::SystemMessage;
1501
1502        let inner = Arc::make_mut(&mut self.messages);
1503        // Check if first message is system
1504        if let Some(Message::System(_)) = inner.first() {
1505            inner[0] = Message::System(SystemMessage::new(prompt));
1506        } else {
1507            inner.insert(0, Message::System(SystemMessage::new(prompt)));
1508        }
1509        self.updated_at = SystemTime::now();
1510    }
1511
1512    /// Append one or more runtime system-context blocks to the canonical system prompt.
1513    pub fn append_system_context_blocks(&mut self, appends: &[PendingSystemContextAppend]) {
1514        if appends.is_empty() {
1515            return;
1516        }
1517
1518        let current_system_prompt = self
1519            .messages
1520            .first()
1521            .and_then(|message| match message {
1522                Message::System(system) => Some(system.content.as_str()),
1523                _ => None,
1524            })
1525            .unwrap_or_default();
1526        let mut state = self.system_context_state().unwrap_or_default();
1527        let mut state_dirty = false;
1528        let mut new_appends: Vec<PendingSystemContextAppend> = Vec::new();
1529        for append in appends {
1530            if append.text.trim().is_empty() {
1531                continue;
1532            }
1533            let rendered = render_system_context_block(append);
1534            if let Some(key) = append.idempotency_key.as_ref() {
1535                if let Some(existing) = state.seen.get(key)
1536                    && !seen_system_context_matches(existing, append)
1537                {
1538                    tracing::warn!(
1539                        idempotency_key = %key,
1540                        "skipping conflicting runtime system-context append"
1541                    );
1542                    continue;
1543                }
1544                if let Some(existing) = state
1545                    .applied
1546                    .iter()
1547                    .find(|applied| applied.idempotency_key.as_ref() == Some(key))
1548                    && !pending_system_context_matches(existing, append)
1549                {
1550                    tracing::warn!(
1551                        idempotency_key = %key,
1552                        "skipping conflicting runtime system-context append"
1553                    );
1554                    continue;
1555                }
1556                if let Some(existing) = new_appends
1557                    .iter()
1558                    .find(|pending| pending.idempotency_key.as_ref() == Some(key))
1559                {
1560                    if !pending_system_context_matches(existing, append) {
1561                        tracing::warn!(
1562                            idempotency_key = %key,
1563                            "skipping conflicting runtime system-context append"
1564                        );
1565                    }
1566                    continue;
1567                }
1568                if current_system_prompt.contains(&rendered) {
1569                    if !state
1570                        .applied
1571                        .iter()
1572                        .any(|applied| applied.idempotency_key.as_ref() == Some(key))
1573                    {
1574                        state.applied.push(append.clone());
1575                        state_dirty = true;
1576                    }
1577                    if state
1578                        .seen
1579                        .get(key)
1580                        .is_none_or(|seen| seen.state != SeenSystemContextState::Applied)
1581                    {
1582                        state.seen.insert(
1583                            key.clone(),
1584                            SeenSystemContextKey {
1585                                text: append.text.clone(),
1586                                source: append.source.clone(),
1587                                state: SeenSystemContextState::Applied,
1588                            },
1589                        );
1590                        state_dirty = true;
1591                    }
1592                    continue;
1593                }
1594            } else if new_appends.contains(append) || current_system_prompt.contains(&rendered) {
1595                continue;
1596            }
1597            new_appends.push(append.clone());
1598        }
1599        if new_appends.is_empty() {
1600            if state_dirty && let Err(err) = self.set_system_context_state(state) {
1601                tracing::warn!(error = %err, "failed to persist applied system-context state");
1602            }
1603            return;
1604        }
1605
1606        let rendered = new_appends
1607            .iter()
1608            .map(render_system_context_block)
1609            .collect::<Vec<_>>()
1610            .join(SYSTEM_CONTEXT_SEPARATOR);
1611
1612        let next = match self.messages.first() {
1613            Some(Message::System(sys)) if !sys.content.is_empty() => {
1614                format!("{}{}{}", sys.content, SYSTEM_CONTEXT_SEPARATOR, rendered)
1615            }
1616            _ => rendered,
1617        };
1618        self.set_system_prompt(next);
1619
1620        for append in new_appends {
1621            if let Some(key) = append.idempotency_key.as_ref() {
1622                state.seen.insert(
1623                    key.clone(),
1624                    SeenSystemContextKey {
1625                        text: append.text.clone(),
1626                        source: append.source.clone(),
1627                        state: SeenSystemContextState::Applied,
1628                    },
1629                );
1630                if state
1631                    .applied
1632                    .iter()
1633                    .any(|applied| applied.idempotency_key.as_ref() == Some(key))
1634                {
1635                    continue;
1636                }
1637            } else if state.applied.contains(&append) {
1638                continue;
1639            }
1640            state.applied.push(append);
1641        }
1642        if let Err(err) = self.set_system_context_state(state) {
1643            tracing::warn!(error = %err, "failed to persist applied system-context state");
1644        }
1645    }
1646
1647    /// Get the last assistant message text content.
1648    ///
1649    /// Concatenates both `Text` (display) and `Transcript` (spoken) blocks
1650    /// in document order, since both lanes project to the same human-readable
1651    /// stream. Lane provenance is preserved on the underlying `AssistantBlock`
1652    /// for callers that need it.
1653    pub fn last_assistant_text(&self) -> Option<String> {
1654        self.messages.iter().rev().find_map(|m| match m {
1655            Message::BlockAssistant(a) => {
1656                let mut buf = String::new();
1657                for block in &a.blocks {
1658                    match block {
1659                        crate::types::AssistantBlock::Text { text, .. }
1660                        | crate::types::AssistantBlock::Transcript { text, .. } => {
1661                            buf.push_str(text);
1662                        }
1663                        _ => {}
1664                    }
1665                }
1666                if buf.is_empty() { None } else { Some(buf) }
1667            }
1668            Message::Assistant(a) if !a.content.is_empty() => Some(a.content.clone()),
1669            _ => None,
1670        })
1671    }
1672
1673    /// Count tool calls made
1674    pub fn tool_call_count(&self) -> usize {
1675        self.messages
1676            .iter()
1677            .filter_map(|m| match m {
1678                Message::BlockAssistant(a) => Some(
1679                    a.blocks
1680                        .iter()
1681                        .filter(|b| matches!(b, crate::types::AssistantBlock::ToolUse { .. }))
1682                        .count(),
1683                ),
1684                Message::Assistant(a) => Some(a.tool_calls.len()),
1685                _ => None,
1686            })
1687            .sum()
1688    }
1689
1690    /// Get metadata
1691    pub fn metadata(&self) -> &serde_json::Map<String, serde_json::Value> {
1692        &self.metadata
1693    }
1694
1695    /// Set a metadata value
1696    pub fn set_metadata(&mut self, key: &str, value: serde_json::Value) {
1697        self.metadata.insert(key.to_string(), value);
1698        self.updated_at = SystemTime::now();
1699    }
1700
1701    /// Backfill a missing metadata value without changing `updated_at`.
1702    ///
1703    /// This is only for compatibility reads that need to hydrate metadata from
1704    /// an older projection. Semantic metadata mutations must use
1705    /// [`Session::set_metadata`] so the session timestamp advances.
1706    pub fn backfill_metadata_if_absent(&mut self, key: &str, value: serde_json::Value) -> bool {
1707        if self.metadata.contains_key(key) {
1708            false
1709        } else {
1710            self.metadata.insert(key.to_string(), value);
1711            true
1712        }
1713    }
1714
1715    /// Remove a metadata value.
1716    pub fn remove_metadata(&mut self, key: &str) {
1717        self.metadata.remove(key);
1718        self.updated_at = SystemTime::now();
1719    }
1720
1721    /// Store SessionMetadata in the session metadata map.
1722    pub fn set_session_metadata(
1723        &mut self,
1724        metadata: SessionMetadata,
1725    ) -> Result<(), serde_json::Error> {
1726        let value = serde_json::to_value(metadata)?;
1727        self.set_metadata(SESSION_METADATA_KEY, value);
1728        Ok(())
1729    }
1730
1731    /// Load SessionMetadata from the session metadata map.
1732    pub fn session_metadata(&self) -> Option<SessionMetadata> {
1733        self.metadata
1734            .get(SESSION_METADATA_KEY)
1735            .and_then(|value| serde_json::from_value(value.clone()).ok())
1736    }
1737
1738    /// Store durable system-context control state in the session metadata map.
1739    pub fn set_system_context_state(
1740        &mut self,
1741        state: SessionSystemContextState,
1742    ) -> Result<(), serde_json::Error> {
1743        let value = serde_json::to_value(state)?;
1744        self.set_metadata(SESSION_SYSTEM_CONTEXT_STATE_KEY, value);
1745        Ok(())
1746    }
1747
1748    /// Load durable system-context control state from the session metadata map.
1749    pub fn system_context_state(&self) -> Option<SessionSystemContextState> {
1750        self.metadata
1751            .get(SESSION_SYSTEM_CONTEXT_STATE_KEY)
1752            .and_then(|value| serde_json::from_value(value.clone()).ok())
1753    }
1754
1755    /// Store durable deferred-turn control state in the session metadata map.
1756    pub fn set_deferred_turn_state(
1757        &mut self,
1758        state: SessionDeferredTurnState,
1759    ) -> Result<(), serde_json::Error> {
1760        let value = serde_json::to_value(state)?;
1761        self.set_metadata(SESSION_DEFERRED_TURN_STATE_KEY, value);
1762        Ok(())
1763    }
1764
1765    /// Load durable deferred-turn control state from the session metadata map.
1766    pub fn deferred_turn_state(&self) -> Option<SessionDeferredTurnState> {
1767        self.metadata
1768            .get(SESSION_DEFERRED_TURN_STATE_KEY)
1769            .and_then(|value| serde_json::from_value(value.clone()).ok())
1770    }
1771
1772    /// Store recoverable build-only session state in the session metadata map.
1773    pub fn set_build_state(&mut self, state: SessionBuildState) -> Result<(), serde_json::Error> {
1774        let value = serde_json::to_value(state)?;
1775        self.set_metadata(SESSION_BUILD_STATE_KEY, value);
1776        Ok(())
1777    }
1778
1779    /// Load recoverable build-only session state from the session metadata map.
1780    pub fn build_state(&self) -> Option<SessionBuildState> {
1781        self.metadata
1782            .get(SESSION_BUILD_STATE_KEY)
1783            .and_then(|value| serde_json::from_value(value.clone()).ok())
1784    }
1785
1786    /// Store durable tool-visibility control state in the session metadata map.
1787    pub fn set_tool_visibility_state(
1788        &mut self,
1789        state: SessionToolVisibilityState,
1790    ) -> Result<(), serde_json::Error> {
1791        let value = serde_json::to_value(state)?;
1792        self.set_metadata(SESSION_TOOL_VISIBILITY_STATE_KEY, value);
1793        Ok(())
1794    }
1795
1796    /// Load durable tool-visibility control state from the session metadata map.
1797    pub fn tool_visibility_state(
1798        &self,
1799    ) -> Result<Option<SessionToolVisibilityState>, serde_json::Error> {
1800        self.try_tool_visibility_state()
1801    }
1802
1803    /// Load durable tool-visibility control state while distinguishing absent
1804    /// metadata from malformed canonical metadata.
1805    pub fn try_tool_visibility_state(
1806        &self,
1807    ) -> Result<Option<SessionToolVisibilityState>, serde_json::Error> {
1808        self.metadata
1809            .get(SESSION_TOOL_VISIBILITY_STATE_KEY)
1810            .map(|value| serde_json::from_value(value.clone()))
1811            .transpose()
1812    }
1813
1814    /// Store typed mob operator authority inside canonical build-state metadata.
1815    pub fn set_mob_tool_authority_context(
1816        &mut self,
1817        authority_context: Option<MobToolAuthorityContext>,
1818    ) -> Result<(), serde_json::Error> {
1819        let mut build_state = self.build_state().unwrap_or_default();
1820        build_state.mob_tool_authority_context = authority_context;
1821        self.set_build_state(build_state)
1822    }
1823
1824    /// Load typed mob operator authority from canonical build-state metadata.
1825    pub fn mob_tool_authority_context(&self) -> Option<MobToolAuthorityContext> {
1826        self.build_state()
1827            .and_then(|state| state.mob_tool_authority_context)
1828    }
1829
1830    /// Fork the session at a specific message index
1831    ///
1832    /// Creates a new session with a subset of messages. The messages are copied
1833    /// (not shared) since the new session has a different prefix.
1834    pub fn fork_at(&self, index: usize) -> Self {
1835        let now = SystemTime::now();
1836        let truncated = self.messages[..index.min(self.messages.len())].to_vec();
1837        Self {
1838            version: SESSION_VERSION,
1839            id: SessionId::new(),
1840            messages: Arc::new(truncated),
1841            created_at: now,
1842            updated_at: now,
1843            metadata: self.metadata.clone(),
1844            usage: self.usage.clone(),
1845        }
1846    }
1847
1848    /// Fork the session and replace the message at `message_index`.
1849    ///
1850    /// The returned session contains the original prefix before
1851    /// `message_index`, followed by the typed replacement. Later source
1852    /// messages are intentionally omitted so follow-up work continues from the
1853    /// edited branch rather than replaying stale descendants.
1854    pub fn fork_replacing(
1855        &self,
1856        message_index: usize,
1857        replacement: TranscriptReplacement,
1858    ) -> Result<Self, TranscriptEditError> {
1859        let Some(original) = self.messages.get(message_index) else {
1860            return Err(TranscriptEditError::MessageIndexOutOfBounds {
1861                message_index,
1862                message_count: self.messages.len(),
1863            });
1864        };
1865
1866        let replacement_message = match replacement {
1867            TranscriptReplacement::Message { message } => message,
1868            TranscriptReplacement::UserContentBlock { block_index, block } => {
1869                let Message::User(user) = original else {
1870                    return Err(TranscriptEditError::MessageRoleMismatch {
1871                        message_index,
1872                        expected: "user",
1873                        actual: message_role_name(original),
1874                    });
1875                };
1876                if block_index >= user.content.len() {
1877                    return Err(TranscriptEditError::BlockIndexOutOfBounds {
1878                        block_kind: "user content block",
1879                        block_index,
1880                        block_count: user.content.len(),
1881                    });
1882                }
1883                let mut edited = user.clone();
1884                edited.content[block_index] = block;
1885                Message::User(edited)
1886            }
1887            TranscriptReplacement::AssistantBlock { block_index, block } => {
1888                let Message::BlockAssistant(assistant) = original else {
1889                    return Err(TranscriptEditError::MessageRoleMismatch {
1890                        message_index,
1891                        expected: "block_assistant",
1892                        actual: message_role_name(original),
1893                    });
1894                };
1895                if block_index >= assistant.blocks.len() {
1896                    return Err(TranscriptEditError::BlockIndexOutOfBounds {
1897                        block_kind: "assistant block",
1898                        block_index,
1899                        block_count: assistant.blocks.len(),
1900                    });
1901                }
1902                let mut edited = assistant.clone();
1903                edited.blocks[block_index] = block;
1904                Message::BlockAssistant(edited)
1905            }
1906            TranscriptReplacement::ToolResultContentBlock {
1907                result_index,
1908                block_index,
1909                block,
1910            } => {
1911                let Message::ToolResults {
1912                    results,
1913                    created_at,
1914                } = original
1915                else {
1916                    return Err(TranscriptEditError::MessageRoleMismatch {
1917                        message_index,
1918                        expected: "tool_results",
1919                        actual: message_role_name(original),
1920                    });
1921                };
1922                let Some(result) = results.get(result_index) else {
1923                    return Err(TranscriptEditError::BlockIndexOutOfBounds {
1924                        block_kind: "tool result",
1925                        block_index: result_index,
1926                        block_count: results.len(),
1927                    });
1928                };
1929                if block_index >= result.content.len() {
1930                    return Err(TranscriptEditError::BlockIndexOutOfBounds {
1931                        block_kind: "tool result content block",
1932                        block_index,
1933                        block_count: result.content.len(),
1934                    });
1935                }
1936                let mut edited_results = results.clone();
1937                edited_results[result_index].content[block_index] = block;
1938                Message::ToolResults {
1939                    results: edited_results,
1940                    created_at: *created_at,
1941                }
1942            }
1943        };
1944
1945        let mut forked = self.fork_at(message_index);
1946        forked.push(replacement_message);
1947        Ok(forked)
1948    }
1949
1950    /// Fork the entire session (full history)
1951    ///
1952    /// This is O(1) - the new session shares the message buffer via Arc.
1953    /// Copy-on-write occurs when either session mutates its messages.
1954    pub fn fork(&self) -> Self {
1955        let now = SystemTime::now();
1956        Self {
1957            version: SESSION_VERSION,
1958            id: SessionId::new(),
1959            messages: Arc::clone(&self.messages),
1960            created_at: now,
1961            updated_at: now,
1962            metadata: self.metadata.clone(),
1963            usage: self.usage.clone(),
1964        }
1965    }
1966}
1967
1968impl Default for Session {
1969    fn default() -> Self {
1970        Self::new()
1971    }
1972}
1973
1974/// Summary metadata for listing sessions
1975#[derive(Debug, Clone, Serialize, Deserialize)]
1976#[serde(rename_all = "snake_case")]
1977pub struct SessionMeta {
1978    pub id: SessionId,
1979    pub created_at: SystemTime,
1980    pub updated_at: SystemTime,
1981    pub message_count: usize,
1982    pub total_tokens: u64,
1983    #[serde(default)]
1984    pub metadata: serde_json::Map<String, serde_json::Value>,
1985}
1986
1987/// Metadata required to reliably resume a session across interfaces.
1988#[derive(Debug, Clone, Serialize, Deserialize)]
1989#[serde(rename_all = "snake_case")]
1990pub struct SessionMetadata {
1991    /// Per-entity schema version byte.
1992    ///
1993    /// Defaults to `1` on read so pre-wave-c rows without the field
1994    /// deserialize cleanly; rewritten as `SESSION_METADATA_SCHEMA_VERSION`
1995    /// on the next `save()` after a successful migration pass.
1996    #[serde(default = "default_session_metadata_schema_version")]
1997    pub schema_version: u32,
1998    pub model: String,
1999    pub max_tokens: u32,
2000    #[serde(default = "default_structured_output_retries")]
2001    pub structured_output_retries: u32,
2002    pub provider: Provider,
2003    #[serde(default, skip_serializing_if = "Option::is_none")]
2004    pub self_hosted_server_id: Option<String>,
2005    #[serde(default, skip_serializing_if = "Option::is_none")]
2006    pub provider_params: Option<serde_json::Value>,
2007    pub tooling: SessionTooling,
2008    #[serde(default)]
2009    pub keep_alive: bool,
2010    pub comms_name: Option<String>,
2011    /// Friendly metadata for peer discovery (populated when comms is enabled).
2012    #[serde(default, skip_serializing_if = "Option::is_none")]
2013    pub peer_meta: Option<PeerMeta>,
2014    /// Realm identity for cross-surface storage sharing/isolation.
2015    #[serde(default, skip_serializing_if = "Option::is_none")]
2016    pub realm_id: Option<String>,
2017    /// Optional process/agent instance identifier within a realm.
2018    #[serde(default, skip_serializing_if = "Option::is_none")]
2019    pub instance_id: Option<String>,
2020    /// Backend pinned by the realm manifest (e.g. "sqlite", "jsonl").
2021    #[serde(default, skip_serializing_if = "Option::is_none")]
2022    pub backend: Option<String>,
2023    /// Config generation used when this session was created/resumed.
2024    #[serde(default, skip_serializing_if = "Option::is_none")]
2025    pub config_generation: Option<u64>,
2026    /// Realm-scoped auth binding (Phase 3 provider-auth redesign).
2027    ///
2028    /// Persisted intent for the auth/backend binding this session resolved
2029    /// through. On resume, `apply_resumed_session_metadata` writes this
2030    /// back into `AgentBuildConfig.auth_binding` so the same realm
2031    /// binding is re-resolved. Never carries secret material — leases
2032    /// are rebuilt from the active realm connection set at resume time.
2033    /// Older persisted sessions without the field deserialize as `None`
2034    /// (backward compatible via `#[serde(default)]`).
2035    #[serde(default, skip_serializing_if = "Option::is_none")]
2036    pub auth_binding: Option<crate::AuthBindingRef>,
2037}
2038
2039fn default_structured_output_retries() -> u32 {
2040    2
2041}
2042
2043fn default_session_metadata_schema_version() -> u32 {
2044    1
2045}
2046
2047/// Canonical durable LLM identity for a session.
2048#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
2049#[serde(rename_all = "snake_case")]
2050pub struct SessionLlmIdentity {
2051    pub model: String,
2052    pub provider: Provider,
2053    #[serde(default, skip_serializing_if = "Option::is_none")]
2054    pub self_hosted_server_id: Option<String>,
2055    #[serde(default, skip_serializing_if = "Option::is_none")]
2056    pub provider_params: Option<serde_json::Value>,
2057    /// Realm-scoped auth binding this session resolves credentials
2058    /// through. Carried on the identity so mid-session hot-swaps
2059    /// (`apply_live_session_llm_identity`) re-resolve against the
2060    /// same realm the session was created with — preventing
2061    /// cross-realm credential bleed in multi-tenant setups. Dogma
2062    /// §12 (dynamic policy follows dynamic identity): on swap the
2063    /// factory re-enters `ProviderRuntimeRegistry::resolve` against
2064    /// this binding, not a new synthesized env-default realm.
2065    ///
2066    /// Projection (dogma §1/§13): canonical owner is
2067    /// `SessionMetadata.auth_binding`; this field is the
2068    /// read/write projection used by hot-swap.
2069    #[serde(default, skip_serializing_if = "Option::is_none")]
2070    pub auth_binding: Option<crate::AuthBindingRef>,
2071}
2072
2073/// Typed per-turn override request for a session LLM identity.
2074pub struct SessionLlmIdentityOverride<'a> {
2075    pub model: Option<&'a str>,
2076    pub provider: Option<Provider>,
2077    pub provider_params: Option<&'a serde_json::Value>,
2078    pub clear_provider_params: bool,
2079    pub auth_binding: Option<&'a crate::AuthBindingRef>,
2080    pub clear_auth_binding: bool,
2081}
2082
2083#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
2084pub enum SessionLlmIdentityOverrideError {
2085    #[error("provider override requires model on an existing session")]
2086    ProviderRequiresModel,
2087    #[error("clear_provider_params cannot be combined with provider_params")]
2088    SetAndClearProviderParams,
2089    #[error("clear_auth_binding cannot be combined with auth_binding")]
2090    SetAndClearAuthBinding,
2091    #[error("{0}")]
2092    ProviderModelMismatch(String),
2093    #[error("self-hosted provider requires a registered model alias; '{model}' is not configured")]
2094    MissingSelfHostedAlias { model: String },
2095}
2096
2097/// Resolve a turn-time model/provider/auth override against the current
2098/// durable session identity.
2099///
2100/// The model registry is the authority for catalog ownership. A model-only
2101/// override follows catalog ownership when the target model is registered;
2102/// uncatalogued models keep the current provider so custom aliases remain
2103/// possible.
2104pub fn resolve_session_llm_identity_override(
2105    current: &SessionLlmIdentity,
2106    registry: &crate::ModelRegistry,
2107    overrides: SessionLlmIdentityOverride<'_>,
2108) -> Result<SessionLlmIdentity, SessionLlmIdentityOverrideError> {
2109    if overrides.provider.is_some() && overrides.model.is_none() {
2110        return Err(SessionLlmIdentityOverrideError::ProviderRequiresModel);
2111    }
2112    if overrides.clear_provider_params && overrides.provider_params.is_some() {
2113        return Err(SessionLlmIdentityOverrideError::SetAndClearProviderParams);
2114    }
2115    if overrides.clear_auth_binding && overrides.auth_binding.is_some() {
2116        return Err(SessionLlmIdentityOverrideError::SetAndClearAuthBinding);
2117    }
2118
2119    let model = overrides
2120        .model
2121        .map(str::to_string)
2122        .unwrap_or_else(|| current.model.clone());
2123    let provider = if let Some(provider) = overrides.provider {
2124        provider
2125    } else if overrides.model.is_some() {
2126        registry
2127            .entry(&model)
2128            .map_or(current.provider, |entry| entry.provider)
2129    } else {
2130        current.provider
2131    };
2132
2133    if (overrides.model.is_some() || overrides.provider.is_some())
2134        && let Some(reason) = registry.provider_override_mismatch_reason(provider, &model)
2135    {
2136        return Err(SessionLlmIdentityOverrideError::ProviderModelMismatch(
2137            reason,
2138        ));
2139    }
2140
2141    let provider_params = if overrides.clear_provider_params {
2142        None
2143    } else {
2144        overrides
2145            .provider_params
2146            .cloned()
2147            .or_else(|| current.provider_params.clone())
2148    };
2149    let self_hosted_server_id = if provider == Provider::SelfHosted {
2150        if overrides.model.is_none() {
2151            current.self_hosted_server_id.clone().or_else(|| {
2152                registry
2153                    .entry_for_provider(Provider::SelfHosted, &model)
2154                    .and_then(|entry| entry.self_hosted.as_ref())
2155                    .map(|server| server.server_id.clone())
2156            })
2157        } else {
2158            let entry = registry
2159                .entry_for_provider(Provider::SelfHosted, &model)
2160                .ok_or_else(|| SessionLlmIdentityOverrideError::MissingSelfHostedAlias {
2161                    model: model.clone(),
2162                })?;
2163            entry
2164                .self_hosted
2165                .as_ref()
2166                .map(|server| server.server_id.clone())
2167        }
2168    } else {
2169        None
2170    };
2171
2172    let auth_binding = if overrides.clear_auth_binding
2173        || (provider != current.provider && overrides.auth_binding.is_none())
2174    {
2175        None
2176    } else {
2177        overrides
2178            .auth_binding
2179            .cloned()
2180            .or_else(|| current.auth_binding.clone())
2181    };
2182
2183    Ok(SessionLlmIdentity {
2184        model,
2185        provider,
2186        self_hosted_server_id,
2187        provider_params,
2188        auth_binding,
2189    })
2190}
2191
2192/// Live request policy paired with a session LLM identity hot-swap.
2193///
2194/// `SessionLlmIdentity` is the durable semantic identity. This projection is
2195/// the per-turn request policy the live agent must use for the next LLM call,
2196/// including provider params and provider-native tool defaults resolved for
2197/// the same target model/provider.
2198#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
2199#[serde(rename_all = "snake_case")]
2200pub struct SessionLlmRequestPolicy {
2201    pub model: String,
2202    #[serde(default, skip_serializing_if = "Option::is_none")]
2203    pub provider_params: Option<serde_json::Value>,
2204    #[serde(default, skip_serializing_if = "Option::is_none")]
2205    pub provider_tool_defaults: Option<serde_json::Value>,
2206}
2207
2208impl SessionMetadata {
2209    /// Return the current durable LLM identity for this session.
2210    pub fn llm_identity(&self) -> SessionLlmIdentity {
2211        SessionLlmIdentity {
2212            model: self.model.clone(),
2213            provider: self.provider,
2214            self_hosted_server_id: self.self_hosted_server_id.clone(),
2215            provider_params: self.provider_params.clone(),
2216            auth_binding: self.auth_binding.clone(),
2217        }
2218    }
2219
2220    /// Overwrite the durable LLM identity while preserving unrelated session metadata.
2221    pub fn apply_llm_identity(&mut self, identity: &SessionLlmIdentity) {
2222        self.model = identity.model.clone();
2223        self.provider = identity.provider;
2224        self.self_hosted_server_id = identity.self_hosted_server_id.clone();
2225        self.provider_params = identity.provider_params.clone();
2226        self.auth_binding = identity.auth_binding.clone();
2227    }
2228}
2229
2230/// Key used to store SessionMetadata in Session metadata map.
2231pub const SESSION_METADATA_KEY: &str = "session_metadata";
2232
2233/// Caller intent for a tool category.
2234///
2235/// Distinguishes "no opinion / didn't exist" (`Inherit`) from explicit
2236/// `Enable` / `Disable` so that resumed sessions don't freeze tool
2237/// availability at the capabilities of the Meerkat version that created them.
2238///
2239/// **Dogma §10:** Inherit, disable, and set are different facts.
2240#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
2241#[serde(rename_all = "snake_case")]
2242pub enum ToolCategoryOverride {
2243    /// No explicit intent — inherit runtime/factory default.
2244    #[default]
2245    Inherit,
2246    /// Explicitly enabled by caller.
2247    Enable,
2248    /// Explicitly disabled by caller.
2249    Disable,
2250}
2251
2252impl ToolCategoryOverride {
2253    /// Resolve this override against a runtime default.
2254    ///
2255    /// - `Enable` → `true`
2256    /// - `Disable` → `false`
2257    /// - `Inherit` → `runtime_default`
2258    #[must_use]
2259    pub fn resolve(self, runtime_default: bool) -> bool {
2260        match self {
2261            Self::Enable => true,
2262            Self::Disable => false,
2263            Self::Inherit => runtime_default,
2264        }
2265    }
2266
2267    /// Convert to `Option<bool>` for feeding `AgentBuildConfig` override fields.
2268    ///
2269    /// - `Enable` → `Some(true)`
2270    /// - `Disable` → `Some(false)`
2271    /// - `Inherit` → `None` (factory default wins)
2272    #[must_use]
2273    pub fn to_override(self) -> Option<bool> {
2274        match self {
2275            Self::Enable => Some(true),
2276            Self::Disable => Some(false),
2277            Self::Inherit => None,
2278        }
2279    }
2280
2281    /// Construct from a resolved effective bool.
2282    ///
2283    /// **Warning:** this collapses `Inherit` into `Enable`/`Disable`. Prefer
2284    /// [`from_override`] when persisting session metadata so that `Inherit`
2285    /// survives across save/resume cycles. Only use `from_effective` in test
2286    /// helpers or when constructing metadata from external sources that only
2287    /// provide a resolved bool.
2288    #[must_use]
2289    pub fn from_effective(enabled: bool) -> Self {
2290        if enabled { Self::Enable } else { Self::Disable }
2291    }
2292
2293    /// Construct from an `Option<bool>` override field, preserving `Inherit`.
2294    ///
2295    /// - `Some(true)` → `Enable`
2296    /// - `Some(false)` → `Disable`
2297    /// - `None` → `Inherit` (factory default was used, no explicit intent)
2298    ///
2299    /// This is the inverse of [`to_override`] and should be used when persisting
2300    /// session tooling metadata so that `Inherit` survives across save/resume
2301    /// cycles.
2302    #[must_use]
2303    pub fn from_override(value: Option<bool>) -> Self {
2304        match value {
2305            Some(true) => Self::Enable,
2306            Some(false) => Self::Disable,
2307            None => Self::Inherit,
2308        }
2309    }
2310}
2311
2312/// Backward-compatible deserializer: accepts both old `bool` JSON and new
2313/// tri-state `"inherit"` / `"enable"` / `"disable"` strings.
2314///
2315/// Old persisted sessions have `"mob": false` or `"builtins": true`.
2316/// - `true`  → `Enable`  (user explicitly had it on)
2317/// - `false` → `Inherit` (can't distinguish "disabled" from "didn't exist")
2318/// - string  → normal enum deserialization
2319fn deserialize_tool_category_compat<'de, D>(
2320    deserializer: D,
2321) -> Result<ToolCategoryOverride, D::Error>
2322where
2323    D: serde::Deserializer<'de>,
2324{
2325    use serde::de;
2326
2327    struct ToolCategoryVisitor;
2328
2329    impl de::Visitor<'_> for ToolCategoryVisitor {
2330        type Value = ToolCategoryOverride;
2331
2332        fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2333            formatter.write_str("a boolean or one of \"inherit\", \"enable\", \"disable\"")
2334        }
2335
2336        fn visit_bool<E: de::Error>(self, v: bool) -> Result<Self::Value, E> {
2337            Ok(if v {
2338                ToolCategoryOverride::Enable
2339            } else {
2340                ToolCategoryOverride::Inherit
2341            })
2342        }
2343
2344        fn visit_str<E: de::Error>(self, v: &str) -> Result<Self::Value, E> {
2345            match v {
2346                "inherit" => Ok(ToolCategoryOverride::Inherit),
2347                "enable" => Ok(ToolCategoryOverride::Enable),
2348                "disable" => Ok(ToolCategoryOverride::Disable),
2349                _ => Err(de::Error::unknown_variant(
2350                    v,
2351                    &["inherit", "enable", "disable"],
2352                )),
2353            }
2354        }
2355    }
2356
2357    deserializer.deserialize_any(ToolCategoryVisitor)
2358}
2359
2360fn normalize_realtime_item_id(item_id: String) -> Option<String> {
2361    let trimmed = item_id.trim();
2362    (!trimmed.is_empty()).then(|| trimmed.to_string())
2363}
2364
2365fn normalize_realtime_previous_item_id(previous_item_id: Option<String>) -> Option<String> {
2366    previous_item_id.and_then(normalize_realtime_item_id)
2367}
2368
2369fn normalize_realtime_response_id(response_id: String) -> Option<String> {
2370    normalize_realtime_item_id(response_id)
2371}
2372
2373fn normalize_realtime_optional_response_id(response_id: Option<String>) -> Option<String> {
2374    response_id.and_then(normalize_realtime_response_id)
2375}
2376
2377/// T9/T10: tag the assistant item with its output lane (Display vs Spoken).
2378///
2379/// First lane wins: if a later delta tries to promote an item already
2380/// classified as the other lane, we keep the existing lane and emit a
2381/// `tracing::warn!`. The materializer can only flush one block-type per
2382/// item; mixed-lane content on the same `item_id` is not expected from
2383/// any provider and would be a provider-side bug. Items with empty
2384/// content (no deltas yet observed) accept any lane.
2385///
2386/// Returns `true` when the item now carries the requested `lane` (either
2387/// the lane already matched, or the item was empty so the lane was
2388/// promoted). Returns `false` when the item already carried the *other*
2389/// lane with staged content — in that case the caller MUST skip any
2390/// content insert it was about to perform on `lane`, otherwise it would
2391/// clobber the locked-in lane content (R5-6 sibling: silent-clobber bug).
2392#[must_use]
2393fn promote_item_lane(item: &mut RealtimeTranscriptItemState, lane: TranscriptLane) -> bool {
2394    if item.lane == lane {
2395        return true;
2396    }
2397    let has_content = item.content_segments.values().any(|s| !s.is_empty());
2398    if !has_content {
2399        // No content has been staged on the original lane yet; safe to
2400        // re-classify.
2401        item.lane = lane;
2402        return true;
2403    }
2404    tracing::warn!(
2405        existing_lane = ?item.lane,
2406        observed_lane = ?lane,
2407        "ignoring realtime transcript lane conflict on item with staged content"
2408    );
2409    false
2410}
2411
2412fn observe_realtime_item(
2413    state: &mut SessionRealtimeTranscriptState,
2414    item_id: String,
2415    previous_item_id: Option<String>,
2416    role: RealtimeTranscriptRole,
2417    response_id: Option<String>,
2418) -> Option<&mut RealtimeTranscriptItemState> {
2419    let item_id = normalize_realtime_item_id(item_id)?;
2420    let previous_item_id = normalize_realtime_previous_item_id(previous_item_id);
2421    let response_id = normalize_realtime_optional_response_id(response_id);
2422    if !state
2423        .first_seen_order
2424        .iter()
2425        .any(|existing| existing == &item_id)
2426    {
2427        state.first_seen_order.push(item_id.clone());
2428    }
2429    let item = state.items.entry(item_id.clone()).or_insert_with(|| {
2430        RealtimeTranscriptItemState::new(role, previous_item_id.clone(), response_id.clone())
2431    });
2432    if item.skipped {
2433        if item.previous_item_id.is_none() && previous_item_id.is_some() {
2434            item.previous_item_id = previous_item_id;
2435        }
2436        tracing::warn!(
2437            item_id = %item_id,
2438            observed_role = ?role,
2439            "ignoring realtime transcript content for item already marked as a contentless causal anchor"
2440        );
2441        return None;
2442    }
2443    if item.role != role {
2444        tracing::warn!(
2445            item_id = %item_id,
2446            existing_role = ?item.role,
2447            observed_role = ?role,
2448            "ignoring realtime transcript item role conflict"
2449        );
2450        return None;
2451    }
2452    if item.previous_item_id.is_none() && previous_item_id.is_some() {
2453        item.previous_item_id = previous_item_id;
2454    }
2455    if let Some(response_id) = response_id {
2456        match item.response_id.as_ref() {
2457            Some(existing) if existing != &response_id => {
2458                tracing::warn!(
2459                    item_id = %item_id,
2460                    existing_response_id = %existing,
2461                    observed_response_id = %response_id,
2462                    "ignoring realtime transcript item response conflict"
2463                );
2464                return None;
2465            }
2466            Some(_) => {}
2467            None => item.response_id = Some(response_id),
2468        }
2469    }
2470    Some(item)
2471}
2472
2473fn observe_realtime_skipped_item(
2474    state: &mut SessionRealtimeTranscriptState,
2475    item_id: String,
2476    previous_item_id: Option<String>,
2477) {
2478    let Some(item_id) = normalize_realtime_item_id(item_id) else {
2479        return;
2480    };
2481    let previous_item_id = normalize_realtime_previous_item_id(previous_item_id);
2482    if !state
2483        .first_seen_order
2484        .iter()
2485        .any(|existing| existing == &item_id)
2486    {
2487        state.first_seen_order.push(item_id.clone());
2488    }
2489    let item = state
2490        .items
2491        .entry(item_id)
2492        .or_insert_with(|| RealtimeTranscriptItemState::skipped(previous_item_id.clone()));
2493    if item.previous_item_id.is_none() && previous_item_id.is_some() {
2494        item.previous_item_id = previous_item_id;
2495    }
2496    if item.materialized || item.skipped {
2497        return;
2498    }
2499    if item.role != RealtimeTranscriptRole::Assistant {
2500        tracing::warn!(
2501            existing_role = ?item.role,
2502            "ignoring realtime skipped-item observation for non-assistant item"
2503        );
2504        return;
2505    }
2506    if !item.content_segments.is_empty() {
2507        tracing::warn!("ignoring realtime skipped-item observation for content-bearing item");
2508        return;
2509    }
2510    item.skipped = true;
2511    item.ready = true;
2512}
2513
2514fn mark_realtime_assistant_response_ready(
2515    state: &mut SessionRealtimeTranscriptState,
2516    response_id: &str,
2517) {
2518    for item in state.items.values_mut() {
2519        if item.role == RealtimeTranscriptRole::Assistant
2520            && item.response_id.as_deref() == Some(response_id)
2521            && !item.materialized
2522            && !item.text().is_empty()
2523        {
2524            item.ready = true;
2525        }
2526    }
2527}
2528
2529fn discard_realtime_assistant_response(
2530    state: &mut SessionRealtimeTranscriptState,
2531    response_id: &str,
2532) {
2533    state
2534        .discarded_assistant_response_ids
2535        .insert(response_id.to_string());
2536    for item in state.items.values_mut() {
2537        if item.role == RealtimeTranscriptRole::Assistant
2538            && item.response_id.as_deref() == Some(response_id)
2539            && !item.materialized
2540        {
2541            item.content_segments.clear();
2542            item.skipped = true;
2543            item.ready = true;
2544        }
2545    }
2546    state.assistant_completions.remove(response_id);
2547}
2548
2549/// R5-5: lane-scoped discard for barge-in (`AssistantTurnInterrupted`).
2550///
2551/// Drops Spoken-lane staged items (the heard audio + transcript the user
2552/// spoke over) and preserves Display-lane items as committable content.
2553/// Items with empty content (no deltas observed yet, lane still defaulting
2554/// to `Display`) are also discarded — they carry no preserveable Display
2555/// text. The caller is responsible for inserting a synthetic
2556/// `assistant_completions` entry so the materializer can flush retained
2557/// Display items immediately.
2558///
2559/// Unlike [`discard_realtime_assistant_response`], this helper does NOT
2560/// remove the response from `assistant_completions`: barge-in must seed a
2561/// completion entry so retained Display items can materialize before any
2562/// late `AssistantTurnCompleted` (which would short-circuit on the
2563/// discarded-set membership) arrives.
2564fn discard_realtime_assistant_response_by_lane(
2565    state: &mut SessionRealtimeTranscriptState,
2566    response_id: &str,
2567) {
2568    state
2569        .discarded_assistant_response_ids
2570        .insert(response_id.to_string());
2571    for item in state.items.values_mut() {
2572        if item.role != RealtimeTranscriptRole::Assistant
2573            || item.response_id.as_deref() != Some(response_id)
2574            || item.materialized
2575        {
2576            continue;
2577        }
2578        let has_content = item.content_segments.values().any(|s| !s.is_empty());
2579        if item.lane == TranscriptLane::Display && has_content {
2580            // Retain the Display content; the caller's synthetic
2581            // completion entry + `mark_realtime_assistant_response_ready`
2582            // will flag it ready for the next materializer pass.
2583            continue;
2584        }
2585        // Spoken (or empty Display, which carries no preserveable text):
2586        // drop content and mark as a contentless predecessor so chained
2587        // items downstream of it can still materialize.
2588        item.content_segments.clear();
2589        item.skipped = true;
2590        item.ready = true;
2591    }
2592}
2593
2594fn realtime_transcript_order(state: &SessionRealtimeTranscriptState) -> Vec<String> {
2595    let mut roots = Vec::new();
2596    let mut children: BTreeMap<String, Vec<String>> = BTreeMap::new();
2597    for item_id in &state.first_seen_order {
2598        let Some(item) = state.items.get(item_id) else {
2599            continue;
2600        };
2601        if let Some(previous) = item.previous_item_id.as_ref()
2602            && state.items.contains_key(previous)
2603        {
2604            children
2605                .entry(previous.clone())
2606                .or_default()
2607                .push(item_id.clone());
2608        } else {
2609            roots.push(item_id.clone());
2610        }
2611    }
2612    roots.sort_by_key(|item_id| realtime_first_seen_index(state, item_id));
2613    for child_ids in children.values_mut() {
2614        child_ids.sort_by_key(|item_id| realtime_first_seen_index(state, item_id));
2615    }
2616
2617    let mut ordered = Vec::new();
2618    let mut visited = BTreeSet::new();
2619    for root in roots {
2620        visit_realtime_transcript_item(&root, &children, &mut visited, &mut ordered);
2621    }
2622    for item_id in &state.first_seen_order {
2623        visit_realtime_transcript_item(item_id, &children, &mut visited, &mut ordered);
2624    }
2625    ordered
2626}
2627
2628fn realtime_first_seen_index(state: &SessionRealtimeTranscriptState, item_id: &str) -> usize {
2629    state
2630        .first_seen_order
2631        .iter()
2632        .position(|existing| existing == item_id)
2633        .unwrap_or(usize::MAX)
2634}
2635
2636fn visit_realtime_transcript_item(
2637    item_id: &str,
2638    children: &BTreeMap<String, Vec<String>>,
2639    visited: &mut BTreeSet<String>,
2640    ordered: &mut Vec<String>,
2641) {
2642    if !visited.insert(item_id.to_string()) {
2643        return;
2644    }
2645    ordered.push(item_id.to_string());
2646    if let Some(child_ids) = children.get(item_id) {
2647        for child_id in child_ids {
2648            visit_realtime_transcript_item(child_id, children, visited, ordered);
2649        }
2650    }
2651}
2652
2653fn realtime_predecessor_materialized(
2654    state: &SessionRealtimeTranscriptState,
2655    previous_item_id: Option<&str>,
2656) -> bool {
2657    match previous_item_id {
2658        None => true,
2659        Some(previous_item_id) => state
2660            .items
2661            .get(previous_item_id)
2662            .is_some_and(|item| item.materialized),
2663    }
2664}
2665
2666/// Tooling intent captured at session creation time.
2667///
2668/// Fields use [`ToolCategoryOverride`] to distinguish "no opinion" from
2669/// explicit enable/disable (Dogma §10). On resume, `Inherit` falls through
2670/// to the factory's current runtime default, allowing new tool categories
2671/// to become available without re-creating the session.
2672#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
2673#[serde(rename_all = "snake_case")]
2674pub struct SessionTooling {
2675    #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
2676    pub builtins: ToolCategoryOverride,
2677    #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
2678    pub shell: ToolCategoryOverride,
2679    #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
2680    pub comms: ToolCategoryOverride,
2681    /// Mob (multi-agent orchestration) tools.
2682    #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
2683    pub mob: ToolCategoryOverride,
2684    /// Semantic memory.
2685    #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
2686    pub memory: ToolCategoryOverride,
2687    /// Scheduler tools.
2688    #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
2689    pub schedule: ToolCategoryOverride,
2690    /// WorkGraph durable work tools.
2691    #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
2692    pub workgraph: ToolCategoryOverride,
2693    /// Assistant image generation.
2694    #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
2695    pub image_generation: ToolCategoryOverride,
2696    /// Meerkat-owned fallback web search.
2697    #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
2698    pub web_search: ToolCategoryOverride,
2699    /// Active skills at session creation time (for deterministic resume).
2700    #[serde(default, skip_serializing_if = "Option::is_none")]
2701    pub active_skills: Option<Vec<crate::skills::SkillKey>>,
2702}
2703
2704impl From<&Session> for SessionMeta {
2705    fn from(session: &Session) -> Self {
2706        Self {
2707            id: session.id.clone(),
2708            created_at: session.created_at,
2709            updated_at: session.updated_at,
2710            message_count: session.messages.len(),
2711            total_tokens: session.total_tokens(),
2712            metadata: session.metadata.clone(),
2713        }
2714    }
2715}
2716
2717#[cfg(test)]
2718#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
2719mod tests {
2720    use super::*;
2721    use crate::types::{
2722        AssistantMessage, BlockAssistantMessage, StopReason, SystemMessage, UserMessage,
2723    };
2724    use std::sync::Arc;
2725
2726    fn block_assistant_text(message: &BlockAssistantMessage) -> String {
2727        message
2728            .blocks
2729            .iter()
2730            .filter_map(|block| match block {
2731                AssistantBlock::Text { text, .. } => Some(text.as_str()),
2732                _ => None,
2733            })
2734            .collect()
2735    }
2736
2737    #[test]
2738    fn test_session_new() {
2739        let session = Session::new();
2740        assert_eq!(session.version(), SESSION_VERSION);
2741        assert!(session.messages().is_empty());
2742        assert!(session.created_at() <= session.updated_at());
2743    }
2744
2745    #[test]
2746    fn llm_identity_model_override_switches_to_catalog_provider() {
2747        let registry = crate::Config::default().model_registry().unwrap();
2748        let current = SessionLlmIdentity {
2749            model: "claude-sonnet-4-5".to_string(),
2750            provider: Provider::Anthropic,
2751            self_hosted_server_id: None,
2752            provider_params: None,
2753            auth_binding: Some(crate::AuthBindingRef {
2754                realm: crate::RealmId::parse("tenant_a").unwrap(),
2755                binding: crate::BindingId::parse("anthropic_default").unwrap(),
2756                profile: None,
2757            }),
2758        };
2759
2760        let resolved = resolve_session_llm_identity_override(
2761            &current,
2762            &registry,
2763            SessionLlmIdentityOverride {
2764                model: Some("gpt-5.5"),
2765                provider: None,
2766                provider_params: None,
2767                clear_provider_params: false,
2768                auth_binding: None,
2769                clear_auth_binding: false,
2770            },
2771        )
2772        .unwrap();
2773
2774        assert_eq!(resolved.model, "gpt-5.5");
2775        assert_eq!(resolved.provider, Provider::OpenAI);
2776        assert!(
2777            resolved.auth_binding.is_none(),
2778            "provider switches must not inherit a binding from the previous provider"
2779        );
2780    }
2781
2782    #[test]
2783    fn llm_identity_model_override_keeps_uncatalogued_model_on_current_provider() {
2784        let registry = crate::Config::default().model_registry().unwrap();
2785        let current = SessionLlmIdentity {
2786            model: "custom-model".to_string(),
2787            provider: Provider::Anthropic,
2788            self_hosted_server_id: None,
2789            provider_params: None,
2790            auth_binding: None,
2791        };
2792
2793        let resolved = resolve_session_llm_identity_override(
2794            &current,
2795            &registry,
2796            SessionLlmIdentityOverride {
2797                model: Some("uncatalogued-custom-model"),
2798                provider: None,
2799                provider_params: None,
2800                clear_provider_params: false,
2801                auth_binding: None,
2802                clear_auth_binding: false,
2803            },
2804        )
2805        .unwrap();
2806
2807        assert_eq!(resolved.model, "uncatalogued-custom-model");
2808        assert_eq!(resolved.provider, Provider::Anthropic);
2809    }
2810
2811    #[test]
2812    fn realtime_transcript_append_is_idempotent_by_provider_item_and_delta_id() {
2813        let mut session = Session::new();
2814
2815        let user = RealtimeTranscriptEvent::UserTranscriptFinal {
2816            item_id: "item_user".to_string(),
2817            previous_item_id: None,
2818            content_index: 0,
2819            text: "hello".to_string(),
2820        };
2821        assert!(
2822            !session
2823                .append_realtime_transcript_event(user.clone())
2824                .is_inert()
2825        );
2826        assert!(session.append_realtime_transcript_event(user).is_inert());
2827
2828        let delta = RealtimeTranscriptEvent::AssistantTextDelta {
2829            response_id: "resp_assistant".to_string(),
2830            delta_id: "evt_delta_1".to_string(),
2831            item_id: "item_assistant".to_string(),
2832            previous_item_id: Some("item_user".to_string()),
2833            content_index: 0,
2834            delta: "hi".to_string(),
2835        };
2836        assert!(
2837            session
2838                .append_realtime_transcript_event(delta.clone())
2839                .is_inert()
2840        );
2841        assert!(session.append_realtime_transcript_event(delta).is_inert());
2842
2843        let terminal = RealtimeTranscriptEvent::AssistantTurnCompleted {
2844            response_id: "resp_assistant".to_string(),
2845            stop_reason: StopReason::EndTurn,
2846            usage: Usage::default(),
2847        };
2848        assert!(
2849            !session
2850                .append_realtime_transcript_event(terminal.clone())
2851                .is_inert()
2852        );
2853        assert!(
2854            session
2855                .append_realtime_transcript_event(terminal)
2856                .is_inert()
2857        );
2858
2859        assert_eq!(session.messages().len(), 2);
2860        assert!(matches!(
2861            &session.messages()[0],
2862            Message::User(user) if user.text_content() == "hello"
2863        ));
2864        assert!(matches!(
2865            &session.messages()[1],
2866            Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "hi"
2867        ));
2868    }
2869
2870    /// R5-7: `AssistantTranscriptFinalText` injects authoritative final text
2871    /// into the staged item. Verifies the override semantics: a partial
2872    /// delta is replaced, not concatenated, and the item promotes to the
2873    /// Spoken lane so flush emits `AssistantBlock::Transcript`.
2874    #[test]
2875    fn realtime_transcript_final_text_overrides_partial_delta_and_promotes_to_spoken_lane() {
2876        let mut session = Session::new();
2877
2878        // Partial delta accumulates "incom" — simulating delta loss before
2879        // the final arrives.
2880        assert!(
2881            session
2882                .append_realtime_transcript_event(
2883                    RealtimeTranscriptEvent::AssistantTranscriptDelta {
2884                        response_id: "resp_a".to_string(),
2885                        delta_id: "evt_1".to_string(),
2886                        item_id: "item_a".to_string(),
2887                        previous_item_id: None,
2888                        content_index: 0,
2889                        delta: "incom".to_string(),
2890                    }
2891                )
2892                .is_inert()
2893        );
2894
2895        // Authoritative final text overrides the staged content.
2896        assert!(
2897            session
2898                .append_realtime_transcript_event(
2899                    RealtimeTranscriptEvent::AssistantTranscriptFinalText {
2900                        response_id: "resp_a".to_string(),
2901                        item_id: "item_a".to_string(),
2902                        content_index: 0,
2903                        text: "complete answer".to_string(),
2904                    }
2905                )
2906                .is_inert()
2907        );
2908
2909        // Turn completion drives the flush.
2910        let outcome = session.append_realtime_transcript_event(
2911            RealtimeTranscriptEvent::AssistantTurnCompleted {
2912                response_id: "resp_a".to_string(),
2913                stop_reason: StopReason::EndTurn,
2914                usage: Usage::default(),
2915            },
2916        );
2917        assert!(!outcome.is_inert());
2918
2919        // Verify the materialized block has the final's authoritative text
2920        // (not the partial "incom") and the Spoken lane.
2921        assert_eq!(session.messages().len(), 1);
2922        match &session.messages()[0] {
2923            Message::BlockAssistant(assistant) => {
2924                let mut found_transcript = false;
2925                for block in &assistant.blocks {
2926                    if let AssistantBlock::Transcript { text, .. } = block {
2927                        assert_eq!(text, "complete answer");
2928                        found_transcript = true;
2929                    }
2930                }
2931                assert!(
2932                    found_transcript,
2933                    "AssistantTranscriptFinalText must promote to the Spoken lane and \
2934                     materialize as AssistantBlock::Transcript"
2935                );
2936            }
2937            other => unreachable!("expected BlockAssistant, got {other:?}"),
2938        }
2939    }
2940
2941    /// R5-7: `AssistantTranscriptFinalText` works for final-only providers
2942    /// where no prior delta has staged an item.
2943    #[test]
2944    fn realtime_transcript_final_text_creates_item_when_no_delta_staged() {
2945        let mut session = Session::new();
2946
2947        assert!(
2948            session
2949                .append_realtime_transcript_event(
2950                    RealtimeTranscriptEvent::AssistantTranscriptFinalText {
2951                        response_id: "resp_a".to_string(),
2952                        item_id: "item_a".to_string(),
2953                        content_index: 0,
2954                        text: "spoken-final-only".to_string(),
2955                    }
2956                )
2957                .is_inert()
2958        );
2959
2960        let outcome = session.append_realtime_transcript_event(
2961            RealtimeTranscriptEvent::AssistantTurnCompleted {
2962                response_id: "resp_a".to_string(),
2963                stop_reason: StopReason::EndTurn,
2964                usage: Usage::default(),
2965            },
2966        );
2967        assert!(!outcome.is_inert());
2968
2969        assert_eq!(session.messages().len(), 1);
2970        match &session.messages()[0] {
2971            Message::BlockAssistant(assistant) => {
2972                let has_transcript = assistant.blocks.iter().any(|b| {
2973                    matches!(b, AssistantBlock::Transcript { text, .. } if text == "spoken-final-only")
2974                });
2975                assert!(
2976                    has_transcript,
2977                    "final-only provider path must materialize as Transcript on the Spoken lane"
2978                );
2979            }
2980            other => unreachable!("expected BlockAssistant, got {other:?}"),
2981        }
2982    }
2983
2984    #[test]
2985    fn realtime_transcript_append_orders_causally_equivalent_out_of_order_items() {
2986        let mut session = Session::new();
2987
2988        assert!(
2989            session
2990                .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
2991                    response_id: "resp_assistant".to_string(),
2992                    delta_id: "evt_delta_1".to_string(),
2993                    item_id: "item_assistant".to_string(),
2994                    previous_item_id: Some("item_user".to_string()),
2995                    content_index: 0,
2996                    delta: "answer".to_string(),
2997                })
2998                .is_inert()
2999        );
3000        assert!(
3001            session
3002                .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
3003                    response_id: "resp_assistant".to_string(),
3004                    stop_reason: StopReason::EndTurn,
3005                    usage: Usage::default(),
3006                })
3007                .is_inert()
3008        );
3009
3010        let outcome = session.append_realtime_transcript_event(
3011            RealtimeTranscriptEvent::UserTranscriptFinal {
3012                item_id: "item_user".to_string(),
3013                previous_item_id: None,
3014                content_index: 0,
3015                text: "question".to_string(),
3016            },
3017        );
3018
3019        assert_eq!(outcome.materialized_messages.len(), 2);
3020        assert_eq!(session.messages().len(), 2);
3021        assert!(matches!(
3022            &session.messages()[0],
3023            Message::User(user) if user.text_content() == "question"
3024        ));
3025        assert!(matches!(
3026            &session.messages()[1],
3027            Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "answer"
3028        ));
3029    }
3030
3031    #[test]
3032    fn realtime_transcript_replay_of_seen_provider_items_is_inert() {
3033        let mut session = Session::new();
3034        let events = vec![
3035            RealtimeTranscriptEvent::UserTranscriptFinal {
3036                item_id: "item_user".to_string(),
3037                previous_item_id: None,
3038                content_index: 0,
3039                text: "hello".to_string(),
3040            },
3041            RealtimeTranscriptEvent::AssistantTextDelta {
3042                response_id: "resp_assistant".to_string(),
3043                delta_id: "evt_delta_1".to_string(),
3044                item_id: "item_assistant".to_string(),
3045                previous_item_id: Some("item_user".to_string()),
3046                content_index: 0,
3047                delta: "world".to_string(),
3048            },
3049            RealtimeTranscriptEvent::AssistantTurnCompleted {
3050                response_id: "resp_assistant".to_string(),
3051                stop_reason: StopReason::EndTurn,
3052                usage: Usage::default(),
3053            },
3054        ];
3055
3056        for event in events.iter().cloned() {
3057            let _ = session.append_realtime_transcript_event(event);
3058        }
3059        let first_messages = serde_json::to_value(session.messages()).unwrap();
3060
3061        for event in events {
3062            assert!(session.append_realtime_transcript_event(event).is_inert());
3063        }
3064
3065        assert_eq!(
3066            serde_json::to_value(session.messages()).unwrap(),
3067            first_messages
3068        );
3069    }
3070
3071    #[test]
3072    fn realtime_transcript_user_final_replay_cannot_erase_existing_segment() {
3073        let mut session = Session::new();
3074
3075        let user = RealtimeTranscriptEvent::UserTranscriptFinal {
3076            item_id: "item_user".to_string(),
3077            previous_item_id: None,
3078            content_index: 0,
3079            text: "remember amber lantern".to_string(),
3080        };
3081        assert!(
3082            !session
3083                .append_realtime_transcript_event(user.clone())
3084                .is_inert()
3085        );
3086        let first_messages = serde_json::to_value(session.messages()).unwrap();
3087
3088        assert!(
3089            session
3090                .append_realtime_transcript_event(RealtimeTranscriptEvent::UserTranscriptFinal {
3091                    item_id: "item_user".to_string(),
3092                    previous_item_id: None,
3093                    content_index: 0,
3094                    text: String::new(),
3095                })
3096                .is_inert()
3097        );
3098        assert!(session.append_realtime_transcript_event(user).is_inert());
3099        assert_eq!(
3100            serde_json::to_value(session.messages()).unwrap(),
3101            first_messages
3102        );
3103    }
3104
3105    #[test]
3106    fn realtime_transcript_empty_user_final_can_be_filled_by_later_nonempty_replay() {
3107        let mut session = Session::new();
3108
3109        assert!(
3110            session
3111                .append_realtime_transcript_event(RealtimeTranscriptEvent::UserTranscriptFinal {
3112                    item_id: "item_user".to_string(),
3113                    previous_item_id: None,
3114                    content_index: 0,
3115                    text: String::new(),
3116                })
3117                .is_inert()
3118        );
3119        assert!(session.messages().is_empty());
3120
3121        let outcome = session.append_realtime_transcript_event(
3122            RealtimeTranscriptEvent::UserTranscriptFinal {
3123                item_id: "item_user".to_string(),
3124                previous_item_id: None,
3125                content_index: 0,
3126                text: "remember amber lantern".to_string(),
3127            },
3128        );
3129        assert_eq!(outcome.materialized_messages.len(), 1);
3130        assert_eq!(session.messages().len(), 1);
3131        assert!(matches!(
3132            &session.messages()[0],
3133            Message::User(user) if user.text_content() == "remember amber lantern"
3134        ));
3135    }
3136
3137    #[test]
3138    fn realtime_transcript_skipped_provider_items_preserve_causal_order_without_content() {
3139        let mut session = Session::new();
3140
3141        let assistant_delta = RealtimeTranscriptEvent::AssistantTextDelta {
3142            response_id: "resp_assistant".to_string(),
3143            delta_id: "evt_delta_1".to_string(),
3144            item_id: "item_assistant".to_string(),
3145            previous_item_id: Some("item_tool".to_string()),
3146            content_index: 0,
3147            delta: "done".to_string(),
3148        };
3149        assert!(
3150            session
3151                .append_realtime_transcript_event(assistant_delta.clone())
3152                .is_inert()
3153        );
3154        let assistant_complete = RealtimeTranscriptEvent::AssistantTurnCompleted {
3155            response_id: "resp_assistant".to_string(),
3156            stop_reason: StopReason::EndTurn,
3157            usage: Usage::default(),
3158        };
3159        assert!(
3160            session
3161                .append_realtime_transcript_event(assistant_complete.clone())
3162                .is_inert()
3163        );
3164
3165        let skipped = RealtimeTranscriptEvent::ItemSkipped {
3166            item_id: "item_tool".to_string(),
3167            previous_item_id: Some("item_user".to_string()),
3168        };
3169        assert!(
3170            session
3171                .append_realtime_transcript_event(skipped.clone())
3172                .is_inert(),
3173            "a skipped provider item must not append transcript content"
3174        );
3175        assert!(session.messages().is_empty());
3176
3177        let outcome = session.append_realtime_transcript_event(
3178            RealtimeTranscriptEvent::UserTranscriptFinal {
3179                item_id: "item_user".to_string(),
3180                previous_item_id: None,
3181                content_index: 0,
3182                text: "please use the tool".to_string(),
3183            },
3184        );
3185        assert_eq!(outcome.materialized_messages.len(), 2);
3186        assert_eq!(session.messages().len(), 2);
3187        assert!(matches!(
3188            &session.messages()[0],
3189            Message::User(user) if user.text_content() == "please use the tool"
3190        ));
3191        assert!(matches!(
3192            &session.messages()[1],
3193            Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "done"
3194        ));
3195
3196        let first_messages = serde_json::to_value(session.messages()).unwrap();
3197        assert!(session.append_realtime_transcript_event(skipped).is_inert());
3198        assert!(
3199            session
3200                .append_realtime_transcript_event(assistant_delta)
3201                .is_inert()
3202        );
3203        assert!(
3204            session
3205                .append_realtime_transcript_event(assistant_complete)
3206                .is_inert()
3207        );
3208        assert_eq!(
3209            serde_json::to_value(session.messages()).unwrap(),
3210            first_messages
3211        );
3212    }
3213
3214    #[test]
3215    fn realtime_transcript_interrupted_assistant_item_unblocks_later_provider_items() {
3216        // R5-5 (Round-5): the staged assistant content is a Display-lane item
3217        // (`AssistantTextDelta`). Under the new lane-aware barge-in contract,
3218        // the Display lane survives interruption and materializes. The User
3219        // "Stop." item, gated on the chained Display item being materialized,
3220        // also unblocks. Round-4's "must stay non-canonical" assertion was
3221        // wrong — that contract was lane-blind.
3222        let mut session = Session::new();
3223
3224        let _ = session.append_realtime_transcript_event(
3225            RealtimeTranscriptEvent::UserTranscriptFinal {
3226                item_id: "item_repeat".to_string(),
3227                previous_item_id: None,
3228                content_index: 0,
3229                text: "repeat until stop".to_string(),
3230            },
3231        );
3232        assert!(
3233            session
3234                .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
3235                    response_id: "resp_loop".to_string(),
3236                    delta_id: "evt_loop_1".to_string(),
3237                    item_id: "item_loop".to_string(),
3238                    previous_item_id: Some("item_repeat".to_string()),
3239                    content_index: 0,
3240                    delta: "Looping now".to_string(),
3241                })
3242                .is_inert()
3243        );
3244        assert!(
3245            session
3246                .append_realtime_transcript_event(RealtimeTranscriptEvent::UserTranscriptFinal {
3247                    item_id: "item_stop".to_string(),
3248                    previous_item_id: Some("item_loop".to_string()),
3249                    content_index: 0,
3250                    text: "Stop.".to_string(),
3251                })
3252                .is_inert(),
3253            "the stop turn waits until the interrupted assistant provider item is resolved"
3254        );
3255
3256        let outcome = session.append_realtime_transcript_event(
3257            RealtimeTranscriptEvent::AssistantTurnInterrupted {
3258                response_id: "resp_loop".to_string(),
3259            },
3260        );
3261
3262        // R5-5: materializer commits 2 messages (the retained Display item +
3263        // the unblocked "Stop." User message).
3264        assert_eq!(outcome.materialized_messages.len(), 2);
3265        // Canonical history: User-repeat, BlockAssistant(Display "Looping now"), User-Stop.
3266        assert_eq!(session.messages().len(), 3);
3267        assert!(matches!(
3268            &session.messages()[0],
3269            Message::User(user) if user.text_content() == "repeat until stop"
3270        ));
3271        match &session.messages()[1] {
3272            Message::BlockAssistant(assistant) => {
3273                let text = block_assistant_text(assistant);
3274                assert_eq!(text, "Looping now");
3275            }
3276            other => unreachable!(
3277                "Display lane assistant item must be retained on Interrupted, got {other:?}"
3278            ),
3279        }
3280        assert!(matches!(
3281            &session.messages()[2],
3282            Message::User(user) if user.text_content() == "Stop."
3283        ));
3284    }
3285
3286    #[test]
3287    fn realtime_transcript_late_interrupted_assistant_delta_stays_noncanonical() {
3288        let mut session = Session::new();
3289
3290        let _ = session.append_realtime_transcript_event(
3291            RealtimeTranscriptEvent::UserTranscriptFinal {
3292                item_id: "item_repeat".to_string(),
3293                previous_item_id: None,
3294                content_index: 0,
3295                text: "repeat until stop".to_string(),
3296            },
3297        );
3298        assert!(
3299            session
3300                .append_realtime_transcript_event(RealtimeTranscriptEvent::ItemObserved {
3301                    item_id: "item_loop".to_string(),
3302                    previous_item_id: Some("item_repeat".to_string()),
3303                    role: RealtimeTranscriptRole::Assistant,
3304                    response_id: None,
3305                })
3306                .is_inert(),
3307            "provider can observe an assistant item before the adapter learns its response id"
3308        );
3309        assert!(
3310            session
3311                .append_realtime_transcript_event(
3312                    RealtimeTranscriptEvent::AssistantTurnInterrupted {
3313                        response_id: "resp_loop".to_string(),
3314                    }
3315                )
3316                .is_inert(),
3317            "an interruption can arrive before delayed transcript deltas for the response"
3318        );
3319        assert!(
3320            session
3321                .append_realtime_transcript_event(RealtimeTranscriptEvent::UserTranscriptFinal {
3322                    item_id: "item_stop".to_string(),
3323                    previous_item_id: Some("item_loop".to_string()),
3324                    content_index: 0,
3325                    text: "Stop.".to_string(),
3326                })
3327                .is_inert(),
3328            "the stop turn waits for the provider's interrupted assistant item anchor"
3329        );
3330
3331        let late_delta_outcome =
3332            session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
3333                response_id: "resp_loop".to_string(),
3334                delta_id: "evt_loop_late".to_string(),
3335                item_id: "item_loop".to_string(),
3336                previous_item_id: Some("item_repeat".to_string()),
3337                content_index: 0,
3338                delta: "Looping now".to_string(),
3339            });
3340        assert_eq!(late_delta_outcome.materialized_messages.len(), 1);
3341        assert!(matches!(
3342            &session.messages()[1],
3343            Message::User(user) if user.text_content() == "Stop."
3344        ));
3345        assert!(
3346            session
3347                .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
3348                    response_id: "resp_loop".to_string(),
3349                    stop_reason: StopReason::EndTurn,
3350                    usage: Usage::default(),
3351                })
3352                .is_inert(),
3353            "late completion for an interrupted response must not resurrect its deltas"
3354        );
3355        assert!(
3356            session
3357                .messages()
3358                .iter()
3359                .filter_map(|message| match message {
3360                    Message::BlockAssistant(assistant) => Some(block_assistant_text(assistant)),
3361                    _ => None,
3362                })
3363                .all(|text| !text.contains("Looping now")),
3364            "late interrupted assistant text must remain non-canonical"
3365        );
3366    }
3367
3368    #[test]
3369    fn realtime_transcript_completion_only_finalizes_matching_response() {
3370        let mut session = Session::new();
3371
3372        let _ = session.append_realtime_transcript_event(
3373            RealtimeTranscriptEvent::UserTranscriptFinal {
3374                item_id: "item_user".to_string(),
3375                previous_item_id: None,
3376                content_index: 0,
3377                text: "question".to_string(),
3378            },
3379        );
3380        assert!(
3381            session
3382                .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
3383                    response_id: "resp_a".to_string(),
3384                    delta_id: "evt_a".to_string(),
3385                    item_id: "item_a".to_string(),
3386                    previous_item_id: Some("item_user".to_string()),
3387                    content_index: 0,
3388                    delta: "answer a".to_string(),
3389                })
3390                .is_inert()
3391        );
3392
3393        assert!(
3394            session
3395                .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
3396                    response_id: "resp_b".to_string(),
3397                    stop_reason: StopReason::EndTurn,
3398                    usage: Usage::default(),
3399                })
3400                .is_inert(),
3401            "a completion for another response must not finalize buffered assistant text"
3402        );
3403        assert_eq!(session.messages().len(), 1);
3404
3405        let outcome = session.append_realtime_transcript_event(
3406            RealtimeTranscriptEvent::AssistantTurnCompleted {
3407                response_id: "resp_a".to_string(),
3408                stop_reason: StopReason::EndTurn,
3409                usage: Usage::default(),
3410            },
3411        );
3412        assert_eq!(outcome.materialized_messages.len(), 1);
3413        assert_eq!(session.messages().len(), 2);
3414        assert!(matches!(
3415            &session.messages()[1],
3416            Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "answer a"
3417        ));
3418    }
3419
3420    #[test]
3421    fn realtime_transcript_completion_before_later_delta_is_response_scoped() {
3422        let mut session = Session::new();
3423
3424        let _ = session.append_realtime_transcript_event(
3425            RealtimeTranscriptEvent::UserTranscriptFinal {
3426                item_id: "item_user".to_string(),
3427                previous_item_id: None,
3428                content_index: 0,
3429                text: "question".to_string(),
3430            },
3431        );
3432        assert!(
3433            session
3434                .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
3435                    response_id: "resp_a".to_string(),
3436                    stop_reason: StopReason::EndTurn,
3437                    usage: Usage::default(),
3438                })
3439                .is_inert()
3440        );
3441        assert!(
3442            session
3443                .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
3444                    response_id: "resp_b".to_string(),
3445                    delta_id: "evt_b".to_string(),
3446                    item_id: "item_b".to_string(),
3447                    previous_item_id: Some("item_user".to_string()),
3448                    content_index: 0,
3449                    delta: "wrong response".to_string(),
3450                })
3451                .is_inert(),
3452            "a later delta for another response must not be finalized by resp_a's pending completion"
3453        );
3454
3455        let outcome =
3456            session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
3457                response_id: "resp_a".to_string(),
3458                delta_id: "evt_a".to_string(),
3459                item_id: "item_a".to_string(),
3460                previous_item_id: Some("item_user".to_string()),
3461                content_index: 0,
3462                delta: "right response".to_string(),
3463            });
3464
3465        assert_eq!(outcome.materialized_messages.len(), 1);
3466        assert_eq!(session.messages().len(), 2);
3467        assert!(matches!(
3468            &session.messages()[1],
3469            Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "right response"
3470        ));
3471    }
3472
3473    #[test]
3474    fn realtime_transcript_late_duplicate_completion_cannot_finalize_unrelated_response() {
3475        let mut session = Session::new();
3476
3477        let _ = session.append_realtime_transcript_event(
3478            RealtimeTranscriptEvent::UserTranscriptFinal {
3479                item_id: "item_user".to_string(),
3480                previous_item_id: None,
3481                content_index: 0,
3482                text: "question".to_string(),
3483            },
3484        );
3485        let _ =
3486            session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
3487                response_id: "resp_a".to_string(),
3488                delta_id: "evt_a".to_string(),
3489                item_id: "item_a".to_string(),
3490                previous_item_id: Some("item_user".to_string()),
3491                content_index: 0,
3492                delta: "first".to_string(),
3493            });
3494        let _ = session.append_realtime_transcript_event(
3495            RealtimeTranscriptEvent::AssistantTurnCompleted {
3496                response_id: "resp_a".to_string(),
3497                stop_reason: StopReason::EndTurn,
3498                usage: Usage::default(),
3499            },
3500        );
3501        assert_eq!(session.messages().len(), 2);
3502
3503        assert!(
3504            session
3505                .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
3506                    response_id: "resp_b".to_string(),
3507                    delta_id: "evt_b".to_string(),
3508                    item_id: "item_b".to_string(),
3509                    previous_item_id: Some("item_a".to_string()),
3510                    content_index: 0,
3511                    delta: "second".to_string(),
3512                })
3513                .is_inert()
3514        );
3515        assert!(
3516            session
3517                .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
3518                    response_id: "resp_a".to_string(),
3519                    stop_reason: StopReason::EndTurn,
3520                    usage: Usage::default(),
3521                })
3522                .is_inert(),
3523            "a duplicate late terminal for resp_a must not finalize resp_b"
3524        );
3525        assert_eq!(session.messages().len(), 2);
3526
3527        let outcome = session.append_realtime_transcript_event(
3528            RealtimeTranscriptEvent::AssistantTurnCompleted {
3529                response_id: "resp_b".to_string(),
3530                stop_reason: StopReason::EndTurn,
3531                usage: Usage::default(),
3532            },
3533        );
3534        assert_eq!(outcome.materialized_messages.len(), 1);
3535        assert_eq!(session.messages().len(), 3);
3536    }
3537
3538    #[test]
3539    fn realtime_transcript_interruption_discards_only_matching_response() {
3540        // R5-5: cross-response isolation invariant — Interrupted on resp_a
3541        // does NOT touch resp_b's staged content. Both responses use
3542        // `AssistantTextDelta` (Display lane); under R5-5 resp_a's Display
3543        // item is RETAINED at Interrupted time and resp_b's continues
3544        // unaffected, materializing on its later TurnCompleted.
3545        let mut session = Session::new();
3546
3547        let _ = session.append_realtime_transcript_event(
3548            RealtimeTranscriptEvent::UserTranscriptFinal {
3549                item_id: "item_user".to_string(),
3550                previous_item_id: None,
3551                content_index: 0,
3552                text: "question".to_string(),
3553            },
3554        );
3555        let _ =
3556            session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
3557                response_id: "resp_a".to_string(),
3558                delta_id: "evt_a".to_string(),
3559                item_id: "item_a".to_string(),
3560                previous_item_id: Some("item_user".to_string()),
3561                content_index: 0,
3562                delta: "interrupted display".to_string(),
3563            });
3564        let _ =
3565            session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
3566                response_id: "resp_b".to_string(),
3567                delta_id: "evt_b".to_string(),
3568                item_id: "item_b".to_string(),
3569                previous_item_id: Some("item_user".to_string()),
3570                content_index: 0,
3571                delta: "keep me".to_string(),
3572            });
3573
3574        // R5-5: Interrupted commits the resp_a Display item; resp_b
3575        // remains untouched.
3576        let interrupt_outcome = session.append_realtime_transcript_event(
3577            RealtimeTranscriptEvent::AssistantTurnInterrupted {
3578                response_id: "resp_a".to_string(),
3579            },
3580        );
3581        assert_eq!(
3582            interrupt_outcome.materialized_messages.len(),
3583            1,
3584            "resp_a's Display item commits on Interrupted"
3585        );
3586
3587        let outcome = session.append_realtime_transcript_event(
3588            RealtimeTranscriptEvent::AssistantTurnCompleted {
3589                response_id: "resp_b".to_string(),
3590                stop_reason: StopReason::EndTurn,
3591                usage: Usage::default(),
3592            },
3593        );
3594        assert_eq!(
3595            outcome.materialized_messages.len(),
3596            1,
3597            "resp_b commits on its TurnCompleted, untouched by resp_a's Interrupted"
3598        );
3599
3600        // 1 user + 2 assistant messages.
3601        assert_eq!(session.messages().len(), 3);
3602        assert!(matches!(
3603            &session.messages()[1],
3604            Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "interrupted display"
3605        ));
3606        assert!(matches!(
3607            &session.messages()[2],
3608            Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "keep me"
3609        ));
3610    }
3611
3612    // Performance tests for Arc-based CoW
3613
3614    #[test]
3615    fn test_fork_shares_arc_no_clone() {
3616        let mut session = Session::new();
3617        for i in 0..100 {
3618            session.push(Message::User(UserMessage::text(format!("Message {i}"))));
3619        }
3620
3621        // Fork should share the same Arc, not clone messages
3622        let forked = session.fork();
3623
3624        // Both should point to the same underlying data (Arc refcount > 1)
3625        assert!(Arc::ptr_eq(&session.messages, &forked.messages));
3626        assert_eq!(forked.messages().len(), 100);
3627    }
3628
3629    #[test]
3630    fn test_fork_at_shares_arc_prefix() {
3631        let mut session = Session::new();
3632        for i in 0..100 {
3633            session.push(Message::User(UserMessage::text(format!("Message {i}"))));
3634        }
3635
3636        // Fork at 50 should create new Arc with copied prefix
3637        let forked = session.fork_at(50);
3638        assert_eq!(forked.messages().len(), 50);
3639
3640        // Original should be unchanged
3641        assert_eq!(session.messages().len(), 100);
3642    }
3643
3644    #[test]
3645    fn test_push_cow_behavior() {
3646        let mut session = Session::new();
3647        session.push(Message::User(UserMessage::text("First".to_string())));
3648
3649        // Fork shares the Arc
3650        let forked = session.fork();
3651        assert!(Arc::ptr_eq(&session.messages, &forked.messages));
3652
3653        // Push on original triggers CoW - original gets new Arc
3654        session.push(Message::User(UserMessage::text("Second".to_string())));
3655
3656        // Now they should have different Arcs
3657        assert!(!Arc::ptr_eq(&session.messages, &forked.messages));
3658        assert_eq!(session.messages().len(), 2);
3659        assert_eq!(forked.messages().len(), 1);
3660    }
3661
3662    // Performance tests for lazy timestamp updates
3663
3664    #[test]
3665    fn test_push_batch_single_timestamp() {
3666        let mut session = Session::new();
3667        let initial_updated = session.updated_at();
3668
3669        // Use push_batch to add multiple messages without repeated syscalls
3670        session.push_batch(vec![
3671            Message::User(UserMessage::text("First".to_string())),
3672            Message::User(UserMessage::text("Second".to_string())),
3673            Message::User(UserMessage::text("Third".to_string())),
3674        ]);
3675
3676        assert_eq!(session.messages().len(), 3);
3677        // Timestamp should have been updated once
3678        assert!(session.updated_at() >= initial_updated);
3679    }
3680
3681    #[test]
3682    fn test_touch_updates_timestamp() {
3683        let mut session = Session::new();
3684        let initial = session.updated_at();
3685
3686        std::thread::sleep(std::time::Duration::from_millis(10));
3687
3688        // Explicit touch to update timestamp
3689        session.touch();
3690
3691        assert!(session.updated_at() > initial);
3692    }
3693
3694    #[test]
3695    fn test_session_push() {
3696        let mut session = Session::new();
3697        let initial_updated = session.updated_at();
3698
3699        // Small delay to ensure time changes
3700        std::thread::sleep(std::time::Duration::from_millis(10));
3701
3702        session.push(Message::User(UserMessage::text("Hello".to_string())));
3703
3704        assert_eq!(session.messages().len(), 1);
3705        assert!(session.updated_at() > initial_updated);
3706    }
3707
3708    #[test]
3709    fn test_session_fork() {
3710        let mut session = Session::new();
3711        session.push(Message::System(SystemMessage::new("System prompt")));
3712        session.push(Message::User(UserMessage::text("Hello".to_string())));
3713        session.push(Message::Assistant(AssistantMessage {
3714            content: "Hi!".to_string(),
3715            tool_calls: vec![],
3716            stop_reason: StopReason::EndTurn,
3717            usage: Usage::default(),
3718            created_at: crate::types::message_timestamp_now(),
3719        }));
3720
3721        // Fork at index 2 (system + user)
3722        let forked = session.fork_at(2);
3723        assert_eq!(forked.messages().len(), 2);
3724        assert_ne!(forked.id(), session.id());
3725
3726        // Full fork
3727        let full_fork = session.fork();
3728        assert_eq!(full_fork.messages().len(), 3);
3729    }
3730
3731    #[test]
3732    fn test_session_metadata() {
3733        let mut session = Session::new();
3734        session.set_metadata("key", serde_json::json!("value"));
3735
3736        assert_eq!(session.metadata().get("key").unwrap(), "value");
3737    }
3738
3739    #[test]
3740    fn test_session_metadata_backfill_preserves_timestamp() {
3741        let mut session = Session::new();
3742        let initial_updated = session.updated_at();
3743
3744        std::thread::sleep(std::time::Duration::from_millis(10));
3745
3746        assert!(session.backfill_metadata_if_absent("key", serde_json::json!("value")));
3747        assert_eq!(session.metadata().get("key").unwrap(), "value");
3748        assert_eq!(session.updated_at(), initial_updated);
3749        assert!(!session.backfill_metadata_if_absent("key", serde_json::json!("other")));
3750        assert_eq!(session.metadata().get("key").unwrap(), "value");
3751        assert_eq!(session.updated_at(), initial_updated);
3752    }
3753
3754    #[test]
3755    fn test_session_mob_tool_authority_context_roundtrip() {
3756        let mut session = Session::new();
3757        let authority = MobToolAuthorityContext::new(
3758            crate::service::OpaquePrincipalToken::new("opaque-principal"),
3759            false,
3760        )
3761        .with_managed_mob_scope(["mob-a"])
3762        .with_audit_invocation_id("audit-1");
3763
3764        session
3765            .set_mob_tool_authority_context(Some(authority.clone()))
3766            .expect("authority should serialize");
3767        assert_eq!(session.mob_tool_authority_context(), Some(authority));
3768
3769        session
3770            .set_mob_tool_authority_context(None)
3771            .expect("authority should clear");
3772        assert!(session.mob_tool_authority_context().is_none());
3773    }
3774
3775    #[test]
3776    fn test_session_tool_visibility_state_roundtrip() {
3777        let mut session = Session::new();
3778        let state = SessionToolVisibilityState {
3779            inherited_base_filter: ToolFilter::Allow(["visible".to_string()].into_iter().collect()),
3780            active_filter: ToolFilter::Allow(
3781                ["visible".to_string(), "missing".to_string()]
3782                    .into_iter()
3783                    .collect(),
3784            ),
3785            staged_filter: ToolFilter::Allow(
3786                ["visible".to_string(), "missing".to_string()]
3787                    .into_iter()
3788                    .collect(),
3789            ),
3790            active_revision: 1,
3791            staged_revision: 2,
3792            ..Default::default()
3793        };
3794
3795        session
3796            .set_tool_visibility_state(state.clone())
3797            .expect("tool visibility state should serialize");
3798        assert_eq!(session.tool_visibility_state().unwrap(), Some(state));
3799    }
3800
3801    #[test]
3802    fn test_session_tool_visibility_state_malformed_returns_error() {
3803        let mut session = Session::new();
3804        session.set_metadata(
3805            SESSION_TOOL_VISIBILITY_STATE_KEY,
3806            serde_json::json!({
3807                "active_filter": {
3808                    "unexpected_filter_kind": ["secret"]
3809                }
3810            }),
3811        );
3812
3813        assert!(
3814            session.tool_visibility_state().is_err(),
3815            "malformed canonical visibility metadata must not decode as absent/default"
3816        );
3817    }
3818
3819    #[test]
3820    fn test_session_serialization() {
3821        let mut session = Session::new();
3822        session.push(Message::User(UserMessage::text("Test".to_string())));
3823
3824        let json = serde_json::to_string(&session).unwrap();
3825        let parsed: Session = serde_json::from_str(&json).unwrap();
3826
3827        assert_eq!(parsed.id(), session.id());
3828        assert_eq!(parsed.messages().len(), 1);
3829        assert_eq!(parsed.version(), SESSION_VERSION);
3830    }
3831
3832    #[test]
3833    fn test_session_meta_from_session() {
3834        let mut session = Session::new();
3835        session.push(Message::User(UserMessage::text("Hello".to_string())));
3836        session.push(Message::Assistant(AssistantMessage {
3837            content: "Hi!".to_string(),
3838            tool_calls: vec![],
3839            stop_reason: StopReason::EndTurn,
3840            usage: Usage {
3841                input_tokens: 10,
3842                output_tokens: 5,
3843                cache_creation_tokens: None,
3844                cache_read_tokens: None,
3845            },
3846            created_at: crate::types::message_timestamp_now(),
3847        }));
3848        session.record_usage(Usage {
3849            input_tokens: 10,
3850            output_tokens: 5,
3851            cache_creation_tokens: None,
3852            cache_read_tokens: None,
3853        });
3854
3855        let meta = SessionMeta::from(&session);
3856        assert_eq!(meta.id, *session.id());
3857        assert_eq!(meta.message_count, 2);
3858        assert_eq!(meta.total_tokens, 15);
3859    }
3860
3861    #[test]
3862    fn has_pending_boundary_empty_session() {
3863        let session = Session::new();
3864        assert!(!session.has_pending_boundary());
3865    }
3866
3867    #[test]
3868    fn has_pending_boundary_after_user_message() {
3869        let mut session = Session::new();
3870        session.push(Message::User(UserMessage::text("hello")));
3871        assert!(session.has_pending_boundary());
3872    }
3873
3874    #[test]
3875    fn has_pending_boundary_after_assistant_message() {
3876        let mut session = Session::new();
3877        session.push(Message::User(UserMessage::text("hello")));
3878        session.push(Message::BlockAssistant(BlockAssistantMessage::new(
3879            vec![],
3880            StopReason::EndTurn,
3881        )));
3882        assert!(!session.has_pending_boundary());
3883    }
3884
3885    #[test]
3886    fn has_pending_boundary_after_tool_results() {
3887        let mut session = Session::new();
3888        session.push(Message::User(UserMessage::text("hello")));
3889        session.push(Message::tool_results(vec![]));
3890        assert!(session.has_pending_boundary());
3891    }
3892
3893    #[test]
3894    fn has_pending_boundary_after_system() {
3895        let mut session = Session::new();
3896        session.push(Message::System(SystemMessage::new("system")));
3897        assert!(!session.has_pending_boundary());
3898    }
3899
3900    #[test]
3901    fn system_context_state_preserves_applied_runtime_context() {
3902        let accepted_at = SystemTime::UNIX_EPOCH;
3903        let mut state = SessionSystemContextState::default();
3904        state
3905            .stage_append(
3906                &AppendSystemContextRequest {
3907                    text: "Authoritative peer token is birch seventeen.".to_string(),
3908                    source: Some(
3909                        "peer_response_terminal:analyst:018f6f79-7a82-7c4e-a552-a3b86f9630f1"
3910                            .to_string(),
3911                    ),
3912                    idempotency_key: Some("018f6f79-7a82-7c4e-a552-a3b86f9630f1".to_string()),
3913                },
3914                accepted_at,
3915            )
3916            .expect("append should stage");
3917
3918        state.mark_pending_applied();
3919
3920        assert!(state.pending.is_empty());
3921        assert_eq!(state.applied.len(), 1);
3922        assert_eq!(
3923            state.applied[0].text,
3924            "Authoritative peer token is birch seventeen."
3925        );
3926        assert_eq!(
3927            state.applied[0].source.as_deref(),
3928            Some("peer_response_terminal:analyst:018f6f79-7a82-7c4e-a552-a3b86f9630f1")
3929        );
3930
3931        let round_tripped: SessionSystemContextState =
3932            serde_json::from_value(serde_json::to_value(&state).expect("serialize state"))
3933                .expect("deserialize state");
3934        assert_eq!(round_tripped.applied, state.applied);
3935    }
3936
3937    #[test]
3938    fn append_system_context_blocks_records_typed_applied_context() {
3939        let append = PendingSystemContextAppend {
3940            text: "Authoritative peer token is birch seventeen.".to_string(),
3941            source: Some(
3942                "peer_response_terminal:analyst:018f6f79-7a82-7c4e-a552-a3b86f9630f1".to_string(),
3943            ),
3944            idempotency_key: Some("018f6f79-7a82-7c4e-a552-a3b86f9630f1".to_string()),
3945            accepted_at: SystemTime::UNIX_EPOCH,
3946        };
3947        let mut session = Session::new();
3948
3949        session.append_system_context_blocks(std::slice::from_ref(&append));
3950
3951        let state = session
3952            .system_context_state()
3953            .expect("append should persist typed context state");
3954        assert_eq!(state.applied, vec![append]);
3955    }
3956
3957    #[test]
3958    fn append_system_context_blocks_renders_pre_marked_pending_context() {
3959        let accepted_at = SystemTime::UNIX_EPOCH;
3960        let mut state = SessionSystemContextState::default();
3961        state
3962            .stage_append(
3963                &AppendSystemContextRequest {
3964                    text: "Apply this staged context at the request boundary.".to_string(),
3965                    source: Some("rpc/session_inject_context".to_string()),
3966                    idempotency_key: Some("ctx-boundary".to_string()),
3967                },
3968                accepted_at,
3969            )
3970            .expect("append should stage");
3971        let pending = state.pending.clone();
3972        state.mark_pending_applied();
3973        let mut session = Session::new();
3974        session
3975            .set_system_context_state(state)
3976            .expect("state should serialize");
3977
3978        session.append_system_context_blocks(&pending);
3979
3980        let system_prompt = session
3981            .messages()
3982            .first()
3983            .and_then(|message| match message {
3984                Message::System(system) => Some(system.content.as_str()),
3985                _ => None,
3986            })
3987            .unwrap_or_default();
3988        assert!(system_prompt.contains("Apply this staged context at the request boundary."));
3989        let state = session
3990            .system_context_state()
3991            .expect("append should persist typed context state");
3992        assert_eq!(state.applied.len(), 1);
3993        assert_eq!(
3994            state.seen["ctx-boundary"].state,
3995            SeenSystemContextState::Applied
3996        );
3997    }
3998
3999    #[test]
4000    fn append_system_context_blocks_renders_pre_marked_context_without_idempotency_key() {
4001        let accepted_at = SystemTime::UNIX_EPOCH;
4002        let mut state = SessionSystemContextState::default();
4003        state
4004            .stage_append(
4005                &AppendSystemContextRequest {
4006                    text: "Apply this unkeyed staged context at the request boundary.".to_string(),
4007                    source: Some("rpc/session_inject_context".to_string()),
4008                    idempotency_key: None,
4009                },
4010                accepted_at,
4011            )
4012            .expect("append should stage");
4013        let pending = state.pending.clone();
4014        state.mark_pending_applied();
4015        let mut session = Session::new();
4016        session
4017            .set_system_context_state(state)
4018            .expect("state should serialize");
4019
4020        session.append_system_context_blocks(&pending);
4021
4022        let system_prompt = session
4023            .messages()
4024            .first()
4025            .and_then(|message| match message {
4026                Message::System(system) => Some(system.content.as_str()),
4027                _ => None,
4028            })
4029            .unwrap_or_default();
4030        assert!(
4031            system_prompt.contains("Apply this unkeyed staged context at the request boundary.")
4032        );
4033    }
4034
4035    #[test]
4036    fn append_system_context_blocks_skips_duplicate_idempotency_key() {
4037        let first = PendingSystemContextAppend {
4038            text: "Authoritative peer token is birch seventeen.".to_string(),
4039            source: Some("peer_response_terminal:analyst:req-1".to_string()),
4040            idempotency_key: Some("req-1".to_string()),
4041            accepted_at: SystemTime::UNIX_EPOCH,
4042        };
4043        let duplicate = PendingSystemContextAppend {
4044            accepted_at: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1),
4045            ..first.clone()
4046        };
4047        let mut session = Session::new();
4048
4049        session.append_system_context_blocks(std::slice::from_ref(&first));
4050        session.append_system_context_blocks(std::slice::from_ref(&duplicate));
4051
4052        let state = session
4053            .system_context_state()
4054            .expect("append should persist typed context state");
4055        assert_eq!(state.applied, vec![first]);
4056        let system_prompt = session
4057            .messages()
4058            .first()
4059            .and_then(|message| match message {
4060                Message::System(system) => Some(system.content.as_str()),
4061                _ => None,
4062            })
4063            .unwrap_or_default();
4064        assert_eq!(
4065            system_prompt
4066                .matches("Authoritative peer token is birch seventeen.")
4067                .count(),
4068            1
4069        );
4070    }
4071
4072    #[test]
4073    fn append_system_context_blocks_skips_conflicting_duplicate_idempotency_key() {
4074        let first = PendingSystemContextAppend {
4075            text: "Authoritative peer token is birch seventeen.".to_string(),
4076            source: Some("peer_response_terminal:analyst:req-1".to_string()),
4077            idempotency_key: Some("req-1".to_string()),
4078            accepted_at: SystemTime::UNIX_EPOCH,
4079        };
4080        let conflicting = PendingSystemContextAppend {
4081            text: "Conflicting peer token should not reach the prompt.".to_string(),
4082            accepted_at: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1),
4083            ..first.clone()
4084        };
4085        let mut session = Session::new();
4086
4087        session.append_system_context_blocks(std::slice::from_ref(&first));
4088        session.append_system_context_blocks(std::slice::from_ref(&conflicting));
4089
4090        let state = session
4091            .system_context_state()
4092            .expect("append should persist typed context state");
4093        assert_eq!(state.applied, vec![first]);
4094        let system_prompt = session
4095            .messages()
4096            .first()
4097            .and_then(|message| match message {
4098                Message::System(system) => Some(system.content.as_str()),
4099                _ => None,
4100            })
4101            .unwrap_or_default();
4102        assert!(system_prompt.contains("Authoritative peer token is birch seventeen."));
4103        assert!(!system_prompt.contains("Conflicting peer token should not reach the prompt."));
4104    }
4105
4106    // ------------------------------------------------------------------
4107    // T9/T10: realtime transcript lane materialization.
4108    //
4109    // The display-text lane (`AssistantTextDelta`) materializes as
4110    // `AssistantBlock::Text`; the spoken-transcript lane
4111    // (`AssistantTranscriptDelta`) materializes as
4112    // `AssistantBlock::Transcript { source: TranscriptSource::Spoken }`.
4113    // These regressions pin both flushes and prove the materializer
4114    // dispatches on the per-item `TranscriptLane`.
4115    // ------------------------------------------------------------------
4116
4117    #[test]
4118    fn realtime_transcript_assistant_transcript_delta_materializes_transcript_block() {
4119        let mut session = Session::new();
4120
4121        let delta = RealtimeTranscriptEvent::AssistantTranscriptDelta {
4122            response_id: "resp_spoken".to_string(),
4123            delta_id: "evt_delta_spoken_1".to_string(),
4124            item_id: "item_spoken".to_string(),
4125            previous_item_id: None,
4126            content_index: 0,
4127            delta: "I said hi".to_string(),
4128        };
4129        assert!(
4130            session.append_realtime_transcript_event(delta).is_inert(),
4131            "delta alone is inert until turn-completed flushes"
4132        );
4133
4134        let terminal = RealtimeTranscriptEvent::AssistantTurnCompleted {
4135            response_id: "resp_spoken".to_string(),
4136            stop_reason: StopReason::EndTurn,
4137            usage: Usage::default(),
4138        };
4139        let outcome = session.append_realtime_transcript_event(terminal);
4140        assert_eq!(outcome.materialized_messages.len(), 1);
4141
4142        // T9/T10: must be a Transcript block, NOT Text.
4143        let messages = session.messages();
4144        assert_eq!(messages.len(), 1);
4145        match &messages[0] {
4146            Message::BlockAssistant(assistant) => {
4147                assert_eq!(assistant.blocks.len(), 1);
4148                match &assistant.blocks[0] {
4149                    AssistantBlock::Transcript { text, source, .. } => {
4150                        assert_eq!(text, "I said hi");
4151                        assert_eq!(*source, crate::types::TranscriptSource::Spoken);
4152                    }
4153                    other => unreachable!(
4154                        "AssistantTranscriptDelta must materialize as AssistantBlock::Transcript, got {other:?}"
4155                    ),
4156                }
4157            }
4158            other => unreachable!("expected BlockAssistant message, got {other:?}"),
4159        }
4160    }
4161
4162    #[test]
4163    fn round4_cc4_in_flight_response_ids_lists_distinct_unmaterialized_responses() {
4164        // CC4 (Round-4 architectural reconciliation): the helper that
4165        // powers `signal_turn_interrupt`'s cross-layer fan-out must
4166        // return every distinct provider response_id that has at least
4167        // one unmaterialized assistant item, EXCLUDING already-discarded
4168        // responses and EXCLUDING the user role.
4169        let mut session = Session::new();
4170
4171        // Two transcript-delta items on resp_a (different content_index
4172        // ranges), one on resp_b. resp_c gets a delta and is then
4173        // discarded explicitly via AssistantTurnInterrupted.
4174        for (i, response_id) in [
4175            ("resp_a", "resp_a"),
4176            ("resp_a_extra", "resp_a"),
4177            ("resp_b", "resp_b"),
4178            ("resp_c", "resp_c"),
4179        ]
4180        .iter()
4181        .enumerate()
4182        {
4183            let event = RealtimeTranscriptEvent::AssistantTranscriptDelta {
4184                response_id: response_id.1.to_string(),
4185                delta_id: format!("delta_{i}"),
4186                item_id: response_id.0.to_string(),
4187                previous_item_id: None,
4188                content_index: 0,
4189                delta: "x".to_string(),
4190            };
4191            let _ = session.append_realtime_transcript_event(event);
4192        }
4193
4194        // Discard resp_c — it should not appear in the in-flight list.
4195        let _ = session.append_realtime_transcript_event(
4196            RealtimeTranscriptEvent::AssistantTurnInterrupted {
4197                response_id: "resp_c".to_string(),
4198            },
4199        );
4200
4201        // User-role item should never appear (CC4 only fans interrupts
4202        // to assistant responses).
4203        let _ = session.append_realtime_transcript_event(
4204            RealtimeTranscriptEvent::UserTranscriptFinal {
4205                item_id: "u_item".to_string(),
4206                previous_item_id: None,
4207                content_index: 0,
4208                text: "hi".to_string(),
4209            },
4210        );
4211
4212        let in_flight = session.in_flight_realtime_assistant_response_ids();
4213        assert!(in_flight.contains(&"resp_a".to_string()), "{in_flight:?}");
4214        assert!(in_flight.contains(&"resp_b".to_string()), "{in_flight:?}");
4215        assert!(
4216            !in_flight.contains(&"resp_c".to_string()),
4217            "discarded response must not appear in in_flight: {in_flight:?}"
4218        );
4219        // resp_a appears exactly once even though two items reference it.
4220        assert_eq!(
4221            in_flight.iter().filter(|r| *r == "resp_a").count(),
4222            1,
4223            "distinct response_ids only: {in_flight:?}"
4224        );
4225    }
4226
4227    #[test]
4228    fn round4_cc2_assistant_turn_completed_after_transcript_deltas_materializes_transcript() {
4229        // CC2 (Round-4 architectural reconciliation): once
4230        // `signal_turn_completed` synthesizes
4231        // `RealtimeTranscriptEvent::AssistantTurnCompleted`, the staging
4232        // materializer commits every staged transcript-delta item for
4233        // that response_id as `AssistantBlock::Transcript { Spoken }`.
4234        // This pins the production end-to-end shape the sink relies on.
4235        let mut session = Session::new();
4236
4237        let delta = RealtimeTranscriptEvent::AssistantTranscriptDelta {
4238            response_id: "resp_cc2".to_string(),
4239            delta_id: "delta_cc2_1".to_string(),
4240            item_id: "item_cc2".to_string(),
4241            previous_item_id: None,
4242            content_index: 0,
4243            delta: "hello world".to_string(),
4244        };
4245        assert!(session.append_realtime_transcript_event(delta).is_inert());
4246
4247        // Pre-completion: in-flight list reports resp_cc2.
4248        assert_eq!(
4249            session.in_flight_realtime_assistant_response_ids(),
4250            vec!["resp_cc2".to_string()]
4251        );
4252
4253        let outcome = session.append_realtime_transcript_event(
4254            RealtimeTranscriptEvent::AssistantTurnCompleted {
4255                response_id: "resp_cc2".to_string(),
4256                stop_reason: StopReason::EndTurn,
4257                usage: Usage::default(),
4258            },
4259        );
4260        assert_eq!(outcome.materialized_messages.len(), 1);
4261
4262        // Post-completion: in-flight list is empty (item is materialized).
4263        assert!(
4264            session
4265                .in_flight_realtime_assistant_response_ids()
4266                .is_empty(),
4267            "materialized items must not appear in in_flight_realtime_assistant_response_ids"
4268        );
4269
4270        let messages = session.messages();
4271        let assistant = messages.iter().find_map(|m| match m {
4272            Message::BlockAssistant(a) => Some(a),
4273            _ => None,
4274        });
4275        let assistant = assistant.expect("assistant block message expected");
4276        assert_eq!(assistant.blocks.len(), 1);
4277        assert!(matches!(
4278            &assistant.blocks[0],
4279            AssistantBlock::Transcript {
4280                source: crate::types::TranscriptSource::Spoken,
4281                ..
4282            }
4283        ));
4284    }
4285
4286    #[test]
4287    fn realtime_transcript_assistant_text_delta_still_materializes_text_block() {
4288        // Counter-regression: the display-text lane must continue to
4289        // produce `AssistantBlock::Text` after T9/T10. Prevents an
4290        // accidental cross-lane flip.
4291        let mut session = Session::new();
4292
4293        let delta = RealtimeTranscriptEvent::AssistantTextDelta {
4294            response_id: "resp_display".to_string(),
4295            delta_id: "evt_delta_display_1".to_string(),
4296            item_id: "item_display".to_string(),
4297            previous_item_id: None,
4298            content_index: 0,
4299            delta: "I wrote".to_string(),
4300        };
4301        let _ = session.append_realtime_transcript_event(delta);
4302
4303        let terminal = RealtimeTranscriptEvent::AssistantTurnCompleted {
4304            response_id: "resp_display".to_string(),
4305            stop_reason: StopReason::EndTurn,
4306            usage: Usage::default(),
4307        };
4308        let outcome = session.append_realtime_transcript_event(terminal);
4309        assert_eq!(outcome.materialized_messages.len(), 1);
4310
4311        let messages = session.messages();
4312        match &messages[0] {
4313            Message::BlockAssistant(assistant) => match &assistant.blocks[0] {
4314                AssistantBlock::Text { text, .. } => assert_eq!(text, "I wrote"),
4315                other => unreachable!(
4316                    "AssistantTextDelta must keep materializing AssistantBlock::Text, got {other:?}"
4317                ),
4318            },
4319            other => unreachable!("expected BlockAssistant message, got {other:?}"),
4320        }
4321    }
4322
4323    #[test]
4324    fn round4_cc7_mixed_response_persists_text_and_transcript_in_order() {
4325        // CC7 (Round-4 adversarial-verifier follow-up): a single mixed-modality
4326        // realtime response that emits BOTH display-text deltas
4327        // (`AssistantTextDelta`) AND spoken-transcript deltas
4328        // (`AssistantTranscriptDelta`) under the same response_id must
4329        // materialize as ONE `Message::BlockAssistant` whose `blocks` field
4330        // contains exactly two ordered entries:
4331        //   1. AssistantBlock::Text       (display-text lane)
4332        //   2. AssistantBlock::Transcript { source: Spoken } (spoken lane)
4333        // Pre-fix the materializer emitted one Message::BlockAssistant per
4334        // staged item, splitting the mixed response into two messages.
4335        //
4336        // This test drives the production materializer end-to-end: deltas
4337        // stage in `SessionRealtimeTranscriptState`; `AssistantTurnCompleted`
4338        // triggers the materializer; canonical history is the assertion
4339        // surface — exactly the same code path that
4340        // `SessionServiceProjectionSink::signal_turn_completed` invokes via
4341        // `runtime.append_realtime_transcript_event` in production.
4342        let mut session = Session::new();
4343
4344        // Provider-arrival order: display first, then spoken.
4345        let display_a = RealtimeTranscriptEvent::AssistantTextDelta {
4346            response_id: "resp_mixed_1".to_string(),
4347            delta_id: "delta_disp_1".to_string(),
4348            item_id: "item_display".to_string(),
4349            previous_item_id: None,
4350            content_index: 0,
4351            delta: "Here's the report:".to_string(),
4352        };
4353        assert!(
4354            session
4355                .append_realtime_transcript_event(display_a)
4356                .is_inert()
4357        );
4358
4359        let display_b = RealtimeTranscriptEvent::AssistantTextDelta {
4360            response_id: "resp_mixed_1".to_string(),
4361            delta_id: "delta_disp_2".to_string(),
4362            item_id: "item_display".to_string(),
4363            previous_item_id: None,
4364            content_index: 0,
4365            delta: " (still writing)".to_string(),
4366        };
4367        assert!(
4368            session
4369                .append_realtime_transcript_event(display_b)
4370                .is_inert()
4371        );
4372
4373        // Spoken items chain after the display item to mirror provider
4374        // arrival semantics — `previous_item_id` carries arrival ordering
4375        // that the materializer must preserve as block ordering inside the
4376        // single emitted message.
4377        let spoken_a = RealtimeTranscriptEvent::AssistantTranscriptDelta {
4378            response_id: "resp_mixed_1".to_string(),
4379            delta_id: "delta_spoken_1".to_string(),
4380            item_id: "item_spoken".to_string(),
4381            previous_item_id: Some("item_display".to_string()),
4382            content_index: 0,
4383            delta: "I'm reading the report aloud:".to_string(),
4384        };
4385        assert!(
4386            session
4387                .append_realtime_transcript_event(spoken_a)
4388                .is_inert()
4389        );
4390
4391        let spoken_b = RealtimeTranscriptEvent::AssistantTranscriptDelta {
4392            response_id: "resp_mixed_1".to_string(),
4393            delta_id: "delta_spoken_2".to_string(),
4394            item_id: "item_spoken".to_string(),
4395            previous_item_id: Some("item_display".to_string()),
4396            content_index: 0,
4397            delta: " sentence two.".to_string(),
4398        };
4399        assert!(
4400            session
4401                .append_realtime_transcript_event(spoken_b)
4402                .is_inert()
4403        );
4404
4405        // TurnCompleted triggers the materializer to flush all staged items
4406        // for this response_id into ONE BlockAssistant message.
4407        let outcome = session.append_realtime_transcript_event(
4408            RealtimeTranscriptEvent::AssistantTurnCompleted {
4409                response_id: "resp_mixed_1".to_string(),
4410                stop_reason: StopReason::EndTurn,
4411                usage: Usage {
4412                    input_tokens: 11,
4413                    output_tokens: 22,
4414                    cache_creation_tokens: None,
4415                    cache_read_tokens: None,
4416                },
4417            },
4418        );
4419        // Materializer reports two staged items got materialized.
4420        assert_eq!(outcome.materialized_messages.len(), 2);
4421
4422        // Canonical history MUST contain exactly ONE BlockAssistant message
4423        // (the CC7 fix: mixed lanes interleave into one message, not two).
4424        let messages = session.messages();
4425        let assistants: Vec<&BlockAssistantMessage> = messages
4426            .iter()
4427            .filter_map(|m| match m {
4428                Message::BlockAssistant(a) => Some(a),
4429                _ => None,
4430            })
4431            .collect();
4432        assert_eq!(
4433            assistants.len(),
4434            1,
4435            "mixed display+spoken response under one response_id must produce exactly ONE BlockAssistant message, got: {assistants:?}"
4436        );
4437        let assistant = assistants[0];
4438        assert_eq!(
4439            assistant.blocks.len(),
4440            2,
4441            "mixed response message must carry both blocks: {:?}",
4442            assistant.blocks
4443        );
4444
4445        // Block 0: display-text (concatenated deltas).
4446        match &assistant.blocks[0] {
4447            AssistantBlock::Text { text, .. } => {
4448                assert_eq!(text, "Here's the report: (still writing)");
4449            }
4450            other => unreachable!(
4451                "first block must be AssistantBlock::Text (display lane), got {other:?}"
4452            ),
4453        }
4454        // Block 1: spoken transcript (concatenated deltas), tagged Spoken.
4455        match &assistant.blocks[1] {
4456            AssistantBlock::Transcript { text, source, .. } => {
4457                assert_eq!(text, "I'm reading the report aloud: sentence two.");
4458                assert_eq!(*source, crate::types::TranscriptSource::Spoken);
4459            }
4460            other => unreachable!(
4461                "second block must be AssistantBlock::Transcript {{ source: Spoken }}, got {other:?}"
4462            ),
4463        }
4464
4465        // Usage was recorded once for the turn.
4466        assert_eq!(session.usage.input_tokens, 11);
4467        assert_eq!(session.usage.output_tokens, 22);
4468    }
4469
4470    #[test]
4471    fn round5_r55_mixed_response_barge_in_preserves_display_drops_spoken() {
4472        // R5-5 (Round-5 contract update): barge-in MUST filter staged items
4473        // by lane — `Spoken` is invalidated (the user spoke over the audio
4474        // they were hearing) but `Display` survives as committed history
4475        // (sideband display text from the same response is not "spoken
4476        // over"). Round-4's `round4_cc7_mixed_response_barge_in_discards_*`
4477        // pinned the wrong invariant; this test replaces it.
4478        //
4479        // Architectural decision: `AssistantTurnInterrupted` is terminal for
4480        // the response on the realtime-staging path — any later
4481        // `AssistantTurnCompleted { stop_reason: Cancelled }` short-circuits
4482        // via the `discarded_assistant_response_ids` guard. So the
4483        // Interrupted handler must seed a synthetic
4484        // `assistant_completions` entry (`StopReason::Cancelled`,
4485        // `Usage::default()`) so retained Display items materialize
4486        // immediately rather than stranding forever.
4487        let mut session = Session::new();
4488
4489        let display = RealtimeTranscriptEvent::AssistantTextDelta {
4490            response_id: "resp_mixed_2".to_string(),
4491            delta_id: "delta_disp_1".to_string(),
4492            item_id: "item_display_2".to_string(),
4493            previous_item_id: None,
4494            content_index: 0,
4495            delta: "Working on the report...".to_string(),
4496        };
4497        let _ = session.append_realtime_transcript_event(display);
4498
4499        let spoken = RealtimeTranscriptEvent::AssistantTranscriptDelta {
4500            response_id: "resp_mixed_2".to_string(),
4501            delta_id: "delta_spoken_1".to_string(),
4502            item_id: "item_spoken_2".to_string(),
4503            previous_item_id: Some("item_display_2".to_string()),
4504            content_index: 0,
4505            delta: "I'm reading the report".to_string(),
4506        };
4507        let _ = session.append_realtime_transcript_event(spoken);
4508
4509        // Barge-in arrives BEFORE TurnCompleted. The Display item with
4510        // staged content materializes immediately under the synthetic
4511        // Cancelled completion.
4512        let outcome = session.append_realtime_transcript_event(
4513            RealtimeTranscriptEvent::AssistantTurnInterrupted {
4514                response_id: "resp_mixed_2".to_string(),
4515            },
4516        );
4517        assert_eq!(
4518            outcome.materialized_messages.len(),
4519            1,
4520            "Display lane item must materialize on Interrupted: {outcome:?}"
4521        );
4522
4523        // A late `AssistantTurnCompleted` (the provider's response.done
4524        // emitted after cancel) must be a no-op: the Display item is
4525        // already materialized; the Spoken item was dropped at Interrupted.
4526        let late_completion = session.append_realtime_transcript_event(
4527            RealtimeTranscriptEvent::AssistantTurnCompleted {
4528                response_id: "resp_mixed_2".to_string(),
4529                stop_reason: StopReason::Cancelled,
4530                usage: Usage::default(),
4531            },
4532        );
4533        assert_eq!(
4534            late_completion.materialized_messages.len(),
4535            0,
4536            "post-barge-in TurnCompleted must not resurrect anything"
4537        );
4538
4539        // Canonical history: exactly one BlockAssistant carrying the
4540        // Display text (no Transcript block — Spoken was dropped).
4541        let messages = session.messages();
4542        let assistants: Vec<&BlockAssistantMessage> = messages
4543            .iter()
4544            .filter_map(|m| match m {
4545                Message::BlockAssistant(a) => Some(a),
4546                _ => None,
4547            })
4548            .collect();
4549        assert_eq!(
4550            assistants.len(),
4551            1,
4552            "barge-in must commit exactly one BlockAssistant containing the Display lane: {assistants:?}"
4553        );
4554        let assistant = assistants[0];
4555        assert_eq!(assistant.blocks.len(), 1, "blocks: {:?}", assistant.blocks);
4556        match &assistant.blocks[0] {
4557            AssistantBlock::Text { text, .. } => {
4558                assert_eq!(text, "Working on the report...");
4559            }
4560            other => {
4561                unreachable!("Display lane must materialize as AssistantBlock::Text, got {other:?}")
4562            }
4563        }
4564        // No Transcript block — Spoken lane was dropped.
4565        assert!(
4566            !assistant
4567                .blocks
4568                .iter()
4569                .any(|b| matches!(b, AssistantBlock::Transcript { .. })),
4570            "Spoken lane must be dropped on barge-in"
4571        );
4572
4573        // The in-flight tracker reports the response as no longer in flight
4574        // (the Display item is materialized; the Spoken item is skipped).
4575        assert!(
4576            !session
4577                .in_flight_realtime_assistant_response_ids()
4578                .contains(&"resp_mixed_2".to_string()),
4579            "barged-in response must not appear in in_flight_realtime_assistant_response_ids"
4580        );
4581    }
4582
4583    #[test]
4584    fn round5_r55_barge_in_preserves_display_lane_drops_spoken() {
4585        // R5-5 unit test: pin the lane-filter behavior at the staged-item
4586        // level (no chained predecessor). One Display item, one Spoken item,
4587        // both unchained, both staged before Interrupted.
4588        let mut session = Session::new();
4589
4590        let _ =
4591            session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
4592                response_id: "resp_a".to_string(),
4593                delta_id: "delta_d_1".to_string(),
4594                item_id: "item_display".to_string(),
4595                previous_item_id: None,
4596                content_index: 0,
4597                delta: "display-text".to_string(),
4598            });
4599        let _ = session.append_realtime_transcript_event(
4600            RealtimeTranscriptEvent::AssistantTranscriptDelta {
4601                response_id: "resp_a".to_string(),
4602                delta_id: "delta_s_1".to_string(),
4603                item_id: "item_spoken".to_string(),
4604                previous_item_id: None,
4605                content_index: 0,
4606                delta: "spoken-transcript".to_string(),
4607            },
4608        );
4609
4610        let outcome = session.append_realtime_transcript_event(
4611            RealtimeTranscriptEvent::AssistantTurnInterrupted {
4612                response_id: "resp_a".to_string(),
4613            },
4614        );
4615        // Display materializes, Spoken does not.
4616        assert_eq!(outcome.materialized_messages.len(), 1);
4617
4618        let messages = session.messages();
4619        let assistants: Vec<&BlockAssistantMessage> = messages
4620            .iter()
4621            .filter_map(|m| match m {
4622                Message::BlockAssistant(a) => Some(a),
4623                _ => None,
4624            })
4625            .collect();
4626        assert_eq!(assistants.len(), 1);
4627        // Single Text block (the Display lane) — no Transcript.
4628        assert_eq!(assistants[0].blocks.len(), 1);
4629        match &assistants[0].blocks[0] {
4630            AssistantBlock::Text { text, .. } => assert_eq!(text, "display-text"),
4631            other => unreachable!("expected Text, got {other:?}"),
4632        }
4633    }
4634
4635    #[test]
4636    fn round5_r55_barge_in_finalizes_retained_display_into_committed_block() {
4637        // R5-5: the architectural decision — Interrupted is terminal for the
4638        // response. Display lane must commit at Interrupted time, not wait
4639        // on a hypothetical AssistantTurnCompleted that may never arrive
4640        // (or arrives Cancelled and short-circuits).
4641        let mut session = Session::new();
4642
4643        let _ =
4644            session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
4645                response_id: "resp_a".to_string(),
4646                delta_id: "delta_d_1".to_string(),
4647                item_id: "item_display".to_string(),
4648                previous_item_id: None,
4649                content_index: 0,
4650                delta: "committed-display-text".to_string(),
4651            });
4652
4653        // Pre-condition: nothing committed yet.
4654        assert!(session.messages().is_empty());
4655
4656        let outcome = session.append_realtime_transcript_event(
4657            RealtimeTranscriptEvent::AssistantTurnInterrupted {
4658                response_id: "resp_a".to_string(),
4659            },
4660        );
4661        assert_eq!(
4662            outcome.materialized_messages.len(),
4663            1,
4664            "Interrupted must finalize retained Display lane immediately"
4665        );
4666
4667        // Post-condition: BlockAssistant in canonical history, no Transcript.
4668        let messages = session.messages();
4669        assert_eq!(messages.len(), 1);
4670        match &messages[0] {
4671            Message::BlockAssistant(assistant) => {
4672                assert_eq!(assistant.blocks.len(), 1);
4673                match &assistant.blocks[0] {
4674                    AssistantBlock::Text { text, .. } => {
4675                        assert_eq!(text, "committed-display-text");
4676                    }
4677                    other => unreachable!("expected Text, got {other:?}"),
4678                }
4679            }
4680            other => unreachable!("expected BlockAssistant, got {other:?}"),
4681        }
4682    }
4683
4684    #[test]
4685    fn round5_r56_truncation_promotes_default_lane_item_to_spoken() {
4686        // R5-6: when truncation is the first content-bearing event for an
4687        // item (no prior delta), the staged item's lane MUST be promoted to
4688        // Spoken so the materializer commits as `AssistantBlock::Transcript`.
4689        // Without the explicit promotion, the lane stays `Display` (the
4690        // default) and the heard audio transcript persists as
4691        // `AssistantBlock::Text`.
4692        let mut session = Session::new();
4693
4694        let _ = session.append_realtime_transcript_event(
4695            RealtimeTranscriptEvent::AssistantTranscriptTruncated {
4696                response_id: "resp_a".to_string(),
4697                item_id: "item_a".to_string(),
4698                content_index: 0,
4699                text: "what was actually heard".to_string(),
4700            },
4701        );
4702
4703        let outcome = session.append_realtime_transcript_event(
4704            RealtimeTranscriptEvent::AssistantTurnCompleted {
4705                response_id: "resp_a".to_string(),
4706                stop_reason: StopReason::EndTurn,
4707                usage: Usage::default(),
4708            },
4709        );
4710        assert_eq!(outcome.materialized_messages.len(), 1);
4711
4712        assert_eq!(session.messages().len(), 1);
4713        match &session.messages()[0] {
4714            Message::BlockAssistant(assistant) => {
4715                assert_eq!(assistant.blocks.len(), 1);
4716                match &assistant.blocks[0] {
4717                    AssistantBlock::Transcript { text, source, .. } => {
4718                        assert_eq!(text, "what was actually heard");
4719                        assert_eq!(*source, crate::types::TranscriptSource::Spoken);
4720                    }
4721                    other => unreachable!(
4722                        "truncation-only path must materialize as AssistantBlock::Transcript, got {other:?}"
4723                    ),
4724                }
4725            }
4726            other => unreachable!("expected BlockAssistant, got {other:?}"),
4727        }
4728    }
4729
4730    #[test]
4731    fn round5_r56_truncation_after_display_delta_is_no_op_keeping_display_content() {
4732        // R5-6 edge case: a Display delta arrived first and staged Display
4733        // content; a truncation event arrives for the SAME item id
4734        // (provider bug — truncation only applies to spoken/audio output).
4735        // Contract: the staged Display content must NOT be clobbered by
4736        // the truncation text. `promote_item_lane` keeps the existing
4737        // Display lane and emits a `tracing::warn!`; the truncation arm
4738        // sees the lane stayed Display and skips the segment-write.
4739        let mut session = Session::new();
4740
4741        let _ =
4742            session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
4743                response_id: "resp_a".to_string(),
4744                delta_id: "delta_d_1".to_string(),
4745                item_id: "item_a".to_string(),
4746                previous_item_id: None,
4747                content_index: 0,
4748                delta: "display-text-from-delta".to_string(),
4749            });
4750
4751        let _ = session.append_realtime_transcript_event(
4752            RealtimeTranscriptEvent::AssistantTranscriptTruncated {
4753                response_id: "resp_a".to_string(),
4754                item_id: "item_a".to_string(),
4755                content_index: 0,
4756                text: "spoken-truncation-text".to_string(),
4757            },
4758        );
4759
4760        let _ = session.append_realtime_transcript_event(
4761            RealtimeTranscriptEvent::AssistantTurnCompleted {
4762                response_id: "resp_a".to_string(),
4763                stop_reason: StopReason::EndTurn,
4764                usage: Usage::default(),
4765            },
4766        );
4767
4768        // Display content survives unchanged — the truncation text was
4769        // refused. Materializes as `AssistantBlock::Text` (Display lane).
4770        assert_eq!(session.messages().len(), 1);
4771        match &session.messages()[0] {
4772            Message::BlockAssistant(assistant) => {
4773                assert_eq!(assistant.blocks.len(), 1);
4774                match &assistant.blocks[0] {
4775                    AssistantBlock::Text { text, .. } => {
4776                        assert_eq!(text, "display-text-from-delta");
4777                    }
4778                    other => unreachable!(
4779                        "Display content must survive misrouted truncation, got {other:?}"
4780                    ),
4781                }
4782            }
4783            other => unreachable!("expected BlockAssistant, got {other:?}"),
4784        }
4785    }
4786
4787    /// R5-6 sibling: a Spoken-classified item (transcript-truncation
4788    /// arrived first and locked the lane to Spoken) must reject a later
4789    /// `AssistantTextDelta` rather than silently appending the Display
4790    /// text into the Spoken-locked content_segment. Pre-fix the delta
4791    /// arm called `promote_item_lane` and unconditionally pushed the
4792    /// delta — clobbering the lane invariant. Post-fix the delta is
4793    /// dropped (warn fires) and the Spoken-truncation text survives.
4794    #[test]
4795    fn round5_r56_sibling_display_delta_skipped_on_spoken_item() {
4796        let mut session = Session::new();
4797
4798        // Truncation arrives first and locks the item to the Spoken lane.
4799        let _ = session.append_realtime_transcript_event(
4800            RealtimeTranscriptEvent::AssistantTranscriptTruncated {
4801                response_id: "resp_a".to_string(),
4802                item_id: "item_a".to_string(),
4803                content_index: 0,
4804                text: "what was actually heard".to_string(),
4805            },
4806        );
4807
4808        // A Display delta arrives later for the SAME item id (provider
4809        // lane-classification bug). It MUST be dropped.
4810        let _ =
4811            session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
4812                response_id: "resp_a".to_string(),
4813                delta_id: "delta_d_1".to_string(),
4814                item_id: "item_a".to_string(),
4815                previous_item_id: None,
4816                content_index: 0,
4817                delta: "should-not-appear".to_string(),
4818            });
4819
4820        let _ = session.append_realtime_transcript_event(
4821            RealtimeTranscriptEvent::AssistantTurnCompleted {
4822                response_id: "resp_a".to_string(),
4823                stop_reason: StopReason::EndTurn,
4824                usage: Usage::default(),
4825            },
4826        );
4827
4828        // The Spoken-truncation text survives intact; no Display text
4829        // leaked into the Spoken lane content.
4830        assert_eq!(session.messages().len(), 1);
4831        match &session.messages()[0] {
4832            Message::BlockAssistant(assistant) => {
4833                assert_eq!(assistant.blocks.len(), 1);
4834                match &assistant.blocks[0] {
4835                    AssistantBlock::Transcript { text, source, .. } => {
4836                        assert_eq!(text, "what was actually heard");
4837                        assert_eq!(*source, crate::types::TranscriptSource::Spoken);
4838                    }
4839                    other => unreachable!(
4840                        "Spoken-locked item must materialize as Transcript, got {other:?}"
4841                    ),
4842                }
4843            }
4844            other => unreachable!("expected BlockAssistant, got {other:?}"),
4845        }
4846    }
4847
4848    /// R5-6 sibling: a Display-classified item (a Display delta arrived
4849    /// first and locked the lane to Display) must reject a later
4850    /// `AssistantTranscriptDelta` rather than appending the Spoken text
4851    /// into the Display-locked content_segment. Pre-fix the transcript
4852    /// delta arm called `promote_item_lane` and unconditionally pushed —
4853    /// silently mixing a Spoken stream into a Display block.
4854    #[test]
4855    fn round5_r56_sibling_spoken_delta_skipped_on_display_item() {
4856        let mut session = Session::new();
4857
4858        // Display delta arrives first and locks the item to the Display lane.
4859        let _ =
4860            session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
4861                response_id: "resp_a".to_string(),
4862                delta_id: "delta_d_1".to_string(),
4863                item_id: "item_a".to_string(),
4864                previous_item_id: None,
4865                content_index: 0,
4866                delta: "display-locked-text".to_string(),
4867            });
4868
4869        // A spoken-transcript delta arrives later for the SAME item id
4870        // (provider lane-classification bug). It MUST be dropped.
4871        let _ = session.append_realtime_transcript_event(
4872            RealtimeTranscriptEvent::AssistantTranscriptDelta {
4873                response_id: "resp_a".to_string(),
4874                delta_id: "delta_s_1".to_string(),
4875                item_id: "item_a".to_string(),
4876                previous_item_id: None,
4877                content_index: 0,
4878                delta: "should-not-appear".to_string(),
4879            },
4880        );
4881
4882        let _ = session.append_realtime_transcript_event(
4883            RealtimeTranscriptEvent::AssistantTurnCompleted {
4884                response_id: "resp_a".to_string(),
4885                stop_reason: StopReason::EndTurn,
4886                usage: Usage::default(),
4887            },
4888        );
4889
4890        // The Display text survives intact; no Spoken text leaked in.
4891        assert_eq!(session.messages().len(), 1);
4892        match &session.messages()[0] {
4893            Message::BlockAssistant(assistant) => {
4894                assert_eq!(assistant.blocks.len(), 1);
4895                match &assistant.blocks[0] {
4896                    AssistantBlock::Text { text, .. } => {
4897                        assert_eq!(text, "display-locked-text");
4898                    }
4899                    other => {
4900                        unreachable!("Display-locked item must materialize as Text, got {other:?}")
4901                    }
4902                }
4903            }
4904            other => unreachable!("expected BlockAssistant, got {other:?}"),
4905        }
4906    }
4907
4908    /// R5-7: a late `AssistantTranscriptFinalText` arriving AFTER
4909    /// `AssistantTurnCompleted` already materialized the item must NOT
4910    /// mutate `content_segments` and must NOT rewrite the canonical
4911    /// `Message::BlockAssistant` (append-only history is a stronger
4912    /// invariant than typed text repair). The committed message keeps
4913    /// the delta-accumulated text; the late final is dropped with a
4914    /// warn; the materializer outcome is inert (no new messages).
4915    #[test]
4916    fn round5_r57_late_final_text_after_turn_completed_warns_and_skips() {
4917        let mut session = Session::new();
4918
4919        // Delta accumulates partial text on the Spoken lane.
4920        let _ = session.append_realtime_transcript_event(
4921            RealtimeTranscriptEvent::AssistantTranscriptDelta {
4922                response_id: "resp_a".to_string(),
4923                delta_id: "delta_s_1".to_string(),
4924                item_id: "item_a".to_string(),
4925                previous_item_id: None,
4926                content_index: 0,
4927                delta: "delta-accumulated".to_string(),
4928            },
4929        );
4930
4931        // TurnCompleted materializes the item with the delta-accumulated text.
4932        let commit_outcome = session.append_realtime_transcript_event(
4933            RealtimeTranscriptEvent::AssistantTurnCompleted {
4934                response_id: "resp_a".to_string(),
4935                stop_reason: StopReason::EndTurn,
4936                usage: Usage::default(),
4937            },
4938        );
4939        assert_eq!(commit_outcome.materialized_messages.len(), 1);
4940
4941        // Late FinalText arrives — provider-side ordering bug. It MUST
4942        // be dropped: no canonical message rewrite, no segment mutation,
4943        // outcome is inert.
4944        let late_outcome = session.append_realtime_transcript_event(
4945            RealtimeTranscriptEvent::AssistantTranscriptFinalText {
4946                response_id: "resp_a".to_string(),
4947                item_id: "item_a".to_string(),
4948                content_index: 0,
4949                text: "authoritative-final-that-must-not-land".to_string(),
4950            },
4951        );
4952        assert!(
4953            late_outcome.is_inert(),
4954            "late FinalText after materialization must produce inert outcome"
4955        );
4956
4957        // Canonical history: still one message with the original
4958        // delta-accumulated text — NOT the authoritative final.
4959        assert_eq!(session.messages().len(), 1);
4960        match &session.messages()[0] {
4961            Message::BlockAssistant(assistant) => {
4962                assert_eq!(assistant.blocks.len(), 1);
4963                match &assistant.blocks[0] {
4964                    AssistantBlock::Transcript { text, .. } => {
4965                        assert_eq!(
4966                            text, "delta-accumulated",
4967                            "canonical message must preserve delta-accumulated text; \
4968                             append-only history forbids late FinalText repair"
4969                        );
4970                    }
4971                    other => unreachable!("expected Transcript, got {other:?}"),
4972                }
4973            }
4974            other => unreachable!("expected BlockAssistant, got {other:?}"),
4975        }
4976    }
4977}