Skip to main content

meerkat_core/
session.rs

1//! Session management for Meerkat
2//!
3//! A session represents a conversation history that can be persisted and resumed.
4//!
5//! # Performance
6//!
7//! Sessions use Arc-based copy-on-write for message storage:
8//! - `fork()` shares the message buffer (O(1), no clone)
9//! - Mutation (push) triggers CoW only when refcount > 1
10//! - `push_batch()` adds multiple messages with a single timestamp update
11
12use crate::Provider;
13use crate::peer_meta::PeerMeta;
14use crate::realtime_transcript::{
15    RealtimeTranscriptApplyOutcome, RealtimeTranscriptEvent, RealtimeTranscriptMaterializedMessage,
16    RealtimeTranscriptRole, SESSION_REALTIME_TRANSCRIPT_STATE_KEY, TranscriptLane,
17};
18use crate::service::{AppendSystemContextRequest, MobToolAuthorityContext};
19use crate::time_compat::SystemTime;
20use crate::tool_scope::ToolFilter;
21use crate::types::{
22    AssistantBlock, BlockAssistantMessage, ContentBlock, ContentInput, Message, SessionId,
23    StopReason, ToolDef, ToolProvenance, ToolResult, Usage, UserMessage,
24};
25use serde::{Deserialize, Deserializer, Serialize, Serializer};
26use sha2::{Digest, Sha256};
27use std::collections::{BTreeMap, BTreeSet, HashMap};
28use std::sync::Arc;
29
30/// Current session format version.
31///
32/// Version history:
33/// - v1 — pre-wave-c. `SessionMetadata.auth_binding` inner fields were
34///   untyped strings (`realm_id`, `binding_id`, `profile`); no per-entity
35///   schema version byte on `SessionMetadata`.
36/// - v2 — wave-c C-3. `AuthBindingRef` inner fields are typed
37///   `RealmId`/`BindingId`/`ProfileId` newtypes; `SessionMetadata` carries
38///   a `schema_version` byte. Opportunistic upgrade-on-read —
39///   `meerkat_session::persistent::migrations::migrate` rewrites v1 rows
40///   into v2 shape; the next `save()` persists v2.
41pub const SESSION_VERSION: u32 = 2;
42
43/// Current `SessionMetadata` schema version. Distinct from `SESSION_VERSION`
44/// so `SessionMetadata` can evolve independently of the Session envelope.
45///
46/// - v1 — pre-wave-c. Default on read for rows written before the byte
47///   was introduced.
48/// - v2 — wave-c C-3. Typed `AuthBindingRef` inner fields; any future
49///   `SessionMetadata`-local shape change bumps this without moving
50///   `SESSION_VERSION`.
51pub const SESSION_METADATA_SCHEMA_VERSION: u32 = 2;
52
53/// Typed transcript replacement used to create an edited fork.
54///
55/// Replacements never mutate the source session in place. The owning service
56/// applies this to a forked prefix, producing a new `SessionId`.
57#[derive(Debug, Clone, Serialize, Deserialize)]
58#[serde(tag = "type", rename_all = "snake_case")]
59pub enum TranscriptReplacement {
60    /// Replace the addressed message with a full canonical message.
61    Message { message: Message },
62    /// Replace one user-message content block.
63    UserContentBlock {
64        block_index: usize,
65        block: ContentBlock,
66    },
67    /// Replace one block in a block-assistant message.
68    AssistantBlock {
69        block_index: usize,
70        block: AssistantBlock,
71    },
72    /// Replace one content block inside one tool-result payload.
73    ToolResultContentBlock {
74        result_index: usize,
75        block_index: usize,
76        block: ContentBlock,
77    },
78}
79
80/// Session metadata key for the typed transcript revision graph head.
81pub const SESSION_TRANSCRIPT_HISTORY_STATE_KEY: &str = "session_transcript_history_state_v1";
82
83/// A concrete transcript span selected for same-session rewrite.
84#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
85#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
86#[serde(tag = "type", rename_all = "snake_case")]
87pub enum TranscriptRewriteSelection {
88    /// Replace messages in `[start, end)`.
89    MessageRange { start: usize, end: usize },
90}
91
92impl TranscriptRewriteSelection {
93    fn bounds(&self) -> (usize, usize) {
94        match self {
95            Self::MessageRange { start, end } => (*start, *end),
96        }
97    }
98}
99
100/// Audit annotation carried with a transcript rewrite commit.
101///
102/// The free-form kind is for review, debugging, and provenance. It is not a
103/// second policy authority; rewrite admission is enforced by the typed
104/// selection, digest, parent-revision, and store-guard contracts.
105#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
106#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
107#[serde(rename_all = "snake_case")]
108pub struct TranscriptRewriteReason {
109    pub kind: String,
110    #[serde(default, skip_serializing_if = "Option::is_none")]
111    pub note: Option<String>,
112}
113
114impl TranscriptRewriteReason {
115    pub fn new(kind: impl Into<String>) -> Self {
116        Self {
117            kind: kind.into(),
118            note: None,
119        }
120    }
121}
122
123/// Immutable rewrite commit that advances a session transcript head.
124#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
125#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
126#[serde(rename_all = "snake_case")]
127pub struct TranscriptRewriteCommit {
128    pub parent_revision: String,
129    pub revision: String,
130    pub selection: TranscriptRewriteSelection,
131    pub original_span_digest: String,
132    pub replacement_digest: String,
133    pub messages_before: usize,
134    pub messages_after: usize,
135    pub reason: TranscriptRewriteReason,
136    #[serde(default, skip_serializing_if = "Option::is_none")]
137    pub actor: Option<String>,
138    #[cfg_attr(feature = "schema", schemars(with = "SchemaSystemTime"))]
139    pub committed_at: SystemTime,
140}
141
142/// Immutable transcript revision body retained by the session-local graph.
143#[derive(Debug, Clone, Serialize, Deserialize)]
144#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
145#[serde(rename_all = "snake_case")]
146pub struct TranscriptRevisionBody {
147    pub revision: String,
148    #[serde(default, skip_serializing_if = "Option::is_none")]
149    pub parent_revision: Option<String>,
150    #[cfg_attr(feature = "schema", schemars(with = "Vec<serde_json::Value>"))]
151    pub messages: Vec<Message>,
152    #[cfg_attr(feature = "schema", schemars(with = "SchemaSystemTime"))]
153    pub created_at: SystemTime,
154}
155
156#[cfg(feature = "schema")]
157#[allow(dead_code)]
158#[derive(schemars::JsonSchema)]
159#[schemars(rename = "SystemTime")]
160struct SchemaSystemTime {
161    secs_since_epoch: u64,
162    nanos_since_epoch: u32,
163}
164
165/// Self-contained append-only transcript rewrite record.
166#[derive(Debug, Clone, Serialize, Deserialize)]
167#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
168#[serde(rename_all = "snake_case")]
169pub struct TranscriptRewriteRecord {
170    pub commit: TranscriptRewriteCommit,
171    pub parent_body: TranscriptRevisionBody,
172    pub revision_body: TranscriptRevisionBody,
173}
174
175impl TranscriptRewriteRecord {
176    pub fn new(
177        commit: TranscriptRewriteCommit,
178        parent_body: TranscriptRevisionBody,
179        revision_body: TranscriptRevisionBody,
180    ) -> Result<Self, TranscriptEditError> {
181        validate_transcript_rewrite_record(&commit, &parent_body, &revision_body)?;
182        Ok(Self {
183            commit,
184            parent_body,
185            revision_body,
186        })
187    }
188}
189
190/// Typed session-local transcript revision graph state.
191#[derive(Debug, Clone, Serialize, Deserialize)]
192#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
193#[serde(rename_all = "snake_case")]
194pub struct TranscriptHistoryState {
195    pub head: String,
196    #[serde(default, skip_serializing_if = "Vec::is_empty")]
197    pub commits: Vec<TranscriptRewriteCommit>,
198    #[serde(default, skip_serializing_if = "Vec::is_empty")]
199    pub revisions: Vec<TranscriptRevisionBody>,
200}
201
202impl TranscriptHistoryState {
203    /// Rebuild transcript revision graph state from append-only rewrite records.
204    pub fn from_rewrite_records<I>(records: I) -> Result<Option<Self>, TranscriptEditError>
205    where
206        I: IntoIterator<Item = TranscriptRewriteRecord>,
207    {
208        let mut state: Option<Self> = None;
209        for record in records {
210            validate_transcript_rewrite_record(
211                &record.commit,
212                &record.parent_body,
213                &record.revision_body,
214            )?;
215            let state = state.get_or_insert_with(|| Self {
216                head: record.commit.parent_revision.clone(),
217                commits: Vec::new(),
218                revisions: Vec::new(),
219            });
220            if record.commit.parent_revision != state.head {
221                if revision_body_extends_head(&record.parent_body, &state.revisions, &state.head)? {
222                    state.head = record.commit.parent_revision.clone();
223                } else {
224                    return Err(TranscriptEditError::HistoryStateMalformed(format!(
225                        "rewrite record parent {} does not extend transcript head {}",
226                        record.commit.parent_revision, state.head
227                    )));
228                }
229            }
230            if !state
231                .revisions
232                .iter()
233                .any(|body| body.revision == record.parent_body.revision)
234            {
235                state.revisions.push(record.parent_body);
236            }
237            if !state
238                .revisions
239                .iter()
240                .any(|body| body.revision == record.revision_body.revision)
241            {
242                state.revisions.push(record.revision_body);
243            }
244            state.head = record.commit.revision.clone();
245            state.commits.push(record.commit);
246        }
247        Ok(state)
248    }
249}
250
251/// Invalid typed transcript edit request.
252#[derive(Debug, Clone, thiserror::Error)]
253pub enum TranscriptEditError {
254    #[error("message index {message_index} out of bounds for {message_count} messages")]
255    MessageIndexOutOfBounds {
256        message_index: usize,
257        message_count: usize,
258    },
259    #[error("{block_kind} index {block_index} out of bounds for {block_count} blocks")]
260    BlockIndexOutOfBounds {
261        block_kind: &'static str,
262        block_index: usize,
263        block_count: usize,
264    },
265    #[error("replacement expected {expected} at message index {message_index}, found {actual}")]
266    MessageRoleMismatch {
267        message_index: usize,
268        expected: &'static str,
269        actual: &'static str,
270    },
271    #[error("invalid transcript rewrite range {start}..{end} for {message_count} messages")]
272    InvalidRewriteRange {
273        start: usize,
274        end: usize,
275        message_count: usize,
276    },
277    #[error("transcript rewrite does not change transcript revision {revision}")]
278    NoOpRewrite { revision: String },
279    #[error("transcript rewrite parent revision mismatch: expected {expected}, actual {actual}")]
280    RevisionConflict { expected: String, actual: String },
281    #[error("transcript history state is malformed: {0}")]
282    HistoryStateMalformed(String),
283    #[error("invalid transcript shape after rewrite: {0}")]
284    InvalidTranscriptShape(String),
285}
286
287fn message_role_name(message: &Message) -> &'static str {
288    match message {
289        Message::System(_) => "system",
290        Message::SystemNotice(_) => "system_notice",
291        Message::User(_) => "user",
292        Message::Assistant(_) => "assistant",
293        Message::BlockAssistant(_) => "block_assistant",
294        Message::ToolResults { .. } => "tool_results",
295    }
296}
297
298fn assistant_tool_use_ids(message: &Message) -> Vec<&str> {
299    match message {
300        Message::Assistant(assistant) => assistant
301            .tool_calls
302            .iter()
303            .map(|tool_call| tool_call.id.as_str())
304            .collect(),
305        Message::BlockAssistant(assistant) => assistant
306            .blocks
307            .iter()
308            .filter_map(|block| match block {
309                AssistantBlock::ToolUse { id, .. } => Some(id.as_str()),
310                _ => None,
311            })
312            .collect(),
313        _ => Vec::new(),
314    }
315}
316
317fn validate_transcript_tool_result_shape(messages: &[Message]) -> Result<(), TranscriptEditError> {
318    for (index, message) in messages.iter().enumerate() {
319        if let Message::ToolResults { results, .. } = message {
320            let Some(previous) = index
321                .checked_sub(1)
322                .and_then(|previous| messages.get(previous))
323            else {
324                return Err(TranscriptEditError::InvalidTranscriptShape(format!(
325                    "tool_results at message {index} has no preceding assistant tool-use message"
326                )));
327            };
328            let expected = assistant_tool_use_ids(previous);
329            if expected.is_empty() {
330                return Err(TranscriptEditError::InvalidTranscriptShape(format!(
331                    "tool_results at message {index} follows {}, not an assistant tool-use message",
332                    message_role_name(previous)
333                )));
334            }
335            let actual = results
336                .iter()
337                .map(|result| result.tool_use_id.as_str())
338                .collect::<Vec<_>>();
339            let actual_set = actual.iter().copied().collect::<BTreeSet<_>>();
340            let expected_set = expected.iter().copied().collect::<BTreeSet<_>>();
341            if actual.len() != actual_set.len() {
342                return Err(TranscriptEditError::InvalidTranscriptShape(format!(
343                    "tool_results at message {index} contains duplicate tool ids"
344                )));
345            }
346            if expected.len() != expected_set.len() {
347                return Err(TranscriptEditError::InvalidTranscriptShape(format!(
348                    "assistant tool-use message before tool_results at message {index} contains duplicate tool ids"
349                )));
350            }
351            if actual_set != expected_set {
352                return Err(TranscriptEditError::InvalidTranscriptShape(format!(
353                    "tool_results at message {index} resolve tool ids {actual_set:?}, expected {expected_set:?}"
354                )));
355            }
356        }
357
358        let tool_use_ids = assistant_tool_use_ids(message);
359        if tool_use_ids.is_empty() {
360            continue;
361        }
362        let Some(next) = messages.get(index + 1) else {
363            return Err(TranscriptEditError::InvalidTranscriptShape(format!(
364                "assistant tool-use message {index} has no following tool_results"
365            )));
366        };
367        if !matches!(next, Message::ToolResults { .. }) {
368            return Err(TranscriptEditError::InvalidTranscriptShape(format!(
369                "assistant tool-use message {index} is followed by {}, not tool_results",
370                message_role_name(next)
371            )));
372        }
373    }
374    Ok(())
375}
376
377pub fn transcript_messages_digest(messages: &[Message]) -> Result<String, serde_json::Error> {
378    sha256_json_digest(messages)
379}
380
381fn validate_transcript_rewrite_record(
382    commit: &TranscriptRewriteCommit,
383    parent_body: &TranscriptRevisionBody,
384    revision_body: &TranscriptRevisionBody,
385) -> Result<(), TranscriptEditError> {
386    if parent_body.revision != commit.parent_revision {
387        return Err(TranscriptEditError::HistoryStateMalformed(format!(
388            "parent body revision {} does not match commit parent {}",
389            parent_body.revision, commit.parent_revision
390        )));
391    }
392    if revision_body.revision != commit.revision {
393        return Err(TranscriptEditError::HistoryStateMalformed(format!(
394            "revision body {} does not match commit revision {}",
395            revision_body.revision, commit.revision
396        )));
397    }
398    if commit.parent_revision == commit.revision {
399        return Err(TranscriptEditError::NoOpRewrite {
400            revision: commit.revision.clone(),
401        });
402    }
403    let parent_digest = transcript_messages_digest(&parent_body.messages)
404        .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
405    if parent_digest != commit.parent_revision {
406        return Err(TranscriptEditError::HistoryStateMalformed(format!(
407            "parent body digest {parent_digest} does not match commit parent {}",
408            commit.parent_revision
409        )));
410    }
411    let revision_digest = transcript_messages_digest(&revision_body.messages)
412        .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
413    if revision_digest != commit.revision {
414        return Err(TranscriptEditError::HistoryStateMalformed(format!(
415            "revision body digest {revision_digest} does not match commit revision {}",
416            commit.revision
417        )));
418    }
419    let (start, end) = commit.selection.bounds();
420    if start > end || end > parent_body.messages.len() {
421        return Err(TranscriptEditError::InvalidRewriteRange {
422            start,
423            end,
424            message_count: parent_body.messages.len(),
425        });
426    }
427    if commit.messages_before != parent_body.messages.len()
428        || commit.messages_after != revision_body.messages.len()
429    {
430        return Err(TranscriptEditError::HistoryStateMalformed(format!(
431            "commit message counts {} -> {} do not match revision bodies {} -> {}",
432            commit.messages_before,
433            commit.messages_after,
434            parent_body.messages.len(),
435            revision_body.messages.len()
436        )));
437    }
438    let original_span_digest = sha256_json_digest(&parent_body.messages[start..end])
439        .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
440    if original_span_digest != commit.original_span_digest {
441        return Err(TranscriptEditError::HistoryStateMalformed(format!(
442            "original span digest {original_span_digest} does not match commit digest {}",
443            commit.original_span_digest
444        )));
445    }
446    let removed_len = end - start;
447    let retained_len = commit
448        .messages_before
449        .checked_sub(removed_len)
450        .ok_or_else(|| {
451            TranscriptEditError::HistoryStateMalformed(
452                "commit removed more messages than it recorded before rewrite".to_string(),
453            )
454        })?;
455    let replacement_len = commit
456        .messages_after
457        .checked_sub(retained_len)
458        .ok_or_else(|| {
459            TranscriptEditError::HistoryStateMalformed(
460                "commit message counts cannot describe a replacement span".to_string(),
461            )
462        })?;
463    let replacement_end = start.checked_add(replacement_len).ok_or_else(|| {
464        TranscriptEditError::HistoryStateMalformed("replacement span end overflowed".to_string())
465    })?;
466    if replacement_end > revision_body.messages.len() {
467        return Err(TranscriptEditError::InvalidRewriteRange {
468            start,
469            end: replacement_end,
470            message_count: revision_body.messages.len(),
471        });
472    }
473    let parent_prefix_digest = transcript_messages_digest(&parent_body.messages[..start])
474        .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
475    let revision_prefix_digest = transcript_messages_digest(&revision_body.messages[..start])
476        .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
477    if parent_prefix_digest != revision_prefix_digest {
478        return Err(TranscriptEditError::HistoryStateMalformed(
479            "rewrite revision changed messages before the selected span".to_string(),
480        ));
481    }
482    let parent_suffix_digest = transcript_messages_digest(&parent_body.messages[end..])
483        .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
484    let revision_suffix_digest =
485        transcript_messages_digest(&revision_body.messages[replacement_end..])
486            .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
487    if parent_suffix_digest != revision_suffix_digest {
488        return Err(TranscriptEditError::HistoryStateMalformed(
489            "rewrite revision changed messages after the selected span".to_string(),
490        ));
491    }
492    let replacement_digest = sha256_json_digest(&revision_body.messages[start..replacement_end])
493        .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
494    if replacement_digest != commit.replacement_digest {
495        return Err(TranscriptEditError::HistoryStateMalformed(format!(
496            "replacement span digest {replacement_digest} does not match commit digest {}",
497            commit.replacement_digest
498        )));
499    }
500    Ok(())
501}
502
503fn validate_transcript_history_state(
504    state: &TranscriptHistoryState,
505) -> Result<(), TranscriptEditError> {
506    if state
507        .revisions
508        .iter()
509        .all(|body| body.revision != state.head)
510    {
511        return Err(TranscriptEditError::HistoryStateMalformed(format!(
512            "missing transcript head body {}",
513            state.head
514        )));
515    }
516    for body in &state.revisions {
517        let digest = transcript_messages_digest(&body.messages)
518            .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
519        if digest != body.revision {
520            return Err(TranscriptEditError::HistoryStateMalformed(format!(
521                "transcript revision body {} has digest {digest}",
522                body.revision
523            )));
524        }
525    }
526    for commit in &state.commits {
527        let parent_body = state
528            .revisions
529            .iter()
530            .find(|body| body.revision == commit.parent_revision)
531            .ok_or_else(|| {
532                TranscriptEditError::HistoryStateMalformed(format!(
533                    "missing parent transcript body {}",
534                    commit.parent_revision
535                ))
536            })?;
537        let revision_body = state
538            .revisions
539            .iter()
540            .find(|body| body.revision == commit.revision)
541            .ok_or_else(|| {
542                TranscriptEditError::HistoryStateMalformed(format!(
543                    "missing transcript revision body {}",
544                    commit.revision
545                ))
546            })?;
547        validate_transcript_rewrite_record(commit, parent_body, revision_body)?;
548    }
549    let Some(first_commit) = state.commits.first() else {
550        return Ok(());
551    };
552    let mut expected_head = first_commit.parent_revision.clone();
553    for commit in &state.commits {
554        let parent_body = state
555            .revisions
556            .iter()
557            .find(|body| body.revision == commit.parent_revision)
558            .ok_or_else(|| {
559                TranscriptEditError::HistoryStateMalformed(format!(
560                    "missing parent transcript body {}",
561                    commit.parent_revision
562                ))
563            })?;
564        if commit.parent_revision != expected_head
565            && !revision_body_extends_head(parent_body, &state.revisions, &expected_head)?
566        {
567            return Err(TranscriptEditError::HistoryStateMalformed(format!(
568                "rewrite commit parent {} does not extend transcript head {}",
569                commit.parent_revision, expected_head
570            )));
571        }
572        expected_head = commit.revision.clone();
573    }
574    let mut cursor = state.head.clone();
575    while cursor != expected_head {
576        let Some(head_body) = state.revisions.iter().find(|body| body.revision == cursor) else {
577            break;
578        };
579        match head_body.parent_revision.as_deref() {
580            Some(parent) => cursor = parent.to_string(),
581            None => break,
582        }
583    }
584    if cursor != expected_head {
585        return Err(TranscriptEditError::HistoryStateMalformed(format!(
586            "transcript head {} does not extend the rewrite chain",
587            state.head
588        )));
589    }
590    Ok(())
591}
592
593fn revision_body_extends_head(
594    candidate: &TranscriptRevisionBody,
595    revisions: &[TranscriptRevisionBody],
596    head: &str,
597) -> Result<bool, TranscriptEditError> {
598    if candidate.parent_revision.as_deref() == Some(head) {
599        return Ok(true);
600    }
601    let Some(head_body) = revisions.iter().find(|body| body.revision == head) else {
602        return Ok(false);
603    };
604    if candidate.messages.len() < head_body.messages.len() {
605        return Ok(false);
606    }
607    let prefix_digest = transcript_messages_digest(&candidate.messages[..head_body.messages.len()])
608        .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
609    Ok(prefix_digest == head)
610}
611
612fn sha256_json_digest<T: Serialize + ?Sized>(value: &T) -> Result<String, serde_json::Error> {
613    let bytes = serde_json::to_vec(value)?;
614    let digest = Sha256::digest(bytes);
615    let mut out = String::with_capacity(digest.len() * 2);
616    const HEX: &[u8; 16] = b"0123456789abcdef";
617    for byte in digest {
618        out.push(HEX[(byte >> 4) as usize] as char);
619        out.push(HEX[(byte & 0x0f) as usize] as char);
620    }
621    Ok(format!("sha256:{out}"))
622}
623
624#[derive(Debug, Clone, Default, Serialize, Deserialize)]
625#[serde(rename_all = "snake_case")]
626struct SessionRealtimeTranscriptState {
627    #[serde(default)]
628    items: BTreeMap<String, RealtimeTranscriptItemState>,
629    #[serde(default)]
630    first_seen_order: Vec<String>,
631    #[serde(default)]
632    seen_delta_ids: BTreeSet<String>,
633    #[serde(default)]
634    assistant_completions: BTreeMap<String, RealtimeAssistantCompletion>,
635    #[serde(default, skip_serializing_if = "BTreeSet::is_empty")]
636    discarded_assistant_response_ids: BTreeSet<String>,
637}
638
639#[derive(Debug, Clone, Serialize, Deserialize)]
640#[serde(rename_all = "snake_case")]
641struct RealtimeTranscriptItemState {
642    role: RealtimeTranscriptRole,
643    #[serde(default)]
644    previous_item_id: Option<String>,
645    #[serde(default)]
646    response_id: Option<String>,
647    #[serde(default)]
648    content_segments: BTreeMap<u32, String>,
649    #[serde(default)]
650    skipped: bool,
651    #[serde(default)]
652    ready: bool,
653    #[serde(default)]
654    materialized: bool,
655    /// T9/T10: output lane this assistant item carries. `Display` is the
656    /// default (matches all pre-T10 sessions on disk); promoted to `Spoken`
657    /// the first time an [`RealtimeTranscriptEvent::AssistantTranscriptDelta`]
658    /// fragment arrives for the item. User-role items always carry
659    /// `Display` (the field is unused for user transcripts).
660    #[serde(default)]
661    lane: TranscriptLane,
662}
663
664impl RealtimeTranscriptItemState {
665    fn new(
666        role: RealtimeTranscriptRole,
667        previous_item_id: Option<String>,
668        response_id: Option<String>,
669    ) -> Self {
670        Self {
671            role,
672            previous_item_id,
673            response_id,
674            content_segments: BTreeMap::new(),
675            skipped: false,
676            ready: false,
677            materialized: false,
678            lane: TranscriptLane::Display,
679        }
680    }
681
682    fn skipped(previous_item_id: Option<String>) -> Self {
683        Self {
684            role: RealtimeTranscriptRole::Assistant,
685            previous_item_id,
686            response_id: None,
687            content_segments: BTreeMap::new(),
688            skipped: true,
689            ready: true,
690            materialized: false,
691            lane: TranscriptLane::Display,
692        }
693    }
694
695    fn text(&self) -> String {
696        self.content_segments.values().cloned().collect()
697    }
698}
699
700#[derive(Debug, Clone, Serialize, Deserialize)]
701#[serde(rename_all = "snake_case")]
702struct RealtimeAssistantCompletion {
703    stop_reason: StopReason,
704    usage: Usage,
705    usage_consumed: bool,
706}
707
708/// A conversation session with full history
709///
710/// Uses Arc<Vec<Message>> internally for efficient forking (copy-on-write).
711#[derive(Debug, Clone)]
712pub struct Session {
713    /// Format version for migrations
714    version: u32,
715    /// Unique identifier
716    id: SessionId,
717    /// All messages in order (Arc for CoW on fork)
718    pub(crate) messages: Arc<Vec<Message>>,
719    /// When the session was created
720    created_at: SystemTime,
721    /// When the session was last updated
722    updated_at: SystemTime,
723    /// Arbitrary metadata
724    metadata: serde_json::Map<String, serde_json::Value>,
725    /// Cumulative token usage across all LLM calls in this session
726    usage: Usage,
727}
728
729/// Serde helper for Session serialization (flattens Arc)
730#[derive(Serialize, Deserialize)]
731#[serde(rename_all = "snake_case")]
732struct SessionSerde {
733    #[serde(default = "default_version")]
734    version: u32,
735    id: SessionId,
736    messages: Vec<Message>,
737    created_at: SystemTime,
738    updated_at: SystemTime,
739    #[serde(default)]
740    metadata: serde_json::Map<String, serde_json::Value>,
741    #[serde(default)]
742    usage: Usage,
743}
744
745impl Serialize for Session {
746    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
747    where
748        S: Serializer,
749    {
750        let serde_repr = SessionSerde {
751            version: self.version,
752            id: self.id.clone(),
753            messages: (*self.messages).clone(),
754            created_at: self.created_at,
755            updated_at: self.updated_at,
756            metadata: self.metadata.clone(),
757            usage: self.usage.clone(),
758        };
759        serde_repr.serialize(serializer)
760    }
761}
762
763impl<'de> Deserialize<'de> for Session {
764    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
765    where
766        D: Deserializer<'de>,
767    {
768        let serde_repr = SessionSerde::deserialize(deserializer)?;
769        Ok(Session {
770            version: serde_repr.version,
771            id: serde_repr.id,
772            messages: Arc::new(serde_repr.messages),
773            created_at: serde_repr.created_at,
774            updated_at: serde_repr.updated_at,
775            metadata: serde_repr.metadata,
776            usage: serde_repr.usage,
777        })
778    }
779}
780
781fn default_version() -> u32 {
782    SESSION_VERSION
783}
784
785/// Metadata key used to store durable system-context control state.
786pub const SESSION_SYSTEM_CONTEXT_STATE_KEY: &str = "session_system_context_state";
787
788/// Metadata key used to store deferred-turn control state.
789pub const SESSION_DEFERRED_TURN_STATE_KEY: &str = "session_deferred_turn_state";
790
791/// Metadata key used to store recoverable build-only session state.
792pub const SESSION_BUILD_STATE_KEY: &str = "session_build_state";
793
794/// Metadata key used to store durable session-local tool visibility intent.
795pub const SESSION_TOOL_VISIBILITY_STATE_KEY: &str = "session_tool_visibility_state_v1";
796
797/// Canonical tool name gated by `image_tool_results` capability.
798pub const VIEW_IMAGE_TOOL_NAME: &str = "view_image";
799
800/// Canonical separator between appended runtime system-context blocks.
801pub const SYSTEM_CONTEXT_SEPARATOR: &str = "\n\n---\n\n";
802
803/// Durable control state for runtime system-context append requests.
804#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
805#[serde(rename_all = "snake_case")]
806pub struct SessionSystemContextState {
807    #[serde(default, skip_serializing_if = "Vec::is_empty")]
808    pub pending: Vec<PendingSystemContextAppend>,
809    #[serde(default, skip_serializing_if = "Vec::is_empty")]
810    pub applied: Vec<PendingSystemContextAppend>,
811    #[serde(default, skip_serializing_if = "std::collections::BTreeMap::is_empty")]
812    pub seen: std::collections::BTreeMap<String, SeenSystemContextKey>,
813    #[serde(default, skip_serializing_if = "std::collections::BTreeSet::is_empty")]
814    pub active_turn_pending_keys: std::collections::BTreeSet<String>,
815}
816
817/// Pending append request accepted by the control plane but not yet applied at an LLM boundary.
818#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
819#[serde(rename_all = "snake_case")]
820pub struct PendingSystemContextAppend {
821    pub text: String,
822    #[serde(default, skip_serializing_if = "Option::is_none")]
823    pub source: Option<String>,
824    #[serde(default, skip_serializing_if = "Option::is_none")]
825    pub idempotency_key: Option<String>,
826    pub accepted_at: SystemTime,
827}
828
829/// Durable control state for deferred first-turn prompt and staged callback tool results.
830#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
831#[serde(rename_all = "snake_case")]
832pub struct SessionDeferredTurnState {
833    #[serde(default, skip_serializing_if = "DeferredFirstTurnPhase::is_inactive")]
834    pub first_turn_phase: DeferredFirstTurnPhase,
835    #[serde(default, skip_serializing_if = "Option::is_none")]
836    pub pending_initial_prompt: Option<PendingDeferredPrompt>,
837    #[serde(default, skip_serializing_if = "Vec::is_empty")]
838    pub pending_tool_results: Vec<PendingToolResultsMessage>,
839}
840
841/// Canonical lifecycle phase for the session's deferred first turn.
842#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq)]
843#[serde(rename_all = "snake_case")]
844pub enum DeferredFirstTurnPhase {
845    /// The session was not created in deferred-first-turn mode.
846    #[default]
847    Inactive,
848    /// The session exists durably but the first turn has not started yet.
849    Pending,
850    /// The first turn has started; build-only overrides are no longer legal.
851    Consumed,
852}
853
854impl DeferredFirstTurnPhase {
855    pub fn is_inactive(&self) -> bool {
856        matches!(self, Self::Inactive)
857    }
858}
859
860fn is_default_hook_run_overrides(value: &crate::HookRunOverrides) -> bool {
861    value == &crate::HookRunOverrides::default()
862}
863
864fn is_default_call_timeout_override(value: &crate::CallTimeoutOverride) -> bool {
865    value == &crate::CallTimeoutOverride::default()
866}
867
868fn is_tool_filter_all(value: &ToolFilter) -> bool {
869    matches!(value, ToolFilter::All)
870}
871
872fn is_zero(value: &u64) -> bool {
873    *value == 0
874}
875
876/// Derive the machine-owned capability base filter from the current image-tool-results support.
877pub fn capability_base_filter_for_image_tool_results(image_tool_results: bool) -> ToolFilter {
878    if image_tool_results {
879        ToolFilter::All
880    } else {
881        ToolFilter::Deny([VIEW_IMAGE_TOOL_NAME.to_string()].into_iter().collect())
882    }
883}
884
885/// Persisted witness for a durable tool-visibility name.
886#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
887#[serde(rename_all = "snake_case")]
888pub struct ToolVisibilityWitness {
889    #[serde(default, skip_serializing_if = "Option::is_none")]
890    pub stable_owner_key: Option<String>,
891    #[serde(default, skip_serializing_if = "Option::is_none")]
892    pub last_seen_provenance: Option<ToolProvenance>,
893}
894
895impl ToolVisibilityWitness {
896    pub fn has_identity_witness(&self) -> bool {
897        self.stable_owner_key.is_some() || self.last_seen_provenance.is_some()
898    }
899
900    pub fn has_provenance_identity_witness(&self) -> bool {
901        self.last_seen_provenance.is_some()
902    }
903}
904
905/// Typed authority value for a deferred-tool load request.
906///
907/// The public/effect seam carries the requested route name and provenance
908/// witness as one value. Canonical owners may project this into name-indexed
909/// maps internally, but callers do not get to make a map key the authority.
910#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
911#[serde(rename_all = "snake_case")]
912pub struct DeferredToolLoadAuthority {
913    pub name: String,
914    pub witness: ToolVisibilityWitness,
915}
916
917impl DeferredToolLoadAuthority {
918    pub fn new(name: impl Into<String>, witness: ToolVisibilityWitness) -> Self {
919        Self {
920            name: name.into(),
921            witness,
922        }
923    }
924
925    pub fn into_parts(self) -> (String, ToolVisibilityWitness) {
926        (self.name, self.witness)
927    }
928}
929
930/// Durable tool-filter intent paired with the witnesses that made the names
931/// authoritative at capture time.
932#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
933#[serde(rename_all = "snake_case")]
934pub struct WitnessedToolFilter {
935    pub filter: ToolFilter,
936    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
937    pub witnesses: BTreeMap<String, ToolVisibilityWitness>,
938}
939
940impl WitnessedToolFilter {
941    pub fn new(filter: ToolFilter, witnesses: BTreeMap<String, ToolVisibilityWitness>) -> Self {
942        Self { filter, witnesses }
943    }
944
945    pub fn into_parts(self) -> (ToolFilter, BTreeMap<String, ToolVisibilityWitness>) {
946        (self.filter, self.witnesses)
947    }
948}
949
950/// Canonical durable session-local tool visibility intent.
951#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
952#[serde(rename_all = "snake_case")]
953pub struct SessionToolVisibilityState {
954    #[serde(default, skip_serializing_if = "is_tool_filter_all")]
955    pub capability_base_filter: ToolFilter,
956    #[serde(default, skip_serializing_if = "is_tool_filter_all")]
957    pub inherited_base_filter: ToolFilter,
958    #[serde(default, skip_serializing_if = "is_tool_filter_all")]
959    pub active_filter: ToolFilter,
960    #[serde(default, skip_serializing_if = "is_tool_filter_all")]
961    pub staged_filter: ToolFilter,
962    #[serde(default, skip_serializing_if = "BTreeSet::is_empty")]
963    pub active_requested_deferred_names: BTreeSet<String>,
964    #[serde(default, skip_serializing_if = "BTreeSet::is_empty")]
965    pub staged_requested_deferred_names: BTreeSet<String>,
966    #[serde(default, skip_serializing_if = "is_zero")]
967    pub active_revision: u64,
968    #[serde(default, skip_serializing_if = "is_zero")]
969    pub staged_revision: u64,
970    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
971    pub requested_witnesses: BTreeMap<String, ToolVisibilityWitness>,
972    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
973    pub filter_witnesses: BTreeMap<String, ToolVisibilityWitness>,
974}
975
976/// Durable build-only session state required to faithfully recover and rebuild
977/// a persisted session without surface-local shadow config.
978#[derive(Debug, Clone, Serialize, Deserialize, Default)]
979#[serde(rename_all = "snake_case")]
980pub struct SessionBuildState {
981    #[serde(default, skip_serializing_if = "Option::is_none")]
982    pub system_prompt: Option<String>,
983    #[serde(default, skip_serializing_if = "Option::is_none")]
984    pub output_schema: Option<crate::OutputSchema>,
985    #[serde(default, skip_serializing_if = "is_default_hook_run_overrides")]
986    pub hooks_override: crate::HookRunOverrides,
987    #[serde(default, skip_serializing_if = "Option::is_none")]
988    pub budget_limits: Option<crate::BudgetLimits>,
989    #[serde(default, skip_serializing_if = "Vec::is_empty")]
990    pub recoverable_tool_defs: Vec<ToolDef>,
991    #[serde(default, skip_serializing_if = "Vec::is_empty")]
992    pub silent_comms_intents: Vec<String>,
993    #[serde(default, skip_serializing_if = "Option::is_none")]
994    pub max_inline_peer_notifications: Option<i32>,
995    #[serde(default, skip_serializing_if = "Option::is_none")]
996    pub app_context: Option<serde_json::Value>,
997    #[serde(default, skip_serializing_if = "Option::is_none")]
998    pub additional_instructions: Option<Vec<String>>,
999    #[serde(default, skip_serializing_if = "Option::is_none")]
1000    pub shell_env: Option<HashMap<String, String>>,
1001    #[serde(default, skip_serializing_if = "Option::is_none")]
1002    pub mob_tool_authority_context: Option<MobToolAuthorityContext>,
1003    #[serde(default, skip_serializing_if = "is_default_call_timeout_override")]
1004    pub call_timeout_override: crate::CallTimeoutOverride,
1005}
1006
1007/// Deferred create-time prompt staged for the next turn.
1008#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1009#[serde(rename_all = "snake_case")]
1010pub struct PendingDeferredPrompt {
1011    pub prompt: ContentInput,
1012    pub accepted_at: SystemTime,
1013}
1014
1015/// Staged callback tool results waiting to be admitted on the next turn seam.
1016#[derive(Debug, Clone, Serialize, Deserialize)]
1017#[serde(rename_all = "snake_case")]
1018pub struct PendingToolResultsMessage {
1019    pub results: Vec<ToolResult>,
1020    pub accepted_at: SystemTime,
1021}
1022
1023impl PartialEq for PendingToolResultsMessage {
1024    fn eq(&self, other: &Self) -> bool {
1025        self.accepted_at == other.accepted_at
1026            && serde_json::to_value(&self.results).ok() == serde_json::to_value(&other.results).ok()
1027    }
1028}
1029
1030/// Seen idempotency-key entry for system-context append requests.
1031#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1032#[serde(rename_all = "snake_case")]
1033pub struct SeenSystemContextKey {
1034    pub text: String,
1035    #[serde(default, skip_serializing_if = "Option::is_none")]
1036    pub source: Option<String>,
1037    pub state: SeenSystemContextState,
1038}
1039
1040/// Lifecycle state for an accepted idempotency key.
1041#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
1042#[serde(rename_all = "snake_case")]
1043pub enum SeenSystemContextState {
1044    Pending,
1045    Applied,
1046}
1047
1048impl SessionSystemContextState {
1049    /// Stage an append request, enforcing per-session idempotency.
1050    pub fn stage_append(
1051        &mut self,
1052        req: &AppendSystemContextRequest,
1053        accepted_at: SystemTime,
1054    ) -> Result<crate::service::AppendSystemContextStatus, SystemContextStageError> {
1055        let text = req.text.trim();
1056        if text.is_empty() {
1057            return Err(SystemContextStageError::InvalidRequest(
1058                "system context text must not be empty".to_string(),
1059            ));
1060        }
1061
1062        if let Some(key) = req.idempotency_key.as_ref() {
1063            match self.seen.get(key) {
1064                Some(existing)
1065                    if existing.text == text
1066                        && existing.source.as_deref() == req.source.as_deref() =>
1067                {
1068                    return Ok(crate::service::AppendSystemContextStatus::Duplicate);
1069                }
1070                Some(existing) => {
1071                    return Err(SystemContextStageError::Conflict {
1072                        key: key.clone(),
1073                        existing_text: existing.text.clone(),
1074                        existing_source: existing.source.clone(),
1075                    });
1076                }
1077                None => {}
1078            }
1079        }
1080
1081        let append = PendingSystemContextAppend {
1082            text: text.to_string(),
1083            source: req.source.clone(),
1084            idempotency_key: req.idempotency_key.clone(),
1085            accepted_at,
1086        };
1087        if let Some(key) = req.idempotency_key.as_ref() {
1088            self.seen.insert(
1089                key.clone(),
1090                SeenSystemContextKey {
1091                    text: append.text.clone(),
1092                    source: append.source.clone(),
1093                    state: SeenSystemContextState::Pending,
1094                },
1095            );
1096        }
1097        self.pending.push(append);
1098        Ok(crate::service::AppendSystemContextStatus::Staged)
1099    }
1100
1101    /// Stage an append that is scoped to the currently-active turn only.
1102    ///
1103    /// If the active turn reaches another model boundary, normal pending
1104    /// consumption moves it to `applied`. If the turn completes first, callers
1105    /// should discard the still-pending active-turn keys so the context cannot
1106    /// leak into an unrelated later run.
1107    pub fn stage_active_turn_append(
1108        &mut self,
1109        req: &AppendSystemContextRequest,
1110        accepted_at: SystemTime,
1111    ) -> Result<crate::service::AppendSystemContextStatus, SystemContextStageError> {
1112        let idempotency_key = req.idempotency_key.clone();
1113        let status = self.stage_append(req, accepted_at)?;
1114        if matches!(status, crate::service::AppendSystemContextStatus::Staged)
1115            && let Some(key) = idempotency_key
1116        {
1117            self.active_turn_pending_keys.insert(key);
1118        }
1119        Ok(status)
1120    }
1121
1122    /// Mark all currently-pending appends as applied and clear the pending queue.
1123    pub fn mark_pending_applied(&mut self) {
1124        for pending in &self.pending {
1125            if is_runtime_steer_append(pending) {
1126                continue;
1127            }
1128            if !self.applied.contains(pending) {
1129                self.applied.push(pending.clone());
1130            }
1131        }
1132        let mut seen_to_remove = Vec::new();
1133        for pending in &self.pending {
1134            if let Some(key) = pending.idempotency_key.as_ref()
1135                && let Some(seen) = self.seen.get_mut(key)
1136            {
1137                if is_runtime_steer_append(pending) {
1138                    seen_to_remove.push(key.clone());
1139                } else {
1140                    seen.state = SeenSystemContextState::Applied;
1141                }
1142            }
1143        }
1144        for key in seen_to_remove {
1145            self.seen.remove(&key);
1146        }
1147        self.pending.clear();
1148        self.active_turn_pending_keys.clear();
1149    }
1150
1151    /// Discard active-turn-only appends that were not consumed by the turn's
1152    /// next LLM boundary.
1153    pub fn discard_unapplied_active_turn_pending(&mut self) -> Vec<PendingSystemContextAppend> {
1154        if self.active_turn_pending_keys.is_empty() {
1155            return Vec::new();
1156        }
1157
1158        let active_keys = std::mem::take(&mut self.active_turn_pending_keys);
1159        let mut discarded = Vec::new();
1160        self.pending.retain(|append| {
1161            let should_discard = append
1162                .idempotency_key
1163                .as_ref()
1164                .is_some_and(|key| active_keys.contains(key));
1165            if should_discard {
1166                discarded.push(append.clone());
1167            }
1168            !should_discard
1169        });
1170
1171        for append in &discarded {
1172            if let Some(key) = append.idempotency_key.as_ref()
1173                && self
1174                    .seen
1175                    .get(key)
1176                    .is_some_and(|seen| seen.state == SeenSystemContextState::Pending)
1177            {
1178                self.seen.remove(key);
1179            }
1180        }
1181
1182        discarded
1183    }
1184
1185    /// Discard specific active-turn-only appends that are still pending.
1186    ///
1187    /// This is the rollback companion for live-boundary staging. The runtime
1188    /// owns the accepted input, so if that commit fails after the session has
1189    /// staged context, the session-side projection must be removed by the same
1190    /// idempotency keys before the caller reports failure.
1191    pub fn discard_active_turn_pending_by_keys(
1192        &mut self,
1193        idempotency_keys: &[String],
1194    ) -> Vec<PendingSystemContextAppend> {
1195        if idempotency_keys.is_empty() || self.active_turn_pending_keys.is_empty() {
1196            return Vec::new();
1197        }
1198
1199        let requested_keys: std::collections::BTreeSet<&str> =
1200            idempotency_keys.iter().map(String::as_str).collect();
1201        let mut discarded = Vec::new();
1202        let mut discarded_keys = Vec::new();
1203        self.pending.retain(|append| {
1204            let should_discard = append.idempotency_key.as_ref().is_some_and(|key| {
1205                requested_keys.contains(key.as_str()) && self.active_turn_pending_keys.contains(key)
1206            });
1207            if should_discard {
1208                if let Some(key) = append.idempotency_key.as_ref() {
1209                    discarded_keys.push(key.clone());
1210                }
1211                discarded.push(append.clone());
1212            }
1213            !should_discard
1214        });
1215
1216        for key in discarded_keys {
1217            self.active_turn_pending_keys.remove(&key);
1218            if self
1219                .seen
1220                .get(&key)
1221                .is_some_and(|seen| seen.state == SeenSystemContextState::Pending)
1222            {
1223                self.seen.remove(&key);
1224            }
1225        }
1226
1227        discarded
1228    }
1229}
1230
1231impl SessionDeferredTurnState {
1232    /// Mark that this session has a deferred first turn waiting to start.
1233    pub fn mark_initial_turn_pending(&mut self) {
1234        self.first_turn_phase = DeferredFirstTurnPhase::Pending;
1235    }
1236
1237    /// Mark the deferred first turn as started.
1238    ///
1239    /// Returns true when the phase transitioned from `Pending`.
1240    pub fn mark_initial_turn_started(&mut self) -> bool {
1241        let was_pending = matches!(self.first_turn_phase, DeferredFirstTurnPhase::Pending);
1242        if was_pending {
1243            self.first_turn_phase = DeferredFirstTurnPhase::Consumed;
1244        }
1245        was_pending
1246    }
1247
1248    /// Restore the deferred first-turn pending phase after a failed pre-run setup.
1249    pub fn restore_initial_turn_pending(&mut self) {
1250        self.first_turn_phase = DeferredFirstTurnPhase::Pending;
1251    }
1252
1253    /// Whether build-only first-turn overrides are still legal for this session.
1254    pub fn allows_initial_turn_overrides(&self) -> bool {
1255        matches!(self.first_turn_phase, DeferredFirstTurnPhase::Pending)
1256    }
1257
1258    /// Stage the create-time prompt for a later first turn.
1259    pub fn stage_initial_prompt(&mut self, prompt: ContentInput, accepted_at: SystemTime) {
1260        if !prompt.has_images() && prompt.text_content().trim().is_empty() {
1261            self.pending_initial_prompt = None;
1262            return;
1263        }
1264
1265        self.pending_initial_prompt = Some(PendingDeferredPrompt {
1266            prompt,
1267            accepted_at,
1268        });
1269    }
1270
1271    /// Stage one callback tool-results message for the next turn.
1272    pub fn stage_tool_results(
1273        &mut self,
1274        results: Vec<ToolResult>,
1275        accepted_at: SystemTime,
1276    ) -> usize {
1277        if results.is_empty() {
1278            return 0;
1279        }
1280
1281        let accepted = results.len();
1282        self.pending_tool_results.push(PendingToolResultsMessage {
1283            results,
1284            accepted_at,
1285        });
1286        accepted
1287    }
1288
1289    /// Consume the staged initial prompt, if any.
1290    pub fn take_initial_prompt(&mut self) -> Option<ContentInput> {
1291        self.pending_initial_prompt
1292            .take()
1293            .map(|pending| pending.prompt)
1294    }
1295
1296    /// Consume all staged callback tool-results messages.
1297    pub fn take_tool_results(&mut self) -> Vec<PendingToolResultsMessage> {
1298        std::mem::take(&mut self.pending_tool_results)
1299    }
1300
1301    /// Whether any callback tool results are currently staged.
1302    pub fn has_pending_tool_results(&self) -> bool {
1303        !self.pending_tool_results.is_empty()
1304    }
1305}
1306
1307/// Failure when staging a system-context append request.
1308#[derive(Debug, Clone, PartialEq, Eq)]
1309pub enum SystemContextStageError {
1310    InvalidRequest(String),
1311    Conflict {
1312        key: String,
1313        existing_text: String,
1314        existing_source: Option<String>,
1315    },
1316}
1317
1318fn render_system_context_block(append: &PendingSystemContextAppend) -> String {
1319    let mut rendered = String::from("[Runtime System Context]");
1320    if let Some(source) = &append.source {
1321        rendered.push_str("\nsource: ");
1322        rendered.push_str(source);
1323    }
1324    rendered.push_str("\n\n");
1325    rendered.push_str(&append.text);
1326    rendered
1327}
1328
1329fn is_runtime_steer_key(value: &str) -> bool {
1330    value.starts_with("runtime:steer:")
1331}
1332
1333fn is_runtime_steer_append(append: &PendingSystemContextAppend) -> bool {
1334    append.source.as_deref().is_some_and(is_runtime_steer_key)
1335        || append
1336            .idempotency_key
1337            .as_deref()
1338            .is_some_and(is_runtime_steer_key)
1339}
1340
1341fn seen_system_context_matches(
1342    seen: &SeenSystemContextKey,
1343    append: &PendingSystemContextAppend,
1344) -> bool {
1345    seen.text == append.text && seen.source.as_deref() == append.source.as_deref()
1346}
1347
1348fn pending_system_context_matches(
1349    existing: &PendingSystemContextAppend,
1350    append: &PendingSystemContextAppend,
1351) -> bool {
1352    existing.text == append.text && existing.source.as_deref() == append.source.as_deref()
1353}
1354
1355impl Session {
1356    /// Create a new empty session
1357    pub fn new() -> Self {
1358        let now = SystemTime::now();
1359        Self {
1360            version: SESSION_VERSION,
1361            id: SessionId::new(),
1362            messages: Arc::new(Vec::new()),
1363            created_at: now,
1364            updated_at: now,
1365            metadata: serde_json::Map::new(),
1366            usage: Usage::default(),
1367        }
1368    }
1369
1370    /// Create a session with a specific ID (for loading)
1371    pub fn with_id(id: SessionId) -> Self {
1372        let mut session = Self::new();
1373        session.id = id;
1374        session
1375    }
1376
1377    /// Get the session ID
1378    pub fn id(&self) -> &SessionId {
1379        &self.id
1380    }
1381
1382    /// Get the session version
1383    pub fn version(&self) -> u32 {
1384        self.version
1385    }
1386
1387    /// Get all messages.
1388    pub fn messages(&self) -> &[Message] {
1389        &self.messages
1390    }
1391
1392    /// Replace the message buffer for core-owned internal transcript rewrites.
1393    ///
1394    /// Intentionally `pub(crate)`: cross-crate consumers must route same-session
1395    /// rewrites through transcript-edit APIs so the revision graph remains the
1396    /// semantic owner of message history.
1397    pub(crate) fn replace_messages_internal(
1398        &mut self,
1399        messages: Vec<Message>,
1400        reason: TranscriptRewriteReason,
1401    ) -> Result<Option<TranscriptRewriteCommit>, TranscriptEditError> {
1402        if transcript_messages_digest(self.messages()).ok()
1403            == transcript_messages_digest(&messages).ok()
1404        {
1405            return Ok(None);
1406        }
1407        let commit = self.commit_transcript_rewrite(
1408            TranscriptRewriteSelection::MessageRange {
1409                start: 0,
1410                end: self.messages.len(),
1411            },
1412            messages,
1413            reason,
1414            Some("meerkat-core".to_string()),
1415            None,
1416        )?;
1417        Ok(Some(commit))
1418    }
1419
1420    /// Retain messages for core-owned synthetic-notice projection cleanup.
1421    pub(crate) fn retain_messages_internal<F>(
1422        &mut self,
1423        mut retain: F,
1424        reason: TranscriptRewriteReason,
1425    ) -> Result<Option<TranscriptRewriteCommit>, TranscriptEditError>
1426    where
1427        F: FnMut(&Message) -> bool,
1428    {
1429        let retained = self
1430            .messages
1431            .iter()
1432            .filter(|message| retain(message))
1433            .cloned()
1434            .collect::<Vec<_>>();
1435        if retained.len() == self.messages.len()
1436            && transcript_messages_digest(self.messages()).ok()
1437                == transcript_messages_digest(&retained).ok()
1438        {
1439            return Ok(None);
1440        }
1441        self.replace_messages_internal(retained, reason)
1442    }
1443
1444    /// Get creation time
1445    pub fn created_at(&self) -> SystemTime {
1446        self.created_at
1447    }
1448
1449    /// Get last update time
1450    pub fn updated_at(&self) -> SystemTime {
1451        self.updated_at
1452    }
1453
1454    /// Add a message to the session
1455    ///
1456    /// Updates the timestamp. For adding multiple messages, prefer `push_batch`.
1457    pub fn push(&mut self, message: Message) {
1458        Arc::make_mut(&mut self.messages).push(message);
1459        self.updated_at = SystemTime::now();
1460        self.refresh_transcript_head_after_message_mutation();
1461    }
1462
1463    /// Add multiple messages in one operation (single timestamp update)
1464    ///
1465    /// More efficient than multiple `push` calls when adding many messages.
1466    pub fn push_batch(&mut self, messages: Vec<Message>) {
1467        if messages.is_empty() {
1468            return;
1469        }
1470        let inner = Arc::make_mut(&mut self.messages);
1471        inner.extend(messages);
1472        self.updated_at = SystemTime::now();
1473        self.refresh_transcript_head_after_message_mutation();
1474    }
1475
1476    /// Rewrite inline media payloads in-place as `BlobRef` pointers.
1477    ///
1478    /// Message count is invariant across this operation — `externalize`
1479    /// only swaps inline image/media bytes for opaque blob references.
1480    /// This is the cross-crate-legitimate rewrite operation that used
1481    /// to require public `messages_mut()`; post-C-H1 callers in
1482    /// `meerkat-session` go through this typed method.
1483    ///
1484    /// Does not touch `updated_at` — externalization is bookkeeping, not
1485    /// a semantic session mutation.
1486    pub async fn externalize_media(
1487        &mut self,
1488        blob_store: &dyn crate::BlobStore,
1489        start: usize,
1490    ) -> Result<(), crate::blob::BlobStoreError> {
1491        let previous_digest = if self
1492            .metadata
1493            .contains_key(SESSION_TRANSCRIPT_HISTORY_STATE_KEY)
1494        {
1495            transcript_messages_digest(self.messages()).ok()
1496        } else {
1497            None
1498        };
1499        let messages = Arc::make_mut(&mut self.messages);
1500        crate::image_content::externalize_messages_from(blob_store, messages, start).await?;
1501        if let Some(previous_digest) = previous_digest
1502            && transcript_messages_digest(self.messages()).ok().as_ref() != Some(&previous_digest)
1503        {
1504            self.refresh_transcript_head_after_message_mutation();
1505        }
1506        Ok(())
1507    }
1508
1509    /// Explicitly update the timestamp
1510    ///
1511    /// Call this after bulk operations that don't update timestamps automatically.
1512    pub fn touch(&mut self) {
1513        self.updated_at = SystemTime::now();
1514    }
1515
1516    /// Whether the conversation has a pending turn boundary.
1517    ///
1518    /// Returns `true` if the last message is `User` or `ToolResults`, meaning
1519    /// the conversation is waiting for an assistant turn and `run_pending` can
1520    /// resume without a new user message.
1521    pub fn has_pending_boundary(&self) -> bool {
1522        self.messages
1523            .last()
1524            .is_some_and(|m| matches!(m, Message::User(_) | Message::ToolResults { .. }))
1525    }
1526
1527    /// Get the last N messages
1528    pub fn last_n(&self, n: usize) -> &[Message] {
1529        let start = self.messages.len().saturating_sub(n);
1530        &self.messages[start..]
1531    }
1532
1533    /// Count total tokens used.
1534    pub fn total_tokens(&self) -> u64 {
1535        self.usage.total_tokens()
1536    }
1537
1538    /// Get total usage statistics for the session.
1539    pub fn total_usage(&self) -> Usage {
1540        self.usage.clone()
1541    }
1542
1543    /// Update cumulative usage after an LLM call.
1544    pub fn record_usage(&mut self, turn_usage: Usage) {
1545        self.usage.add(&turn_usage);
1546        self.updated_at = SystemTime::now();
1547    }
1548
1549    /// Append externally-produced user content to the canonical transcript.
1550    pub fn append_external_user_content(&mut self, content: ContentInput) {
1551        self.push(Message::User(UserMessage::with_blocks(
1552            content.into_blocks(),
1553        )));
1554    }
1555
1556    /// Append externally-produced assistant output to the canonical transcript.
1557    pub fn append_external_assistant_blocks(
1558        &mut self,
1559        blocks: Vec<AssistantBlock>,
1560        stop_reason: StopReason,
1561        usage: Usage,
1562    ) {
1563        if !blocks.is_empty() {
1564            self.push(Message::BlockAssistant(BlockAssistantMessage::new(
1565                blocks,
1566                stop_reason,
1567            )));
1568        }
1569        if usage != Usage::default() {
1570            self.record_usage(usage);
1571        }
1572    }
1573
1574    /// Apply an identity-bearing provider realtime transcript event.
1575    ///
1576    /// This is the canonical append authority for provider-managed realtime
1577    /// turns: provider item ids, predecessor links, and content segment ids are
1578    /// persisted in session metadata so duplicate websocket delivery,
1579    /// reconnect replay, and causally equivalent event ordering cannot create
1580    /// duplicate or misordered canonical messages.
1581    pub fn append_realtime_transcript_event(
1582        &mut self,
1583        event: RealtimeTranscriptEvent,
1584    ) -> RealtimeTranscriptApplyOutcome {
1585        let mut state = self.realtime_transcript_state();
1586        match event {
1587            RealtimeTranscriptEvent::ItemObserved {
1588                item_id,
1589                previous_item_id,
1590                role,
1591                response_id,
1592            } => {
1593                let response_id = normalize_realtime_optional_response_id(response_id);
1594                if role == RealtimeTranscriptRole::Assistant
1595                    && response_id
1596                        .as_ref()
1597                        .is_some_and(|id| state.discarded_assistant_response_ids.contains(id))
1598                {
1599                    observe_realtime_skipped_item(&mut state, item_id, previous_item_id);
1600                } else {
1601                    observe_realtime_item(&mut state, item_id, previous_item_id, role, response_id);
1602                }
1603            }
1604            RealtimeTranscriptEvent::ItemSkipped {
1605                item_id,
1606                previous_item_id,
1607            } => {
1608                observe_realtime_skipped_item(&mut state, item_id, previous_item_id);
1609            }
1610            RealtimeTranscriptEvent::UserTranscriptFinal {
1611                item_id,
1612                previous_item_id,
1613                content_index,
1614                text,
1615            } => {
1616                if let Some(item) = observe_realtime_item(
1617                    &mut state,
1618                    item_id,
1619                    previous_item_id,
1620                    RealtimeTranscriptRole::User,
1621                    None,
1622                ) {
1623                    let segment = item.content_segments.entry(content_index).or_default();
1624                    if segment.is_empty() && !text.is_empty() {
1625                        *segment = text;
1626                    } else if !text.is_empty() && segment.as_str() != text {
1627                        tracing::warn!(
1628                            content_index,
1629                            "ignoring conflicting realtime user transcript segment replay"
1630                        );
1631                    }
1632                    item.ready = true;
1633                }
1634            }
1635            RealtimeTranscriptEvent::AssistantTextDelta {
1636                response_id,
1637                delta_id,
1638                item_id,
1639                previous_item_id,
1640                content_index,
1641                delta,
1642            } => {
1643                let Some(response_id) = normalize_realtime_response_id(response_id) else {
1644                    return RealtimeTranscriptApplyOutcome::default();
1645                };
1646                if state
1647                    .discarded_assistant_response_ids
1648                    .contains(&response_id)
1649                {
1650                    observe_realtime_skipped_item(&mut state, item_id, previous_item_id);
1651                    let outcome = self.materialize_realtime_transcript_ready_items(&mut state);
1652                    self.store_realtime_transcript_state(&state);
1653                    return outcome;
1654                }
1655                if !delta_id.trim().is_empty() && !state.seen_delta_ids.insert(delta_id) {
1656                    return RealtimeTranscriptApplyOutcome::default();
1657                }
1658                let response_completed = state.assistant_completions.contains_key(&response_id);
1659                if let Some(item) = observe_realtime_item(
1660                    &mut state,
1661                    item_id,
1662                    previous_item_id,
1663                    RealtimeTranscriptRole::Assistant,
1664                    Some(response_id),
1665                ) {
1666                    if promote_item_lane(item, TranscriptLane::Display) {
1667                        item.content_segments
1668                            .entry(content_index)
1669                            .or_default()
1670                            .push_str(&delta);
1671                        if response_completed && !item.text().is_empty() {
1672                            item.ready = true;
1673                        }
1674                    } else {
1675                        // R5-6 sibling: this delta was routed at a
1676                        // Spoken-classified item (e.g. truncation arrived
1677                        // first and locked the lane). `promote_item_lane`
1678                        // already warned about the lane conflict; drop the
1679                        // delta to preserve the lane invariant rather than
1680                        // clobbering Spoken content with Display text.
1681                        tracing::warn!(
1682                            "AssistantTextDelta routed to a Spoken-lane item; dropping delta to preserve lane invariant — this indicates a provider lane-classification bug"
1683                        );
1684                    }
1685                }
1686            }
1687            RealtimeTranscriptEvent::AssistantTranscriptDelta {
1688                response_id,
1689                delta_id,
1690                item_id,
1691                previous_item_id,
1692                content_index,
1693                delta,
1694            } => {
1695                // T9/T10: spoken-transcript lane. Identical staging shape to
1696                // `AssistantTextDelta` (same idempotency / ordering / dedup
1697                // logic owns both lanes); the only difference is that the
1698                // owning item is tagged `Spoken` so the materializer flushes
1699                // `AssistantBlock::Transcript` instead of `AssistantBlock::Text`.
1700                let Some(response_id) = normalize_realtime_response_id(response_id) else {
1701                    return RealtimeTranscriptApplyOutcome::default();
1702                };
1703                if state
1704                    .discarded_assistant_response_ids
1705                    .contains(&response_id)
1706                {
1707                    observe_realtime_skipped_item(&mut state, item_id, previous_item_id);
1708                    let outcome = self.materialize_realtime_transcript_ready_items(&mut state);
1709                    self.store_realtime_transcript_state(&state);
1710                    return outcome;
1711                }
1712                if !delta_id.trim().is_empty() && !state.seen_delta_ids.insert(delta_id) {
1713                    return RealtimeTranscriptApplyOutcome::default();
1714                }
1715                let response_completed = state.assistant_completions.contains_key(&response_id);
1716                if let Some(item) = observe_realtime_item(
1717                    &mut state,
1718                    item_id,
1719                    previous_item_id,
1720                    RealtimeTranscriptRole::Assistant,
1721                    Some(response_id),
1722                ) {
1723                    if promote_item_lane(item, TranscriptLane::Spoken) {
1724                        item.content_segments
1725                            .entry(content_index)
1726                            .or_default()
1727                            .push_str(&delta);
1728                        if response_completed && !item.text().is_empty() {
1729                            item.ready = true;
1730                        }
1731                    } else {
1732                        // R5-6 sibling: this transcript delta arrived for a
1733                        // Display-classified item (a Display delta or other
1734                        // Display-lane event landed first). `promote_item_lane`
1735                        // already warned; drop the delta rather than appending
1736                        // Spoken text into a Display-locked content_segment.
1737                        tracing::warn!(
1738                            "AssistantTranscriptDelta routed to a Display-lane item; dropping delta to preserve lane invariant — this indicates a provider lane-classification bug"
1739                        );
1740                    }
1741                }
1742            }
1743            RealtimeTranscriptEvent::AssistantTranscriptTruncated {
1744                response_id,
1745                item_id,
1746                content_index,
1747                text,
1748            } => {
1749                let Some(response_id) = normalize_realtime_response_id(response_id) else {
1750                    return RealtimeTranscriptApplyOutcome::default();
1751                };
1752                if state
1753                    .discarded_assistant_response_ids
1754                    .contains(&response_id)
1755                {
1756                    observe_realtime_skipped_item(&mut state, item_id, None);
1757                    let outcome = self.materialize_realtime_transcript_ready_items(&mut state);
1758                    self.store_realtime_transcript_state(&state);
1759                    return outcome;
1760                }
1761                let response_completed = state.assistant_completions.contains_key(&response_id);
1762                let item_id_for_log = item_id.clone();
1763                let response_id_for_log = response_id.clone();
1764                if let Some(item) = observe_realtime_item(
1765                    &mut state,
1766                    item_id,
1767                    None,
1768                    RealtimeTranscriptRole::Assistant,
1769                    Some(response_id),
1770                ) {
1771                    // R5-7-sibling: a late truncation arriving after the
1772                    // canonical message has already committed cannot mutate
1773                    // history (append-only invariant). Same shape as the
1774                    // late-FinalText guard above — warn and skip.
1775                    if item.materialized {
1776                        tracing::warn!(
1777                            target: "meerkat::session",
1778                            item_id = %item_id_for_log,
1779                            response_id = %response_id_for_log,
1780                            "AssistantTranscriptTruncated arrived after item already materialized; canonical message is locked, late truncation dropped",
1781                        );
1782                    } else if promote_item_lane(item, TranscriptLane::Spoken) {
1783                        // R5-6: truncation is a Spoken-lane-only semantic —
1784                        // it describes the audio output that was actually
1785                        // heard before barge-in cut it short. Promote to
1786                        // Spoken so the materializer commits as
1787                        // `AssistantBlock::Transcript`. If a Display delta
1788                        // arrived first, `promote_item_lane` keeps the
1789                        // existing lane and returns `false` — that's a
1790                        // provider bug; warn already emitted inside
1791                        // `promote_item_lane`.
1792                        item.content_segments.insert(content_index, text);
1793                        if response_completed && !item.text().is_empty() {
1794                            item.ready = true;
1795                        }
1796                    }
1797                }
1798            }
1799            RealtimeTranscriptEvent::AssistantTranscriptFinalText {
1800                response_id,
1801                item_id,
1802                content_index,
1803                text,
1804            } => {
1805                // R5-7: authoritative final text overrides any incomplete
1806                // delta accumulation for this `(response_id, item_id,
1807                // content_index)`. If no item is staged yet (final-only
1808                // provider, or all deltas dropped), create one on the
1809                // Spoken lane so the materializer flushes it as
1810                // `AssistantBlock::Transcript`. Flush gating is unchanged:
1811                // `AssistantTurnCompleted` still drives readiness.
1812                let Some(response_id) = normalize_realtime_response_id(response_id) else {
1813                    return RealtimeTranscriptApplyOutcome::default();
1814                };
1815                if state
1816                    .discarded_assistant_response_ids
1817                    .contains(&response_id)
1818                {
1819                    observe_realtime_skipped_item(&mut state, item_id, None);
1820                    let outcome = self.materialize_realtime_transcript_ready_items(&mut state);
1821                    self.store_realtime_transcript_state(&state);
1822                    return outcome;
1823                }
1824                let response_completed = state.assistant_completions.contains_key(&response_id);
1825                if let Some(item) = observe_realtime_item(
1826                    &mut state,
1827                    item_id,
1828                    None,
1829                    RealtimeTranscriptRole::Assistant,
1830                    Some(response_id),
1831                ) {
1832                    // R5-7: late `AssistantTranscriptFinalText` after the
1833                    // item has already been committed to canonical history.
1834                    // The staged item is `materialized = true` and the
1835                    // session's `Message::BlockAssistant` already carries
1836                    // the delta-accumulated text. Append-only history is a
1837                    // stronger invariant than typed text repair: rewriting
1838                    // the canonical message would violate it (consumers may
1839                    // already have observed the prior text via the event
1840                    // stream / projector). Warn-and-skip; the SDK or human
1841                    // can decide whether the provider-side ordering bug
1842                    // matters.
1843                    if item.materialized {
1844                        tracing::warn!(
1845                            "AssistantTranscriptFinalText arrived after item already materialized; canonical message is locked, late repair dropped"
1846                        );
1847                        self.store_realtime_transcript_state(&state);
1848                        return RealtimeTranscriptApplyOutcome::default();
1849                    }
1850                    // Spoken lane: this variant is the authoritative
1851                    // transcript-final text path. Display-text finals come
1852                    // through a different seam. If the item is locked to
1853                    // Display (e.g. an earlier `AssistantTextDelta` staged
1854                    // it), drop the final to preserve the lane invariant —
1855                    // append-only history would otherwise silently switch
1856                    // block type at materialize time.
1857                    if promote_item_lane(item, TranscriptLane::Spoken) {
1858                        // Replace, not append: the final's text is
1859                        // authoritative and supersedes any (possibly
1860                        // partial) accumulated segment text.
1861                        item.content_segments.insert(content_index, text);
1862                        if response_completed && !item.text().is_empty() {
1863                            item.ready = true;
1864                        }
1865                    } else {
1866                        tracing::warn!(
1867                            "AssistantTranscriptFinalText routed to a Display-lane item; dropping authoritative final to preserve lane invariant — this indicates a provider lane-classification bug"
1868                        );
1869                    }
1870                }
1871            }
1872            RealtimeTranscriptEvent::AssistantTurnCompleted {
1873                response_id,
1874                stop_reason,
1875                usage,
1876            } => {
1877                let Some(response_id) = normalize_realtime_response_id(response_id) else {
1878                    return RealtimeTranscriptApplyOutcome::default();
1879                };
1880                if state
1881                    .discarded_assistant_response_ids
1882                    .contains(&response_id)
1883                {
1884                    discard_realtime_assistant_response(&mut state, &response_id);
1885                    let outcome = self.materialize_realtime_transcript_ready_items(&mut state);
1886                    self.store_realtime_transcript_state(&state);
1887                    return outcome;
1888                }
1889                match stop_reason {
1890                    StopReason::Cancelled => {
1891                        discard_realtime_assistant_response(&mut state, &response_id);
1892                    }
1893                    StopReason::ToolUse => {
1894                        state.assistant_completions.remove(&response_id);
1895                    }
1896                    _ => {
1897                        state
1898                            .assistant_completions
1899                            .entry(response_id.clone())
1900                            .or_insert(RealtimeAssistantCompletion {
1901                                stop_reason,
1902                                usage,
1903                                usage_consumed: false,
1904                            });
1905                        mark_realtime_assistant_response_ready(&mut state, &response_id);
1906                    }
1907                }
1908            }
1909            RealtimeTranscriptEvent::AssistantTurnInterrupted { response_id } => {
1910                let Some(response_id) = normalize_realtime_response_id(response_id) else {
1911                    return RealtimeTranscriptApplyOutcome::default();
1912                };
1913                // R5-5: barge-in invalidates the spoken/audio lane (the user
1914                // is speaking over what they heard) but preserves the
1915                // Display lane (sideband display text from the same response
1916                // is not "spoken over"). The interrupt is also terminal for
1917                // the response on the realtime-staging path: any later
1918                // `AssistantTurnCompleted` arrives with `StopReason::Cancelled`
1919                // and short-circuits via the discarded-set guard. Therefore
1920                // we must materialize retained Display items immediately here
1921                // by inserting a synthetic `assistant_completions` entry —
1922                // otherwise they would never commit.
1923                discard_realtime_assistant_response_by_lane(&mut state, &response_id);
1924                state
1925                    .assistant_completions
1926                    .entry(response_id.clone())
1927                    .or_insert(RealtimeAssistantCompletion {
1928                        stop_reason: StopReason::Cancelled,
1929                        usage: Usage::default(),
1930                        usage_consumed: false,
1931                    });
1932                mark_realtime_assistant_response_ready(&mut state, &response_id);
1933            }
1934        }
1935
1936        let outcome = self.materialize_realtime_transcript_ready_items(&mut state);
1937        self.store_realtime_transcript_state(&state);
1938        outcome
1939    }
1940
1941    /// Return every distinct provider `response_id` currently staged in the
1942    /// realtime-transcript metadata that has at least one **unmaterialized**
1943    /// assistant item and is **not already discarded**.
1944    ///
1945    /// CC4 (Round-4 architectural reconciliation): when the live boundary
1946    /// signals a barge-in (`TurnInterrupted`), the projection sink does not
1947    /// know which provider response_ids have streaming deltas staged in
1948    /// session metadata. This accessor lets the sink fan
1949    /// [`RealtimeTranscriptEvent::AssistantTurnInterrupted`] events out to
1950    /// each in-flight response so staged-but-not-yet-materialized transcript
1951    /// fragments are discarded — preventing them from silently committing
1952    /// when the *next* turn's `AssistantTurnCompleted` (synthesized by the
1953    /// CC2 fix in `signal_turn_completed`) sweeps the materializer.
1954    ///
1955    /// Order is the [`SessionRealtimeTranscriptState::first_seen_order`]
1956    /// projection so callers see deterministic iteration. Items already
1957    /// materialized or skipped are excluded — only response_ids with at
1958    /// least one live unmaterialized assistant item are returned.
1959    #[must_use]
1960    pub fn in_flight_realtime_assistant_response_ids(&self) -> Vec<String> {
1961        let state = self.realtime_transcript_state();
1962        let mut seen: BTreeSet<String> = BTreeSet::new();
1963        let mut out: Vec<String> = Vec::new();
1964        for item_id in &state.first_seen_order {
1965            let Some(item) = state.items.get(item_id) else {
1966                continue;
1967            };
1968            if item.role != RealtimeTranscriptRole::Assistant {
1969                continue;
1970            }
1971            if item.materialized || item.skipped {
1972                continue;
1973            }
1974            let Some(response_id) = item.response_id.as_ref() else {
1975                continue;
1976            };
1977            if state.discarded_assistant_response_ids.contains(response_id) {
1978                continue;
1979            }
1980            if seen.insert(response_id.clone()) {
1981                out.push(response_id.clone());
1982            }
1983        }
1984        out
1985    }
1986
1987    fn realtime_transcript_state(&self) -> SessionRealtimeTranscriptState {
1988        self.metadata
1989            .get(SESSION_REALTIME_TRANSCRIPT_STATE_KEY)
1990            .cloned()
1991            .and_then(|value| serde_json::from_value(value).ok())
1992            .unwrap_or_default()
1993    }
1994
1995    fn store_realtime_transcript_state(&mut self, state: &SessionRealtimeTranscriptState) {
1996        match serde_json::to_value(state) {
1997            Ok(value) => self.set_metadata(SESSION_REALTIME_TRANSCRIPT_STATE_KEY, value),
1998            Err(error) => {
1999                tracing::warn!(error = %error, "failed to serialize realtime transcript state");
2000            }
2001        }
2002    }
2003
2004    fn materialize_realtime_transcript_ready_items(
2005        &mut self,
2006        state: &mut SessionRealtimeTranscriptState,
2007    ) -> RealtimeTranscriptApplyOutcome {
2008        let mut materialized = Vec::new();
2009
2010        // Round-4 CC7: when a single response_id produces both display-text
2011        // and spoken-transcript items (mixed-modality response), we emit a
2012        // SINGLE `Message::BlockAssistant` whose `blocks` interleave
2013        // `AssistantBlock::Text` and `AssistantBlock::Transcript` in
2014        // arrival order — not multiple messages. Pending state lives across
2015        // outer-loop batches: when chained items (`previous_item_id`) force
2016        // serial materialization, batch N may emit the display item and
2017        // batch N+1 may emit the spoken item under the same response_id —
2018        // we still want one combined message.
2019        //
2020        // The pending group is flushed when:
2021        //   - a User item lands (canonical-history ordering boundary)
2022        //   - an assistant item with a different response_id lands
2023        //   - the outer materialization loop terminates
2024        let mut pending_blocks: Vec<AssistantBlock> = Vec::new();
2025        let mut pending_response_id: Option<String> = None;
2026        let mut pending_stop_reason: StopReason = StopReason::EndTurn;
2027        let mut pending_usage: Usage = Usage::default();
2028
2029        loop {
2030            let order = realtime_transcript_order(state);
2031            let mut skipped_batch = Vec::new();
2032            let mut batch = Vec::new();
2033            for item_id in order {
2034                let Some(item) = state.items.get(&item_id) else {
2035                    continue;
2036                };
2037                if item.materialized {
2038                    continue;
2039                }
2040                if !realtime_predecessor_materialized(state, item.previous_item_id.as_deref()) {
2041                    continue;
2042                }
2043                if item.skipped {
2044                    skipped_batch.push(item_id.clone());
2045                    continue;
2046                }
2047                if !item.ready {
2048                    continue;
2049                }
2050                let text = item.text();
2051                if text.is_empty() {
2052                    continue;
2053                }
2054                match item.role {
2055                    RealtimeTranscriptRole::User => {
2056                        batch.push(RealtimeTranscriptMaterializedMessage::User {
2057                            item_id: item_id.clone(),
2058                            text,
2059                        });
2060                    }
2061                    RealtimeTranscriptRole::Assistant => {
2062                        let Some(response_id) = item.response_id.as_ref() else {
2063                            continue;
2064                        };
2065                        let Some(completion) = state.assistant_completions.get(response_id) else {
2066                            continue;
2067                        };
2068                        let usage = if completion.usage_consumed {
2069                            Usage::default()
2070                        } else {
2071                            completion.usage.clone()
2072                        };
2073                        batch.push(RealtimeTranscriptMaterializedMessage::Assistant {
2074                            item_id: item_id.clone(),
2075                            response_id: response_id.clone(),
2076                            text,
2077                            stop_reason: completion.stop_reason,
2078                            usage,
2079                            lane: item.lane,
2080                        });
2081                    }
2082                }
2083            }
2084            if skipped_batch.is_empty() && batch.is_empty() {
2085                break;
2086            }
2087            for item_id in skipped_batch {
2088                if let Some(item) = state.items.get_mut(&item_id) {
2089                    item.materialized = true;
2090                }
2091            }
2092
2093            for message in batch {
2094                match &message {
2095                    RealtimeTranscriptMaterializedMessage::User { item_id, text } => {
2096                        if !pending_blocks.is_empty() {
2097                            let drained = std::mem::take(&mut pending_blocks);
2098                            self.append_external_assistant_blocks(
2099                                drained,
2100                                pending_stop_reason,
2101                                std::mem::take(&mut pending_usage),
2102                            );
2103                            pending_response_id = None;
2104                        }
2105                        if let Some(item) = state.items.get_mut(item_id) {
2106                            item.materialized = true;
2107                        }
2108                        self.append_external_user_content(ContentInput::Text(text.clone()));
2109                    }
2110                    RealtimeTranscriptMaterializedMessage::Assistant {
2111                        item_id,
2112                        response_id,
2113                        text,
2114                        stop_reason,
2115                        usage,
2116                        lane,
2117                    } => {
2118                        // Flush if this assistant item belongs to a different
2119                        // response than the pending group. (Same response_id
2120                        // → accumulate; different → emit prior message.)
2121                        if pending_response_id
2122                            .as_ref()
2123                            .is_some_and(|existing| existing != response_id)
2124                            && !pending_blocks.is_empty()
2125                        {
2126                            let drained = std::mem::take(&mut pending_blocks);
2127                            self.append_external_assistant_blocks(
2128                                drained,
2129                                pending_stop_reason,
2130                                std::mem::take(&mut pending_usage),
2131                            );
2132                            pending_response_id = None;
2133                        }
2134                        if let Some(item) = state.items.get_mut(item_id) {
2135                            item.materialized = true;
2136                        }
2137                        if let Some(completion) = state.assistant_completions.get_mut(response_id) {
2138                            completion.usage_consumed = true;
2139                        }
2140                        // T9/T10: route to the correct canonical block by
2141                        // lane. `Display` keeps the legacy `AssistantBlock::Text`
2142                        // shape; `Spoken` flushes
2143                        // `AssistantBlock::Transcript { source: Spoken }` so
2144                        // OpenAI realtime audio transcripts stop being
2145                        // persisted as authored display text.
2146                        let block = match lane {
2147                            TranscriptLane::Display => AssistantBlock::Text {
2148                                text: text.clone(),
2149                                meta: None,
2150                            },
2151                            TranscriptLane::Spoken => AssistantBlock::Transcript {
2152                                text: text.clone(),
2153                                source: crate::types::TranscriptSource::Spoken,
2154                                meta: None,
2155                            },
2156                        };
2157                        // First item in a group seeds the response_id /
2158                        // stop_reason / usage. `usage_consumed` is flipped
2159                        // to true above as the first item is processed, so
2160                        // every subsequent item sees `Usage::default()` from
2161                        // the materializer's per-item builder — accumulating
2162                        // across items in one group is naturally
2163                        // single-counted.
2164                        if pending_response_id.is_none() {
2165                            pending_response_id = Some(response_id.clone());
2166                            pending_stop_reason = *stop_reason;
2167                            pending_usage = usage.clone();
2168                        }
2169                        pending_blocks.push(block);
2170                    }
2171                }
2172                materialized.push(message);
2173            }
2174        }
2175
2176        // Final flush after the outer materialization loop has drained.
2177        if !pending_blocks.is_empty() {
2178            self.append_external_assistant_blocks(
2179                pending_blocks,
2180                pending_stop_reason,
2181                pending_usage,
2182            );
2183        }
2184
2185        RealtimeTranscriptApplyOutcome {
2186            materialized_messages: materialized,
2187        }
2188    }
2189
2190    /// Set a system prompt (adds or replaces System message at start)
2191    pub fn set_system_prompt(&mut self, prompt: String) {
2192        use crate::types::SystemMessage;
2193
2194        let inner = Arc::make_mut(&mut self.messages);
2195        // Check if first message is system
2196        if let Some(Message::System(_)) = inner.first() {
2197            inner[0] = Message::System(SystemMessage::new(prompt));
2198        } else {
2199            inner.insert(0, Message::System(SystemMessage::new(prompt)));
2200        }
2201        self.updated_at = SystemTime::now();
2202        self.refresh_transcript_head_after_message_mutation();
2203    }
2204
2205    /// Remove transient active-turn steer context from persisted session state.
2206    ///
2207    /// Operator steers accepted into an already-running turn are request-local:
2208    /// they should be visible to that turn's next model boundary, then vanish
2209    /// instead of replaying into later turns after persistence or resume.
2210    pub fn discard_transient_runtime_steer_context(&mut self) -> usize {
2211        let mut removed = 0usize;
2212
2213        if let Some(Message::System(system)) = self.messages.first() {
2214            let parts = system
2215                .content
2216                .split(SYSTEM_CONTEXT_SEPARATOR)
2217                .map(str::to_string)
2218                .collect::<Vec<_>>();
2219            let original_len = parts.len();
2220            let retained = parts
2221                .into_iter()
2222                .filter(|part| {
2223                    !(part.starts_with("[Runtime System Context]")
2224                        && part.contains("\nsource: runtime:steer:"))
2225                })
2226                .collect::<Vec<_>>();
2227            let removed_blocks = original_len.saturating_sub(retained.len());
2228            if removed_blocks > 0 {
2229                removed += removed_blocks;
2230                self.set_system_prompt(retained.join(SYSTEM_CONTEXT_SEPARATOR));
2231            }
2232        }
2233
2234        let mut state = self.system_context_state().unwrap_or_default();
2235
2236        let before_pending = state.pending.len();
2237        state
2238            .pending
2239            .retain(|append| !is_runtime_steer_append(append));
2240        removed += before_pending.saturating_sub(state.pending.len());
2241
2242        let before_applied = state.applied.len();
2243        state
2244            .applied
2245            .retain(|append| !is_runtime_steer_append(append));
2246        removed += before_applied.saturating_sub(state.applied.len());
2247
2248        let before_seen = state.seen.len();
2249        state.seen.retain(|key, seen| {
2250            !is_runtime_steer_key(key) && !seen.source.as_deref().is_some_and(is_runtime_steer_key)
2251        });
2252        removed += before_seen.saturating_sub(state.seen.len());
2253
2254        let before_active = state.active_turn_pending_keys.len();
2255        state
2256            .active_turn_pending_keys
2257            .retain(|key| !is_runtime_steer_key(key));
2258        removed += before_active.saturating_sub(state.active_turn_pending_keys.len());
2259
2260        if removed > 0
2261            && let Err(err) = self.set_system_context_state(state)
2262        {
2263            tracing::warn!(
2264                error = %err,
2265                "failed to persist runtime steer context cleanup"
2266            );
2267        }
2268
2269        removed
2270    }
2271
2272    /// Append one or more runtime system-context blocks to the canonical system prompt.
2273    pub fn append_system_context_blocks(&mut self, appends: &[PendingSystemContextAppend]) {
2274        if appends.is_empty() {
2275            return;
2276        }
2277
2278        let current_system_prompt = self
2279            .messages
2280            .first()
2281            .and_then(|message| match message {
2282                Message::System(system) => Some(system.content.as_str()),
2283                _ => None,
2284            })
2285            .unwrap_or_default();
2286        let mut state = self.system_context_state().unwrap_or_default();
2287        let mut state_dirty = false;
2288        let mut new_appends: Vec<PendingSystemContextAppend> = Vec::new();
2289        for append in appends {
2290            if append.text.trim().is_empty() {
2291                continue;
2292            }
2293            let rendered = render_system_context_block(append);
2294            if let Some(key) = append.idempotency_key.as_ref() {
2295                if let Some(existing) = state.seen.get(key)
2296                    && !seen_system_context_matches(existing, append)
2297                {
2298                    tracing::warn!(
2299                        idempotency_key = %key,
2300                        "skipping conflicting runtime system-context append"
2301                    );
2302                    continue;
2303                }
2304                if let Some(existing) = state
2305                    .applied
2306                    .iter()
2307                    .find(|applied| applied.idempotency_key.as_ref() == Some(key))
2308                    && !pending_system_context_matches(existing, append)
2309                {
2310                    tracing::warn!(
2311                        idempotency_key = %key,
2312                        "skipping conflicting runtime system-context append"
2313                    );
2314                    continue;
2315                }
2316                if let Some(existing) = new_appends
2317                    .iter()
2318                    .find(|pending| pending.idempotency_key.as_ref() == Some(key))
2319                {
2320                    if !pending_system_context_matches(existing, append) {
2321                        tracing::warn!(
2322                            idempotency_key = %key,
2323                            "skipping conflicting runtime system-context append"
2324                        );
2325                    }
2326                    continue;
2327                }
2328                if current_system_prompt.contains(&rendered) {
2329                    if !state
2330                        .applied
2331                        .iter()
2332                        .any(|applied| applied.idempotency_key.as_ref() == Some(key))
2333                    {
2334                        state.applied.push(append.clone());
2335                        state_dirty = true;
2336                    }
2337                    if state
2338                        .seen
2339                        .get(key)
2340                        .is_none_or(|seen| seen.state != SeenSystemContextState::Applied)
2341                    {
2342                        state.seen.insert(
2343                            key.clone(),
2344                            SeenSystemContextKey {
2345                                text: append.text.clone(),
2346                                source: append.source.clone(),
2347                                state: SeenSystemContextState::Applied,
2348                            },
2349                        );
2350                        state_dirty = true;
2351                    }
2352                    continue;
2353                }
2354            } else if new_appends.contains(append) || current_system_prompt.contains(&rendered) {
2355                continue;
2356            }
2357            new_appends.push(append.clone());
2358        }
2359        if new_appends.is_empty() {
2360            if state_dirty && let Err(err) = self.set_system_context_state(state) {
2361                tracing::warn!(error = %err, "failed to persist applied system-context state");
2362            }
2363            return;
2364        }
2365
2366        let rendered = new_appends
2367            .iter()
2368            .map(render_system_context_block)
2369            .collect::<Vec<_>>()
2370            .join(SYSTEM_CONTEXT_SEPARATOR);
2371
2372        let next = match self.messages.first() {
2373            Some(Message::System(sys)) if !sys.content.is_empty() => {
2374                format!("{}{}{}", sys.content, SYSTEM_CONTEXT_SEPARATOR, rendered)
2375            }
2376            _ => rendered,
2377        };
2378        self.set_system_prompt(next);
2379
2380        for append in new_appends {
2381            if let Some(key) = append.idempotency_key.as_ref() {
2382                state.seen.insert(
2383                    key.clone(),
2384                    SeenSystemContextKey {
2385                        text: append.text.clone(),
2386                        source: append.source.clone(),
2387                        state: SeenSystemContextState::Applied,
2388                    },
2389                );
2390                if state
2391                    .applied
2392                    .iter()
2393                    .any(|applied| applied.idempotency_key.as_ref() == Some(key))
2394                {
2395                    continue;
2396                }
2397            } else if state.applied.contains(&append) {
2398                continue;
2399            }
2400            state.applied.push(append);
2401        }
2402        if let Err(err) = self.set_system_context_state(state) {
2403            tracing::warn!(error = %err, "failed to persist applied system-context state");
2404        }
2405    }
2406
2407    /// Get the last assistant message text content.
2408    ///
2409    /// Concatenates both `Text` (display) and `Transcript` (spoken) blocks
2410    /// in document order, since both lanes project to the same human-readable
2411    /// stream. Lane provenance is preserved on the underlying `AssistantBlock`
2412    /// for callers that need it.
2413    pub fn last_assistant_text(&self) -> Option<String> {
2414        self.messages.iter().rev().find_map(|m| match m {
2415            Message::BlockAssistant(a) => {
2416                let mut buf = String::new();
2417                for block in &a.blocks {
2418                    match block {
2419                        crate::types::AssistantBlock::Text { text, .. }
2420                        | crate::types::AssistantBlock::Transcript { text, .. } => {
2421                            buf.push_str(text);
2422                        }
2423                        _ => {}
2424                    }
2425                }
2426                if buf.is_empty() { None } else { Some(buf) }
2427            }
2428            Message::Assistant(a) if !a.content.is_empty() => Some(a.content.clone()),
2429            _ => None,
2430        })
2431    }
2432
2433    /// Count tool calls made
2434    pub fn tool_call_count(&self) -> usize {
2435        self.messages
2436            .iter()
2437            .filter_map(|m| match m {
2438                Message::BlockAssistant(a) => Some(
2439                    a.blocks
2440                        .iter()
2441                        .filter(|b| matches!(b, crate::types::AssistantBlock::ToolUse { .. }))
2442                        .count(),
2443                ),
2444                Message::Assistant(a) => Some(a.tool_calls.len()),
2445                _ => None,
2446            })
2447            .sum()
2448    }
2449
2450    /// Get metadata
2451    pub fn metadata(&self) -> &serde_json::Map<String, serde_json::Value> {
2452        &self.metadata
2453    }
2454
2455    /// Set a metadata value
2456    pub fn set_metadata(&mut self, key: &str, value: serde_json::Value) {
2457        self.metadata.insert(key.to_string(), value);
2458        self.updated_at = SystemTime::now();
2459    }
2460
2461    /// Backfill a missing metadata value without changing `updated_at`.
2462    ///
2463    /// This is only for compatibility reads that need to hydrate metadata from
2464    /// an older projection. Semantic metadata mutations must use
2465    /// [`Session::set_metadata`] so the session timestamp advances.
2466    pub fn backfill_metadata_if_absent(&mut self, key: &str, value: serde_json::Value) -> bool {
2467        if self.metadata.contains_key(key) {
2468            false
2469        } else {
2470            self.metadata.insert(key.to_string(), value);
2471            true
2472        }
2473    }
2474
2475    /// Remove a metadata value.
2476    pub fn remove_metadata(&mut self, key: &str) {
2477        self.metadata.remove(key);
2478        self.updated_at = SystemTime::now();
2479    }
2480
2481    /// Store SessionMetadata in the session metadata map.
2482    pub fn set_session_metadata(
2483        &mut self,
2484        metadata: SessionMetadata,
2485    ) -> Result<(), serde_json::Error> {
2486        let value = serde_json::to_value(metadata)?;
2487        self.set_metadata(SESSION_METADATA_KEY, value);
2488        Ok(())
2489    }
2490
2491    /// Load SessionMetadata from the session metadata map.
2492    pub fn session_metadata(&self) -> Option<SessionMetadata> {
2493        self.metadata
2494            .get(SESSION_METADATA_KEY)
2495            .and_then(|value| serde_json::from_value(value.clone()).ok())
2496    }
2497
2498    /// Store durable system-context control state in the session metadata map.
2499    pub fn set_system_context_state(
2500        &mut self,
2501        state: SessionSystemContextState,
2502    ) -> Result<(), serde_json::Error> {
2503        let value = serde_json::to_value(state)?;
2504        self.set_metadata(SESSION_SYSTEM_CONTEXT_STATE_KEY, value);
2505        Ok(())
2506    }
2507
2508    /// Load durable system-context control state from the session metadata map.
2509    pub fn system_context_state(&self) -> Option<SessionSystemContextState> {
2510        self.metadata
2511            .get(SESSION_SYSTEM_CONTEXT_STATE_KEY)
2512            .and_then(|value| serde_json::from_value(value.clone()).ok())
2513    }
2514
2515    /// Store durable deferred-turn control state in the session metadata map.
2516    pub fn set_deferred_turn_state(
2517        &mut self,
2518        state: SessionDeferredTurnState,
2519    ) -> Result<(), serde_json::Error> {
2520        let value = serde_json::to_value(state)?;
2521        self.set_metadata(SESSION_DEFERRED_TURN_STATE_KEY, value);
2522        Ok(())
2523    }
2524
2525    /// Load durable deferred-turn control state from the session metadata map.
2526    pub fn deferred_turn_state(&self) -> Option<SessionDeferredTurnState> {
2527        self.metadata
2528            .get(SESSION_DEFERRED_TURN_STATE_KEY)
2529            .and_then(|value| serde_json::from_value(value.clone()).ok())
2530    }
2531
2532    /// Store recoverable build-only session state in the session metadata map.
2533    pub fn set_build_state(&mut self, state: SessionBuildState) -> Result<(), serde_json::Error> {
2534        let value = serde_json::to_value(state)?;
2535        self.set_metadata(SESSION_BUILD_STATE_KEY, value);
2536        Ok(())
2537    }
2538
2539    /// Load recoverable build-only session state from the session metadata map.
2540    pub fn build_state(&self) -> Option<SessionBuildState> {
2541        self.metadata
2542            .get(SESSION_BUILD_STATE_KEY)
2543            .and_then(|value| serde_json::from_value(value.clone()).ok())
2544    }
2545
2546    /// Store durable tool-visibility control state in the session metadata map.
2547    pub fn set_tool_visibility_state(
2548        &mut self,
2549        state: SessionToolVisibilityState,
2550    ) -> Result<(), serde_json::Error> {
2551        let value = serde_json::to_value(state)?;
2552        self.set_metadata(SESSION_TOOL_VISIBILITY_STATE_KEY, value);
2553        Ok(())
2554    }
2555
2556    /// Load durable tool-visibility control state from the session metadata map.
2557    pub fn tool_visibility_state(
2558        &self,
2559    ) -> Result<Option<SessionToolVisibilityState>, serde_json::Error> {
2560        self.try_tool_visibility_state()
2561    }
2562
2563    /// Load durable tool-visibility control state while distinguishing absent
2564    /// metadata from malformed canonical metadata.
2565    pub fn try_tool_visibility_state(
2566        &self,
2567    ) -> Result<Option<SessionToolVisibilityState>, serde_json::Error> {
2568        self.metadata
2569            .get(SESSION_TOOL_VISIBILITY_STATE_KEY)
2570            .map(|value| serde_json::from_value(value.clone()))
2571            .transpose()
2572    }
2573
2574    /// Load typed transcript revision state from metadata.
2575    pub fn transcript_history_state(
2576        &self,
2577    ) -> Result<Option<TranscriptHistoryState>, serde_json::Error> {
2578        self.metadata
2579            .get(SESSION_TRANSCRIPT_HISTORY_STATE_KEY)
2580            .map(|value| serde_json::from_value(value.clone()))
2581            .transpose()
2582    }
2583
2584    /// Validate the retained transcript revision graph, when present.
2585    pub fn validate_transcript_history_state(&self) -> Result<(), TranscriptEditError> {
2586        let Some(state) = self
2587            .transcript_history_state()
2588            .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?
2589        else {
2590            return Ok(());
2591        };
2592        validate_transcript_history_state(&state)
2593    }
2594
2595    /// Return the retained immutable body for a transcript revision.
2596    pub fn transcript_revision_body(
2597        &self,
2598        revision: &str,
2599    ) -> Result<Option<TranscriptRevisionBody>, serde_json::Error> {
2600        Ok(self.transcript_history_state()?.and_then(|state| {
2601            state
2602                .revisions
2603                .into_iter()
2604                .find(|body| body.revision == revision)
2605        }))
2606    }
2607
2608    /// Return the ordered messages for a retained transcript revision.
2609    pub fn transcript_revision_messages(
2610        &self,
2611        revision: &str,
2612    ) -> Result<Option<Vec<Message>>, serde_json::Error> {
2613        Ok(self
2614            .transcript_revision_body(revision)?
2615            .map(|body| body.messages))
2616    }
2617
2618    /// Materialize this session projection from a typed transcript history graph.
2619    pub fn apply_transcript_history_state(
2620        &mut self,
2621        state: TranscriptHistoryState,
2622    ) -> Result<(), TranscriptEditError> {
2623        validate_transcript_history_state(&state)?;
2624        let head_body = state
2625            .revisions
2626            .iter()
2627            .find(|body| body.revision == state.head)
2628            .ok_or_else(|| {
2629                TranscriptEditError::HistoryStateMalformed(format!(
2630                    "missing transcript head body {}",
2631                    state.head
2632                ))
2633            })?
2634            .clone();
2635        let value = serde_json::to_value(&state)
2636            .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
2637        self.set_metadata(SESSION_TRANSCRIPT_HISTORY_STATE_KEY, value);
2638        let mut updated_at = head_body.created_at;
2639        for commit in &state.commits {
2640            if commit.committed_at > updated_at {
2641                updated_at = commit.committed_at;
2642            }
2643        }
2644        self.messages = Arc::new(head_body.messages);
2645        self.updated_at = updated_at;
2646        Ok(())
2647    }
2648
2649    /// Current transcript head revision. Rows written before transcript
2650    /// revisions derive their implicit head from the current message snapshot.
2651    pub fn transcript_revision(&self) -> Result<String, serde_json::Error> {
2652        if let Some(state) = self.transcript_history_state()? {
2653            Ok(state.head)
2654        } else {
2655            transcript_messages_digest(self.messages())
2656        }
2657    }
2658
2659    /// Commit a same-session transcript rewrite and advance the transcript head.
2660    pub fn commit_transcript_rewrite(
2661        &mut self,
2662        selection: TranscriptRewriteSelection,
2663        replacement: Vec<Message>,
2664        reason: TranscriptRewriteReason,
2665        actor: Option<String>,
2666        expected_parent_revision: Option<String>,
2667    ) -> Result<TranscriptRewriteCommit, TranscriptEditError> {
2668        let parent_revision = self
2669            .transcript_revision()
2670            .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
2671        if let Some(expected) = expected_parent_revision
2672            && expected != parent_revision
2673        {
2674            return Err(TranscriptEditError::RevisionConflict {
2675                expected,
2676                actual: parent_revision,
2677            });
2678        }
2679
2680        let (start, end) = selection.bounds();
2681        let message_count = self.messages.len();
2682        if start > end || end > message_count {
2683            return Err(TranscriptEditError::InvalidRewriteRange {
2684                start,
2685                end,
2686                message_count,
2687            });
2688        }
2689
2690        let replacement_len = replacement.len();
2691        let mut rewritten = Vec::with_capacity(
2692            start
2693                .saturating_add(replacement_len)
2694                .saturating_add(message_count.saturating_sub(end)),
2695        );
2696        rewritten.extend_from_slice(&self.messages[..start]);
2697        rewritten.extend(replacement);
2698        rewritten.extend_from_slice(&self.messages[end..]);
2699        validate_transcript_tool_result_shape(&rewritten)?;
2700
2701        let original_span_digest = sha256_json_digest(&self.messages[start..end])
2702            .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
2703        let replacement_digest = sha256_json_digest(&rewritten[start..start + replacement_len])
2704            .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
2705        let revision = transcript_messages_digest(&rewritten)
2706            .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
2707        if revision == parent_revision {
2708            return Err(TranscriptEditError::NoOpRewrite { revision });
2709        }
2710
2711        let commit = TranscriptRewriteCommit {
2712            parent_revision,
2713            revision: revision.clone(),
2714            selection,
2715            original_span_digest,
2716            replacement_digest,
2717            messages_before: message_count,
2718            messages_after: rewritten.len(),
2719            reason,
2720            actor,
2721            committed_at: SystemTime::now(),
2722        };
2723
2724        let mut state = self
2725            .transcript_history_state()
2726            .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?
2727            .unwrap_or_else(|| TranscriptHistoryState {
2728                head: commit.parent_revision.clone(),
2729                commits: Vec::new(),
2730                revisions: Vec::new(),
2731            });
2732        if !state
2733            .revisions
2734            .iter()
2735            .any(|body| body.revision == commit.parent_revision)
2736        {
2737            state.revisions.push(TranscriptRevisionBody {
2738                revision: commit.parent_revision.clone(),
2739                parent_revision: None,
2740                messages: self.messages().to_vec(),
2741                created_at: self.updated_at,
2742            });
2743        }
2744        if !state
2745            .revisions
2746            .iter()
2747            .any(|body| body.revision == commit.revision)
2748        {
2749            state.revisions.push(TranscriptRevisionBody {
2750                revision: commit.revision.clone(),
2751                parent_revision: Some(commit.parent_revision.clone()),
2752                messages: rewritten.clone(),
2753                created_at: commit.committed_at,
2754            });
2755        }
2756        state.head = revision;
2757        state.commits.push(commit.clone());
2758        let value = serde_json::to_value(state)
2759            .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
2760        self.set_metadata(SESSION_TRANSCRIPT_HISTORY_STATE_KEY, value);
2761
2762        self.messages = Arc::new(rewritten);
2763        self.updated_at = SystemTime::now();
2764        Ok(commit)
2765    }
2766
2767    fn refresh_transcript_head_after_message_mutation(&mut self) {
2768        if !self
2769            .metadata
2770            .contains_key(SESSION_TRANSCRIPT_HISTORY_STATE_KEY)
2771        {
2772            return;
2773        }
2774        let Ok(Some(mut state)) = self.transcript_history_state() else {
2775            tracing::warn!(
2776                session_id = %self.id,
2777                "transcript history state is malformed; leaving head unchanged after message mutation"
2778            );
2779            return;
2780        };
2781        let Ok(head) = transcript_messages_digest(self.messages()) else {
2782            tracing::warn!(
2783                session_id = %self.id,
2784                "failed to digest transcript after message mutation; leaving head unchanged"
2785            );
2786            return;
2787        };
2788        let previous_head = state.head.clone();
2789        if !state.revisions.iter().any(|body| body.revision == head) {
2790            state.revisions.push(TranscriptRevisionBody {
2791                revision: head.clone(),
2792                parent_revision: Some(previous_head),
2793                messages: self.messages().to_vec(),
2794                created_at: SystemTime::now(),
2795            });
2796        }
2797        state.head = head;
2798        match serde_json::to_value(state) {
2799            Ok(value) => self.set_metadata(SESSION_TRANSCRIPT_HISTORY_STATE_KEY, value),
2800            Err(error) => {
2801                tracing::warn!(
2802                    session_id = %self.id,
2803                    error = %error,
2804                    "failed to serialize transcript history state after message mutation"
2805                );
2806            }
2807        }
2808    }
2809
2810    /// Store typed mob operator authority inside canonical build-state metadata.
2811    pub fn set_mob_tool_authority_context(
2812        &mut self,
2813        authority_context: Option<MobToolAuthorityContext>,
2814    ) -> Result<(), serde_json::Error> {
2815        let mut build_state = self.build_state().unwrap_or_default();
2816        build_state.mob_tool_authority_context = authority_context;
2817        self.set_build_state(build_state)
2818    }
2819
2820    /// Load typed mob operator authority from canonical build-state metadata.
2821    pub fn mob_tool_authority_context(&self) -> Option<MobToolAuthorityContext> {
2822        self.build_state()
2823            .and_then(|state| state.mob_tool_authority_context)
2824    }
2825
2826    /// Fork the session at a specific message index
2827    ///
2828    /// Creates a new session with a subset of messages. The messages are copied
2829    /// (not shared) since the new session has a different prefix.
2830    pub fn fork_at(&self, index: usize) -> Self {
2831        let now = SystemTime::now();
2832        let truncated = self.messages[..index.min(self.messages.len())].to_vec();
2833        Self {
2834            version: SESSION_VERSION,
2835            id: SessionId::new(),
2836            messages: Arc::new(truncated),
2837            created_at: now,
2838            updated_at: now,
2839            metadata: self.branch_metadata(),
2840            usage: self.usage.clone(),
2841        }
2842    }
2843
2844    /// Fork the session and replace the message at `message_index`.
2845    ///
2846    /// The returned session contains the original prefix before
2847    /// `message_index`, followed by the typed replacement. Later source
2848    /// messages are intentionally omitted so follow-up work continues from the
2849    /// edited branch rather than replaying stale descendants.
2850    pub fn fork_replacing(
2851        &self,
2852        message_index: usize,
2853        replacement: TranscriptReplacement,
2854    ) -> Result<Self, TranscriptEditError> {
2855        let Some(original) = self.messages.get(message_index) else {
2856            return Err(TranscriptEditError::MessageIndexOutOfBounds {
2857                message_index,
2858                message_count: self.messages.len(),
2859            });
2860        };
2861
2862        let replacement_message = match replacement {
2863            TranscriptReplacement::Message { message } => message,
2864            TranscriptReplacement::UserContentBlock { block_index, block } => {
2865                let Message::User(user) = original else {
2866                    return Err(TranscriptEditError::MessageRoleMismatch {
2867                        message_index,
2868                        expected: "user",
2869                        actual: message_role_name(original),
2870                    });
2871                };
2872                if block_index >= user.content.len() {
2873                    return Err(TranscriptEditError::BlockIndexOutOfBounds {
2874                        block_kind: "user content block",
2875                        block_index,
2876                        block_count: user.content.len(),
2877                    });
2878                }
2879                let mut edited = user.clone();
2880                edited.content[block_index] = block;
2881                Message::User(edited)
2882            }
2883            TranscriptReplacement::AssistantBlock { block_index, block } => {
2884                let Message::BlockAssistant(assistant) = original else {
2885                    return Err(TranscriptEditError::MessageRoleMismatch {
2886                        message_index,
2887                        expected: "block_assistant",
2888                        actual: message_role_name(original),
2889                    });
2890                };
2891                if block_index >= assistant.blocks.len() {
2892                    return Err(TranscriptEditError::BlockIndexOutOfBounds {
2893                        block_kind: "assistant block",
2894                        block_index,
2895                        block_count: assistant.blocks.len(),
2896                    });
2897                }
2898                let mut edited = assistant.clone();
2899                edited.blocks[block_index] = block;
2900                Message::BlockAssistant(edited)
2901            }
2902            TranscriptReplacement::ToolResultContentBlock {
2903                result_index,
2904                block_index,
2905                block,
2906            } => {
2907                let Message::ToolResults {
2908                    results,
2909                    created_at,
2910                } = original
2911                else {
2912                    return Err(TranscriptEditError::MessageRoleMismatch {
2913                        message_index,
2914                        expected: "tool_results",
2915                        actual: message_role_name(original),
2916                    });
2917                };
2918                let Some(result) = results.get(result_index) else {
2919                    return Err(TranscriptEditError::BlockIndexOutOfBounds {
2920                        block_kind: "tool result",
2921                        block_index: result_index,
2922                        block_count: results.len(),
2923                    });
2924                };
2925                if block_index >= result.content.len() {
2926                    return Err(TranscriptEditError::BlockIndexOutOfBounds {
2927                        block_kind: "tool result content block",
2928                        block_index,
2929                        block_count: result.content.len(),
2930                    });
2931                }
2932                let mut edited_results = results.clone();
2933                edited_results[result_index].content[block_index] = block;
2934                Message::ToolResults {
2935                    results: edited_results,
2936                    created_at: *created_at,
2937                }
2938            }
2939        };
2940
2941        let mut forked = self.fork_at(message_index);
2942        forked.push(replacement_message);
2943        Ok(forked)
2944    }
2945
2946    /// Fork the entire session (full history)
2947    ///
2948    /// This is O(1) - the new session shares the message buffer via Arc.
2949    /// Copy-on-write occurs when either session mutates its messages.
2950    pub fn fork(&self) -> Self {
2951        let now = SystemTime::now();
2952        Self {
2953            version: SESSION_VERSION,
2954            id: SessionId::new(),
2955            messages: Arc::clone(&self.messages),
2956            created_at: now,
2957            updated_at: now,
2958            metadata: self.branch_metadata(),
2959            usage: self.usage.clone(),
2960        }
2961    }
2962
2963    fn branch_metadata(&self) -> serde_json::Map<String, serde_json::Value> {
2964        let mut metadata = self.metadata.clone();
2965        metadata.remove(SESSION_TRANSCRIPT_HISTORY_STATE_KEY);
2966        metadata
2967    }
2968}
2969
2970impl Default for Session {
2971    fn default() -> Self {
2972        Self::new()
2973    }
2974}
2975
2976/// Summary metadata for listing sessions
2977#[derive(Debug, Clone, Serialize, Deserialize)]
2978#[serde(rename_all = "snake_case")]
2979pub struct SessionMeta {
2980    pub id: SessionId,
2981    pub created_at: SystemTime,
2982    pub updated_at: SystemTime,
2983    pub message_count: usize,
2984    pub total_tokens: u64,
2985    #[serde(default)]
2986    pub metadata: serde_json::Map<String, serde_json::Value>,
2987}
2988
2989/// Metadata required to reliably resume a session across interfaces.
2990#[derive(Debug, Clone, Serialize, Deserialize)]
2991#[serde(rename_all = "snake_case")]
2992pub struct SessionMetadata {
2993    /// Per-entity schema version byte.
2994    ///
2995    /// Defaults to `1` on read so pre-wave-c rows without the field
2996    /// deserialize cleanly; rewritten as `SESSION_METADATA_SCHEMA_VERSION`
2997    /// on the next `save()` after a successful migration pass.
2998    #[serde(default = "default_session_metadata_schema_version")]
2999    pub schema_version: u32,
3000    pub model: String,
3001    pub max_tokens: u32,
3002    #[serde(default = "default_structured_output_retries")]
3003    pub structured_output_retries: u32,
3004    pub provider: Provider,
3005    #[serde(default, skip_serializing_if = "Option::is_none")]
3006    pub self_hosted_server_id: Option<String>,
3007    #[serde(default, skip_serializing_if = "Option::is_none")]
3008    pub provider_params: Option<serde_json::Value>,
3009    pub tooling: SessionTooling,
3010    #[serde(default)]
3011    pub keep_alive: bool,
3012    pub comms_name: Option<String>,
3013    /// Friendly metadata for peer discovery (populated when comms is enabled).
3014    #[serde(default, skip_serializing_if = "Option::is_none")]
3015    pub peer_meta: Option<PeerMeta>,
3016    /// Realm identity for cross-surface storage sharing/isolation.
3017    #[serde(default, skip_serializing_if = "Option::is_none")]
3018    pub realm_id: Option<String>,
3019    /// Optional process/agent instance identifier within a realm.
3020    #[serde(default, skip_serializing_if = "Option::is_none")]
3021    pub instance_id: Option<String>,
3022    /// Backend pinned by the realm manifest (e.g. "sqlite", "jsonl").
3023    #[serde(default, skip_serializing_if = "Option::is_none")]
3024    pub backend: Option<String>,
3025    /// Config generation used when this session was created/resumed.
3026    #[serde(default, skip_serializing_if = "Option::is_none")]
3027    pub config_generation: Option<u64>,
3028    /// Realm-scoped auth binding (Phase 3 provider-auth redesign).
3029    ///
3030    /// Persisted intent for the auth/backend binding this session resolved
3031    /// through. On resume, `apply_resumed_session_metadata` writes this
3032    /// back into `AgentBuildConfig.auth_binding` so the same realm
3033    /// binding is re-resolved. Never carries secret material — leases
3034    /// are rebuilt from the active realm connection set at resume time.
3035    /// Older persisted sessions without the field deserialize as `None`
3036    /// (backward compatible via `#[serde(default)]`).
3037    #[serde(default, skip_serializing_if = "Option::is_none")]
3038    pub auth_binding: Option<crate::AuthBindingRef>,
3039}
3040
3041fn default_structured_output_retries() -> u32 {
3042    2
3043}
3044
3045fn default_session_metadata_schema_version() -> u32 {
3046    1
3047}
3048
3049/// Canonical durable LLM identity for a session.
3050#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
3051#[serde(rename_all = "snake_case")]
3052pub struct SessionLlmIdentity {
3053    pub model: String,
3054    pub provider: Provider,
3055    #[serde(default, skip_serializing_if = "Option::is_none")]
3056    pub self_hosted_server_id: Option<String>,
3057    #[serde(default, skip_serializing_if = "Option::is_none")]
3058    pub provider_params: Option<serde_json::Value>,
3059    /// Realm-scoped auth binding this session resolves credentials
3060    /// through. Carried on the identity so mid-session hot-swaps
3061    /// (`apply_live_session_llm_identity`) re-resolve against the
3062    /// same realm the session was created with — preventing
3063    /// cross-realm credential bleed in multi-tenant setups. Dogma
3064    /// §12 (dynamic policy follows dynamic identity): on swap the
3065    /// factory re-enters `ProviderRuntimeRegistry::resolve` against
3066    /// this binding, not a new synthesized env-default realm.
3067    ///
3068    /// Projection (dogma §1/§13): canonical owner is
3069    /// `SessionMetadata.auth_binding`; this field is the
3070    /// read/write projection used by hot-swap.
3071    #[serde(default, skip_serializing_if = "Option::is_none")]
3072    pub auth_binding: Option<crate::AuthBindingRef>,
3073}
3074
3075/// Typed per-turn override request for a session LLM identity.
3076pub struct SessionLlmIdentityOverride<'a> {
3077    pub model: Option<&'a str>,
3078    pub provider: Option<Provider>,
3079    pub provider_params: Option<&'a serde_json::Value>,
3080    pub clear_provider_params: bool,
3081    pub auth_binding: Option<&'a crate::AuthBindingRef>,
3082    pub clear_auth_binding: bool,
3083}
3084
3085#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
3086pub enum SessionLlmIdentityOverrideError {
3087    #[error("provider override requires model on an existing session")]
3088    ProviderRequiresModel,
3089    #[error("clear_provider_params cannot be combined with provider_params")]
3090    SetAndClearProviderParams,
3091    #[error("clear_auth_binding cannot be combined with auth_binding")]
3092    SetAndClearAuthBinding,
3093    #[error("{0}")]
3094    ProviderModelMismatch(String),
3095    #[error("self-hosted provider requires a registered model alias; '{model}' is not configured")]
3096    MissingSelfHostedAlias { model: String },
3097}
3098
3099/// Resolve a turn-time model/provider/auth override against the current
3100/// durable session identity.
3101///
3102/// The model registry is the authority for catalog ownership. A model-only
3103/// override follows catalog ownership when the target model is registered;
3104/// uncatalogued models keep the current provider so custom aliases remain
3105/// possible.
3106pub fn resolve_session_llm_identity_override(
3107    current: &SessionLlmIdentity,
3108    registry: &crate::ModelRegistry,
3109    overrides: SessionLlmIdentityOverride<'_>,
3110) -> Result<SessionLlmIdentity, SessionLlmIdentityOverrideError> {
3111    if overrides.provider.is_some() && overrides.model.is_none() {
3112        return Err(SessionLlmIdentityOverrideError::ProviderRequiresModel);
3113    }
3114    if overrides.clear_provider_params && overrides.provider_params.is_some() {
3115        return Err(SessionLlmIdentityOverrideError::SetAndClearProviderParams);
3116    }
3117    if overrides.clear_auth_binding && overrides.auth_binding.is_some() {
3118        return Err(SessionLlmIdentityOverrideError::SetAndClearAuthBinding);
3119    }
3120
3121    let model = overrides
3122        .model
3123        .map(str::to_string)
3124        .unwrap_or_else(|| current.model.clone());
3125    let provider = if let Some(provider) = overrides.provider {
3126        provider
3127    } else if overrides.model.is_some() {
3128        registry
3129            .entry(&model)
3130            .map_or(current.provider, |entry| entry.provider)
3131    } else {
3132        current.provider
3133    };
3134
3135    if (overrides.model.is_some() || overrides.provider.is_some())
3136        && let Some(reason) = registry.provider_override_mismatch_reason(provider, &model)
3137    {
3138        return Err(SessionLlmIdentityOverrideError::ProviderModelMismatch(
3139            reason,
3140        ));
3141    }
3142
3143    let provider_params = if overrides.clear_provider_params {
3144        None
3145    } else {
3146        overrides
3147            .provider_params
3148            .cloned()
3149            .or_else(|| current.provider_params.clone())
3150    };
3151    let self_hosted_server_id = if provider == Provider::SelfHosted {
3152        if overrides.model.is_none() {
3153            current.self_hosted_server_id.clone().or_else(|| {
3154                registry
3155                    .entry_for_provider(Provider::SelfHosted, &model)
3156                    .and_then(|entry| entry.self_hosted.as_ref())
3157                    .map(|server| server.server_id.clone())
3158            })
3159        } else {
3160            let entry = registry
3161                .entry_for_provider(Provider::SelfHosted, &model)
3162                .ok_or_else(|| SessionLlmIdentityOverrideError::MissingSelfHostedAlias {
3163                    model: model.clone(),
3164                })?;
3165            entry
3166                .self_hosted
3167                .as_ref()
3168                .map(|server| server.server_id.clone())
3169        }
3170    } else {
3171        None
3172    };
3173
3174    let auth_binding = if overrides.clear_auth_binding
3175        || (provider != current.provider && overrides.auth_binding.is_none())
3176    {
3177        None
3178    } else {
3179        overrides
3180            .auth_binding
3181            .cloned()
3182            .or_else(|| current.auth_binding.clone())
3183    };
3184
3185    Ok(SessionLlmIdentity {
3186        model,
3187        provider,
3188        self_hosted_server_id,
3189        provider_params,
3190        auth_binding,
3191    })
3192}
3193
3194/// Live request policy paired with a session LLM identity hot-swap.
3195///
3196/// `SessionLlmIdentity` is the durable semantic identity. This projection is
3197/// the per-turn request policy the live agent must use for the next LLM call,
3198/// including provider params and provider-native tool defaults resolved for
3199/// the same target model/provider.
3200#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
3201#[serde(rename_all = "snake_case")]
3202pub struct SessionLlmRequestPolicy {
3203    pub model: String,
3204    #[serde(default, skip_serializing_if = "Option::is_none")]
3205    pub provider_params: Option<serde_json::Value>,
3206    #[serde(default, skip_serializing_if = "Option::is_none")]
3207    pub provider_tool_defaults: Option<serde_json::Value>,
3208}
3209
3210impl SessionMetadata {
3211    /// Return the current durable LLM identity for this session.
3212    pub fn llm_identity(&self) -> SessionLlmIdentity {
3213        SessionLlmIdentity {
3214            model: self.model.clone(),
3215            provider: self.provider,
3216            self_hosted_server_id: self.self_hosted_server_id.clone(),
3217            provider_params: self.provider_params.clone(),
3218            auth_binding: self.auth_binding.clone(),
3219        }
3220    }
3221
3222    /// Overwrite the durable LLM identity while preserving unrelated session metadata.
3223    pub fn apply_llm_identity(&mut self, identity: &SessionLlmIdentity) {
3224        self.model = identity.model.clone();
3225        self.provider = identity.provider;
3226        self.self_hosted_server_id = identity.self_hosted_server_id.clone();
3227        self.provider_params = identity.provider_params.clone();
3228        self.auth_binding = identity.auth_binding.clone();
3229    }
3230}
3231
3232/// Key used to store SessionMetadata in Session metadata map.
3233pub const SESSION_METADATA_KEY: &str = "session_metadata";
3234
3235/// Caller intent for a tool category.
3236///
3237/// Distinguishes "no opinion / didn't exist" (`Inherit`) from explicit
3238/// `Enable` / `Disable` so that resumed sessions don't freeze tool
3239/// availability at the capabilities of the Meerkat version that created them.
3240///
3241/// **Dogma §10:** Inherit, disable, and set are different facts.
3242#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
3243#[serde(rename_all = "snake_case")]
3244pub enum ToolCategoryOverride {
3245    /// No explicit intent — inherit runtime/factory default.
3246    #[default]
3247    Inherit,
3248    /// Explicitly enabled by caller.
3249    Enable,
3250    /// Explicitly disabled by caller.
3251    Disable,
3252}
3253
3254impl ToolCategoryOverride {
3255    /// Resolve this override against a runtime default.
3256    ///
3257    /// - `Enable` → `true`
3258    /// - `Disable` → `false`
3259    /// - `Inherit` → `runtime_default`
3260    #[must_use]
3261    pub fn resolve(self, runtime_default: bool) -> bool {
3262        match self {
3263            Self::Enable => true,
3264            Self::Disable => false,
3265            Self::Inherit => runtime_default,
3266        }
3267    }
3268
3269    /// Convert to `Option<bool>` for feeding `AgentBuildConfig` override fields.
3270    ///
3271    /// - `Enable` → `Some(true)`
3272    /// - `Disable` → `Some(false)`
3273    /// - `Inherit` → `None` (factory default wins)
3274    #[must_use]
3275    pub fn to_override(self) -> Option<bool> {
3276        match self {
3277            Self::Enable => Some(true),
3278            Self::Disable => Some(false),
3279            Self::Inherit => None,
3280        }
3281    }
3282
3283    /// Construct from a resolved effective bool.
3284    ///
3285    /// **Warning:** this collapses `Inherit` into `Enable`/`Disable`. Prefer
3286    /// [`from_override`] when persisting session metadata so that `Inherit`
3287    /// survives across save/resume cycles. Only use `from_effective` in test
3288    /// helpers or when constructing metadata from external sources that only
3289    /// provide a resolved bool.
3290    #[must_use]
3291    pub fn from_effective(enabled: bool) -> Self {
3292        if enabled { Self::Enable } else { Self::Disable }
3293    }
3294
3295    /// Construct from an `Option<bool>` override field, preserving `Inherit`.
3296    ///
3297    /// - `Some(true)` → `Enable`
3298    /// - `Some(false)` → `Disable`
3299    /// - `None` → `Inherit` (factory default was used, no explicit intent)
3300    ///
3301    /// This is the inverse of [`to_override`] and should be used when persisting
3302    /// session tooling metadata so that `Inherit` survives across save/resume
3303    /// cycles.
3304    #[must_use]
3305    pub fn from_override(value: Option<bool>) -> Self {
3306        match value {
3307            Some(true) => Self::Enable,
3308            Some(false) => Self::Disable,
3309            None => Self::Inherit,
3310        }
3311    }
3312}
3313
3314/// Backward-compatible deserializer: accepts both old `bool` JSON and new
3315/// tri-state `"inherit"` / `"enable"` / `"disable"` strings.
3316///
3317/// Old persisted sessions have `"mob": false` or `"builtins": true`.
3318/// - `true`  → `Enable`  (user explicitly had it on)
3319/// - `false` → `Inherit` (can't distinguish "disabled" from "didn't exist")
3320/// - string  → normal enum deserialization
3321fn deserialize_tool_category_compat<'de, D>(
3322    deserializer: D,
3323) -> Result<ToolCategoryOverride, D::Error>
3324where
3325    D: serde::Deserializer<'de>,
3326{
3327    use serde::de;
3328
3329    struct ToolCategoryVisitor;
3330
3331    impl de::Visitor<'_> for ToolCategoryVisitor {
3332        type Value = ToolCategoryOverride;
3333
3334        fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3335            formatter.write_str("a boolean or one of \"inherit\", \"enable\", \"disable\"")
3336        }
3337
3338        fn visit_bool<E: de::Error>(self, v: bool) -> Result<Self::Value, E> {
3339            Ok(if v {
3340                ToolCategoryOverride::Enable
3341            } else {
3342                ToolCategoryOverride::Inherit
3343            })
3344        }
3345
3346        fn visit_str<E: de::Error>(self, v: &str) -> Result<Self::Value, E> {
3347            match v {
3348                "inherit" => Ok(ToolCategoryOverride::Inherit),
3349                "enable" => Ok(ToolCategoryOverride::Enable),
3350                "disable" => Ok(ToolCategoryOverride::Disable),
3351                _ => Err(de::Error::unknown_variant(
3352                    v,
3353                    &["inherit", "enable", "disable"],
3354                )),
3355            }
3356        }
3357    }
3358
3359    deserializer.deserialize_any(ToolCategoryVisitor)
3360}
3361
3362fn normalize_realtime_item_id(item_id: String) -> Option<String> {
3363    let trimmed = item_id.trim();
3364    (!trimmed.is_empty()).then(|| trimmed.to_string())
3365}
3366
3367fn normalize_realtime_previous_item_id(previous_item_id: Option<String>) -> Option<String> {
3368    previous_item_id.and_then(normalize_realtime_item_id)
3369}
3370
3371fn normalize_realtime_response_id(response_id: String) -> Option<String> {
3372    normalize_realtime_item_id(response_id)
3373}
3374
3375fn normalize_realtime_optional_response_id(response_id: Option<String>) -> Option<String> {
3376    response_id.and_then(normalize_realtime_response_id)
3377}
3378
3379/// T9/T10: tag the assistant item with its output lane (Display vs Spoken).
3380///
3381/// First lane wins: if a later delta tries to promote an item already
3382/// classified as the other lane, we keep the existing lane and emit a
3383/// `tracing::warn!`. The materializer can only flush one block-type per
3384/// item; mixed-lane content on the same `item_id` is not expected from
3385/// any provider and would be a provider-side bug. Items with empty
3386/// content (no deltas yet observed) accept any lane.
3387///
3388/// Returns `true` when the item now carries the requested `lane` (either
3389/// the lane already matched, or the item was empty so the lane was
3390/// promoted). Returns `false` when the item already carried the *other*
3391/// lane with staged content — in that case the caller MUST skip any
3392/// content insert it was about to perform on `lane`, otherwise it would
3393/// clobber the locked-in lane content (R5-6 sibling: silent-clobber bug).
3394#[must_use]
3395fn promote_item_lane(item: &mut RealtimeTranscriptItemState, lane: TranscriptLane) -> bool {
3396    if item.lane == lane {
3397        return true;
3398    }
3399    let has_content = item.content_segments.values().any(|s| !s.is_empty());
3400    if !has_content {
3401        // No content has been staged on the original lane yet; safe to
3402        // re-classify.
3403        item.lane = lane;
3404        return true;
3405    }
3406    tracing::warn!(
3407        existing_lane = ?item.lane,
3408        observed_lane = ?lane,
3409        "ignoring realtime transcript lane conflict on item with staged content"
3410    );
3411    false
3412}
3413
3414fn observe_realtime_item(
3415    state: &mut SessionRealtimeTranscriptState,
3416    item_id: String,
3417    previous_item_id: Option<String>,
3418    role: RealtimeTranscriptRole,
3419    response_id: Option<String>,
3420) -> Option<&mut RealtimeTranscriptItemState> {
3421    let item_id = normalize_realtime_item_id(item_id)?;
3422    let previous_item_id = normalize_realtime_previous_item_id(previous_item_id);
3423    let response_id = normalize_realtime_optional_response_id(response_id);
3424    if !state
3425        .first_seen_order
3426        .iter()
3427        .any(|existing| existing == &item_id)
3428    {
3429        state.first_seen_order.push(item_id.clone());
3430    }
3431    let item = state.items.entry(item_id.clone()).or_insert_with(|| {
3432        RealtimeTranscriptItemState::new(role, previous_item_id.clone(), response_id.clone())
3433    });
3434    if item.skipped {
3435        if item.previous_item_id.is_none() && previous_item_id.is_some() {
3436            item.previous_item_id = previous_item_id;
3437        }
3438        tracing::warn!(
3439            item_id = %item_id,
3440            observed_role = ?role,
3441            "ignoring realtime transcript content for item already marked as a contentless causal anchor"
3442        );
3443        return None;
3444    }
3445    if item.role != role {
3446        tracing::warn!(
3447            item_id = %item_id,
3448            existing_role = ?item.role,
3449            observed_role = ?role,
3450            "ignoring realtime transcript item role conflict"
3451        );
3452        return None;
3453    }
3454    if item.previous_item_id.is_none() && previous_item_id.is_some() {
3455        item.previous_item_id = previous_item_id;
3456    }
3457    if let Some(response_id) = response_id {
3458        match item.response_id.as_ref() {
3459            Some(existing) if existing != &response_id => {
3460                tracing::warn!(
3461                    item_id = %item_id,
3462                    existing_response_id = %existing,
3463                    observed_response_id = %response_id,
3464                    "ignoring realtime transcript item response conflict"
3465                );
3466                return None;
3467            }
3468            Some(_) => {}
3469            None => item.response_id = Some(response_id),
3470        }
3471    }
3472    Some(item)
3473}
3474
3475fn observe_realtime_skipped_item(
3476    state: &mut SessionRealtimeTranscriptState,
3477    item_id: String,
3478    previous_item_id: Option<String>,
3479) {
3480    let Some(item_id) = normalize_realtime_item_id(item_id) else {
3481        return;
3482    };
3483    let previous_item_id = normalize_realtime_previous_item_id(previous_item_id);
3484    if !state
3485        .first_seen_order
3486        .iter()
3487        .any(|existing| existing == &item_id)
3488    {
3489        state.first_seen_order.push(item_id.clone());
3490    }
3491    let item = state
3492        .items
3493        .entry(item_id)
3494        .or_insert_with(|| RealtimeTranscriptItemState::skipped(previous_item_id.clone()));
3495    if item.previous_item_id.is_none() && previous_item_id.is_some() {
3496        item.previous_item_id = previous_item_id;
3497    }
3498    if item.materialized || item.skipped {
3499        return;
3500    }
3501    if item.role != RealtimeTranscriptRole::Assistant {
3502        tracing::warn!(
3503            existing_role = ?item.role,
3504            "ignoring realtime skipped-item observation for non-assistant item"
3505        );
3506        return;
3507    }
3508    if !item.content_segments.is_empty() {
3509        tracing::warn!("ignoring realtime skipped-item observation for content-bearing item");
3510        return;
3511    }
3512    item.skipped = true;
3513    item.ready = true;
3514}
3515
3516fn mark_realtime_assistant_response_ready(
3517    state: &mut SessionRealtimeTranscriptState,
3518    response_id: &str,
3519) {
3520    for item in state.items.values_mut() {
3521        if item.role == RealtimeTranscriptRole::Assistant
3522            && item.response_id.as_deref() == Some(response_id)
3523            && !item.materialized
3524            && !item.text().is_empty()
3525        {
3526            item.ready = true;
3527        }
3528    }
3529}
3530
3531fn discard_realtime_assistant_response(
3532    state: &mut SessionRealtimeTranscriptState,
3533    response_id: &str,
3534) {
3535    state
3536        .discarded_assistant_response_ids
3537        .insert(response_id.to_string());
3538    for item in state.items.values_mut() {
3539        if item.role == RealtimeTranscriptRole::Assistant
3540            && item.response_id.as_deref() == Some(response_id)
3541            && !item.materialized
3542        {
3543            item.content_segments.clear();
3544            item.skipped = true;
3545            item.ready = true;
3546        }
3547    }
3548    state.assistant_completions.remove(response_id);
3549}
3550
3551/// R5-5: lane-scoped discard for barge-in (`AssistantTurnInterrupted`).
3552///
3553/// Drops Spoken-lane staged items (the heard audio + transcript the user
3554/// spoke over) and preserves Display-lane items as committable content.
3555/// Items with empty content (no deltas observed yet, lane still defaulting
3556/// to `Display`) are also discarded — they carry no preserveable Display
3557/// text. The caller is responsible for inserting a synthetic
3558/// `assistant_completions` entry so the materializer can flush retained
3559/// Display items immediately.
3560///
3561/// Unlike [`discard_realtime_assistant_response`], this helper does NOT
3562/// remove the response from `assistant_completions`: barge-in must seed a
3563/// completion entry so retained Display items can materialize before any
3564/// late `AssistantTurnCompleted` (which would short-circuit on the
3565/// discarded-set membership) arrives.
3566fn discard_realtime_assistant_response_by_lane(
3567    state: &mut SessionRealtimeTranscriptState,
3568    response_id: &str,
3569) {
3570    state
3571        .discarded_assistant_response_ids
3572        .insert(response_id.to_string());
3573    for item in state.items.values_mut() {
3574        if item.role != RealtimeTranscriptRole::Assistant
3575            || item.response_id.as_deref() != Some(response_id)
3576            || item.materialized
3577        {
3578            continue;
3579        }
3580        let has_content = item.content_segments.values().any(|s| !s.is_empty());
3581        if item.lane == TranscriptLane::Display && has_content {
3582            // Retain the Display content; the caller's synthetic
3583            // completion entry + `mark_realtime_assistant_response_ready`
3584            // will flag it ready for the next materializer pass.
3585            continue;
3586        }
3587        // Spoken (or empty Display, which carries no preserveable text):
3588        // drop content and mark as a contentless predecessor so chained
3589        // items downstream of it can still materialize.
3590        item.content_segments.clear();
3591        item.skipped = true;
3592        item.ready = true;
3593    }
3594}
3595
3596fn realtime_transcript_order(state: &SessionRealtimeTranscriptState) -> Vec<String> {
3597    let mut roots = Vec::new();
3598    let mut children: BTreeMap<String, Vec<String>> = BTreeMap::new();
3599    for item_id in &state.first_seen_order {
3600        let Some(item) = state.items.get(item_id) else {
3601            continue;
3602        };
3603        if let Some(previous) = item.previous_item_id.as_ref()
3604            && state.items.contains_key(previous)
3605        {
3606            children
3607                .entry(previous.clone())
3608                .or_default()
3609                .push(item_id.clone());
3610        } else {
3611            roots.push(item_id.clone());
3612        }
3613    }
3614    roots.sort_by_key(|item_id| realtime_first_seen_index(state, item_id));
3615    for child_ids in children.values_mut() {
3616        child_ids.sort_by_key(|item_id| realtime_first_seen_index(state, item_id));
3617    }
3618
3619    let mut ordered = Vec::new();
3620    let mut visited = BTreeSet::new();
3621    for root in roots {
3622        visit_realtime_transcript_item(&root, &children, &mut visited, &mut ordered);
3623    }
3624    for item_id in &state.first_seen_order {
3625        visit_realtime_transcript_item(item_id, &children, &mut visited, &mut ordered);
3626    }
3627    ordered
3628}
3629
3630fn realtime_first_seen_index(state: &SessionRealtimeTranscriptState, item_id: &str) -> usize {
3631    state
3632        .first_seen_order
3633        .iter()
3634        .position(|existing| existing == item_id)
3635        .unwrap_or(usize::MAX)
3636}
3637
3638fn visit_realtime_transcript_item(
3639    item_id: &str,
3640    children: &BTreeMap<String, Vec<String>>,
3641    visited: &mut BTreeSet<String>,
3642    ordered: &mut Vec<String>,
3643) {
3644    if !visited.insert(item_id.to_string()) {
3645        return;
3646    }
3647    ordered.push(item_id.to_string());
3648    if let Some(child_ids) = children.get(item_id) {
3649        for child_id in child_ids {
3650            visit_realtime_transcript_item(child_id, children, visited, ordered);
3651        }
3652    }
3653}
3654
3655fn realtime_predecessor_materialized(
3656    state: &SessionRealtimeTranscriptState,
3657    previous_item_id: Option<&str>,
3658) -> bool {
3659    match previous_item_id {
3660        None => true,
3661        Some(previous_item_id) => state
3662            .items
3663            .get(previous_item_id)
3664            .is_some_and(|item| item.materialized),
3665    }
3666}
3667
3668/// Tooling intent captured at session creation time.
3669///
3670/// Fields use [`ToolCategoryOverride`] to distinguish "no opinion" from
3671/// explicit enable/disable (Dogma §10). On resume, `Inherit` falls through
3672/// to the factory's current runtime default, allowing new tool categories
3673/// to become available without re-creating the session.
3674#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
3675#[serde(rename_all = "snake_case")]
3676pub struct SessionTooling {
3677    #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
3678    pub builtins: ToolCategoryOverride,
3679    #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
3680    pub shell: ToolCategoryOverride,
3681    #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
3682    pub comms: ToolCategoryOverride,
3683    /// Mob (multi-agent orchestration) tools.
3684    #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
3685    pub mob: ToolCategoryOverride,
3686    /// Semantic memory.
3687    #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
3688    pub memory: ToolCategoryOverride,
3689    /// Scheduler tools.
3690    #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
3691    pub schedule: ToolCategoryOverride,
3692    /// WorkGraph durable work tools.
3693    #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
3694    pub workgraph: ToolCategoryOverride,
3695    /// Assistant image generation.
3696    #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
3697    pub image_generation: ToolCategoryOverride,
3698    /// Meerkat-owned fallback web search.
3699    #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
3700    pub web_search: ToolCategoryOverride,
3701    /// Active skills at session creation time (for deterministic resume).
3702    #[serde(default, skip_serializing_if = "Option::is_none")]
3703    pub active_skills: Option<Vec<crate::skills::SkillKey>>,
3704}
3705
3706impl From<&Session> for SessionMeta {
3707    fn from(session: &Session) -> Self {
3708        Self {
3709            id: session.id.clone(),
3710            created_at: session.created_at,
3711            updated_at: session.updated_at,
3712            message_count: session.messages.len(),
3713            total_tokens: session.total_tokens(),
3714            metadata: session.metadata.clone(),
3715        }
3716    }
3717}
3718
3719#[cfg(test)]
3720#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
3721mod tests {
3722    use super::*;
3723    use crate::types::{
3724        AssistantMessage, BlockAssistantMessage, ContentBlock, StopReason, SystemMessage, Usage,
3725        UserMessage,
3726    };
3727    use std::sync::Arc;
3728
3729    fn block_assistant_text(message: &BlockAssistantMessage) -> String {
3730        message
3731            .blocks
3732            .iter()
3733            .filter_map(|block| match block {
3734                AssistantBlock::Text { text, .. } => Some(text.as_str()),
3735                _ => None,
3736            })
3737            .collect()
3738    }
3739
3740    #[test]
3741    fn transcript_rewrite_preserves_full_assistant_block_trace() {
3742        let mut session = Session::new();
3743        session.push(Message::User(UserMessage::text(
3744            "run the trace".to_string(),
3745        )));
3746        session.push(Message::BlockAssistant(BlockAssistantMessage::new(
3747            vec![AssistantBlock::Text {
3748                text: "original assistant trace".to_string(),
3749                meta: None,
3750            }],
3751            StopReason::EndTurn,
3752        )));
3753
3754        let parent_revision = session.transcript_revision().expect("parent revision");
3755        let replacement = vec![
3756            Message::BlockAssistant(BlockAssistantMessage::new(
3757                vec![
3758                    AssistantBlock::Text {
3759                        text: "compacted assistant trace".to_string(),
3760                        meta: None,
3761                    },
3762                    AssistantBlock::ToolUse {
3763                        id: "toolu_trace".to_string(),
3764                        name: "trace_probe".to_string(),
3765                        args: serde_json::value::RawValue::from_string(
3766                            r#"{"path":"N-3"}"#.to_string(),
3767                        )
3768                        .expect("valid tool args"),
3769                        meta: None,
3770                    },
3771                ],
3772                StopReason::ToolUse,
3773            )),
3774            Message::tool_results(vec![ToolResult::new(
3775                "toolu_trace".to_string(),
3776                "trace complete".to_string(),
3777                false,
3778            )]),
3779        ];
3780
3781        let commit = session
3782            .commit_transcript_rewrite(
3783                TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
3784                replacement,
3785                TranscriptRewriteReason::new("compaction"),
3786                Some("unit-test".to_string()),
3787                Some(parent_revision.clone()),
3788            )
3789            .expect("rewrite should commit");
3790
3791        assert_eq!(commit.parent_revision, parent_revision);
3792        let current = session
3793            .transcript_revision_messages(&commit.revision)
3794            .expect("history state should decode")
3795            .expect("current revision should be retained");
3796        let Message::BlockAssistant(assistant) = &current[1] else {
3797            panic!("replacement should remain a block assistant message");
3798        };
3799        assert!(assistant.blocks.iter().any(|block| matches!(
3800            block,
3801            AssistantBlock::ToolUse { name, args, .. }
3802                if name == "trace_probe" && args.get().contains("\"N-3\"")
3803        )));
3804
3805        let parent = session
3806            .transcript_revision_messages(&parent_revision)
3807            .expect("history state should decode")
3808            .expect("parent revision should remain retained");
3809        assert!(matches!(
3810            &parent[1],
3811            Message::BlockAssistant(assistant)
3812                if block_assistant_text(assistant).contains("original assistant trace")
3813        ));
3814    }
3815
3816    #[test]
3817    fn transcript_rewrite_rejects_trailing_block_assistant_tool_call() {
3818        let mut session = Session::new();
3819        session.push(Message::User(UserMessage::text("question".to_string())));
3820        session.push(Message::Assistant(AssistantMessage {
3821            content: "plain answer".to_string(),
3822            tool_calls: Vec::new(),
3823            stop_reason: StopReason::EndTurn,
3824            usage: Usage::default(),
3825            created_at: crate::types::message_timestamp_now(),
3826        }));
3827        let parent_revision = session.transcript_revision().expect("parent revision");
3828
3829        let err = session
3830            .commit_transcript_rewrite(
3831                TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
3832                vec![Message::BlockAssistant(BlockAssistantMessage::new(
3833                    vec![AssistantBlock::ToolUse {
3834                        id: "toolu_1".to_string(),
3835                        name: "lookup".to_string(),
3836                        args: serde_json::value::RawValue::from_string("{}".to_string())
3837                            .expect("valid args"),
3838                        meta: None,
3839                    }],
3840                    StopReason::ToolUse,
3841                ))],
3842                TranscriptRewriteReason::new("compaction"),
3843                Some("unit-test".to_string()),
3844                Some(parent_revision),
3845            )
3846            .expect_err("rewrite should reject trailing unresolved block-assistant tool call");
3847        assert!(matches!(
3848            err,
3849            TranscriptEditError::InvalidTranscriptShape(_)
3850        ));
3851    }
3852
3853    #[test]
3854    fn transcript_rewrite_rejects_no_op_self_edge() {
3855        let mut session = Session::new();
3856        session.push(Message::User(UserMessage::text(
3857            "keep this exact transcript".to_string(),
3858        )));
3859        session.push(Message::Assistant(AssistantMessage {
3860            content: "unchanged".to_string(),
3861            tool_calls: Vec::new(),
3862            stop_reason: StopReason::EndTurn,
3863            usage: Usage::default(),
3864            created_at: crate::types::message_timestamp_now(),
3865        }));
3866
3867        let parent_revision = session.transcript_revision().expect("parent revision");
3868        let err = session
3869            .commit_transcript_rewrite(
3870                TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
3871                vec![session.messages()[1].clone()],
3872                TranscriptRewriteReason::new("retry"),
3873                Some("unit-test".to_string()),
3874                Some(parent_revision.clone()),
3875            )
3876            .expect_err("same-content rewrite should not emit a self-edge commit");
3877
3878        assert!(matches!(
3879            err,
3880            TranscriptEditError::NoOpRewrite { revision } if revision == parent_revision
3881        ));
3882        assert!(
3883            session
3884                .transcript_history_state()
3885                .expect("history state should decode")
3886                .is_none()
3887        );
3888    }
3889
3890    #[test]
3891    fn transcript_rewrite_run_boundary_guard_accepts_rewrite_then_append() {
3892        let mut original = Session::new();
3893        original.push(Message::User(UserMessage::text("question".to_string())));
3894        original.push(Message::Assistant(AssistantMessage {
3895            content: "verbose answer".to_string(),
3896            tool_calls: Vec::new(),
3897            stop_reason: StopReason::EndTurn,
3898            usage: Usage::default(),
3899            created_at: crate::types::message_timestamp_now(),
3900        }));
3901
3902        let parent_revision = original.transcript_revision().expect("parent revision");
3903        let mut incoming = original.clone();
3904        incoming
3905            .commit_transcript_rewrite(
3906                TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
3907                vec![Message::Assistant(AssistantMessage {
3908                    content: "compact answer".to_string(),
3909                    tool_calls: Vec::new(),
3910                    stop_reason: StopReason::EndTurn,
3911                    usage: Usage::default(),
3912                    created_at: crate::types::message_timestamp_now(),
3913                })],
3914                TranscriptRewriteReason::new("compaction"),
3915                Some("unit-test".to_string()),
3916                Some(parent_revision),
3917            )
3918            .expect("rewrite should commit");
3919        incoming.push(Message::User(UserMessage::text("follow-up".to_string())));
3920        incoming.push(Message::Assistant(AssistantMessage {
3921            content: "follow-up answer".to_string(),
3922            tool_calls: Vec::new(),
3923            stop_reason: StopReason::EndTurn,
3924            usage: Usage::default(),
3925            created_at: crate::types::message_timestamp_now(),
3926        }));
3927
3928        crate::session_store::run_boundary_snapshot_save_guard(&incoming, Some(&original))
3929            .expect("rewrite plus appended turn should be a valid run-boundary commit");
3930    }
3931
3932    #[test]
3933    fn transcript_rewrite_rejects_orphaned_tool_results() {
3934        let mut session = Session::new();
3935        session.push(Message::User(UserMessage::text("use a tool".to_string())));
3936        session.push(Message::BlockAssistant(BlockAssistantMessage::new(
3937            vec![AssistantBlock::ToolUse {
3938                id: "toolu_1".to_string(),
3939                name: "lookup".to_string(),
3940                args: serde_json::value::RawValue::from_string("{}".to_string())
3941                    .expect("valid args"),
3942                meta: None,
3943            }],
3944            StopReason::ToolUse,
3945        )));
3946        session.push(Message::tool_results(vec![ToolResult::new(
3947            "toolu_1".to_string(),
3948            "done".to_string(),
3949            false,
3950        )]));
3951        let parent_revision = session.transcript_revision().expect("parent revision");
3952
3953        let err = session
3954            .commit_transcript_rewrite(
3955                TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
3956                vec![Message::Assistant(AssistantMessage {
3957                    content: "no tool after all".to_string(),
3958                    tool_calls: Vec::new(),
3959                    stop_reason: StopReason::EndTurn,
3960                    usage: Usage::default(),
3961                    created_at: crate::types::message_timestamp_now(),
3962                })],
3963                TranscriptRewriteReason::new("compaction"),
3964                Some("unit-test".to_string()),
3965                Some(parent_revision),
3966            )
3967            .expect_err("rewrite should reject stranded tool results");
3968        assert!(matches!(
3969            err,
3970            TranscriptEditError::InvalidTranscriptShape(_)
3971        ));
3972    }
3973
3974    #[test]
3975    fn transcript_rewrite_rejects_trailing_assistant_tool_call() {
3976        let mut session = Session::new();
3977        session.push(Message::User(UserMessage::text("question".to_string())));
3978        session.push(Message::Assistant(AssistantMessage {
3979            content: "plain answer".to_string(),
3980            tool_calls: Vec::new(),
3981            stop_reason: StopReason::EndTurn,
3982            usage: Usage::default(),
3983            created_at: crate::types::message_timestamp_now(),
3984        }));
3985        let parent_revision = session.transcript_revision().expect("parent revision");
3986
3987        let err = session
3988            .commit_transcript_rewrite(
3989                TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
3990                vec![Message::Assistant(AssistantMessage {
3991                    content: String::new(),
3992                    tool_calls: vec![crate::types::ToolCall::new(
3993                        "toolu_1".to_string(),
3994                        "lookup".to_string(),
3995                        serde_json::json!({}),
3996                    )],
3997                    stop_reason: StopReason::ToolUse,
3998                    usage: Usage::default(),
3999                    created_at: crate::types::message_timestamp_now(),
4000                })],
4001                TranscriptRewriteReason::new("compaction"),
4002                Some("unit-test".to_string()),
4003                Some(parent_revision),
4004            )
4005            .expect_err("rewrite should reject trailing unresolved tool call");
4006        assert!(matches!(
4007            err,
4008            TranscriptEditError::InvalidTranscriptShape(_)
4009        ));
4010    }
4011
4012    #[test]
4013    fn transcript_rewrite_rejects_duplicate_tool_results() {
4014        let mut session = Session::new();
4015        session.push(Message::User(UserMessage::text("use a tool".to_string())));
4016        session.push(Message::Assistant(AssistantMessage {
4017            content: "plain answer".to_string(),
4018            tool_calls: Vec::new(),
4019            stop_reason: StopReason::EndTurn,
4020            usage: Usage::default(),
4021            created_at: crate::types::message_timestamp_now(),
4022        }));
4023        let parent_revision = session.transcript_revision().expect("parent revision");
4024
4025        let err = session
4026            .commit_transcript_rewrite(
4027                TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4028                vec![
4029                    Message::BlockAssistant(BlockAssistantMessage::new(
4030                        vec![AssistantBlock::ToolUse {
4031                            id: "toolu_1".to_string(),
4032                            name: "lookup".to_string(),
4033                            args: serde_json::value::RawValue::from_string("{}".to_string())
4034                                .expect("valid args"),
4035                            meta: None,
4036                        }],
4037                        StopReason::ToolUse,
4038                    )),
4039                    Message::tool_results(vec![
4040                        ToolResult::new("toolu_1".to_string(), "one".to_string(), false),
4041                        ToolResult::new("toolu_1".to_string(), "two".to_string(), false),
4042                    ]),
4043                ],
4044                TranscriptRewriteReason::new("compaction"),
4045                Some("unit-test".to_string()),
4046                Some(parent_revision),
4047            )
4048            .expect_err("rewrite should reject duplicate tool results");
4049        assert!(matches!(
4050            err,
4051            TranscriptEditError::InvalidTranscriptShape(_)
4052        ));
4053    }
4054
4055    #[test]
4056    fn transcript_rewrite_record_rejects_prefix_or_suffix_tampering() {
4057        let mut session = Session::new();
4058        session.push(Message::System(SystemMessage::new("keep prefix")));
4059        session.push(Message::Assistant(AssistantMessage {
4060            content: "verbose answer".to_string(),
4061            tool_calls: Vec::new(),
4062            stop_reason: StopReason::EndTurn,
4063            usage: Usage::default(),
4064            created_at: crate::types::message_timestamp_now(),
4065        }));
4066        session.push(Message::User(UserMessage::text("keep suffix".to_string())));
4067
4068        let parent_revision = session.transcript_revision().expect("parent revision");
4069        let commit = session
4070            .commit_transcript_rewrite(
4071                TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4072                vec![Message::Assistant(AssistantMessage {
4073                    content: "compact answer".to_string(),
4074                    tool_calls: Vec::new(),
4075                    stop_reason: StopReason::EndTurn,
4076                    usage: Usage::default(),
4077                    created_at: crate::types::message_timestamp_now(),
4078                })],
4079                TranscriptRewriteReason::new("compaction"),
4080                Some("unit-test".to_string()),
4081                Some(parent_revision),
4082            )
4083            .expect("rewrite should commit");
4084        let state = session
4085            .transcript_history_state()
4086            .expect("history state should decode")
4087            .expect("history state should exist");
4088        let parent_body = state
4089            .revisions
4090            .iter()
4091            .find(|body| body.revision == commit.parent_revision)
4092            .expect("parent body retained")
4093            .clone();
4094        let revision_body = state
4095            .revisions
4096            .iter()
4097            .find(|body| body.revision == commit.revision)
4098            .expect("revision body retained")
4099            .clone();
4100
4101        let mut forged_body = revision_body;
4102        forged_body.messages[0] = Message::System(SystemMessage::new("tampered prefix"));
4103        forged_body.revision =
4104            transcript_messages_digest(&forged_body.messages).expect("forged digest");
4105        let mut forged_commit = commit;
4106        forged_commit.revision = forged_body.revision.clone();
4107        let err = TranscriptRewriteRecord::new(forged_commit, parent_body, forged_body)
4108            .expect_err("record validation must reject changes outside selected span");
4109        assert!(
4110            err.to_string().contains("before the selected span"),
4111            "unexpected error: {err}"
4112        );
4113    }
4114
4115    #[test]
4116    fn transcript_rewrite_replay_allows_normal_turn_revisions_between_rewrites() {
4117        let mut session = Session::new();
4118        session.push(Message::User(UserMessage::text("first".to_string())));
4119        session.push(Message::Assistant(AssistantMessage {
4120            content: "verbose first answer".to_string(),
4121            tool_calls: Vec::new(),
4122            stop_reason: StopReason::EndTurn,
4123            usage: crate::types::Usage::default(),
4124            created_at: crate::types::message_timestamp_now(),
4125        }));
4126
4127        let first_parent = session.transcript_revision().expect("first parent");
4128        let first_commit = session
4129            .commit_transcript_rewrite(
4130                TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4131                vec![Message::Assistant(AssistantMessage {
4132                    content: "compact first answer".to_string(),
4133                    tool_calls: Vec::new(),
4134                    stop_reason: StopReason::EndTurn,
4135                    usage: crate::types::Usage::default(),
4136                    created_at: crate::types::message_timestamp_now(),
4137                })],
4138                TranscriptRewriteReason::new("compaction"),
4139                Some("unit-test".to_string()),
4140                Some(first_parent),
4141            )
4142            .expect("first rewrite");
4143
4144        session.push(Message::User(UserMessage::text("normal turn".to_string())));
4145        session.push(Message::Assistant(AssistantMessage {
4146            content: "verbose second answer".to_string(),
4147            tool_calls: Vec::new(),
4148            stop_reason: StopReason::EndTurn,
4149            usage: crate::types::Usage::default(),
4150            created_at: crate::types::message_timestamp_now(),
4151        }));
4152        let bridge_parent = session
4153            .transcript_revision()
4154            .expect("normal turn should advance transcript head");
4155        assert_ne!(bridge_parent, first_commit.revision);
4156        validate_transcript_history_state(
4157            &session
4158                .transcript_history_state()
4159                .expect("history state should decode")
4160                .expect("history state should exist"),
4161        )
4162        .expect("normal turn head may legitimately differ from last rewrite commit");
4163
4164        let second_commit = session
4165            .commit_transcript_rewrite(
4166                TranscriptRewriteSelection::MessageRange { start: 3, end: 4 },
4167                vec![Message::Assistant(AssistantMessage {
4168                    content: "compact second answer".to_string(),
4169                    tool_calls: Vec::new(),
4170                    stop_reason: StopReason::EndTurn,
4171                    usage: crate::types::Usage::default(),
4172                    created_at: crate::types::message_timestamp_now(),
4173                })],
4174                TranscriptRewriteReason::new("compaction"),
4175                Some("unit-test".to_string()),
4176                Some(bridge_parent.clone()),
4177            )
4178            .expect("second rewrite");
4179
4180        let state = session
4181            .transcript_history_state()
4182            .expect("history state should decode")
4183            .expect("history state should exist");
4184        let records = state.commits.iter().map(|commit| {
4185            let parent_body = state
4186                .revisions
4187                .iter()
4188                .find(|body| body.revision == commit.parent_revision)
4189                .expect("parent body retained")
4190                .clone();
4191            let revision_body = state
4192                .revisions
4193                .iter()
4194                .find(|body| body.revision == commit.revision)
4195                .expect("revision body retained")
4196                .clone();
4197            TranscriptRewriteRecord::new(commit.clone(), parent_body, revision_body)
4198                .expect("record should validate")
4199        });
4200
4201        let replayed = TranscriptHistoryState::from_rewrite_records(records)
4202            .expect("rewrite replay should accept normal-turn bridge revisions")
4203            .expect("rewrite records should exist");
4204        assert_eq!(replayed.head, second_commit.revision);
4205        assert!(
4206            replayed
4207                .revisions
4208                .iter()
4209                .any(|body| body.revision == bridge_parent)
4210        );
4211    }
4212
4213    #[test]
4214    fn transcript_rewrite_replay_rejects_branched_rewrite_records() {
4215        let mut base = Session::new();
4216        base.push(Message::User(UserMessage::text("question".to_string())));
4217        base.push(Message::Assistant(AssistantMessage {
4218            content: "verbose answer".to_string(),
4219            tool_calls: Vec::new(),
4220            stop_reason: StopReason::EndTurn,
4221            usage: crate::types::Usage::default(),
4222            created_at: crate::types::message_timestamp_now(),
4223        }));
4224        let parent = base.transcript_revision().expect("parent revision");
4225
4226        let mut first = base.clone();
4227        let first_commit = first
4228            .commit_transcript_rewrite(
4229                TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4230                vec![Message::Assistant(AssistantMessage {
4231                    content: "first compact answer".to_string(),
4232                    tool_calls: Vec::new(),
4233                    stop_reason: StopReason::EndTurn,
4234                    usage: crate::types::Usage::default(),
4235                    created_at: crate::types::message_timestamp_now(),
4236                })],
4237                TranscriptRewriteReason::new("compaction"),
4238                Some("unit-test".to_string()),
4239                Some(parent.clone()),
4240            )
4241            .expect("first rewrite");
4242        let first_state = first
4243            .transcript_history_state()
4244            .expect("first state decodes")
4245            .expect("first state exists");
4246
4247        let mut second = base;
4248        let second_commit = second
4249            .commit_transcript_rewrite(
4250                TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4251                vec![Message::Assistant(AssistantMessage {
4252                    content: "second compact answer".to_string(),
4253                    tool_calls: Vec::new(),
4254                    stop_reason: StopReason::EndTurn,
4255                    usage: crate::types::Usage::default(),
4256                    created_at: crate::types::message_timestamp_now(),
4257                })],
4258                TranscriptRewriteReason::new("compaction"),
4259                Some("unit-test".to_string()),
4260                Some(parent),
4261            )
4262            .expect("second rewrite");
4263        let second_state = second
4264            .transcript_history_state()
4265            .expect("second state decodes")
4266            .expect("second state exists");
4267
4268        let record = |state: &TranscriptHistoryState, commit: &TranscriptRewriteCommit| {
4269            let parent_body = state
4270                .revisions
4271                .iter()
4272                .find(|body| body.revision == commit.parent_revision)
4273                .expect("parent body retained")
4274                .clone();
4275            let revision_body = state
4276                .revisions
4277                .iter()
4278                .find(|body| body.revision == commit.revision)
4279                .expect("revision body retained")
4280                .clone();
4281            TranscriptRewriteRecord::new(commit.clone(), parent_body, revision_body)
4282                .expect("record should validate")
4283        };
4284
4285        let err = TranscriptHistoryState::from_rewrite_records(vec![
4286            record(&first_state, &first_commit),
4287            record(&second_state, &second_commit),
4288        ])
4289        .expect_err("branched rewrite records must not replay as a linear source history");
4290        assert!(
4291            err.to_string().contains("does not extend transcript head"),
4292            "unexpected error: {err}"
4293        );
4294    }
4295
4296    #[test]
4297    fn internal_message_rewrites_refresh_transcript_history_head() {
4298        let mut session = Session::new();
4299        session.push(Message::User(UserMessage::text("question".to_string())));
4300        session.push(Message::Assistant(AssistantMessage {
4301            content: "verbose answer".to_string(),
4302            tool_calls: Vec::new(),
4303            stop_reason: StopReason::EndTurn,
4304            usage: crate::types::Usage::default(),
4305            created_at: crate::types::message_timestamp_now(),
4306        }));
4307
4308        let parent = session.transcript_revision().expect("parent revision");
4309        session
4310            .commit_transcript_rewrite(
4311                TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4312                vec![Message::Assistant(AssistantMessage {
4313                    content: "compact answer".to_string(),
4314                    tool_calls: Vec::new(),
4315                    stop_reason: StopReason::EndTurn,
4316                    usage: crate::types::Usage::default(),
4317                    created_at: crate::types::message_timestamp_now(),
4318                })],
4319                TranscriptRewriteReason::new("compaction"),
4320                Some("unit-test".to_string()),
4321                Some(parent),
4322            )
4323            .expect("rewrite should commit");
4324
4325        session.push(Message::User(UserMessage::text(
4326            "notice-bearing turn".to_string(),
4327        )));
4328        session
4329            .retain_messages_internal(
4330                |message| {
4331                    !matches!(
4332                        message,
4333                        Message::User(user)
4334                            if user.content.iter().any(|block| matches!(
4335                                block,
4336                                ContentBlock::Text { text } if text.contains("notice-bearing")
4337                            ))
4338                    )
4339                },
4340                TranscriptRewriteReason::new("synthetic_notice_cleanup"),
4341            )
4342            .expect("retain should commit internal rewrite");
4343        let retained_digest =
4344            transcript_messages_digest(session.messages()).expect("retained digest");
4345        assert_eq!(
4346            session.transcript_revision().expect("retained head"),
4347            retained_digest
4348        );
4349
4350        session
4351            .replace_messages_internal(
4352                vec![
4353                    Message::User(UserMessage::text("compacted question".to_string())),
4354                    Message::Assistant(AssistantMessage {
4355                        content: "compacted answer".to_string(),
4356                        tool_calls: Vec::new(),
4357                        stop_reason: StopReason::EndTurn,
4358                        usage: crate::types::Usage::default(),
4359                        created_at: crate::types::message_timestamp_now(),
4360                    }),
4361                ],
4362                TranscriptRewriteReason::new("compaction"),
4363            )
4364            .expect("replace should commit internal rewrite");
4365        let replaced_digest =
4366            transcript_messages_digest(session.messages()).expect("replaced digest");
4367        assert_eq!(
4368            session.transcript_revision().expect("replaced head"),
4369            replaced_digest
4370        );
4371        let state = session
4372            .transcript_history_state()
4373            .expect("history state should decode")
4374            .expect("history state should exist");
4375        assert!(
4376            state
4377                .revisions
4378                .iter()
4379                .any(|body| body.revision == replaced_digest)
4380        );
4381        validate_transcript_history_state(&state).expect("history state remains valid");
4382    }
4383
4384    #[test]
4385    fn set_system_prompt_refreshes_transcript_history_head_after_rewrite() {
4386        let mut session = Session::new();
4387        session.push(Message::User(UserMessage::text("question".to_string())));
4388        session.push(Message::Assistant(AssistantMessage {
4389            content: "verbose answer".to_string(),
4390            tool_calls: Vec::new(),
4391            stop_reason: StopReason::EndTurn,
4392            usage: crate::types::Usage::default(),
4393            created_at: crate::types::message_timestamp_now(),
4394        }));
4395
4396        let parent = session.transcript_revision().expect("parent revision");
4397        let rewrite = session
4398            .commit_transcript_rewrite(
4399                TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4400                vec![Message::Assistant(AssistantMessage {
4401                    content: "compact answer".to_string(),
4402                    tool_calls: Vec::new(),
4403                    stop_reason: StopReason::EndTurn,
4404                    usage: crate::types::Usage::default(),
4405                    created_at: crate::types::message_timestamp_now(),
4406                })],
4407                TranscriptRewriteReason::new("compaction"),
4408                Some("unit-test".to_string()),
4409                Some(parent),
4410            )
4411            .expect("rewrite should commit");
4412
4413        session.set_system_prompt("durable system prompt".to_string());
4414
4415        let head = session
4416            .transcript_revision()
4417            .expect("system prompt should refresh transcript head");
4418        assert_ne!(head, rewrite.revision);
4419        assert_eq!(
4420            head,
4421            transcript_messages_digest(session.messages()).expect("current digest")
4422        );
4423        let head_messages = session
4424            .transcript_revision_messages(&head)
4425            .expect("history state should decode")
4426            .expect("refreshed head body should be retained");
4427        assert_eq!(
4428            serde_json::to_value(&head_messages).expect("head serializes"),
4429            serde_json::to_value(session.messages()).expect("session serializes")
4430        );
4431        validate_transcript_history_state(
4432            &session
4433                .transcript_history_state()
4434                .expect("history state should decode")
4435                .expect("history state should exist"),
4436        )
4437        .expect("history state remains valid after system prompt update");
4438    }
4439
4440    #[test]
4441    fn apply_transcript_history_state_uses_latest_commit_time_for_restored_head() {
4442        let mut session = Session::new();
4443        session.push(Message::User(UserMessage::text("question".to_string())));
4444        session.push(Message::Assistant(AssistantMessage {
4445            content: "verbose answer".to_string(),
4446            tool_calls: Vec::new(),
4447            stop_reason: StopReason::EndTurn,
4448            usage: crate::types::Usage::default(),
4449            created_at: crate::types::message_timestamp_now(),
4450        }));
4451        let original_messages = session.messages().to_vec();
4452        let parent = session.transcript_revision().expect("parent revision");
4453        let compact = session
4454            .commit_transcript_rewrite(
4455                TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4456                vec![Message::Assistant(AssistantMessage {
4457                    content: "compact answer".to_string(),
4458                    tool_calls: Vec::new(),
4459                    stop_reason: StopReason::EndTurn,
4460                    usage: crate::types::Usage::default(),
4461                    created_at: crate::types::message_timestamp_now(),
4462                })],
4463                TranscriptRewriteReason::new("compaction"),
4464                Some("unit-test".to_string()),
4465                Some(parent.clone()),
4466            )
4467            .expect("rewrite should commit");
4468
4469        std::thread::sleep(std::time::Duration::from_millis(2));
4470        let restore = session
4471            .commit_transcript_rewrite(
4472                TranscriptRewriteSelection::MessageRange {
4473                    start: 0,
4474                    end: session.messages().len(),
4475                },
4476                original_messages.clone(),
4477                TranscriptRewriteReason::new("restore"),
4478                Some("unit-test".to_string()),
4479                Some(compact.revision),
4480            )
4481            .expect("restore should commit");
4482        assert_eq!(restore.revision, parent);
4483
4484        let state = session
4485            .transcript_history_state()
4486            .expect("history state should decode")
4487            .expect("history state should exist");
4488        let restored_body_created_at = state
4489            .revisions
4490            .iter()
4491            .find(|body| body.revision == restore.revision)
4492            .expect("restored body should be retained")
4493            .created_at;
4494        assert!(
4495            restored_body_created_at < restore.committed_at,
4496            "test requires restore commit to be newer than retained body"
4497        );
4498
4499        let mut replayed = Session::new();
4500        replayed
4501            .apply_transcript_history_state(state)
4502            .expect("replay should materialize restored head");
4503        assert_eq!(
4504            serde_json::to_value(replayed.messages()).expect("replayed serializes"),
4505            serde_json::to_value(&original_messages).expect("original serializes")
4506        );
4507        assert_eq!(replayed.updated_at(), restore.committed_at);
4508    }
4509
4510    #[test]
4511    fn test_session_new() {
4512        let session = Session::new();
4513        assert_eq!(session.version(), SESSION_VERSION);
4514        assert!(session.messages().is_empty());
4515        assert!(session.created_at() <= session.updated_at());
4516    }
4517
4518    #[test]
4519    fn llm_identity_model_override_switches_to_catalog_provider() {
4520        let registry = crate::Config::default().model_registry().unwrap();
4521        let current = SessionLlmIdentity {
4522            model: "claude-sonnet-4-5".to_string(),
4523            provider: Provider::Anthropic,
4524            self_hosted_server_id: None,
4525            provider_params: None,
4526            auth_binding: Some(crate::AuthBindingRef {
4527                realm: crate::RealmId::parse("tenant_a").unwrap(),
4528                binding: crate::BindingId::parse("anthropic_default").unwrap(),
4529                profile: None,
4530            }),
4531        };
4532
4533        let resolved = resolve_session_llm_identity_override(
4534            &current,
4535            &registry,
4536            SessionLlmIdentityOverride {
4537                model: Some("gpt-5.5"),
4538                provider: None,
4539                provider_params: None,
4540                clear_provider_params: false,
4541                auth_binding: None,
4542                clear_auth_binding: false,
4543            },
4544        )
4545        .unwrap();
4546
4547        assert_eq!(resolved.model, "gpt-5.5");
4548        assert_eq!(resolved.provider, Provider::OpenAI);
4549        assert!(
4550            resolved.auth_binding.is_none(),
4551            "provider switches must not inherit a binding from the previous provider"
4552        );
4553    }
4554
4555    #[test]
4556    fn llm_identity_model_override_keeps_uncatalogued_model_on_current_provider() {
4557        let registry = crate::Config::default().model_registry().unwrap();
4558        let current = SessionLlmIdentity {
4559            model: "custom-model".to_string(),
4560            provider: Provider::Anthropic,
4561            self_hosted_server_id: None,
4562            provider_params: None,
4563            auth_binding: None,
4564        };
4565
4566        let resolved = resolve_session_llm_identity_override(
4567            &current,
4568            &registry,
4569            SessionLlmIdentityOverride {
4570                model: Some("uncatalogued-custom-model"),
4571                provider: None,
4572                provider_params: None,
4573                clear_provider_params: false,
4574                auth_binding: None,
4575                clear_auth_binding: false,
4576            },
4577        )
4578        .unwrap();
4579
4580        assert_eq!(resolved.model, "uncatalogued-custom-model");
4581        assert_eq!(resolved.provider, Provider::Anthropic);
4582    }
4583
4584    #[test]
4585    fn realtime_transcript_append_is_idempotent_by_provider_item_and_delta_id() {
4586        let mut session = Session::new();
4587
4588        let user = RealtimeTranscriptEvent::UserTranscriptFinal {
4589            item_id: "item_user".to_string(),
4590            previous_item_id: None,
4591            content_index: 0,
4592            text: "hello".to_string(),
4593        };
4594        assert!(
4595            !session
4596                .append_realtime_transcript_event(user.clone())
4597                .is_inert()
4598        );
4599        assert!(session.append_realtime_transcript_event(user).is_inert());
4600
4601        let delta = RealtimeTranscriptEvent::AssistantTextDelta {
4602            response_id: "resp_assistant".to_string(),
4603            delta_id: "evt_delta_1".to_string(),
4604            item_id: "item_assistant".to_string(),
4605            previous_item_id: Some("item_user".to_string()),
4606            content_index: 0,
4607            delta: "hi".to_string(),
4608        };
4609        assert!(
4610            session
4611                .append_realtime_transcript_event(delta.clone())
4612                .is_inert()
4613        );
4614        assert!(session.append_realtime_transcript_event(delta).is_inert());
4615
4616        let terminal = RealtimeTranscriptEvent::AssistantTurnCompleted {
4617            response_id: "resp_assistant".to_string(),
4618            stop_reason: StopReason::EndTurn,
4619            usage: Usage::default(),
4620        };
4621        assert!(
4622            !session
4623                .append_realtime_transcript_event(terminal.clone())
4624                .is_inert()
4625        );
4626        assert!(
4627            session
4628                .append_realtime_transcript_event(terminal)
4629                .is_inert()
4630        );
4631
4632        assert_eq!(session.messages().len(), 2);
4633        assert!(matches!(
4634            &session.messages()[0],
4635            Message::User(user) if user.text_content() == "hello"
4636        ));
4637        assert!(matches!(
4638            &session.messages()[1],
4639            Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "hi"
4640        ));
4641    }
4642
4643    /// R5-7: `AssistantTranscriptFinalText` injects authoritative final text
4644    /// into the staged item. Verifies the override semantics: a partial
4645    /// delta is replaced, not concatenated, and the item promotes to the
4646    /// Spoken lane so flush emits `AssistantBlock::Transcript`.
4647    #[test]
4648    fn realtime_transcript_final_text_overrides_partial_delta_and_promotes_to_spoken_lane() {
4649        let mut session = Session::new();
4650
4651        // Partial delta accumulates "incom" — simulating delta loss before
4652        // the final arrives.
4653        assert!(
4654            session
4655                .append_realtime_transcript_event(
4656                    RealtimeTranscriptEvent::AssistantTranscriptDelta {
4657                        response_id: "resp_a".to_string(),
4658                        delta_id: "evt_1".to_string(),
4659                        item_id: "item_a".to_string(),
4660                        previous_item_id: None,
4661                        content_index: 0,
4662                        delta: "incom".to_string(),
4663                    }
4664                )
4665                .is_inert()
4666        );
4667
4668        // Authoritative final text overrides the staged content.
4669        assert!(
4670            session
4671                .append_realtime_transcript_event(
4672                    RealtimeTranscriptEvent::AssistantTranscriptFinalText {
4673                        response_id: "resp_a".to_string(),
4674                        item_id: "item_a".to_string(),
4675                        content_index: 0,
4676                        text: "complete answer".to_string(),
4677                    }
4678                )
4679                .is_inert()
4680        );
4681
4682        // Turn completion drives the flush.
4683        let outcome = session.append_realtime_transcript_event(
4684            RealtimeTranscriptEvent::AssistantTurnCompleted {
4685                response_id: "resp_a".to_string(),
4686                stop_reason: StopReason::EndTurn,
4687                usage: Usage::default(),
4688            },
4689        );
4690        assert!(!outcome.is_inert());
4691
4692        // Verify the materialized block has the final's authoritative text
4693        // (not the partial "incom") and the Spoken lane.
4694        assert_eq!(session.messages().len(), 1);
4695        match &session.messages()[0] {
4696            Message::BlockAssistant(assistant) => {
4697                let mut found_transcript = false;
4698                for block in &assistant.blocks {
4699                    if let AssistantBlock::Transcript { text, .. } = block {
4700                        assert_eq!(text, "complete answer");
4701                        found_transcript = true;
4702                    }
4703                }
4704                assert!(
4705                    found_transcript,
4706                    "AssistantTranscriptFinalText must promote to the Spoken lane and \
4707                     materialize as AssistantBlock::Transcript"
4708                );
4709            }
4710            other => unreachable!("expected BlockAssistant, got {other:?}"),
4711        }
4712    }
4713
4714    /// R5-7: `AssistantTranscriptFinalText` works for final-only providers
4715    /// where no prior delta has staged an item.
4716    #[test]
4717    fn realtime_transcript_final_text_creates_item_when_no_delta_staged() {
4718        let mut session = Session::new();
4719
4720        assert!(
4721            session
4722                .append_realtime_transcript_event(
4723                    RealtimeTranscriptEvent::AssistantTranscriptFinalText {
4724                        response_id: "resp_a".to_string(),
4725                        item_id: "item_a".to_string(),
4726                        content_index: 0,
4727                        text: "spoken-final-only".to_string(),
4728                    }
4729                )
4730                .is_inert()
4731        );
4732
4733        let outcome = session.append_realtime_transcript_event(
4734            RealtimeTranscriptEvent::AssistantTurnCompleted {
4735                response_id: "resp_a".to_string(),
4736                stop_reason: StopReason::EndTurn,
4737                usage: Usage::default(),
4738            },
4739        );
4740        assert!(!outcome.is_inert());
4741
4742        assert_eq!(session.messages().len(), 1);
4743        match &session.messages()[0] {
4744            Message::BlockAssistant(assistant) => {
4745                let has_transcript = assistant.blocks.iter().any(|b| {
4746                    matches!(b, AssistantBlock::Transcript { text, .. } if text == "spoken-final-only")
4747                });
4748                assert!(
4749                    has_transcript,
4750                    "final-only provider path must materialize as Transcript on the Spoken lane"
4751                );
4752            }
4753            other => unreachable!("expected BlockAssistant, got {other:?}"),
4754        }
4755    }
4756
4757    #[test]
4758    fn realtime_transcript_append_orders_causally_equivalent_out_of_order_items() {
4759        let mut session = Session::new();
4760
4761        assert!(
4762            session
4763                .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
4764                    response_id: "resp_assistant".to_string(),
4765                    delta_id: "evt_delta_1".to_string(),
4766                    item_id: "item_assistant".to_string(),
4767                    previous_item_id: Some("item_user".to_string()),
4768                    content_index: 0,
4769                    delta: "answer".to_string(),
4770                })
4771                .is_inert()
4772        );
4773        assert!(
4774            session
4775                .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
4776                    response_id: "resp_assistant".to_string(),
4777                    stop_reason: StopReason::EndTurn,
4778                    usage: Usage::default(),
4779                })
4780                .is_inert()
4781        );
4782
4783        let outcome = session.append_realtime_transcript_event(
4784            RealtimeTranscriptEvent::UserTranscriptFinal {
4785                item_id: "item_user".to_string(),
4786                previous_item_id: None,
4787                content_index: 0,
4788                text: "question".to_string(),
4789            },
4790        );
4791
4792        assert_eq!(outcome.materialized_messages.len(), 2);
4793        assert_eq!(session.messages().len(), 2);
4794        assert!(matches!(
4795            &session.messages()[0],
4796            Message::User(user) if user.text_content() == "question"
4797        ));
4798        assert!(matches!(
4799            &session.messages()[1],
4800            Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "answer"
4801        ));
4802    }
4803
4804    #[test]
4805    fn realtime_transcript_replay_of_seen_provider_items_is_inert() {
4806        let mut session = Session::new();
4807        let events = vec![
4808            RealtimeTranscriptEvent::UserTranscriptFinal {
4809                item_id: "item_user".to_string(),
4810                previous_item_id: None,
4811                content_index: 0,
4812                text: "hello".to_string(),
4813            },
4814            RealtimeTranscriptEvent::AssistantTextDelta {
4815                response_id: "resp_assistant".to_string(),
4816                delta_id: "evt_delta_1".to_string(),
4817                item_id: "item_assistant".to_string(),
4818                previous_item_id: Some("item_user".to_string()),
4819                content_index: 0,
4820                delta: "world".to_string(),
4821            },
4822            RealtimeTranscriptEvent::AssistantTurnCompleted {
4823                response_id: "resp_assistant".to_string(),
4824                stop_reason: StopReason::EndTurn,
4825                usage: Usage::default(),
4826            },
4827        ];
4828
4829        for event in events.iter().cloned() {
4830            let _ = session.append_realtime_transcript_event(event);
4831        }
4832        let first_messages = serde_json::to_value(session.messages()).unwrap();
4833
4834        for event in events {
4835            assert!(session.append_realtime_transcript_event(event).is_inert());
4836        }
4837
4838        assert_eq!(
4839            serde_json::to_value(session.messages()).unwrap(),
4840            first_messages
4841        );
4842    }
4843
4844    #[test]
4845    fn realtime_transcript_user_final_replay_cannot_erase_existing_segment() {
4846        let mut session = Session::new();
4847
4848        let user = RealtimeTranscriptEvent::UserTranscriptFinal {
4849            item_id: "item_user".to_string(),
4850            previous_item_id: None,
4851            content_index: 0,
4852            text: "remember amber lantern".to_string(),
4853        };
4854        assert!(
4855            !session
4856                .append_realtime_transcript_event(user.clone())
4857                .is_inert()
4858        );
4859        let first_messages = serde_json::to_value(session.messages()).unwrap();
4860
4861        assert!(
4862            session
4863                .append_realtime_transcript_event(RealtimeTranscriptEvent::UserTranscriptFinal {
4864                    item_id: "item_user".to_string(),
4865                    previous_item_id: None,
4866                    content_index: 0,
4867                    text: String::new(),
4868                })
4869                .is_inert()
4870        );
4871        assert!(session.append_realtime_transcript_event(user).is_inert());
4872        assert_eq!(
4873            serde_json::to_value(session.messages()).unwrap(),
4874            first_messages
4875        );
4876    }
4877
4878    #[test]
4879    fn realtime_transcript_empty_user_final_can_be_filled_by_later_nonempty_replay() {
4880        let mut session = Session::new();
4881
4882        assert!(
4883            session
4884                .append_realtime_transcript_event(RealtimeTranscriptEvent::UserTranscriptFinal {
4885                    item_id: "item_user".to_string(),
4886                    previous_item_id: None,
4887                    content_index: 0,
4888                    text: String::new(),
4889                })
4890                .is_inert()
4891        );
4892        assert!(session.messages().is_empty());
4893
4894        let outcome = session.append_realtime_transcript_event(
4895            RealtimeTranscriptEvent::UserTranscriptFinal {
4896                item_id: "item_user".to_string(),
4897                previous_item_id: None,
4898                content_index: 0,
4899                text: "remember amber lantern".to_string(),
4900            },
4901        );
4902        assert_eq!(outcome.materialized_messages.len(), 1);
4903        assert_eq!(session.messages().len(), 1);
4904        assert!(matches!(
4905            &session.messages()[0],
4906            Message::User(user) if user.text_content() == "remember amber lantern"
4907        ));
4908    }
4909
4910    #[test]
4911    fn realtime_transcript_skipped_provider_items_preserve_causal_order_without_content() {
4912        let mut session = Session::new();
4913
4914        let assistant_delta = RealtimeTranscriptEvent::AssistantTextDelta {
4915            response_id: "resp_assistant".to_string(),
4916            delta_id: "evt_delta_1".to_string(),
4917            item_id: "item_assistant".to_string(),
4918            previous_item_id: Some("item_tool".to_string()),
4919            content_index: 0,
4920            delta: "done".to_string(),
4921        };
4922        assert!(
4923            session
4924                .append_realtime_transcript_event(assistant_delta.clone())
4925                .is_inert()
4926        );
4927        let assistant_complete = RealtimeTranscriptEvent::AssistantTurnCompleted {
4928            response_id: "resp_assistant".to_string(),
4929            stop_reason: StopReason::EndTurn,
4930            usage: Usage::default(),
4931        };
4932        assert!(
4933            session
4934                .append_realtime_transcript_event(assistant_complete.clone())
4935                .is_inert()
4936        );
4937
4938        let skipped = RealtimeTranscriptEvent::ItemSkipped {
4939            item_id: "item_tool".to_string(),
4940            previous_item_id: Some("item_user".to_string()),
4941        };
4942        assert!(
4943            session
4944                .append_realtime_transcript_event(skipped.clone())
4945                .is_inert(),
4946            "a skipped provider item must not append transcript content"
4947        );
4948        assert!(session.messages().is_empty());
4949
4950        let outcome = session.append_realtime_transcript_event(
4951            RealtimeTranscriptEvent::UserTranscriptFinal {
4952                item_id: "item_user".to_string(),
4953                previous_item_id: None,
4954                content_index: 0,
4955                text: "please use the tool".to_string(),
4956            },
4957        );
4958        assert_eq!(outcome.materialized_messages.len(), 2);
4959        assert_eq!(session.messages().len(), 2);
4960        assert!(matches!(
4961            &session.messages()[0],
4962            Message::User(user) if user.text_content() == "please use the tool"
4963        ));
4964        assert!(matches!(
4965            &session.messages()[1],
4966            Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "done"
4967        ));
4968
4969        let first_messages = serde_json::to_value(session.messages()).unwrap();
4970        assert!(session.append_realtime_transcript_event(skipped).is_inert());
4971        assert!(
4972            session
4973                .append_realtime_transcript_event(assistant_delta)
4974                .is_inert()
4975        );
4976        assert!(
4977            session
4978                .append_realtime_transcript_event(assistant_complete)
4979                .is_inert()
4980        );
4981        assert_eq!(
4982            serde_json::to_value(session.messages()).unwrap(),
4983            first_messages
4984        );
4985    }
4986
4987    #[test]
4988    fn realtime_transcript_interrupted_assistant_item_unblocks_later_provider_items() {
4989        // R5-5 (Round-5): the staged assistant content is a Display-lane item
4990        // (`AssistantTextDelta`). Under the new lane-aware barge-in contract,
4991        // the Display lane survives interruption and materializes. The User
4992        // "Stop." item, gated on the chained Display item being materialized,
4993        // also unblocks. Round-4's "must stay non-canonical" assertion was
4994        // wrong — that contract was lane-blind.
4995        let mut session = Session::new();
4996
4997        let _ = session.append_realtime_transcript_event(
4998            RealtimeTranscriptEvent::UserTranscriptFinal {
4999                item_id: "item_repeat".to_string(),
5000                previous_item_id: None,
5001                content_index: 0,
5002                text: "repeat until stop".to_string(),
5003            },
5004        );
5005        assert!(
5006            session
5007                .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5008                    response_id: "resp_loop".to_string(),
5009                    delta_id: "evt_loop_1".to_string(),
5010                    item_id: "item_loop".to_string(),
5011                    previous_item_id: Some("item_repeat".to_string()),
5012                    content_index: 0,
5013                    delta: "Looping now".to_string(),
5014                })
5015                .is_inert()
5016        );
5017        assert!(
5018            session
5019                .append_realtime_transcript_event(RealtimeTranscriptEvent::UserTranscriptFinal {
5020                    item_id: "item_stop".to_string(),
5021                    previous_item_id: Some("item_loop".to_string()),
5022                    content_index: 0,
5023                    text: "Stop.".to_string(),
5024                })
5025                .is_inert(),
5026            "the stop turn waits until the interrupted assistant provider item is resolved"
5027        );
5028
5029        let outcome = session.append_realtime_transcript_event(
5030            RealtimeTranscriptEvent::AssistantTurnInterrupted {
5031                response_id: "resp_loop".to_string(),
5032            },
5033        );
5034
5035        // R5-5: materializer commits 2 messages (the retained Display item +
5036        // the unblocked "Stop." User message).
5037        assert_eq!(outcome.materialized_messages.len(), 2);
5038        // Canonical history: User-repeat, BlockAssistant(Display "Looping now"), User-Stop.
5039        assert_eq!(session.messages().len(), 3);
5040        assert!(matches!(
5041            &session.messages()[0],
5042            Message::User(user) if user.text_content() == "repeat until stop"
5043        ));
5044        match &session.messages()[1] {
5045            Message::BlockAssistant(assistant) => {
5046                let text = block_assistant_text(assistant);
5047                assert_eq!(text, "Looping now");
5048            }
5049            other => unreachable!(
5050                "Display lane assistant item must be retained on Interrupted, got {other:?}"
5051            ),
5052        }
5053        assert!(matches!(
5054            &session.messages()[2],
5055            Message::User(user) if user.text_content() == "Stop."
5056        ));
5057    }
5058
5059    #[test]
5060    fn realtime_transcript_late_interrupted_assistant_delta_stays_noncanonical() {
5061        let mut session = Session::new();
5062
5063        let _ = session.append_realtime_transcript_event(
5064            RealtimeTranscriptEvent::UserTranscriptFinal {
5065                item_id: "item_repeat".to_string(),
5066                previous_item_id: None,
5067                content_index: 0,
5068                text: "repeat until stop".to_string(),
5069            },
5070        );
5071        assert!(
5072            session
5073                .append_realtime_transcript_event(RealtimeTranscriptEvent::ItemObserved {
5074                    item_id: "item_loop".to_string(),
5075                    previous_item_id: Some("item_repeat".to_string()),
5076                    role: RealtimeTranscriptRole::Assistant,
5077                    response_id: None,
5078                })
5079                .is_inert(),
5080            "provider can observe an assistant item before the adapter learns its response id"
5081        );
5082        assert!(
5083            session
5084                .append_realtime_transcript_event(
5085                    RealtimeTranscriptEvent::AssistantTurnInterrupted {
5086                        response_id: "resp_loop".to_string(),
5087                    }
5088                )
5089                .is_inert(),
5090            "an interruption can arrive before delayed transcript deltas for the response"
5091        );
5092        assert!(
5093            session
5094                .append_realtime_transcript_event(RealtimeTranscriptEvent::UserTranscriptFinal {
5095                    item_id: "item_stop".to_string(),
5096                    previous_item_id: Some("item_loop".to_string()),
5097                    content_index: 0,
5098                    text: "Stop.".to_string(),
5099                })
5100                .is_inert(),
5101            "the stop turn waits for the provider's interrupted assistant item anchor"
5102        );
5103
5104        let late_delta_outcome =
5105            session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5106                response_id: "resp_loop".to_string(),
5107                delta_id: "evt_loop_late".to_string(),
5108                item_id: "item_loop".to_string(),
5109                previous_item_id: Some("item_repeat".to_string()),
5110                content_index: 0,
5111                delta: "Looping now".to_string(),
5112            });
5113        assert_eq!(late_delta_outcome.materialized_messages.len(), 1);
5114        assert!(matches!(
5115            &session.messages()[1],
5116            Message::User(user) if user.text_content() == "Stop."
5117        ));
5118        assert!(
5119            session
5120                .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
5121                    response_id: "resp_loop".to_string(),
5122                    stop_reason: StopReason::EndTurn,
5123                    usage: Usage::default(),
5124                })
5125                .is_inert(),
5126            "late completion for an interrupted response must not resurrect its deltas"
5127        );
5128        assert!(
5129            session
5130                .messages()
5131                .iter()
5132                .filter_map(|message| match message {
5133                    Message::BlockAssistant(assistant) => Some(block_assistant_text(assistant)),
5134                    _ => None,
5135                })
5136                .all(|text| !text.contains("Looping now")),
5137            "late interrupted assistant text must remain non-canonical"
5138        );
5139    }
5140
5141    #[test]
5142    fn realtime_transcript_completion_only_finalizes_matching_response() {
5143        let mut session = Session::new();
5144
5145        let _ = session.append_realtime_transcript_event(
5146            RealtimeTranscriptEvent::UserTranscriptFinal {
5147                item_id: "item_user".to_string(),
5148                previous_item_id: None,
5149                content_index: 0,
5150                text: "question".to_string(),
5151            },
5152        );
5153        assert!(
5154            session
5155                .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5156                    response_id: "resp_a".to_string(),
5157                    delta_id: "evt_a".to_string(),
5158                    item_id: "item_a".to_string(),
5159                    previous_item_id: Some("item_user".to_string()),
5160                    content_index: 0,
5161                    delta: "answer a".to_string(),
5162                })
5163                .is_inert()
5164        );
5165
5166        assert!(
5167            session
5168                .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
5169                    response_id: "resp_b".to_string(),
5170                    stop_reason: StopReason::EndTurn,
5171                    usage: Usage::default(),
5172                })
5173                .is_inert(),
5174            "a completion for another response must not finalize buffered assistant text"
5175        );
5176        assert_eq!(session.messages().len(), 1);
5177
5178        let outcome = session.append_realtime_transcript_event(
5179            RealtimeTranscriptEvent::AssistantTurnCompleted {
5180                response_id: "resp_a".to_string(),
5181                stop_reason: StopReason::EndTurn,
5182                usage: Usage::default(),
5183            },
5184        );
5185        assert_eq!(outcome.materialized_messages.len(), 1);
5186        assert_eq!(session.messages().len(), 2);
5187        assert!(matches!(
5188            &session.messages()[1],
5189            Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "answer a"
5190        ));
5191    }
5192
5193    #[test]
5194    fn realtime_transcript_completion_before_later_delta_is_response_scoped() {
5195        let mut session = Session::new();
5196
5197        let _ = session.append_realtime_transcript_event(
5198            RealtimeTranscriptEvent::UserTranscriptFinal {
5199                item_id: "item_user".to_string(),
5200                previous_item_id: None,
5201                content_index: 0,
5202                text: "question".to_string(),
5203            },
5204        );
5205        assert!(
5206            session
5207                .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
5208                    response_id: "resp_a".to_string(),
5209                    stop_reason: StopReason::EndTurn,
5210                    usage: Usage::default(),
5211                })
5212                .is_inert()
5213        );
5214        assert!(
5215            session
5216                .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5217                    response_id: "resp_b".to_string(),
5218                    delta_id: "evt_b".to_string(),
5219                    item_id: "item_b".to_string(),
5220                    previous_item_id: Some("item_user".to_string()),
5221                    content_index: 0,
5222                    delta: "wrong response".to_string(),
5223                })
5224                .is_inert(),
5225            "a later delta for another response must not be finalized by resp_a's pending completion"
5226        );
5227
5228        let outcome =
5229            session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5230                response_id: "resp_a".to_string(),
5231                delta_id: "evt_a".to_string(),
5232                item_id: "item_a".to_string(),
5233                previous_item_id: Some("item_user".to_string()),
5234                content_index: 0,
5235                delta: "right response".to_string(),
5236            });
5237
5238        assert_eq!(outcome.materialized_messages.len(), 1);
5239        assert_eq!(session.messages().len(), 2);
5240        assert!(matches!(
5241            &session.messages()[1],
5242            Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "right response"
5243        ));
5244    }
5245
5246    #[test]
5247    fn realtime_transcript_late_duplicate_completion_cannot_finalize_unrelated_response() {
5248        let mut session = Session::new();
5249
5250        let _ = session.append_realtime_transcript_event(
5251            RealtimeTranscriptEvent::UserTranscriptFinal {
5252                item_id: "item_user".to_string(),
5253                previous_item_id: None,
5254                content_index: 0,
5255                text: "question".to_string(),
5256            },
5257        );
5258        let _ =
5259            session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5260                response_id: "resp_a".to_string(),
5261                delta_id: "evt_a".to_string(),
5262                item_id: "item_a".to_string(),
5263                previous_item_id: Some("item_user".to_string()),
5264                content_index: 0,
5265                delta: "first".to_string(),
5266            });
5267        let _ = session.append_realtime_transcript_event(
5268            RealtimeTranscriptEvent::AssistantTurnCompleted {
5269                response_id: "resp_a".to_string(),
5270                stop_reason: StopReason::EndTurn,
5271                usage: Usage::default(),
5272            },
5273        );
5274        assert_eq!(session.messages().len(), 2);
5275
5276        assert!(
5277            session
5278                .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5279                    response_id: "resp_b".to_string(),
5280                    delta_id: "evt_b".to_string(),
5281                    item_id: "item_b".to_string(),
5282                    previous_item_id: Some("item_a".to_string()),
5283                    content_index: 0,
5284                    delta: "second".to_string(),
5285                })
5286                .is_inert()
5287        );
5288        assert!(
5289            session
5290                .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
5291                    response_id: "resp_a".to_string(),
5292                    stop_reason: StopReason::EndTurn,
5293                    usage: Usage::default(),
5294                })
5295                .is_inert(),
5296            "a duplicate late terminal for resp_a must not finalize resp_b"
5297        );
5298        assert_eq!(session.messages().len(), 2);
5299
5300        let outcome = session.append_realtime_transcript_event(
5301            RealtimeTranscriptEvent::AssistantTurnCompleted {
5302                response_id: "resp_b".to_string(),
5303                stop_reason: StopReason::EndTurn,
5304                usage: Usage::default(),
5305            },
5306        );
5307        assert_eq!(outcome.materialized_messages.len(), 1);
5308        assert_eq!(session.messages().len(), 3);
5309    }
5310
5311    #[test]
5312    fn realtime_transcript_interruption_discards_only_matching_response() {
5313        // R5-5: cross-response isolation invariant — Interrupted on resp_a
5314        // does NOT touch resp_b's staged content. Both responses use
5315        // `AssistantTextDelta` (Display lane); under R5-5 resp_a's Display
5316        // item is RETAINED at Interrupted time and resp_b's continues
5317        // unaffected, materializing on its later TurnCompleted.
5318        let mut session = Session::new();
5319
5320        let _ = session.append_realtime_transcript_event(
5321            RealtimeTranscriptEvent::UserTranscriptFinal {
5322                item_id: "item_user".to_string(),
5323                previous_item_id: None,
5324                content_index: 0,
5325                text: "question".to_string(),
5326            },
5327        );
5328        let _ =
5329            session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5330                response_id: "resp_a".to_string(),
5331                delta_id: "evt_a".to_string(),
5332                item_id: "item_a".to_string(),
5333                previous_item_id: Some("item_user".to_string()),
5334                content_index: 0,
5335                delta: "interrupted display".to_string(),
5336            });
5337        let _ =
5338            session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5339                response_id: "resp_b".to_string(),
5340                delta_id: "evt_b".to_string(),
5341                item_id: "item_b".to_string(),
5342                previous_item_id: Some("item_user".to_string()),
5343                content_index: 0,
5344                delta: "keep me".to_string(),
5345            });
5346
5347        // R5-5: Interrupted commits the resp_a Display item; resp_b
5348        // remains untouched.
5349        let interrupt_outcome = session.append_realtime_transcript_event(
5350            RealtimeTranscriptEvent::AssistantTurnInterrupted {
5351                response_id: "resp_a".to_string(),
5352            },
5353        );
5354        assert_eq!(
5355            interrupt_outcome.materialized_messages.len(),
5356            1,
5357            "resp_a's Display item commits on Interrupted"
5358        );
5359
5360        let outcome = session.append_realtime_transcript_event(
5361            RealtimeTranscriptEvent::AssistantTurnCompleted {
5362                response_id: "resp_b".to_string(),
5363                stop_reason: StopReason::EndTurn,
5364                usage: Usage::default(),
5365            },
5366        );
5367        assert_eq!(
5368            outcome.materialized_messages.len(),
5369            1,
5370            "resp_b commits on its TurnCompleted, untouched by resp_a's Interrupted"
5371        );
5372
5373        // 1 user + 2 assistant messages.
5374        assert_eq!(session.messages().len(), 3);
5375        assert!(matches!(
5376            &session.messages()[1],
5377            Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "interrupted display"
5378        ));
5379        assert!(matches!(
5380            &session.messages()[2],
5381            Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "keep me"
5382        ));
5383    }
5384
5385    // Performance tests for Arc-based CoW
5386
5387    #[test]
5388    fn test_fork_shares_arc_no_clone() {
5389        let mut session = Session::new();
5390        for i in 0..100 {
5391            session.push(Message::User(UserMessage::text(format!("Message {i}"))));
5392        }
5393
5394        // Fork should share the same Arc, not clone messages
5395        let forked = session.fork();
5396
5397        // Both should point to the same underlying data (Arc refcount > 1)
5398        assert!(Arc::ptr_eq(&session.messages, &forked.messages));
5399        assert_eq!(forked.messages().len(), 100);
5400    }
5401
5402    #[test]
5403    fn test_fork_at_shares_arc_prefix() {
5404        let mut session = Session::new();
5405        for i in 0..100 {
5406            session.push(Message::User(UserMessage::text(format!("Message {i}"))));
5407        }
5408
5409        // Fork at 50 should create new Arc with copied prefix
5410        let forked = session.fork_at(50);
5411        assert_eq!(forked.messages().len(), 50);
5412
5413        // Original should be unchanged
5414        assert_eq!(session.messages().len(), 100);
5415    }
5416
5417    #[test]
5418    fn test_fork_at_resets_transcript_history_state_for_branch_identity() {
5419        let mut session = Session::new();
5420        session.push(Message::User(UserMessage::text(
5421            "summarize this".to_string(),
5422        )));
5423        session.push(Message::BlockAssistant(BlockAssistantMessage::new(
5424            vec![AssistantBlock::Text {
5425                text: "long assistant trace".to_string(),
5426                meta: None,
5427            }],
5428            StopReason::EndTurn,
5429        )));
5430        let parent_revision = session.transcript_revision().expect("parent revision");
5431        session
5432            .commit_transcript_rewrite(
5433                TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
5434                vec![Message::BlockAssistant(BlockAssistantMessage::new(
5435                    vec![AssistantBlock::Text {
5436                        text: "compact trace".to_string(),
5437                        meta: None,
5438                    }],
5439                    StopReason::EndTurn,
5440                ))],
5441                TranscriptRewriteReason::new("compaction"),
5442                Some("test".to_string()),
5443                Some(parent_revision),
5444            )
5445            .expect("rewrite should commit");
5446
5447        let source_head = session.transcript_revision().expect("source head");
5448        let mut forked = session.fork_at(1);
5449        assert_ne!(forked.id(), session.id());
5450        assert!(
5451            !forked
5452                .metadata()
5453                .contains_key(SESSION_TRANSCRIPT_HISTORY_STATE_KEY)
5454        );
5455        assert_eq!(
5456            forked.transcript_revision().expect("fork head"),
5457            transcript_messages_digest(forked.messages()).expect("fork digest")
5458        );
5459        assert!(
5460            forked
5461                .transcript_revision_messages(&source_head)
5462                .expect("fork history lookup")
5463                .is_none()
5464        );
5465
5466        let fork_parent = forked.transcript_revision().expect("fork parent");
5467        let commit = forked
5468            .commit_transcript_rewrite(
5469                TranscriptRewriteSelection::MessageRange { start: 0, end: 1 },
5470                vec![Message::User(UserMessage::text(
5471                    "branch prompt".to_string(),
5472                ))],
5473                TranscriptRewriteReason::new("branch_edit"),
5474                Some("test".to_string()),
5475                Some(fork_parent.clone()),
5476            )
5477            .expect("fork rewrite should use fork-local parent");
5478        assert_eq!(commit.parent_revision, fork_parent);
5479    }
5480
5481    #[test]
5482    fn test_push_cow_behavior() {
5483        let mut session = Session::new();
5484        session.push(Message::User(UserMessage::text("First".to_string())));
5485
5486        // Fork shares the Arc
5487        let forked = session.fork();
5488        assert!(Arc::ptr_eq(&session.messages, &forked.messages));
5489
5490        // Push on original triggers CoW - original gets new Arc
5491        session.push(Message::User(UserMessage::text("Second".to_string())));
5492
5493        // Now they should have different Arcs
5494        assert!(!Arc::ptr_eq(&session.messages, &forked.messages));
5495        assert_eq!(session.messages().len(), 2);
5496        assert_eq!(forked.messages().len(), 1);
5497    }
5498
5499    // Performance tests for lazy timestamp updates
5500
5501    #[test]
5502    fn test_push_batch_single_timestamp() {
5503        let mut session = Session::new();
5504        let initial_updated = session.updated_at();
5505
5506        // Use push_batch to add multiple messages without repeated syscalls
5507        session.push_batch(vec![
5508            Message::User(UserMessage::text("First".to_string())),
5509            Message::User(UserMessage::text("Second".to_string())),
5510            Message::User(UserMessage::text("Third".to_string())),
5511        ]);
5512
5513        assert_eq!(session.messages().len(), 3);
5514        // Timestamp should have been updated once
5515        assert!(session.updated_at() >= initial_updated);
5516    }
5517
5518    #[test]
5519    fn test_touch_updates_timestamp() {
5520        let mut session = Session::new();
5521        let initial = session.updated_at();
5522
5523        std::thread::sleep(std::time::Duration::from_millis(10));
5524
5525        // Explicit touch to update timestamp
5526        session.touch();
5527
5528        assert!(session.updated_at() > initial);
5529    }
5530
5531    #[test]
5532    fn test_session_push() {
5533        let mut session = Session::new();
5534        let initial_updated = session.updated_at();
5535
5536        // Small delay to ensure time changes
5537        std::thread::sleep(std::time::Duration::from_millis(10));
5538
5539        session.push(Message::User(UserMessage::text("Hello".to_string())));
5540
5541        assert_eq!(session.messages().len(), 1);
5542        assert!(session.updated_at() > initial_updated);
5543    }
5544
5545    #[test]
5546    fn test_session_fork() {
5547        let mut session = Session::new();
5548        session.push(Message::System(SystemMessage::new("System prompt")));
5549        session.push(Message::User(UserMessage::text("Hello".to_string())));
5550        session.push(Message::Assistant(AssistantMessage {
5551            content: "Hi!".to_string(),
5552            tool_calls: vec![],
5553            stop_reason: StopReason::EndTurn,
5554            usage: Usage::default(),
5555            created_at: crate::types::message_timestamp_now(),
5556        }));
5557
5558        // Fork at index 2 (system + user)
5559        let forked = session.fork_at(2);
5560        assert_eq!(forked.messages().len(), 2);
5561        assert_ne!(forked.id(), session.id());
5562
5563        // Full fork
5564        let full_fork = session.fork();
5565        assert_eq!(full_fork.messages().len(), 3);
5566    }
5567
5568    #[test]
5569    fn test_session_metadata() {
5570        let mut session = Session::new();
5571        session.set_metadata("key", serde_json::json!("value"));
5572
5573        assert_eq!(session.metadata().get("key").unwrap(), "value");
5574    }
5575
5576    #[test]
5577    fn test_session_metadata_backfill_preserves_timestamp() {
5578        let mut session = Session::new();
5579        let initial_updated = session.updated_at();
5580
5581        std::thread::sleep(std::time::Duration::from_millis(10));
5582
5583        assert!(session.backfill_metadata_if_absent("key", serde_json::json!("value")));
5584        assert_eq!(session.metadata().get("key").unwrap(), "value");
5585        assert_eq!(session.updated_at(), initial_updated);
5586        assert!(!session.backfill_metadata_if_absent("key", serde_json::json!("other")));
5587        assert_eq!(session.metadata().get("key").unwrap(), "value");
5588        assert_eq!(session.updated_at(), initial_updated);
5589    }
5590
5591    #[test]
5592    fn test_session_mob_tool_authority_context_roundtrip() {
5593        let mut session = Session::new();
5594        let authority = MobToolAuthorityContext::new(
5595            crate::service::OpaquePrincipalToken::new("opaque-principal"),
5596            false,
5597        )
5598        .with_managed_mob_scope(["mob-a"])
5599        .with_audit_invocation_id("audit-1");
5600
5601        session
5602            .set_mob_tool_authority_context(Some(authority.clone()))
5603            .expect("authority should serialize");
5604        assert_eq!(session.mob_tool_authority_context(), Some(authority));
5605
5606        session
5607            .set_mob_tool_authority_context(None)
5608            .expect("authority should clear");
5609        assert!(session.mob_tool_authority_context().is_none());
5610    }
5611
5612    #[test]
5613    fn test_session_tool_visibility_state_roundtrip() {
5614        let mut session = Session::new();
5615        let state = SessionToolVisibilityState {
5616            inherited_base_filter: ToolFilter::Allow(["visible".to_string()].into_iter().collect()),
5617            active_filter: ToolFilter::Allow(
5618                ["visible".to_string(), "missing".to_string()]
5619                    .into_iter()
5620                    .collect(),
5621            ),
5622            staged_filter: ToolFilter::Allow(
5623                ["visible".to_string(), "missing".to_string()]
5624                    .into_iter()
5625                    .collect(),
5626            ),
5627            active_revision: 1,
5628            staged_revision: 2,
5629            ..Default::default()
5630        };
5631
5632        session
5633            .set_tool_visibility_state(state.clone())
5634            .expect("tool visibility state should serialize");
5635        assert_eq!(session.tool_visibility_state().unwrap(), Some(state));
5636    }
5637
5638    #[test]
5639    fn test_session_tool_visibility_state_malformed_returns_error() {
5640        let mut session = Session::new();
5641        session.set_metadata(
5642            SESSION_TOOL_VISIBILITY_STATE_KEY,
5643            serde_json::json!({
5644                "active_filter": {
5645                    "unexpected_filter_kind": ["secret"]
5646                }
5647            }),
5648        );
5649
5650        assert!(
5651            session.tool_visibility_state().is_err(),
5652            "malformed canonical visibility metadata must not decode as absent/default"
5653        );
5654    }
5655
5656    #[test]
5657    fn test_session_serialization() {
5658        let mut session = Session::new();
5659        session.push(Message::User(UserMessage::text("Test".to_string())));
5660
5661        let json = serde_json::to_string(&session).unwrap();
5662        let parsed: Session = serde_json::from_str(&json).unwrap();
5663
5664        assert_eq!(parsed.id(), session.id());
5665        assert_eq!(parsed.messages().len(), 1);
5666        assert_eq!(parsed.version(), SESSION_VERSION);
5667    }
5668
5669    #[test]
5670    fn test_session_meta_from_session() {
5671        let mut session = Session::new();
5672        session.push(Message::User(UserMessage::text("Hello".to_string())));
5673        session.push(Message::Assistant(AssistantMessage {
5674            content: "Hi!".to_string(),
5675            tool_calls: vec![],
5676            stop_reason: StopReason::EndTurn,
5677            usage: Usage {
5678                input_tokens: 10,
5679                output_tokens: 5,
5680                cache_creation_tokens: None,
5681                cache_read_tokens: None,
5682            },
5683            created_at: crate::types::message_timestamp_now(),
5684        }));
5685        session.record_usage(Usage {
5686            input_tokens: 10,
5687            output_tokens: 5,
5688            cache_creation_tokens: None,
5689            cache_read_tokens: None,
5690        });
5691
5692        let meta = SessionMeta::from(&session);
5693        assert_eq!(meta.id, *session.id());
5694        assert_eq!(meta.message_count, 2);
5695        assert_eq!(meta.total_tokens, 15);
5696    }
5697
5698    #[test]
5699    fn has_pending_boundary_empty_session() {
5700        let session = Session::new();
5701        assert!(!session.has_pending_boundary());
5702    }
5703
5704    #[test]
5705    fn has_pending_boundary_after_user_message() {
5706        let mut session = Session::new();
5707        session.push(Message::User(UserMessage::text("hello")));
5708        assert!(session.has_pending_boundary());
5709    }
5710
5711    #[test]
5712    fn has_pending_boundary_after_assistant_message() {
5713        let mut session = Session::new();
5714        session.push(Message::User(UserMessage::text("hello")));
5715        session.push(Message::BlockAssistant(BlockAssistantMessage::new(
5716            vec![],
5717            StopReason::EndTurn,
5718        )));
5719        assert!(!session.has_pending_boundary());
5720    }
5721
5722    #[test]
5723    fn has_pending_boundary_after_tool_results() {
5724        let mut session = Session::new();
5725        session.push(Message::User(UserMessage::text("hello")));
5726        session.push(Message::tool_results(vec![]));
5727        assert!(session.has_pending_boundary());
5728    }
5729
5730    #[test]
5731    fn has_pending_boundary_after_system() {
5732        let mut session = Session::new();
5733        session.push(Message::System(SystemMessage::new("system")));
5734        assert!(!session.has_pending_boundary());
5735    }
5736
5737    #[test]
5738    fn system_context_state_preserves_applied_runtime_context() {
5739        let accepted_at = SystemTime::UNIX_EPOCH;
5740        let mut state = SessionSystemContextState::default();
5741        state
5742            .stage_append(
5743                &AppendSystemContextRequest {
5744                    text: "Authoritative peer token is birch seventeen.".to_string(),
5745                    source: Some(
5746                        "peer_response_terminal:analyst:018f6f79-7a82-7c4e-a552-a3b86f9630f1"
5747                            .to_string(),
5748                    ),
5749                    idempotency_key: Some("018f6f79-7a82-7c4e-a552-a3b86f9630f1".to_string()),
5750                },
5751                accepted_at,
5752            )
5753            .expect("append should stage");
5754
5755        state.mark_pending_applied();
5756
5757        assert!(state.pending.is_empty());
5758        assert_eq!(state.applied.len(), 1);
5759        assert_eq!(
5760            state.applied[0].text,
5761            "Authoritative peer token is birch seventeen."
5762        );
5763        assert_eq!(
5764            state.applied[0].source.as_deref(),
5765            Some("peer_response_terminal:analyst:018f6f79-7a82-7c4e-a552-a3b86f9630f1")
5766        );
5767
5768        let round_tripped: SessionSystemContextState =
5769            serde_json::from_value(serde_json::to_value(&state).expect("serialize state"))
5770                .expect("deserialize state");
5771        assert_eq!(round_tripped.applied, state.applied);
5772    }
5773
5774    #[test]
5775    fn active_turn_system_context_is_discarded_when_not_applied() {
5776        let mut state = SessionSystemContextState::default();
5777        state
5778            .stage_active_turn_append(
5779                &AppendSystemContextRequest {
5780                    text: "only for the active run".to_string(),
5781                    source: Some("runtime:steer:input-1".to_string()),
5782                    idempotency_key: Some("runtime:steer:input-1".to_string()),
5783                },
5784                SystemTime::UNIX_EPOCH,
5785            )
5786            .expect("active context should stage");
5787
5788        let discarded = state.discard_unapplied_active_turn_pending();
5789
5790        assert_eq!(discarded.len(), 1);
5791        assert!(state.pending.is_empty());
5792        assert!(state.applied.is_empty());
5793        assert!(state.active_turn_pending_keys.is_empty());
5794        assert!(
5795            state.seen.is_empty(),
5796            "discarded active-turn context should not block later idempotency keys"
5797        );
5798    }
5799
5800    #[test]
5801    fn active_turn_system_context_can_roll_back_targeted_keys() {
5802        let mut state = SessionSystemContextState::default();
5803        for key in ["runtime:steer:input-1", "runtime:steer:input-2"] {
5804            state
5805                .stage_active_turn_append(
5806                    &AppendSystemContextRequest {
5807                        text: format!("context for {key}"),
5808                        source: Some(key.to_string()),
5809                        idempotency_key: Some(key.to_string()),
5810                    },
5811                    SystemTime::UNIX_EPOCH,
5812                )
5813                .expect("active context should stage");
5814        }
5815
5816        let discarded =
5817            state.discard_active_turn_pending_by_keys(&["runtime:steer:input-1".to_string()]);
5818
5819        assert_eq!(discarded.len(), 1);
5820        assert_eq!(
5821            discarded[0].idempotency_key.as_deref(),
5822            Some("runtime:steer:input-1")
5823        );
5824        assert_eq!(state.pending.len(), 1);
5825        assert_eq!(
5826            state.pending[0].idempotency_key.as_deref(),
5827            Some("runtime:steer:input-2")
5828        );
5829        assert!(!state.seen.contains_key("runtime:steer:input-1"));
5830        assert!(state.seen.contains_key("runtime:steer:input-2"));
5831        assert!(
5832            !state
5833                .active_turn_pending_keys
5834                .contains("runtime:steer:input-1")
5835        );
5836        assert!(
5837            state
5838                .active_turn_pending_keys
5839                .contains("runtime:steer:input-2")
5840        );
5841    }
5842
5843    #[test]
5844    fn active_turn_system_context_is_transient_when_boundary_consumes_it() {
5845        let mut state = SessionSystemContextState::default();
5846        state
5847            .stage_active_turn_append(
5848                &AppendSystemContextRequest {
5849                    text: "visible to this run".to_string(),
5850                    source: Some("runtime:steer:input-2".to_string()),
5851                    idempotency_key: Some("runtime:steer:input-2".to_string()),
5852                },
5853                SystemTime::UNIX_EPOCH,
5854            )
5855            .expect("active context should stage");
5856
5857        state.mark_pending_applied();
5858        let discarded = state.discard_unapplied_active_turn_pending();
5859
5860        assert!(discarded.is_empty());
5861        assert!(state.pending.is_empty());
5862        assert!(state.applied.is_empty());
5863        assert!(state.active_turn_pending_keys.is_empty());
5864        assert_eq!(
5865            state.seen.get("runtime:steer:input-2"),
5866            None,
5867            "consumed active-turn steer context must not become durable state"
5868        );
5869    }
5870
5871    #[test]
5872    fn discard_transient_runtime_steer_context_removes_legacy_prompt_and_state() {
5873        let mut session = Session::new();
5874        session.set_system_prompt(format!(
5875            "base{}{}{}{}",
5876            SYSTEM_CONTEXT_SEPARATOR,
5877            render_system_context_block(&PendingSystemContextAppend {
5878                text: "old steer".to_string(),
5879                source: Some("runtime:steer:old".to_string()),
5880                idempotency_key: Some("runtime:steer:old".to_string()),
5881                accepted_at: SystemTime::UNIX_EPOCH,
5882            }),
5883            SYSTEM_CONTEXT_SEPARATOR,
5884            render_system_context_block(&PendingSystemContextAppend {
5885                text: "durable peer fact".to_string(),
5886                source: Some("peer_response_terminal:analyst:req".to_string()),
5887                idempotency_key: Some("peer_response_terminal:analyst:req".to_string()),
5888                accepted_at: SystemTime::UNIX_EPOCH,
5889            })
5890        ));
5891        session
5892            .set_system_context_state(SessionSystemContextState {
5893                pending: vec![PendingSystemContextAppend {
5894                    text: "pending steer".to_string(),
5895                    source: Some("runtime:steer:pending".to_string()),
5896                    idempotency_key: Some("runtime:steer:pending".to_string()),
5897                    accepted_at: SystemTime::UNIX_EPOCH,
5898                }],
5899                applied: vec![
5900                    PendingSystemContextAppend {
5901                        text: "old steer".to_string(),
5902                        source: Some("runtime:steer:old".to_string()),
5903                        idempotency_key: Some("runtime:steer:old".to_string()),
5904                        accepted_at: SystemTime::UNIX_EPOCH,
5905                    },
5906                    PendingSystemContextAppend {
5907                        text: "durable peer fact".to_string(),
5908                        source: Some("peer_response_terminal:analyst:req".to_string()),
5909                        idempotency_key: Some("peer_response_terminal:analyst:req".to_string()),
5910                        accepted_at: SystemTime::UNIX_EPOCH,
5911                    },
5912                ],
5913                seen: BTreeMap::from([(
5914                    "runtime:steer:old".to_string(),
5915                    SeenSystemContextKey {
5916                        text: "old steer".to_string(),
5917                        source: Some("runtime:steer:old".to_string()),
5918                        state: SeenSystemContextState::Applied,
5919                    },
5920                )]),
5921                active_turn_pending_keys: BTreeSet::from(["runtime:steer:pending".to_string()]),
5922            })
5923            .expect("system context state should serialize");
5924
5925        let removed = session.discard_transient_runtime_steer_context();
5926
5927        assert!(removed >= 4);
5928        let system_prompt = match session.messages().first() {
5929            Some(Message::System(system)) => system.content.as_str(),
5930            other => panic!("expected system prompt, got {other:?}"),
5931        };
5932        assert!(!system_prompt.contains("runtime:steer:"));
5933        assert!(system_prompt.contains("durable peer fact"));
5934        let state = session.system_context_state().unwrap_or_default();
5935        assert!(state.pending.is_empty());
5936        assert_eq!(state.applied.len(), 1);
5937        assert_eq!(state.applied[0].text, "durable peer fact");
5938        assert!(state.seen.is_empty());
5939        assert!(state.active_turn_pending_keys.is_empty());
5940    }
5941
5942    #[test]
5943    fn append_system_context_blocks_records_typed_applied_context() {
5944        let append = PendingSystemContextAppend {
5945            text: "Authoritative peer token is birch seventeen.".to_string(),
5946            source: Some(
5947                "peer_response_terminal:analyst:018f6f79-7a82-7c4e-a552-a3b86f9630f1".to_string(),
5948            ),
5949            idempotency_key: Some("018f6f79-7a82-7c4e-a552-a3b86f9630f1".to_string()),
5950            accepted_at: SystemTime::UNIX_EPOCH,
5951        };
5952        let mut session = Session::new();
5953
5954        session.append_system_context_blocks(std::slice::from_ref(&append));
5955
5956        let state = session
5957            .system_context_state()
5958            .expect("append should persist typed context state");
5959        assert_eq!(state.applied, vec![append]);
5960    }
5961
5962    #[test]
5963    fn append_system_context_blocks_renders_pre_marked_pending_context() {
5964        let accepted_at = SystemTime::UNIX_EPOCH;
5965        let mut state = SessionSystemContextState::default();
5966        state
5967            .stage_append(
5968                &AppendSystemContextRequest {
5969                    text: "Apply this staged context at the request boundary.".to_string(),
5970                    source: Some("rpc/session_inject_context".to_string()),
5971                    idempotency_key: Some("ctx-boundary".to_string()),
5972                },
5973                accepted_at,
5974            )
5975            .expect("append should stage");
5976        let pending = state.pending.clone();
5977        state.mark_pending_applied();
5978        let mut session = Session::new();
5979        session
5980            .set_system_context_state(state)
5981            .expect("state should serialize");
5982
5983        session.append_system_context_blocks(&pending);
5984
5985        let system_prompt = session
5986            .messages()
5987            .first()
5988            .and_then(|message| match message {
5989                Message::System(system) => Some(system.content.as_str()),
5990                _ => None,
5991            })
5992            .unwrap_or_default();
5993        assert!(system_prompt.contains("Apply this staged context at the request boundary."));
5994        let state = session
5995            .system_context_state()
5996            .expect("append should persist typed context state");
5997        assert_eq!(state.applied.len(), 1);
5998        assert_eq!(
5999            state.seen["ctx-boundary"].state,
6000            SeenSystemContextState::Applied
6001        );
6002    }
6003
6004    #[test]
6005    fn append_system_context_blocks_renders_pre_marked_context_without_idempotency_key() {
6006        let accepted_at = SystemTime::UNIX_EPOCH;
6007        let mut state = SessionSystemContextState::default();
6008        state
6009            .stage_append(
6010                &AppendSystemContextRequest {
6011                    text: "Apply this unkeyed staged context at the request boundary.".to_string(),
6012                    source: Some("rpc/session_inject_context".to_string()),
6013                    idempotency_key: None,
6014                },
6015                accepted_at,
6016            )
6017            .expect("append should stage");
6018        let pending = state.pending.clone();
6019        state.mark_pending_applied();
6020        let mut session = Session::new();
6021        session
6022            .set_system_context_state(state)
6023            .expect("state should serialize");
6024
6025        session.append_system_context_blocks(&pending);
6026
6027        let system_prompt = session
6028            .messages()
6029            .first()
6030            .and_then(|message| match message {
6031                Message::System(system) => Some(system.content.as_str()),
6032                _ => None,
6033            })
6034            .unwrap_or_default();
6035        assert!(
6036            system_prompt.contains("Apply this unkeyed staged context at the request boundary.")
6037        );
6038    }
6039
6040    #[test]
6041    fn append_system_context_blocks_skips_duplicate_idempotency_key() {
6042        let first = PendingSystemContextAppend {
6043            text: "Authoritative peer token is birch seventeen.".to_string(),
6044            source: Some("peer_response_terminal:analyst:req-1".to_string()),
6045            idempotency_key: Some("req-1".to_string()),
6046            accepted_at: SystemTime::UNIX_EPOCH,
6047        };
6048        let duplicate = PendingSystemContextAppend {
6049            accepted_at: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1),
6050            ..first.clone()
6051        };
6052        let mut session = Session::new();
6053
6054        session.append_system_context_blocks(std::slice::from_ref(&first));
6055        session.append_system_context_blocks(std::slice::from_ref(&duplicate));
6056
6057        let state = session
6058            .system_context_state()
6059            .expect("append should persist typed context state");
6060        assert_eq!(state.applied, vec![first]);
6061        let system_prompt = session
6062            .messages()
6063            .first()
6064            .and_then(|message| match message {
6065                Message::System(system) => Some(system.content.as_str()),
6066                _ => None,
6067            })
6068            .unwrap_or_default();
6069        assert_eq!(
6070            system_prompt
6071                .matches("Authoritative peer token is birch seventeen.")
6072                .count(),
6073            1
6074        );
6075    }
6076
6077    #[test]
6078    fn append_system_context_blocks_skips_conflicting_duplicate_idempotency_key() {
6079        let first = PendingSystemContextAppend {
6080            text: "Authoritative peer token is birch seventeen.".to_string(),
6081            source: Some("peer_response_terminal:analyst:req-1".to_string()),
6082            idempotency_key: Some("req-1".to_string()),
6083            accepted_at: SystemTime::UNIX_EPOCH,
6084        };
6085        let conflicting = PendingSystemContextAppend {
6086            text: "Conflicting peer token should not reach the prompt.".to_string(),
6087            accepted_at: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1),
6088            ..first.clone()
6089        };
6090        let mut session = Session::new();
6091
6092        session.append_system_context_blocks(std::slice::from_ref(&first));
6093        session.append_system_context_blocks(std::slice::from_ref(&conflicting));
6094
6095        let state = session
6096            .system_context_state()
6097            .expect("append should persist typed context state");
6098        assert_eq!(state.applied, vec![first]);
6099        let system_prompt = session
6100            .messages()
6101            .first()
6102            .and_then(|message| match message {
6103                Message::System(system) => Some(system.content.as_str()),
6104                _ => None,
6105            })
6106            .unwrap_or_default();
6107        assert!(system_prompt.contains("Authoritative peer token is birch seventeen."));
6108        assert!(!system_prompt.contains("Conflicting peer token should not reach the prompt."));
6109    }
6110
6111    // ------------------------------------------------------------------
6112    // T9/T10: realtime transcript lane materialization.
6113    //
6114    // The display-text lane (`AssistantTextDelta`) materializes as
6115    // `AssistantBlock::Text`; the spoken-transcript lane
6116    // (`AssistantTranscriptDelta`) materializes as
6117    // `AssistantBlock::Transcript { source: TranscriptSource::Spoken }`.
6118    // These regressions pin both flushes and prove the materializer
6119    // dispatches on the per-item `TranscriptLane`.
6120    // ------------------------------------------------------------------
6121
6122    #[test]
6123    fn realtime_transcript_assistant_transcript_delta_materializes_transcript_block() {
6124        let mut session = Session::new();
6125
6126        let delta = RealtimeTranscriptEvent::AssistantTranscriptDelta {
6127            response_id: "resp_spoken".to_string(),
6128            delta_id: "evt_delta_spoken_1".to_string(),
6129            item_id: "item_spoken".to_string(),
6130            previous_item_id: None,
6131            content_index: 0,
6132            delta: "I said hi".to_string(),
6133        };
6134        assert!(
6135            session.append_realtime_transcript_event(delta).is_inert(),
6136            "delta alone is inert until turn-completed flushes"
6137        );
6138
6139        let terminal = RealtimeTranscriptEvent::AssistantTurnCompleted {
6140            response_id: "resp_spoken".to_string(),
6141            stop_reason: StopReason::EndTurn,
6142            usage: Usage::default(),
6143        };
6144        let outcome = session.append_realtime_transcript_event(terminal);
6145        assert_eq!(outcome.materialized_messages.len(), 1);
6146
6147        // T9/T10: must be a Transcript block, NOT Text.
6148        let messages = session.messages();
6149        assert_eq!(messages.len(), 1);
6150        match &messages[0] {
6151            Message::BlockAssistant(assistant) => {
6152                assert_eq!(assistant.blocks.len(), 1);
6153                match &assistant.blocks[0] {
6154                    AssistantBlock::Transcript { text, source, .. } => {
6155                        assert_eq!(text, "I said hi");
6156                        assert_eq!(*source, crate::types::TranscriptSource::Spoken);
6157                    }
6158                    other => unreachable!(
6159                        "AssistantTranscriptDelta must materialize as AssistantBlock::Transcript, got {other:?}"
6160                    ),
6161                }
6162            }
6163            other => unreachable!("expected BlockAssistant message, got {other:?}"),
6164        }
6165    }
6166
6167    #[test]
6168    fn round4_cc4_in_flight_response_ids_lists_distinct_unmaterialized_responses() {
6169        // CC4 (Round-4 architectural reconciliation): the helper that
6170        // powers `signal_turn_interrupt`'s cross-layer fan-out must
6171        // return every distinct provider response_id that has at least
6172        // one unmaterialized assistant item, EXCLUDING already-discarded
6173        // responses and EXCLUDING the user role.
6174        let mut session = Session::new();
6175
6176        // Two transcript-delta items on resp_a (different content_index
6177        // ranges), one on resp_b. resp_c gets a delta and is then
6178        // discarded explicitly via AssistantTurnInterrupted.
6179        for (i, response_id) in [
6180            ("resp_a", "resp_a"),
6181            ("resp_a_extra", "resp_a"),
6182            ("resp_b", "resp_b"),
6183            ("resp_c", "resp_c"),
6184        ]
6185        .iter()
6186        .enumerate()
6187        {
6188            let event = RealtimeTranscriptEvent::AssistantTranscriptDelta {
6189                response_id: response_id.1.to_string(),
6190                delta_id: format!("delta_{i}"),
6191                item_id: response_id.0.to_string(),
6192                previous_item_id: None,
6193                content_index: 0,
6194                delta: "x".to_string(),
6195            };
6196            let _ = session.append_realtime_transcript_event(event);
6197        }
6198
6199        // Discard resp_c — it should not appear in the in-flight list.
6200        let _ = session.append_realtime_transcript_event(
6201            RealtimeTranscriptEvent::AssistantTurnInterrupted {
6202                response_id: "resp_c".to_string(),
6203            },
6204        );
6205
6206        // User-role item should never appear (CC4 only fans interrupts
6207        // to assistant responses).
6208        let _ = session.append_realtime_transcript_event(
6209            RealtimeTranscriptEvent::UserTranscriptFinal {
6210                item_id: "u_item".to_string(),
6211                previous_item_id: None,
6212                content_index: 0,
6213                text: "hi".to_string(),
6214            },
6215        );
6216
6217        let in_flight = session.in_flight_realtime_assistant_response_ids();
6218        assert!(in_flight.contains(&"resp_a".to_string()), "{in_flight:?}");
6219        assert!(in_flight.contains(&"resp_b".to_string()), "{in_flight:?}");
6220        assert!(
6221            !in_flight.contains(&"resp_c".to_string()),
6222            "discarded response must not appear in in_flight: {in_flight:?}"
6223        );
6224        // resp_a appears exactly once even though two items reference it.
6225        assert_eq!(
6226            in_flight.iter().filter(|r| *r == "resp_a").count(),
6227            1,
6228            "distinct response_ids only: {in_flight:?}"
6229        );
6230    }
6231
6232    #[test]
6233    fn round4_cc2_assistant_turn_completed_after_transcript_deltas_materializes_transcript() {
6234        // CC2 (Round-4 architectural reconciliation): once
6235        // `signal_turn_completed` synthesizes
6236        // `RealtimeTranscriptEvent::AssistantTurnCompleted`, the staging
6237        // materializer commits every staged transcript-delta item for
6238        // that response_id as `AssistantBlock::Transcript { Spoken }`.
6239        // This pins the production end-to-end shape the sink relies on.
6240        let mut session = Session::new();
6241
6242        let delta = RealtimeTranscriptEvent::AssistantTranscriptDelta {
6243            response_id: "resp_cc2".to_string(),
6244            delta_id: "delta_cc2_1".to_string(),
6245            item_id: "item_cc2".to_string(),
6246            previous_item_id: None,
6247            content_index: 0,
6248            delta: "hello world".to_string(),
6249        };
6250        assert!(session.append_realtime_transcript_event(delta).is_inert());
6251
6252        // Pre-completion: in-flight list reports resp_cc2.
6253        assert_eq!(
6254            session.in_flight_realtime_assistant_response_ids(),
6255            vec!["resp_cc2".to_string()]
6256        );
6257
6258        let outcome = session.append_realtime_transcript_event(
6259            RealtimeTranscriptEvent::AssistantTurnCompleted {
6260                response_id: "resp_cc2".to_string(),
6261                stop_reason: StopReason::EndTurn,
6262                usage: Usage::default(),
6263            },
6264        );
6265        assert_eq!(outcome.materialized_messages.len(), 1);
6266
6267        // Post-completion: in-flight list is empty (item is materialized).
6268        assert!(
6269            session
6270                .in_flight_realtime_assistant_response_ids()
6271                .is_empty(),
6272            "materialized items must not appear in in_flight_realtime_assistant_response_ids"
6273        );
6274
6275        let messages = session.messages();
6276        let assistant = messages.iter().find_map(|m| match m {
6277            Message::BlockAssistant(a) => Some(a),
6278            _ => None,
6279        });
6280        let assistant = assistant.expect("assistant block message expected");
6281        assert_eq!(assistant.blocks.len(), 1);
6282        assert!(matches!(
6283            &assistant.blocks[0],
6284            AssistantBlock::Transcript {
6285                source: crate::types::TranscriptSource::Spoken,
6286                ..
6287            }
6288        ));
6289    }
6290
6291    #[test]
6292    fn realtime_transcript_assistant_text_delta_still_materializes_text_block() {
6293        // Counter-regression: the display-text lane must continue to
6294        // produce `AssistantBlock::Text` after T9/T10. Prevents an
6295        // accidental cross-lane flip.
6296        let mut session = Session::new();
6297
6298        let delta = RealtimeTranscriptEvent::AssistantTextDelta {
6299            response_id: "resp_display".to_string(),
6300            delta_id: "evt_delta_display_1".to_string(),
6301            item_id: "item_display".to_string(),
6302            previous_item_id: None,
6303            content_index: 0,
6304            delta: "I wrote".to_string(),
6305        };
6306        let _ = session.append_realtime_transcript_event(delta);
6307
6308        let terminal = RealtimeTranscriptEvent::AssistantTurnCompleted {
6309            response_id: "resp_display".to_string(),
6310            stop_reason: StopReason::EndTurn,
6311            usage: Usage::default(),
6312        };
6313        let outcome = session.append_realtime_transcript_event(terminal);
6314        assert_eq!(outcome.materialized_messages.len(), 1);
6315
6316        let messages = session.messages();
6317        match &messages[0] {
6318            Message::BlockAssistant(assistant) => match &assistant.blocks[0] {
6319                AssistantBlock::Text { text, .. } => assert_eq!(text, "I wrote"),
6320                other => unreachable!(
6321                    "AssistantTextDelta must keep materializing AssistantBlock::Text, got {other:?}"
6322                ),
6323            },
6324            other => unreachable!("expected BlockAssistant message, got {other:?}"),
6325        }
6326    }
6327
6328    #[test]
6329    fn round4_cc7_mixed_response_persists_text_and_transcript_in_order() {
6330        // CC7 (Round-4 adversarial-verifier follow-up): a single mixed-modality
6331        // realtime response that emits BOTH display-text deltas
6332        // (`AssistantTextDelta`) AND spoken-transcript deltas
6333        // (`AssistantTranscriptDelta`) under the same response_id must
6334        // materialize as ONE `Message::BlockAssistant` whose `blocks` field
6335        // contains exactly two ordered entries:
6336        //   1. AssistantBlock::Text       (display-text lane)
6337        //   2. AssistantBlock::Transcript { source: Spoken } (spoken lane)
6338        // Pre-fix the materializer emitted one Message::BlockAssistant per
6339        // staged item, splitting the mixed response into two messages.
6340        //
6341        // This test drives the production materializer end-to-end: deltas
6342        // stage in `SessionRealtimeTranscriptState`; `AssistantTurnCompleted`
6343        // triggers the materializer; canonical history is the assertion
6344        // surface — exactly the same code path that
6345        // `SessionServiceProjectionSink::signal_turn_completed` invokes via
6346        // `runtime.append_realtime_transcript_event` in production.
6347        let mut session = Session::new();
6348
6349        // Provider-arrival order: display first, then spoken.
6350        let display_a = RealtimeTranscriptEvent::AssistantTextDelta {
6351            response_id: "resp_mixed_1".to_string(),
6352            delta_id: "delta_disp_1".to_string(),
6353            item_id: "item_display".to_string(),
6354            previous_item_id: None,
6355            content_index: 0,
6356            delta: "Here's the report:".to_string(),
6357        };
6358        assert!(
6359            session
6360                .append_realtime_transcript_event(display_a)
6361                .is_inert()
6362        );
6363
6364        let display_b = RealtimeTranscriptEvent::AssistantTextDelta {
6365            response_id: "resp_mixed_1".to_string(),
6366            delta_id: "delta_disp_2".to_string(),
6367            item_id: "item_display".to_string(),
6368            previous_item_id: None,
6369            content_index: 0,
6370            delta: " (still writing)".to_string(),
6371        };
6372        assert!(
6373            session
6374                .append_realtime_transcript_event(display_b)
6375                .is_inert()
6376        );
6377
6378        // Spoken items chain after the display item to mirror provider
6379        // arrival semantics — `previous_item_id` carries arrival ordering
6380        // that the materializer must preserve as block ordering inside the
6381        // single emitted message.
6382        let spoken_a = RealtimeTranscriptEvent::AssistantTranscriptDelta {
6383            response_id: "resp_mixed_1".to_string(),
6384            delta_id: "delta_spoken_1".to_string(),
6385            item_id: "item_spoken".to_string(),
6386            previous_item_id: Some("item_display".to_string()),
6387            content_index: 0,
6388            delta: "I'm reading the report aloud:".to_string(),
6389        };
6390        assert!(
6391            session
6392                .append_realtime_transcript_event(spoken_a)
6393                .is_inert()
6394        );
6395
6396        let spoken_b = RealtimeTranscriptEvent::AssistantTranscriptDelta {
6397            response_id: "resp_mixed_1".to_string(),
6398            delta_id: "delta_spoken_2".to_string(),
6399            item_id: "item_spoken".to_string(),
6400            previous_item_id: Some("item_display".to_string()),
6401            content_index: 0,
6402            delta: " sentence two.".to_string(),
6403        };
6404        assert!(
6405            session
6406                .append_realtime_transcript_event(spoken_b)
6407                .is_inert()
6408        );
6409
6410        // TurnCompleted triggers the materializer to flush all staged items
6411        // for this response_id into ONE BlockAssistant message.
6412        let outcome = session.append_realtime_transcript_event(
6413            RealtimeTranscriptEvent::AssistantTurnCompleted {
6414                response_id: "resp_mixed_1".to_string(),
6415                stop_reason: StopReason::EndTurn,
6416                usage: Usage {
6417                    input_tokens: 11,
6418                    output_tokens: 22,
6419                    cache_creation_tokens: None,
6420                    cache_read_tokens: None,
6421                },
6422            },
6423        );
6424        // Materializer reports two staged items got materialized.
6425        assert_eq!(outcome.materialized_messages.len(), 2);
6426
6427        // Canonical history MUST contain exactly ONE BlockAssistant message
6428        // (the CC7 fix: mixed lanes interleave into one message, not two).
6429        let messages = session.messages();
6430        let assistants: Vec<&BlockAssistantMessage> = messages
6431            .iter()
6432            .filter_map(|m| match m {
6433                Message::BlockAssistant(a) => Some(a),
6434                _ => None,
6435            })
6436            .collect();
6437        assert_eq!(
6438            assistants.len(),
6439            1,
6440            "mixed display+spoken response under one response_id must produce exactly ONE BlockAssistant message, got: {assistants:?}"
6441        );
6442        let assistant = assistants[0];
6443        assert_eq!(
6444            assistant.blocks.len(),
6445            2,
6446            "mixed response message must carry both blocks: {:?}",
6447            assistant.blocks
6448        );
6449
6450        // Block 0: display-text (concatenated deltas).
6451        match &assistant.blocks[0] {
6452            AssistantBlock::Text { text, .. } => {
6453                assert_eq!(text, "Here's the report: (still writing)");
6454            }
6455            other => unreachable!(
6456                "first block must be AssistantBlock::Text (display lane), got {other:?}"
6457            ),
6458        }
6459        // Block 1: spoken transcript (concatenated deltas), tagged Spoken.
6460        match &assistant.blocks[1] {
6461            AssistantBlock::Transcript { text, source, .. } => {
6462                assert_eq!(text, "I'm reading the report aloud: sentence two.");
6463                assert_eq!(*source, crate::types::TranscriptSource::Spoken);
6464            }
6465            other => unreachable!(
6466                "second block must be AssistantBlock::Transcript {{ source: Spoken }}, got {other:?}"
6467            ),
6468        }
6469
6470        // Usage was recorded once for the turn.
6471        assert_eq!(session.usage.input_tokens, 11);
6472        assert_eq!(session.usage.output_tokens, 22);
6473    }
6474
6475    #[test]
6476    fn round5_r55_mixed_response_barge_in_preserves_display_drops_spoken() {
6477        // R5-5 (Round-5 contract update): barge-in MUST filter staged items
6478        // by lane — `Spoken` is invalidated (the user spoke over the audio
6479        // they were hearing) but `Display` survives as committed history
6480        // (sideband display text from the same response is not "spoken
6481        // over"). Round-4's `round4_cc7_mixed_response_barge_in_discards_*`
6482        // pinned the wrong invariant; this test replaces it.
6483        //
6484        // Architectural decision: `AssistantTurnInterrupted` is terminal for
6485        // the response on the realtime-staging path — any later
6486        // `AssistantTurnCompleted { stop_reason: Cancelled }` short-circuits
6487        // via the `discarded_assistant_response_ids` guard. So the
6488        // Interrupted handler must seed a synthetic
6489        // `assistant_completions` entry (`StopReason::Cancelled`,
6490        // `Usage::default()`) so retained Display items materialize
6491        // immediately rather than stranding forever.
6492        let mut session = Session::new();
6493
6494        let display = RealtimeTranscriptEvent::AssistantTextDelta {
6495            response_id: "resp_mixed_2".to_string(),
6496            delta_id: "delta_disp_1".to_string(),
6497            item_id: "item_display_2".to_string(),
6498            previous_item_id: None,
6499            content_index: 0,
6500            delta: "Working on the report...".to_string(),
6501        };
6502        let _ = session.append_realtime_transcript_event(display);
6503
6504        let spoken = RealtimeTranscriptEvent::AssistantTranscriptDelta {
6505            response_id: "resp_mixed_2".to_string(),
6506            delta_id: "delta_spoken_1".to_string(),
6507            item_id: "item_spoken_2".to_string(),
6508            previous_item_id: Some("item_display_2".to_string()),
6509            content_index: 0,
6510            delta: "I'm reading the report".to_string(),
6511        };
6512        let _ = session.append_realtime_transcript_event(spoken);
6513
6514        // Barge-in arrives BEFORE TurnCompleted. The Display item with
6515        // staged content materializes immediately under the synthetic
6516        // Cancelled completion.
6517        let outcome = session.append_realtime_transcript_event(
6518            RealtimeTranscriptEvent::AssistantTurnInterrupted {
6519                response_id: "resp_mixed_2".to_string(),
6520            },
6521        );
6522        assert_eq!(
6523            outcome.materialized_messages.len(),
6524            1,
6525            "Display lane item must materialize on Interrupted: {outcome:?}"
6526        );
6527
6528        // A late `AssistantTurnCompleted` (the provider's response.done
6529        // emitted after cancel) must be a no-op: the Display item is
6530        // already materialized; the Spoken item was dropped at Interrupted.
6531        let late_completion = session.append_realtime_transcript_event(
6532            RealtimeTranscriptEvent::AssistantTurnCompleted {
6533                response_id: "resp_mixed_2".to_string(),
6534                stop_reason: StopReason::Cancelled,
6535                usage: Usage::default(),
6536            },
6537        );
6538        assert_eq!(
6539            late_completion.materialized_messages.len(),
6540            0,
6541            "post-barge-in TurnCompleted must not resurrect anything"
6542        );
6543
6544        // Canonical history: exactly one BlockAssistant carrying the
6545        // Display text (no Transcript block — Spoken was dropped).
6546        let messages = session.messages();
6547        let assistants: Vec<&BlockAssistantMessage> = messages
6548            .iter()
6549            .filter_map(|m| match m {
6550                Message::BlockAssistant(a) => Some(a),
6551                _ => None,
6552            })
6553            .collect();
6554        assert_eq!(
6555            assistants.len(),
6556            1,
6557            "barge-in must commit exactly one BlockAssistant containing the Display lane: {assistants:?}"
6558        );
6559        let assistant = assistants[0];
6560        assert_eq!(assistant.blocks.len(), 1, "blocks: {:?}", assistant.blocks);
6561        match &assistant.blocks[0] {
6562            AssistantBlock::Text { text, .. } => {
6563                assert_eq!(text, "Working on the report...");
6564            }
6565            other => {
6566                unreachable!("Display lane must materialize as AssistantBlock::Text, got {other:?}")
6567            }
6568        }
6569        // No Transcript block — Spoken lane was dropped.
6570        assert!(
6571            !assistant
6572                .blocks
6573                .iter()
6574                .any(|b| matches!(b, AssistantBlock::Transcript { .. })),
6575            "Spoken lane must be dropped on barge-in"
6576        );
6577
6578        // The in-flight tracker reports the response as no longer in flight
6579        // (the Display item is materialized; the Spoken item is skipped).
6580        assert!(
6581            !session
6582                .in_flight_realtime_assistant_response_ids()
6583                .contains(&"resp_mixed_2".to_string()),
6584            "barged-in response must not appear in in_flight_realtime_assistant_response_ids"
6585        );
6586    }
6587
6588    #[test]
6589    fn round5_r55_barge_in_preserves_display_lane_drops_spoken() {
6590        // R5-5 unit test: pin the lane-filter behavior at the staged-item
6591        // level (no chained predecessor). One Display item, one Spoken item,
6592        // both unchained, both staged before Interrupted.
6593        let mut session = Session::new();
6594
6595        let _ =
6596            session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
6597                response_id: "resp_a".to_string(),
6598                delta_id: "delta_d_1".to_string(),
6599                item_id: "item_display".to_string(),
6600                previous_item_id: None,
6601                content_index: 0,
6602                delta: "display-text".to_string(),
6603            });
6604        let _ = session.append_realtime_transcript_event(
6605            RealtimeTranscriptEvent::AssistantTranscriptDelta {
6606                response_id: "resp_a".to_string(),
6607                delta_id: "delta_s_1".to_string(),
6608                item_id: "item_spoken".to_string(),
6609                previous_item_id: None,
6610                content_index: 0,
6611                delta: "spoken-transcript".to_string(),
6612            },
6613        );
6614
6615        let outcome = session.append_realtime_transcript_event(
6616            RealtimeTranscriptEvent::AssistantTurnInterrupted {
6617                response_id: "resp_a".to_string(),
6618            },
6619        );
6620        // Display materializes, Spoken does not.
6621        assert_eq!(outcome.materialized_messages.len(), 1);
6622
6623        let messages = session.messages();
6624        let assistants: Vec<&BlockAssistantMessage> = messages
6625            .iter()
6626            .filter_map(|m| match m {
6627                Message::BlockAssistant(a) => Some(a),
6628                _ => None,
6629            })
6630            .collect();
6631        assert_eq!(assistants.len(), 1);
6632        // Single Text block (the Display lane) — no Transcript.
6633        assert_eq!(assistants[0].blocks.len(), 1);
6634        match &assistants[0].blocks[0] {
6635            AssistantBlock::Text { text, .. } => assert_eq!(text, "display-text"),
6636            other => unreachable!("expected Text, got {other:?}"),
6637        }
6638    }
6639
6640    #[test]
6641    fn round5_r55_barge_in_finalizes_retained_display_into_committed_block() {
6642        // R5-5: the architectural decision — Interrupted is terminal for the
6643        // response. Display lane must commit at Interrupted time, not wait
6644        // on a hypothetical AssistantTurnCompleted that may never arrive
6645        // (or arrives Cancelled and short-circuits).
6646        let mut session = Session::new();
6647
6648        let _ =
6649            session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
6650                response_id: "resp_a".to_string(),
6651                delta_id: "delta_d_1".to_string(),
6652                item_id: "item_display".to_string(),
6653                previous_item_id: None,
6654                content_index: 0,
6655                delta: "committed-display-text".to_string(),
6656            });
6657
6658        // Pre-condition: nothing committed yet.
6659        assert!(session.messages().is_empty());
6660
6661        let outcome = session.append_realtime_transcript_event(
6662            RealtimeTranscriptEvent::AssistantTurnInterrupted {
6663                response_id: "resp_a".to_string(),
6664            },
6665        );
6666        assert_eq!(
6667            outcome.materialized_messages.len(),
6668            1,
6669            "Interrupted must finalize retained Display lane immediately"
6670        );
6671
6672        // Post-condition: BlockAssistant in canonical history, no Transcript.
6673        let messages = session.messages();
6674        assert_eq!(messages.len(), 1);
6675        match &messages[0] {
6676            Message::BlockAssistant(assistant) => {
6677                assert_eq!(assistant.blocks.len(), 1);
6678                match &assistant.blocks[0] {
6679                    AssistantBlock::Text { text, .. } => {
6680                        assert_eq!(text, "committed-display-text");
6681                    }
6682                    other => unreachable!("expected Text, got {other:?}"),
6683                }
6684            }
6685            other => unreachable!("expected BlockAssistant, got {other:?}"),
6686        }
6687    }
6688
6689    #[test]
6690    fn round5_r56_truncation_promotes_default_lane_item_to_spoken() {
6691        // R5-6: when truncation is the first content-bearing event for an
6692        // item (no prior delta), the staged item's lane MUST be promoted to
6693        // Spoken so the materializer commits as `AssistantBlock::Transcript`.
6694        // Without the explicit promotion, the lane stays `Display` (the
6695        // default) and the heard audio transcript persists as
6696        // `AssistantBlock::Text`.
6697        let mut session = Session::new();
6698
6699        let _ = session.append_realtime_transcript_event(
6700            RealtimeTranscriptEvent::AssistantTranscriptTruncated {
6701                response_id: "resp_a".to_string(),
6702                item_id: "item_a".to_string(),
6703                content_index: 0,
6704                text: "what was actually heard".to_string(),
6705            },
6706        );
6707
6708        let outcome = session.append_realtime_transcript_event(
6709            RealtimeTranscriptEvent::AssistantTurnCompleted {
6710                response_id: "resp_a".to_string(),
6711                stop_reason: StopReason::EndTurn,
6712                usage: Usage::default(),
6713            },
6714        );
6715        assert_eq!(outcome.materialized_messages.len(), 1);
6716
6717        assert_eq!(session.messages().len(), 1);
6718        match &session.messages()[0] {
6719            Message::BlockAssistant(assistant) => {
6720                assert_eq!(assistant.blocks.len(), 1);
6721                match &assistant.blocks[0] {
6722                    AssistantBlock::Transcript { text, source, .. } => {
6723                        assert_eq!(text, "what was actually heard");
6724                        assert_eq!(*source, crate::types::TranscriptSource::Spoken);
6725                    }
6726                    other => unreachable!(
6727                        "truncation-only path must materialize as AssistantBlock::Transcript, got {other:?}"
6728                    ),
6729                }
6730            }
6731            other => unreachable!("expected BlockAssistant, got {other:?}"),
6732        }
6733    }
6734
6735    #[test]
6736    fn round5_r56_truncation_after_display_delta_is_no_op_keeping_display_content() {
6737        // R5-6 edge case: a Display delta arrived first and staged Display
6738        // content; a truncation event arrives for the SAME item id
6739        // (provider bug — truncation only applies to spoken/audio output).
6740        // Contract: the staged Display content must NOT be clobbered by
6741        // the truncation text. `promote_item_lane` keeps the existing
6742        // Display lane and emits a `tracing::warn!`; the truncation arm
6743        // sees the lane stayed Display and skips the segment-write.
6744        let mut session = Session::new();
6745
6746        let _ =
6747            session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
6748                response_id: "resp_a".to_string(),
6749                delta_id: "delta_d_1".to_string(),
6750                item_id: "item_a".to_string(),
6751                previous_item_id: None,
6752                content_index: 0,
6753                delta: "display-text-from-delta".to_string(),
6754            });
6755
6756        let _ = session.append_realtime_transcript_event(
6757            RealtimeTranscriptEvent::AssistantTranscriptTruncated {
6758                response_id: "resp_a".to_string(),
6759                item_id: "item_a".to_string(),
6760                content_index: 0,
6761                text: "spoken-truncation-text".to_string(),
6762            },
6763        );
6764
6765        let _ = session.append_realtime_transcript_event(
6766            RealtimeTranscriptEvent::AssistantTurnCompleted {
6767                response_id: "resp_a".to_string(),
6768                stop_reason: StopReason::EndTurn,
6769                usage: Usage::default(),
6770            },
6771        );
6772
6773        // Display content survives unchanged — the truncation text was
6774        // refused. Materializes as `AssistantBlock::Text` (Display lane).
6775        assert_eq!(session.messages().len(), 1);
6776        match &session.messages()[0] {
6777            Message::BlockAssistant(assistant) => {
6778                assert_eq!(assistant.blocks.len(), 1);
6779                match &assistant.blocks[0] {
6780                    AssistantBlock::Text { text, .. } => {
6781                        assert_eq!(text, "display-text-from-delta");
6782                    }
6783                    other => unreachable!(
6784                        "Display content must survive misrouted truncation, got {other:?}"
6785                    ),
6786                }
6787            }
6788            other => unreachable!("expected BlockAssistant, got {other:?}"),
6789        }
6790    }
6791
6792    /// R5-6 sibling: a Spoken-classified item (transcript-truncation
6793    /// arrived first and locked the lane to Spoken) must reject a later
6794    /// `AssistantTextDelta` rather than silently appending the Display
6795    /// text into the Spoken-locked content_segment. Pre-fix the delta
6796    /// arm called `promote_item_lane` and unconditionally pushed the
6797    /// delta — clobbering the lane invariant. Post-fix the delta is
6798    /// dropped (warn fires) and the Spoken-truncation text survives.
6799    #[test]
6800    fn round5_r56_sibling_display_delta_skipped_on_spoken_item() {
6801        let mut session = Session::new();
6802
6803        // Truncation arrives first and locks the item to the Spoken lane.
6804        let _ = session.append_realtime_transcript_event(
6805            RealtimeTranscriptEvent::AssistantTranscriptTruncated {
6806                response_id: "resp_a".to_string(),
6807                item_id: "item_a".to_string(),
6808                content_index: 0,
6809                text: "what was actually heard".to_string(),
6810            },
6811        );
6812
6813        // A Display delta arrives later for the SAME item id (provider
6814        // lane-classification bug). It MUST be dropped.
6815        let _ =
6816            session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
6817                response_id: "resp_a".to_string(),
6818                delta_id: "delta_d_1".to_string(),
6819                item_id: "item_a".to_string(),
6820                previous_item_id: None,
6821                content_index: 0,
6822                delta: "should-not-appear".to_string(),
6823            });
6824
6825        let _ = session.append_realtime_transcript_event(
6826            RealtimeTranscriptEvent::AssistantTurnCompleted {
6827                response_id: "resp_a".to_string(),
6828                stop_reason: StopReason::EndTurn,
6829                usage: Usage::default(),
6830            },
6831        );
6832
6833        // The Spoken-truncation text survives intact; no Display text
6834        // leaked into the Spoken lane content.
6835        assert_eq!(session.messages().len(), 1);
6836        match &session.messages()[0] {
6837            Message::BlockAssistant(assistant) => {
6838                assert_eq!(assistant.blocks.len(), 1);
6839                match &assistant.blocks[0] {
6840                    AssistantBlock::Transcript { text, source, .. } => {
6841                        assert_eq!(text, "what was actually heard");
6842                        assert_eq!(*source, crate::types::TranscriptSource::Spoken);
6843                    }
6844                    other => unreachable!(
6845                        "Spoken-locked item must materialize as Transcript, got {other:?}"
6846                    ),
6847                }
6848            }
6849            other => unreachable!("expected BlockAssistant, got {other:?}"),
6850        }
6851    }
6852
6853    /// R5-6 sibling: a Display-classified item (a Display delta arrived
6854    /// first and locked the lane to Display) must reject a later
6855    /// `AssistantTranscriptDelta` rather than appending the Spoken text
6856    /// into the Display-locked content_segment. Pre-fix the transcript
6857    /// delta arm called `promote_item_lane` and unconditionally pushed —
6858    /// silently mixing a Spoken stream into a Display block.
6859    #[test]
6860    fn round5_r56_sibling_spoken_delta_skipped_on_display_item() {
6861        let mut session = Session::new();
6862
6863        // Display delta arrives first and locks the item to the Display lane.
6864        let _ =
6865            session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
6866                response_id: "resp_a".to_string(),
6867                delta_id: "delta_d_1".to_string(),
6868                item_id: "item_a".to_string(),
6869                previous_item_id: None,
6870                content_index: 0,
6871                delta: "display-locked-text".to_string(),
6872            });
6873
6874        // A spoken-transcript delta arrives later for the SAME item id
6875        // (provider lane-classification bug). It MUST be dropped.
6876        let _ = session.append_realtime_transcript_event(
6877            RealtimeTranscriptEvent::AssistantTranscriptDelta {
6878                response_id: "resp_a".to_string(),
6879                delta_id: "delta_s_1".to_string(),
6880                item_id: "item_a".to_string(),
6881                previous_item_id: None,
6882                content_index: 0,
6883                delta: "should-not-appear".to_string(),
6884            },
6885        );
6886
6887        let _ = session.append_realtime_transcript_event(
6888            RealtimeTranscriptEvent::AssistantTurnCompleted {
6889                response_id: "resp_a".to_string(),
6890                stop_reason: StopReason::EndTurn,
6891                usage: Usage::default(),
6892            },
6893        );
6894
6895        // The Display text survives intact; no Spoken text leaked in.
6896        assert_eq!(session.messages().len(), 1);
6897        match &session.messages()[0] {
6898            Message::BlockAssistant(assistant) => {
6899                assert_eq!(assistant.blocks.len(), 1);
6900                match &assistant.blocks[0] {
6901                    AssistantBlock::Text { text, .. } => {
6902                        assert_eq!(text, "display-locked-text");
6903                    }
6904                    other => {
6905                        unreachable!("Display-locked item must materialize as Text, got {other:?}")
6906                    }
6907                }
6908            }
6909            other => unreachable!("expected BlockAssistant, got {other:?}"),
6910        }
6911    }
6912
6913    /// R5-7: a late `AssistantTranscriptFinalText` arriving AFTER
6914    /// `AssistantTurnCompleted` already materialized the item must NOT
6915    /// mutate `content_segments` and must NOT rewrite the canonical
6916    /// `Message::BlockAssistant` (append-only history is a stronger
6917    /// invariant than typed text repair). The committed message keeps
6918    /// the delta-accumulated text; the late final is dropped with a
6919    /// warn; the materializer outcome is inert (no new messages).
6920    #[test]
6921    fn round5_r57_late_final_text_after_turn_completed_warns_and_skips() {
6922        let mut session = Session::new();
6923
6924        // Delta accumulates partial text on the Spoken lane.
6925        let _ = session.append_realtime_transcript_event(
6926            RealtimeTranscriptEvent::AssistantTranscriptDelta {
6927                response_id: "resp_a".to_string(),
6928                delta_id: "delta_s_1".to_string(),
6929                item_id: "item_a".to_string(),
6930                previous_item_id: None,
6931                content_index: 0,
6932                delta: "delta-accumulated".to_string(),
6933            },
6934        );
6935
6936        // TurnCompleted materializes the item with the delta-accumulated text.
6937        let commit_outcome = session.append_realtime_transcript_event(
6938            RealtimeTranscriptEvent::AssistantTurnCompleted {
6939                response_id: "resp_a".to_string(),
6940                stop_reason: StopReason::EndTurn,
6941                usage: Usage::default(),
6942            },
6943        );
6944        assert_eq!(commit_outcome.materialized_messages.len(), 1);
6945
6946        // Late FinalText arrives — provider-side ordering bug. It MUST
6947        // be dropped: no canonical message rewrite, no segment mutation,
6948        // outcome is inert.
6949        let late_outcome = session.append_realtime_transcript_event(
6950            RealtimeTranscriptEvent::AssistantTranscriptFinalText {
6951                response_id: "resp_a".to_string(),
6952                item_id: "item_a".to_string(),
6953                content_index: 0,
6954                text: "authoritative-final-that-must-not-land".to_string(),
6955            },
6956        );
6957        assert!(
6958            late_outcome.is_inert(),
6959            "late FinalText after materialization must produce inert outcome"
6960        );
6961
6962        // Canonical history: still one message with the original
6963        // delta-accumulated text — NOT the authoritative final.
6964        assert_eq!(session.messages().len(), 1);
6965        match &session.messages()[0] {
6966            Message::BlockAssistant(assistant) => {
6967                assert_eq!(assistant.blocks.len(), 1);
6968                match &assistant.blocks[0] {
6969                    AssistantBlock::Transcript { text, .. } => {
6970                        assert_eq!(
6971                            text, "delta-accumulated",
6972                            "canonical message must preserve delta-accumulated text; \
6973                             append-only history forbids late FinalText repair"
6974                        );
6975                    }
6976                    other => unreachable!("expected Transcript, got {other:?}"),
6977                }
6978            }
6979            other => unreachable!("expected BlockAssistant, got {other:?}"),
6980        }
6981    }
6982}