1use crate::Provider;
13use crate::peer_meta::PeerMeta;
14use crate::realtime_transcript::{
15 RealtimeTranscriptApplyOutcome, RealtimeTranscriptEvent, RealtimeTranscriptMaterializedMessage,
16 RealtimeTranscriptRole, SESSION_REALTIME_TRANSCRIPT_STATE_KEY, TranscriptLane,
17};
18use crate::service::{AppendSystemContextRequest, MobToolAuthorityContext};
19use crate::time_compat::SystemTime;
20use crate::tool_scope::ToolFilter;
21use crate::types::{
22 AssistantBlock, BlockAssistantMessage, ContentBlock, ContentInput, Message, SessionId,
23 StopReason, ToolDef, ToolProvenance, ToolResult, Usage, UserMessage,
24};
25use serde::{Deserialize, Deserializer, Serialize, Serializer};
26use std::collections::{BTreeMap, BTreeSet, HashMap};
27use std::sync::Arc;
28
29pub const SESSION_VERSION: u32 = 2;
41
42pub const SESSION_METADATA_SCHEMA_VERSION: u32 = 2;
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
57#[serde(tag = "type", rename_all = "snake_case")]
58pub enum TranscriptReplacement {
59 Message { message: Message },
61 UserContentBlock {
63 block_index: usize,
64 block: ContentBlock,
65 },
66 AssistantBlock {
68 block_index: usize,
69 block: AssistantBlock,
70 },
71 ToolResultContentBlock {
73 result_index: usize,
74 block_index: usize,
75 block: ContentBlock,
76 },
77}
78
79#[derive(Debug, Clone, thiserror::Error)]
81pub enum TranscriptEditError {
82 #[error("message index {message_index} out of bounds for {message_count} messages")]
83 MessageIndexOutOfBounds {
84 message_index: usize,
85 message_count: usize,
86 },
87 #[error("{block_kind} index {block_index} out of bounds for {block_count} blocks")]
88 BlockIndexOutOfBounds {
89 block_kind: &'static str,
90 block_index: usize,
91 block_count: usize,
92 },
93 #[error("replacement expected {expected} at message index {message_index}, found {actual}")]
94 MessageRoleMismatch {
95 message_index: usize,
96 expected: &'static str,
97 actual: &'static str,
98 },
99}
100
101fn message_role_name(message: &Message) -> &'static str {
102 match message {
103 Message::System(_) => "system",
104 Message::SystemNotice(_) => "system_notice",
105 Message::User(_) => "user",
106 Message::Assistant(_) => "assistant",
107 Message::BlockAssistant(_) => "block_assistant",
108 Message::ToolResults { .. } => "tool_results",
109 }
110}
111
112#[derive(Debug, Clone, Default, Serialize, Deserialize)]
113#[serde(rename_all = "snake_case")]
114struct SessionRealtimeTranscriptState {
115 #[serde(default)]
116 items: BTreeMap<String, RealtimeTranscriptItemState>,
117 #[serde(default)]
118 first_seen_order: Vec<String>,
119 #[serde(default)]
120 seen_delta_ids: BTreeSet<String>,
121 #[serde(default)]
122 assistant_completions: BTreeMap<String, RealtimeAssistantCompletion>,
123 #[serde(default, skip_serializing_if = "BTreeSet::is_empty")]
124 discarded_assistant_response_ids: BTreeSet<String>,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
128#[serde(rename_all = "snake_case")]
129struct RealtimeTranscriptItemState {
130 role: RealtimeTranscriptRole,
131 #[serde(default)]
132 previous_item_id: Option<String>,
133 #[serde(default)]
134 response_id: Option<String>,
135 #[serde(default)]
136 content_segments: BTreeMap<u32, String>,
137 #[serde(default)]
138 skipped: bool,
139 #[serde(default)]
140 ready: bool,
141 #[serde(default)]
142 materialized: bool,
143 #[serde(default)]
149 lane: TranscriptLane,
150}
151
152impl RealtimeTranscriptItemState {
153 fn new(
154 role: RealtimeTranscriptRole,
155 previous_item_id: Option<String>,
156 response_id: Option<String>,
157 ) -> Self {
158 Self {
159 role,
160 previous_item_id,
161 response_id,
162 content_segments: BTreeMap::new(),
163 skipped: false,
164 ready: false,
165 materialized: false,
166 lane: TranscriptLane::Display,
167 }
168 }
169
170 fn skipped(previous_item_id: Option<String>) -> Self {
171 Self {
172 role: RealtimeTranscriptRole::Assistant,
173 previous_item_id,
174 response_id: None,
175 content_segments: BTreeMap::new(),
176 skipped: true,
177 ready: true,
178 materialized: false,
179 lane: TranscriptLane::Display,
180 }
181 }
182
183 fn text(&self) -> String {
184 self.content_segments.values().cloned().collect()
185 }
186}
187
188#[derive(Debug, Clone, Serialize, Deserialize)]
189#[serde(rename_all = "snake_case")]
190struct RealtimeAssistantCompletion {
191 stop_reason: StopReason,
192 usage: Usage,
193 usage_consumed: bool,
194}
195
196#[derive(Debug, Clone)]
200pub struct Session {
201 version: u32,
203 id: SessionId,
205 pub(crate) messages: Arc<Vec<Message>>,
207 created_at: SystemTime,
209 updated_at: SystemTime,
211 metadata: serde_json::Map<String, serde_json::Value>,
213 usage: Usage,
215}
216
217#[derive(Serialize, Deserialize)]
219#[serde(rename_all = "snake_case")]
220struct SessionSerde {
221 #[serde(default = "default_version")]
222 version: u32,
223 id: SessionId,
224 messages: Vec<Message>,
225 created_at: SystemTime,
226 updated_at: SystemTime,
227 #[serde(default)]
228 metadata: serde_json::Map<String, serde_json::Value>,
229 #[serde(default)]
230 usage: Usage,
231}
232
233impl Serialize for Session {
234 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
235 where
236 S: Serializer,
237 {
238 let serde_repr = SessionSerde {
239 version: self.version,
240 id: self.id.clone(),
241 messages: (*self.messages).clone(),
242 created_at: self.created_at,
243 updated_at: self.updated_at,
244 metadata: self.metadata.clone(),
245 usage: self.usage.clone(),
246 };
247 serde_repr.serialize(serializer)
248 }
249}
250
251impl<'de> Deserialize<'de> for Session {
252 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
253 where
254 D: Deserializer<'de>,
255 {
256 let serde_repr = SessionSerde::deserialize(deserializer)?;
257 Ok(Session {
258 version: serde_repr.version,
259 id: serde_repr.id,
260 messages: Arc::new(serde_repr.messages),
261 created_at: serde_repr.created_at,
262 updated_at: serde_repr.updated_at,
263 metadata: serde_repr.metadata,
264 usage: serde_repr.usage,
265 })
266 }
267}
268
269fn default_version() -> u32 {
270 SESSION_VERSION
271}
272
273pub const SESSION_SYSTEM_CONTEXT_STATE_KEY: &str = "session_system_context_state";
275
276pub const SESSION_DEFERRED_TURN_STATE_KEY: &str = "session_deferred_turn_state";
278
279pub const SESSION_BUILD_STATE_KEY: &str = "session_build_state";
281
282pub const SESSION_TOOL_VISIBILITY_STATE_KEY: &str = "session_tool_visibility_state_v1";
284
285pub const VIEW_IMAGE_TOOL_NAME: &str = "view_image";
287
288pub const SYSTEM_CONTEXT_SEPARATOR: &str = "\n\n---\n\n";
290
291#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
293#[serde(rename_all = "snake_case")]
294pub struct SessionSystemContextState {
295 #[serde(default, skip_serializing_if = "Vec::is_empty")]
296 pub pending: Vec<PendingSystemContextAppend>,
297 #[serde(default, skip_serializing_if = "Vec::is_empty")]
298 pub applied: Vec<PendingSystemContextAppend>,
299 #[serde(default, skip_serializing_if = "std::collections::BTreeMap::is_empty")]
300 pub seen: std::collections::BTreeMap<String, SeenSystemContextKey>,
301}
302
303#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
305#[serde(rename_all = "snake_case")]
306pub struct PendingSystemContextAppend {
307 pub text: String,
308 #[serde(default, skip_serializing_if = "Option::is_none")]
309 pub source: Option<String>,
310 #[serde(default, skip_serializing_if = "Option::is_none")]
311 pub idempotency_key: Option<String>,
312 pub accepted_at: SystemTime,
313}
314
315#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
317#[serde(rename_all = "snake_case")]
318pub struct SessionDeferredTurnState {
319 #[serde(default, skip_serializing_if = "DeferredFirstTurnPhase::is_inactive")]
320 pub first_turn_phase: DeferredFirstTurnPhase,
321 #[serde(default, skip_serializing_if = "Option::is_none")]
322 pub pending_initial_prompt: Option<PendingDeferredPrompt>,
323 #[serde(default, skip_serializing_if = "Vec::is_empty")]
324 pub pending_tool_results: Vec<PendingToolResultsMessage>,
325}
326
327#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq)]
329#[serde(rename_all = "snake_case")]
330pub enum DeferredFirstTurnPhase {
331 #[default]
333 Inactive,
334 Pending,
336 Consumed,
338}
339
340impl DeferredFirstTurnPhase {
341 pub fn is_inactive(&self) -> bool {
342 matches!(self, Self::Inactive)
343 }
344}
345
346fn is_default_hook_run_overrides(value: &crate::HookRunOverrides) -> bool {
347 value == &crate::HookRunOverrides::default()
348}
349
350fn is_default_call_timeout_override(value: &crate::CallTimeoutOverride) -> bool {
351 value == &crate::CallTimeoutOverride::default()
352}
353
354fn is_tool_filter_all(value: &ToolFilter) -> bool {
355 matches!(value, ToolFilter::All)
356}
357
358fn is_zero(value: &u64) -> bool {
359 *value == 0
360}
361
362pub fn capability_base_filter_for_image_tool_results(image_tool_results: bool) -> ToolFilter {
364 if image_tool_results {
365 ToolFilter::All
366 } else {
367 ToolFilter::Deny([VIEW_IMAGE_TOOL_NAME.to_string()].into_iter().collect())
368 }
369}
370
371#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
373#[serde(rename_all = "snake_case")]
374pub struct ToolVisibilityWitness {
375 #[serde(default, skip_serializing_if = "Option::is_none")]
376 pub stable_owner_key: Option<String>,
377 #[serde(default, skip_serializing_if = "Option::is_none")]
378 pub last_seen_provenance: Option<ToolProvenance>,
379}
380
381impl ToolVisibilityWitness {
382 pub fn has_identity_witness(&self) -> bool {
383 self.stable_owner_key.is_some() || self.last_seen_provenance.is_some()
384 }
385
386 pub fn has_provenance_identity_witness(&self) -> bool {
387 self.last_seen_provenance.is_some()
388 }
389}
390
391#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
397#[serde(rename_all = "snake_case")]
398pub struct DeferredToolLoadAuthority {
399 pub name: String,
400 pub witness: ToolVisibilityWitness,
401}
402
403impl DeferredToolLoadAuthority {
404 pub fn new(name: impl Into<String>, witness: ToolVisibilityWitness) -> Self {
405 Self {
406 name: name.into(),
407 witness,
408 }
409 }
410
411 pub fn into_parts(self) -> (String, ToolVisibilityWitness) {
412 (self.name, self.witness)
413 }
414}
415
416#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
419#[serde(rename_all = "snake_case")]
420pub struct WitnessedToolFilter {
421 pub filter: ToolFilter,
422 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
423 pub witnesses: BTreeMap<String, ToolVisibilityWitness>,
424}
425
426impl WitnessedToolFilter {
427 pub fn new(filter: ToolFilter, witnesses: BTreeMap<String, ToolVisibilityWitness>) -> Self {
428 Self { filter, witnesses }
429 }
430
431 pub fn into_parts(self) -> (ToolFilter, BTreeMap<String, ToolVisibilityWitness>) {
432 (self.filter, self.witnesses)
433 }
434}
435
436#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
438#[serde(rename_all = "snake_case")]
439pub struct SessionToolVisibilityState {
440 #[serde(default, skip_serializing_if = "is_tool_filter_all")]
441 pub capability_base_filter: ToolFilter,
442 #[serde(default, skip_serializing_if = "is_tool_filter_all")]
443 pub inherited_base_filter: ToolFilter,
444 #[serde(default, skip_serializing_if = "is_tool_filter_all")]
445 pub active_filter: ToolFilter,
446 #[serde(default, skip_serializing_if = "is_tool_filter_all")]
447 pub staged_filter: ToolFilter,
448 #[serde(default, skip_serializing_if = "BTreeSet::is_empty")]
449 pub active_requested_deferred_names: BTreeSet<String>,
450 #[serde(default, skip_serializing_if = "BTreeSet::is_empty")]
451 pub staged_requested_deferred_names: BTreeSet<String>,
452 #[serde(default, skip_serializing_if = "is_zero")]
453 pub active_revision: u64,
454 #[serde(default, skip_serializing_if = "is_zero")]
455 pub staged_revision: u64,
456 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
457 pub requested_witnesses: BTreeMap<String, ToolVisibilityWitness>,
458 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
459 pub filter_witnesses: BTreeMap<String, ToolVisibilityWitness>,
460}
461
462#[derive(Debug, Clone, Serialize, Deserialize, Default)]
465#[serde(rename_all = "snake_case")]
466pub struct SessionBuildState {
467 #[serde(default, skip_serializing_if = "Option::is_none")]
468 pub system_prompt: Option<String>,
469 #[serde(default, skip_serializing_if = "Option::is_none")]
470 pub output_schema: Option<crate::OutputSchema>,
471 #[serde(default, skip_serializing_if = "is_default_hook_run_overrides")]
472 pub hooks_override: crate::HookRunOverrides,
473 #[serde(default, skip_serializing_if = "Option::is_none")]
474 pub budget_limits: Option<crate::BudgetLimits>,
475 #[serde(default, skip_serializing_if = "Vec::is_empty")]
476 pub recoverable_tool_defs: Vec<ToolDef>,
477 #[serde(default, skip_serializing_if = "Vec::is_empty")]
478 pub silent_comms_intents: Vec<String>,
479 #[serde(default, skip_serializing_if = "Option::is_none")]
480 pub max_inline_peer_notifications: Option<i32>,
481 #[serde(default, skip_serializing_if = "Option::is_none")]
482 pub app_context: Option<serde_json::Value>,
483 #[serde(default, skip_serializing_if = "Option::is_none")]
484 pub additional_instructions: Option<Vec<String>>,
485 #[serde(default, skip_serializing_if = "Option::is_none")]
486 pub shell_env: Option<HashMap<String, String>>,
487 #[serde(default, skip_serializing_if = "Option::is_none")]
488 pub mob_tool_authority_context: Option<MobToolAuthorityContext>,
489 #[serde(default, skip_serializing_if = "is_default_call_timeout_override")]
490 pub call_timeout_override: crate::CallTimeoutOverride,
491}
492
493#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
495#[serde(rename_all = "snake_case")]
496pub struct PendingDeferredPrompt {
497 pub prompt: ContentInput,
498 pub accepted_at: SystemTime,
499}
500
501#[derive(Debug, Clone, Serialize, Deserialize)]
503#[serde(rename_all = "snake_case")]
504pub struct PendingToolResultsMessage {
505 pub results: Vec<ToolResult>,
506 pub accepted_at: SystemTime,
507}
508
509impl PartialEq for PendingToolResultsMessage {
510 fn eq(&self, other: &Self) -> bool {
511 self.accepted_at == other.accepted_at
512 && serde_json::to_value(&self.results).ok() == serde_json::to_value(&other.results).ok()
513 }
514}
515
516#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
518#[serde(rename_all = "snake_case")]
519pub struct SeenSystemContextKey {
520 pub text: String,
521 #[serde(default, skip_serializing_if = "Option::is_none")]
522 pub source: Option<String>,
523 pub state: SeenSystemContextState,
524}
525
526#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
528#[serde(rename_all = "snake_case")]
529pub enum SeenSystemContextState {
530 Pending,
531 Applied,
532}
533
534impl SessionSystemContextState {
535 pub fn stage_append(
537 &mut self,
538 req: &AppendSystemContextRequest,
539 accepted_at: SystemTime,
540 ) -> Result<crate::service::AppendSystemContextStatus, SystemContextStageError> {
541 let text = req.text.trim();
542 if text.is_empty() {
543 return Err(SystemContextStageError::InvalidRequest(
544 "system context text must not be empty".to_string(),
545 ));
546 }
547
548 if let Some(key) = req.idempotency_key.as_ref() {
549 match self.seen.get(key) {
550 Some(existing)
551 if existing.text == text
552 && existing.source.as_deref() == req.source.as_deref() =>
553 {
554 return Ok(crate::service::AppendSystemContextStatus::Duplicate);
555 }
556 Some(existing) => {
557 return Err(SystemContextStageError::Conflict {
558 key: key.clone(),
559 existing_text: existing.text.clone(),
560 existing_source: existing.source.clone(),
561 });
562 }
563 None => {}
564 }
565 }
566
567 let append = PendingSystemContextAppend {
568 text: text.to_string(),
569 source: req.source.clone(),
570 idempotency_key: req.idempotency_key.clone(),
571 accepted_at,
572 };
573 if let Some(key) = req.idempotency_key.as_ref() {
574 self.seen.insert(
575 key.clone(),
576 SeenSystemContextKey {
577 text: append.text.clone(),
578 source: append.source.clone(),
579 state: SeenSystemContextState::Pending,
580 },
581 );
582 }
583 self.pending.push(append);
584 Ok(crate::service::AppendSystemContextStatus::Staged)
585 }
586
587 pub fn mark_pending_applied(&mut self) {
589 for pending in &self.pending {
590 if !self.applied.contains(pending) {
591 self.applied.push(pending.clone());
592 }
593 }
594 for pending in &self.pending {
595 if let Some(key) = pending.idempotency_key.as_ref()
596 && let Some(seen) = self.seen.get_mut(key)
597 {
598 seen.state = SeenSystemContextState::Applied;
599 }
600 }
601 self.pending.clear();
602 }
603}
604
605impl SessionDeferredTurnState {
606 pub fn mark_initial_turn_pending(&mut self) {
608 self.first_turn_phase = DeferredFirstTurnPhase::Pending;
609 }
610
611 pub fn mark_initial_turn_started(&mut self) -> bool {
615 let was_pending = matches!(self.first_turn_phase, DeferredFirstTurnPhase::Pending);
616 if was_pending {
617 self.first_turn_phase = DeferredFirstTurnPhase::Consumed;
618 }
619 was_pending
620 }
621
622 pub fn restore_initial_turn_pending(&mut self) {
624 self.first_turn_phase = DeferredFirstTurnPhase::Pending;
625 }
626
627 pub fn allows_initial_turn_overrides(&self) -> bool {
629 matches!(self.first_turn_phase, DeferredFirstTurnPhase::Pending)
630 }
631
632 pub fn stage_initial_prompt(&mut self, prompt: ContentInput, accepted_at: SystemTime) {
634 if !prompt.has_images() && prompt.text_content().trim().is_empty() {
635 self.pending_initial_prompt = None;
636 return;
637 }
638
639 self.pending_initial_prompt = Some(PendingDeferredPrompt {
640 prompt,
641 accepted_at,
642 });
643 }
644
645 pub fn stage_tool_results(
647 &mut self,
648 results: Vec<ToolResult>,
649 accepted_at: SystemTime,
650 ) -> usize {
651 if results.is_empty() {
652 return 0;
653 }
654
655 let accepted = results.len();
656 self.pending_tool_results.push(PendingToolResultsMessage {
657 results,
658 accepted_at,
659 });
660 accepted
661 }
662
663 pub fn take_initial_prompt(&mut self) -> Option<ContentInput> {
665 self.pending_initial_prompt
666 .take()
667 .map(|pending| pending.prompt)
668 }
669
670 pub fn take_tool_results(&mut self) -> Vec<PendingToolResultsMessage> {
672 std::mem::take(&mut self.pending_tool_results)
673 }
674
675 pub fn has_pending_tool_results(&self) -> bool {
677 !self.pending_tool_results.is_empty()
678 }
679}
680
681#[derive(Debug, Clone, PartialEq, Eq)]
683pub enum SystemContextStageError {
684 InvalidRequest(String),
685 Conflict {
686 key: String,
687 existing_text: String,
688 existing_source: Option<String>,
689 },
690}
691
692fn render_system_context_block(append: &PendingSystemContextAppend) -> String {
693 let mut rendered = String::from("[Runtime System Context]");
694 if let Some(source) = &append.source {
695 rendered.push_str("\nsource: ");
696 rendered.push_str(source);
697 }
698 rendered.push_str("\n\n");
699 rendered.push_str(&append.text);
700 rendered
701}
702
703fn seen_system_context_matches(
704 seen: &SeenSystemContextKey,
705 append: &PendingSystemContextAppend,
706) -> bool {
707 seen.text == append.text && seen.source.as_deref() == append.source.as_deref()
708}
709
710fn pending_system_context_matches(
711 existing: &PendingSystemContextAppend,
712 append: &PendingSystemContextAppend,
713) -> bool {
714 existing.text == append.text && existing.source.as_deref() == append.source.as_deref()
715}
716
717impl Session {
718 pub fn new() -> Self {
720 let now = SystemTime::now();
721 Self {
722 version: SESSION_VERSION,
723 id: SessionId::new(),
724 messages: Arc::new(Vec::new()),
725 created_at: now,
726 updated_at: now,
727 metadata: serde_json::Map::new(),
728 usage: Usage::default(),
729 }
730 }
731
732 pub fn with_id(id: SessionId) -> Self {
734 let mut session = Self::new();
735 session.id = id;
736 session
737 }
738
739 pub fn id(&self) -> &SessionId {
741 &self.id
742 }
743
744 pub fn version(&self) -> u32 {
746 self.version
747 }
748
749 pub fn messages(&self) -> &[Message] {
751 &self.messages
752 }
753
754 pub(crate) fn messages_mut_internal(&mut self) -> &mut Vec<Message> {
765 Arc::make_mut(&mut self.messages)
766 }
767
768 pub fn created_at(&self) -> SystemTime {
770 self.created_at
771 }
772
773 pub fn updated_at(&self) -> SystemTime {
775 self.updated_at
776 }
777
778 pub fn push(&mut self, message: Message) {
782 Arc::make_mut(&mut self.messages).push(message);
783 self.updated_at = SystemTime::now();
784 }
785
786 pub fn push_batch(&mut self, messages: Vec<Message>) {
790 if messages.is_empty() {
791 return;
792 }
793 let inner = Arc::make_mut(&mut self.messages);
794 inner.extend(messages);
795 self.updated_at = SystemTime::now();
796 }
797
798 pub async fn externalize_media(
809 &mut self,
810 blob_store: &dyn crate::BlobStore,
811 start: usize,
812 ) -> Result<(), crate::blob::BlobStoreError> {
813 let messages = Arc::make_mut(&mut self.messages);
814 crate::image_content::externalize_messages_from(blob_store, messages, start).await
815 }
816
817 pub fn touch(&mut self) {
821 self.updated_at = SystemTime::now();
822 }
823
824 pub fn has_pending_boundary(&self) -> bool {
830 self.messages
831 .last()
832 .is_some_and(|m| matches!(m, Message::User(_) | Message::ToolResults { .. }))
833 }
834
835 pub fn last_n(&self, n: usize) -> &[Message] {
837 let start = self.messages.len().saturating_sub(n);
838 &self.messages[start..]
839 }
840
841 pub fn total_tokens(&self) -> u64 {
843 self.usage.total_tokens()
844 }
845
846 pub fn total_usage(&self) -> Usage {
848 self.usage.clone()
849 }
850
851 pub fn record_usage(&mut self, turn_usage: Usage) {
853 self.usage.add(&turn_usage);
854 self.updated_at = SystemTime::now();
855 }
856
857 pub fn append_external_user_content(&mut self, content: ContentInput) {
859 self.push(Message::User(UserMessage::with_blocks(
860 content.into_blocks(),
861 )));
862 }
863
864 pub fn append_external_assistant_blocks(
866 &mut self,
867 blocks: Vec<AssistantBlock>,
868 stop_reason: StopReason,
869 usage: Usage,
870 ) {
871 if !blocks.is_empty() {
872 self.push(Message::BlockAssistant(BlockAssistantMessage::new(
873 blocks,
874 stop_reason,
875 )));
876 }
877 if usage != Usage::default() {
878 self.record_usage(usage);
879 }
880 }
881
882 pub fn append_realtime_transcript_event(
890 &mut self,
891 event: RealtimeTranscriptEvent,
892 ) -> RealtimeTranscriptApplyOutcome {
893 let mut state = self.realtime_transcript_state();
894 match event {
895 RealtimeTranscriptEvent::ItemObserved {
896 item_id,
897 previous_item_id,
898 role,
899 response_id,
900 } => {
901 let response_id = normalize_realtime_optional_response_id(response_id);
902 if role == RealtimeTranscriptRole::Assistant
903 && response_id
904 .as_ref()
905 .is_some_and(|id| state.discarded_assistant_response_ids.contains(id))
906 {
907 observe_realtime_skipped_item(&mut state, item_id, previous_item_id);
908 } else {
909 observe_realtime_item(&mut state, item_id, previous_item_id, role, response_id);
910 }
911 }
912 RealtimeTranscriptEvent::ItemSkipped {
913 item_id,
914 previous_item_id,
915 } => {
916 observe_realtime_skipped_item(&mut state, item_id, previous_item_id);
917 }
918 RealtimeTranscriptEvent::UserTranscriptFinal {
919 item_id,
920 previous_item_id,
921 content_index,
922 text,
923 } => {
924 if let Some(item) = observe_realtime_item(
925 &mut state,
926 item_id,
927 previous_item_id,
928 RealtimeTranscriptRole::User,
929 None,
930 ) {
931 let segment = item.content_segments.entry(content_index).or_default();
932 if segment.is_empty() && !text.is_empty() {
933 *segment = text;
934 } else if !text.is_empty() && segment.as_str() != text {
935 tracing::warn!(
936 content_index,
937 "ignoring conflicting realtime user transcript segment replay"
938 );
939 }
940 item.ready = true;
941 }
942 }
943 RealtimeTranscriptEvent::AssistantTextDelta {
944 response_id,
945 delta_id,
946 item_id,
947 previous_item_id,
948 content_index,
949 delta,
950 } => {
951 let Some(response_id) = normalize_realtime_response_id(response_id) else {
952 return RealtimeTranscriptApplyOutcome::default();
953 };
954 if state
955 .discarded_assistant_response_ids
956 .contains(&response_id)
957 {
958 observe_realtime_skipped_item(&mut state, item_id, previous_item_id);
959 let outcome = self.materialize_realtime_transcript_ready_items(&mut state);
960 self.store_realtime_transcript_state(&state);
961 return outcome;
962 }
963 if !delta_id.trim().is_empty() && !state.seen_delta_ids.insert(delta_id) {
964 return RealtimeTranscriptApplyOutcome::default();
965 }
966 let response_completed = state.assistant_completions.contains_key(&response_id);
967 if let Some(item) = observe_realtime_item(
968 &mut state,
969 item_id,
970 previous_item_id,
971 RealtimeTranscriptRole::Assistant,
972 Some(response_id),
973 ) {
974 if promote_item_lane(item, TranscriptLane::Display) {
975 item.content_segments
976 .entry(content_index)
977 .or_default()
978 .push_str(&delta);
979 if response_completed && !item.text().is_empty() {
980 item.ready = true;
981 }
982 } else {
983 tracing::warn!(
990 "AssistantTextDelta routed to a Spoken-lane item; dropping delta to preserve lane invariant — this indicates a provider lane-classification bug"
991 );
992 }
993 }
994 }
995 RealtimeTranscriptEvent::AssistantTranscriptDelta {
996 response_id,
997 delta_id,
998 item_id,
999 previous_item_id,
1000 content_index,
1001 delta,
1002 } => {
1003 let Some(response_id) = normalize_realtime_response_id(response_id) else {
1009 return RealtimeTranscriptApplyOutcome::default();
1010 };
1011 if state
1012 .discarded_assistant_response_ids
1013 .contains(&response_id)
1014 {
1015 observe_realtime_skipped_item(&mut state, item_id, previous_item_id);
1016 let outcome = self.materialize_realtime_transcript_ready_items(&mut state);
1017 self.store_realtime_transcript_state(&state);
1018 return outcome;
1019 }
1020 if !delta_id.trim().is_empty() && !state.seen_delta_ids.insert(delta_id) {
1021 return RealtimeTranscriptApplyOutcome::default();
1022 }
1023 let response_completed = state.assistant_completions.contains_key(&response_id);
1024 if let Some(item) = observe_realtime_item(
1025 &mut state,
1026 item_id,
1027 previous_item_id,
1028 RealtimeTranscriptRole::Assistant,
1029 Some(response_id),
1030 ) {
1031 if promote_item_lane(item, TranscriptLane::Spoken) {
1032 item.content_segments
1033 .entry(content_index)
1034 .or_default()
1035 .push_str(&delta);
1036 if response_completed && !item.text().is_empty() {
1037 item.ready = true;
1038 }
1039 } else {
1040 tracing::warn!(
1046 "AssistantTranscriptDelta routed to a Display-lane item; dropping delta to preserve lane invariant — this indicates a provider lane-classification bug"
1047 );
1048 }
1049 }
1050 }
1051 RealtimeTranscriptEvent::AssistantTranscriptTruncated {
1052 response_id,
1053 item_id,
1054 content_index,
1055 text,
1056 } => {
1057 let Some(response_id) = normalize_realtime_response_id(response_id) else {
1058 return RealtimeTranscriptApplyOutcome::default();
1059 };
1060 if state
1061 .discarded_assistant_response_ids
1062 .contains(&response_id)
1063 {
1064 observe_realtime_skipped_item(&mut state, item_id, None);
1065 let outcome = self.materialize_realtime_transcript_ready_items(&mut state);
1066 self.store_realtime_transcript_state(&state);
1067 return outcome;
1068 }
1069 let response_completed = state.assistant_completions.contains_key(&response_id);
1070 let item_id_for_log = item_id.clone();
1071 let response_id_for_log = response_id.clone();
1072 if let Some(item) = observe_realtime_item(
1073 &mut state,
1074 item_id,
1075 None,
1076 RealtimeTranscriptRole::Assistant,
1077 Some(response_id),
1078 ) {
1079 if item.materialized {
1084 tracing::warn!(
1085 target: "meerkat::session",
1086 item_id = %item_id_for_log,
1087 response_id = %response_id_for_log,
1088 "AssistantTranscriptTruncated arrived after item already materialized; canonical message is locked, late truncation dropped",
1089 );
1090 } else if promote_item_lane(item, TranscriptLane::Spoken) {
1091 item.content_segments.insert(content_index, text);
1101 if response_completed && !item.text().is_empty() {
1102 item.ready = true;
1103 }
1104 }
1105 }
1106 }
1107 RealtimeTranscriptEvent::AssistantTranscriptFinalText {
1108 response_id,
1109 item_id,
1110 content_index,
1111 text,
1112 } => {
1113 let Some(response_id) = normalize_realtime_response_id(response_id) else {
1121 return RealtimeTranscriptApplyOutcome::default();
1122 };
1123 if state
1124 .discarded_assistant_response_ids
1125 .contains(&response_id)
1126 {
1127 observe_realtime_skipped_item(&mut state, item_id, None);
1128 let outcome = self.materialize_realtime_transcript_ready_items(&mut state);
1129 self.store_realtime_transcript_state(&state);
1130 return outcome;
1131 }
1132 let response_completed = state.assistant_completions.contains_key(&response_id);
1133 if let Some(item) = observe_realtime_item(
1134 &mut state,
1135 item_id,
1136 None,
1137 RealtimeTranscriptRole::Assistant,
1138 Some(response_id),
1139 ) {
1140 if item.materialized {
1152 tracing::warn!(
1153 "AssistantTranscriptFinalText arrived after item already materialized; canonical message is locked, late repair dropped"
1154 );
1155 self.store_realtime_transcript_state(&state);
1156 return RealtimeTranscriptApplyOutcome::default();
1157 }
1158 if promote_item_lane(item, TranscriptLane::Spoken) {
1166 item.content_segments.insert(content_index, text);
1170 if response_completed && !item.text().is_empty() {
1171 item.ready = true;
1172 }
1173 } else {
1174 tracing::warn!(
1175 "AssistantTranscriptFinalText routed to a Display-lane item; dropping authoritative final to preserve lane invariant — this indicates a provider lane-classification bug"
1176 );
1177 }
1178 }
1179 }
1180 RealtimeTranscriptEvent::AssistantTurnCompleted {
1181 response_id,
1182 stop_reason,
1183 usage,
1184 } => {
1185 let Some(response_id) = normalize_realtime_response_id(response_id) else {
1186 return RealtimeTranscriptApplyOutcome::default();
1187 };
1188 if state
1189 .discarded_assistant_response_ids
1190 .contains(&response_id)
1191 {
1192 discard_realtime_assistant_response(&mut state, &response_id);
1193 let outcome = self.materialize_realtime_transcript_ready_items(&mut state);
1194 self.store_realtime_transcript_state(&state);
1195 return outcome;
1196 }
1197 match stop_reason {
1198 StopReason::Cancelled => {
1199 discard_realtime_assistant_response(&mut state, &response_id);
1200 }
1201 StopReason::ToolUse => {
1202 state.assistant_completions.remove(&response_id);
1203 }
1204 _ => {
1205 state
1206 .assistant_completions
1207 .entry(response_id.clone())
1208 .or_insert(RealtimeAssistantCompletion {
1209 stop_reason,
1210 usage,
1211 usage_consumed: false,
1212 });
1213 mark_realtime_assistant_response_ready(&mut state, &response_id);
1214 }
1215 }
1216 }
1217 RealtimeTranscriptEvent::AssistantTurnInterrupted { response_id } => {
1218 let Some(response_id) = normalize_realtime_response_id(response_id) else {
1219 return RealtimeTranscriptApplyOutcome::default();
1220 };
1221 discard_realtime_assistant_response_by_lane(&mut state, &response_id);
1232 state
1233 .assistant_completions
1234 .entry(response_id.clone())
1235 .or_insert(RealtimeAssistantCompletion {
1236 stop_reason: StopReason::Cancelled,
1237 usage: Usage::default(),
1238 usage_consumed: false,
1239 });
1240 mark_realtime_assistant_response_ready(&mut state, &response_id);
1241 }
1242 }
1243
1244 let outcome = self.materialize_realtime_transcript_ready_items(&mut state);
1245 self.store_realtime_transcript_state(&state);
1246 outcome
1247 }
1248
1249 #[must_use]
1268 pub fn in_flight_realtime_assistant_response_ids(&self) -> Vec<String> {
1269 let state = self.realtime_transcript_state();
1270 let mut seen: BTreeSet<String> = BTreeSet::new();
1271 let mut out: Vec<String> = Vec::new();
1272 for item_id in &state.first_seen_order {
1273 let Some(item) = state.items.get(item_id) else {
1274 continue;
1275 };
1276 if item.role != RealtimeTranscriptRole::Assistant {
1277 continue;
1278 }
1279 if item.materialized || item.skipped {
1280 continue;
1281 }
1282 let Some(response_id) = item.response_id.as_ref() else {
1283 continue;
1284 };
1285 if state.discarded_assistant_response_ids.contains(response_id) {
1286 continue;
1287 }
1288 if seen.insert(response_id.clone()) {
1289 out.push(response_id.clone());
1290 }
1291 }
1292 out
1293 }
1294
1295 fn realtime_transcript_state(&self) -> SessionRealtimeTranscriptState {
1296 self.metadata
1297 .get(SESSION_REALTIME_TRANSCRIPT_STATE_KEY)
1298 .cloned()
1299 .and_then(|value| serde_json::from_value(value).ok())
1300 .unwrap_or_default()
1301 }
1302
1303 fn store_realtime_transcript_state(&mut self, state: &SessionRealtimeTranscriptState) {
1304 match serde_json::to_value(state) {
1305 Ok(value) => self.set_metadata(SESSION_REALTIME_TRANSCRIPT_STATE_KEY, value),
1306 Err(error) => {
1307 tracing::warn!(error = %error, "failed to serialize realtime transcript state");
1308 }
1309 }
1310 }
1311
1312 fn materialize_realtime_transcript_ready_items(
1313 &mut self,
1314 state: &mut SessionRealtimeTranscriptState,
1315 ) -> RealtimeTranscriptApplyOutcome {
1316 let mut materialized = Vec::new();
1317
1318 let mut pending_blocks: Vec<AssistantBlock> = Vec::new();
1333 let mut pending_response_id: Option<String> = None;
1334 let mut pending_stop_reason: StopReason = StopReason::EndTurn;
1335 let mut pending_usage: Usage = Usage::default();
1336
1337 loop {
1338 let order = realtime_transcript_order(state);
1339 let mut skipped_batch = Vec::new();
1340 let mut batch = Vec::new();
1341 for item_id in order {
1342 let Some(item) = state.items.get(&item_id) else {
1343 continue;
1344 };
1345 if item.materialized {
1346 continue;
1347 }
1348 if !realtime_predecessor_materialized(state, item.previous_item_id.as_deref()) {
1349 continue;
1350 }
1351 if item.skipped {
1352 skipped_batch.push(item_id.clone());
1353 continue;
1354 }
1355 if !item.ready {
1356 continue;
1357 }
1358 let text = item.text();
1359 if text.is_empty() {
1360 continue;
1361 }
1362 match item.role {
1363 RealtimeTranscriptRole::User => {
1364 batch.push(RealtimeTranscriptMaterializedMessage::User {
1365 item_id: item_id.clone(),
1366 text,
1367 });
1368 }
1369 RealtimeTranscriptRole::Assistant => {
1370 let Some(response_id) = item.response_id.as_ref() else {
1371 continue;
1372 };
1373 let Some(completion) = state.assistant_completions.get(response_id) else {
1374 continue;
1375 };
1376 let usage = if completion.usage_consumed {
1377 Usage::default()
1378 } else {
1379 completion.usage.clone()
1380 };
1381 batch.push(RealtimeTranscriptMaterializedMessage::Assistant {
1382 item_id: item_id.clone(),
1383 response_id: response_id.clone(),
1384 text,
1385 stop_reason: completion.stop_reason,
1386 usage,
1387 lane: item.lane,
1388 });
1389 }
1390 }
1391 }
1392 if skipped_batch.is_empty() && batch.is_empty() {
1393 break;
1394 }
1395 for item_id in skipped_batch {
1396 if let Some(item) = state.items.get_mut(&item_id) {
1397 item.materialized = true;
1398 }
1399 }
1400
1401 for message in batch {
1402 match &message {
1403 RealtimeTranscriptMaterializedMessage::User { item_id, text } => {
1404 if !pending_blocks.is_empty() {
1405 let drained = std::mem::take(&mut pending_blocks);
1406 self.append_external_assistant_blocks(
1407 drained,
1408 pending_stop_reason,
1409 std::mem::take(&mut pending_usage),
1410 );
1411 pending_response_id = None;
1412 }
1413 if let Some(item) = state.items.get_mut(item_id) {
1414 item.materialized = true;
1415 }
1416 self.append_external_user_content(ContentInput::Text(text.clone()));
1417 }
1418 RealtimeTranscriptMaterializedMessage::Assistant {
1419 item_id,
1420 response_id,
1421 text,
1422 stop_reason,
1423 usage,
1424 lane,
1425 } => {
1426 if pending_response_id
1430 .as_ref()
1431 .is_some_and(|existing| existing != response_id)
1432 && !pending_blocks.is_empty()
1433 {
1434 let drained = std::mem::take(&mut pending_blocks);
1435 self.append_external_assistant_blocks(
1436 drained,
1437 pending_stop_reason,
1438 std::mem::take(&mut pending_usage),
1439 );
1440 pending_response_id = None;
1441 }
1442 if let Some(item) = state.items.get_mut(item_id) {
1443 item.materialized = true;
1444 }
1445 if let Some(completion) = state.assistant_completions.get_mut(response_id) {
1446 completion.usage_consumed = true;
1447 }
1448 let block = match lane {
1455 TranscriptLane::Display => AssistantBlock::Text {
1456 text: text.clone(),
1457 meta: None,
1458 },
1459 TranscriptLane::Spoken => AssistantBlock::Transcript {
1460 text: text.clone(),
1461 source: crate::types::TranscriptSource::Spoken,
1462 meta: None,
1463 },
1464 };
1465 if pending_response_id.is_none() {
1473 pending_response_id = Some(response_id.clone());
1474 pending_stop_reason = *stop_reason;
1475 pending_usage = usage.clone();
1476 }
1477 pending_blocks.push(block);
1478 }
1479 }
1480 materialized.push(message);
1481 }
1482 }
1483
1484 if !pending_blocks.is_empty() {
1486 self.append_external_assistant_blocks(
1487 pending_blocks,
1488 pending_stop_reason,
1489 pending_usage,
1490 );
1491 }
1492
1493 RealtimeTranscriptApplyOutcome {
1494 materialized_messages: materialized,
1495 }
1496 }
1497
1498 pub fn set_system_prompt(&mut self, prompt: String) {
1500 use crate::types::SystemMessage;
1501
1502 let inner = Arc::make_mut(&mut self.messages);
1503 if let Some(Message::System(_)) = inner.first() {
1505 inner[0] = Message::System(SystemMessage::new(prompt));
1506 } else {
1507 inner.insert(0, Message::System(SystemMessage::new(prompt)));
1508 }
1509 self.updated_at = SystemTime::now();
1510 }
1511
1512 pub fn append_system_context_blocks(&mut self, appends: &[PendingSystemContextAppend]) {
1514 if appends.is_empty() {
1515 return;
1516 }
1517
1518 let current_system_prompt = self
1519 .messages
1520 .first()
1521 .and_then(|message| match message {
1522 Message::System(system) => Some(system.content.as_str()),
1523 _ => None,
1524 })
1525 .unwrap_or_default();
1526 let mut state = self.system_context_state().unwrap_or_default();
1527 let mut state_dirty = false;
1528 let mut new_appends: Vec<PendingSystemContextAppend> = Vec::new();
1529 for append in appends {
1530 if append.text.trim().is_empty() {
1531 continue;
1532 }
1533 let rendered = render_system_context_block(append);
1534 if let Some(key) = append.idempotency_key.as_ref() {
1535 if let Some(existing) = state.seen.get(key)
1536 && !seen_system_context_matches(existing, append)
1537 {
1538 tracing::warn!(
1539 idempotency_key = %key,
1540 "skipping conflicting runtime system-context append"
1541 );
1542 continue;
1543 }
1544 if let Some(existing) = state
1545 .applied
1546 .iter()
1547 .find(|applied| applied.idempotency_key.as_ref() == Some(key))
1548 && !pending_system_context_matches(existing, append)
1549 {
1550 tracing::warn!(
1551 idempotency_key = %key,
1552 "skipping conflicting runtime system-context append"
1553 );
1554 continue;
1555 }
1556 if let Some(existing) = new_appends
1557 .iter()
1558 .find(|pending| pending.idempotency_key.as_ref() == Some(key))
1559 {
1560 if !pending_system_context_matches(existing, append) {
1561 tracing::warn!(
1562 idempotency_key = %key,
1563 "skipping conflicting runtime system-context append"
1564 );
1565 }
1566 continue;
1567 }
1568 if current_system_prompt.contains(&rendered) {
1569 if !state
1570 .applied
1571 .iter()
1572 .any(|applied| applied.idempotency_key.as_ref() == Some(key))
1573 {
1574 state.applied.push(append.clone());
1575 state_dirty = true;
1576 }
1577 if state
1578 .seen
1579 .get(key)
1580 .is_none_or(|seen| seen.state != SeenSystemContextState::Applied)
1581 {
1582 state.seen.insert(
1583 key.clone(),
1584 SeenSystemContextKey {
1585 text: append.text.clone(),
1586 source: append.source.clone(),
1587 state: SeenSystemContextState::Applied,
1588 },
1589 );
1590 state_dirty = true;
1591 }
1592 continue;
1593 }
1594 } else if new_appends.contains(append) || current_system_prompt.contains(&rendered) {
1595 continue;
1596 }
1597 new_appends.push(append.clone());
1598 }
1599 if new_appends.is_empty() {
1600 if state_dirty && let Err(err) = self.set_system_context_state(state) {
1601 tracing::warn!(error = %err, "failed to persist applied system-context state");
1602 }
1603 return;
1604 }
1605
1606 let rendered = new_appends
1607 .iter()
1608 .map(render_system_context_block)
1609 .collect::<Vec<_>>()
1610 .join(SYSTEM_CONTEXT_SEPARATOR);
1611
1612 let next = match self.messages.first() {
1613 Some(Message::System(sys)) if !sys.content.is_empty() => {
1614 format!("{}{}{}", sys.content, SYSTEM_CONTEXT_SEPARATOR, rendered)
1615 }
1616 _ => rendered,
1617 };
1618 self.set_system_prompt(next);
1619
1620 for append in new_appends {
1621 if let Some(key) = append.idempotency_key.as_ref() {
1622 state.seen.insert(
1623 key.clone(),
1624 SeenSystemContextKey {
1625 text: append.text.clone(),
1626 source: append.source.clone(),
1627 state: SeenSystemContextState::Applied,
1628 },
1629 );
1630 if state
1631 .applied
1632 .iter()
1633 .any(|applied| applied.idempotency_key.as_ref() == Some(key))
1634 {
1635 continue;
1636 }
1637 } else if state.applied.contains(&append) {
1638 continue;
1639 }
1640 state.applied.push(append);
1641 }
1642 if let Err(err) = self.set_system_context_state(state) {
1643 tracing::warn!(error = %err, "failed to persist applied system-context state");
1644 }
1645 }
1646
1647 pub fn last_assistant_text(&self) -> Option<String> {
1654 self.messages.iter().rev().find_map(|m| match m {
1655 Message::BlockAssistant(a) => {
1656 let mut buf = String::new();
1657 for block in &a.blocks {
1658 match block {
1659 crate::types::AssistantBlock::Text { text, .. }
1660 | crate::types::AssistantBlock::Transcript { text, .. } => {
1661 buf.push_str(text);
1662 }
1663 _ => {}
1664 }
1665 }
1666 if buf.is_empty() { None } else { Some(buf) }
1667 }
1668 Message::Assistant(a) if !a.content.is_empty() => Some(a.content.clone()),
1669 _ => None,
1670 })
1671 }
1672
1673 pub fn tool_call_count(&self) -> usize {
1675 self.messages
1676 .iter()
1677 .filter_map(|m| match m {
1678 Message::BlockAssistant(a) => Some(
1679 a.blocks
1680 .iter()
1681 .filter(|b| matches!(b, crate::types::AssistantBlock::ToolUse { .. }))
1682 .count(),
1683 ),
1684 Message::Assistant(a) => Some(a.tool_calls.len()),
1685 _ => None,
1686 })
1687 .sum()
1688 }
1689
1690 pub fn metadata(&self) -> &serde_json::Map<String, serde_json::Value> {
1692 &self.metadata
1693 }
1694
1695 pub fn set_metadata(&mut self, key: &str, value: serde_json::Value) {
1697 self.metadata.insert(key.to_string(), value);
1698 self.updated_at = SystemTime::now();
1699 }
1700
1701 pub fn backfill_metadata_if_absent(&mut self, key: &str, value: serde_json::Value) -> bool {
1707 if self.metadata.contains_key(key) {
1708 false
1709 } else {
1710 self.metadata.insert(key.to_string(), value);
1711 true
1712 }
1713 }
1714
1715 pub fn remove_metadata(&mut self, key: &str) {
1717 self.metadata.remove(key);
1718 self.updated_at = SystemTime::now();
1719 }
1720
1721 pub fn set_session_metadata(
1723 &mut self,
1724 metadata: SessionMetadata,
1725 ) -> Result<(), serde_json::Error> {
1726 let value = serde_json::to_value(metadata)?;
1727 self.set_metadata(SESSION_METADATA_KEY, value);
1728 Ok(())
1729 }
1730
1731 pub fn session_metadata(&self) -> Option<SessionMetadata> {
1733 self.metadata
1734 .get(SESSION_METADATA_KEY)
1735 .and_then(|value| serde_json::from_value(value.clone()).ok())
1736 }
1737
1738 pub fn set_system_context_state(
1740 &mut self,
1741 state: SessionSystemContextState,
1742 ) -> Result<(), serde_json::Error> {
1743 let value = serde_json::to_value(state)?;
1744 self.set_metadata(SESSION_SYSTEM_CONTEXT_STATE_KEY, value);
1745 Ok(())
1746 }
1747
1748 pub fn system_context_state(&self) -> Option<SessionSystemContextState> {
1750 self.metadata
1751 .get(SESSION_SYSTEM_CONTEXT_STATE_KEY)
1752 .and_then(|value| serde_json::from_value(value.clone()).ok())
1753 }
1754
1755 pub fn set_deferred_turn_state(
1757 &mut self,
1758 state: SessionDeferredTurnState,
1759 ) -> Result<(), serde_json::Error> {
1760 let value = serde_json::to_value(state)?;
1761 self.set_metadata(SESSION_DEFERRED_TURN_STATE_KEY, value);
1762 Ok(())
1763 }
1764
1765 pub fn deferred_turn_state(&self) -> Option<SessionDeferredTurnState> {
1767 self.metadata
1768 .get(SESSION_DEFERRED_TURN_STATE_KEY)
1769 .and_then(|value| serde_json::from_value(value.clone()).ok())
1770 }
1771
1772 pub fn set_build_state(&mut self, state: SessionBuildState) -> Result<(), serde_json::Error> {
1774 let value = serde_json::to_value(state)?;
1775 self.set_metadata(SESSION_BUILD_STATE_KEY, value);
1776 Ok(())
1777 }
1778
1779 pub fn build_state(&self) -> Option<SessionBuildState> {
1781 self.metadata
1782 .get(SESSION_BUILD_STATE_KEY)
1783 .and_then(|value| serde_json::from_value(value.clone()).ok())
1784 }
1785
1786 pub fn set_tool_visibility_state(
1788 &mut self,
1789 state: SessionToolVisibilityState,
1790 ) -> Result<(), serde_json::Error> {
1791 let value = serde_json::to_value(state)?;
1792 self.set_metadata(SESSION_TOOL_VISIBILITY_STATE_KEY, value);
1793 Ok(())
1794 }
1795
1796 pub fn tool_visibility_state(
1798 &self,
1799 ) -> Result<Option<SessionToolVisibilityState>, serde_json::Error> {
1800 self.try_tool_visibility_state()
1801 }
1802
1803 pub fn try_tool_visibility_state(
1806 &self,
1807 ) -> Result<Option<SessionToolVisibilityState>, serde_json::Error> {
1808 self.metadata
1809 .get(SESSION_TOOL_VISIBILITY_STATE_KEY)
1810 .map(|value| serde_json::from_value(value.clone()))
1811 .transpose()
1812 }
1813
1814 pub fn set_mob_tool_authority_context(
1816 &mut self,
1817 authority_context: Option<MobToolAuthorityContext>,
1818 ) -> Result<(), serde_json::Error> {
1819 let mut build_state = self.build_state().unwrap_or_default();
1820 build_state.mob_tool_authority_context = authority_context;
1821 self.set_build_state(build_state)
1822 }
1823
1824 pub fn mob_tool_authority_context(&self) -> Option<MobToolAuthorityContext> {
1826 self.build_state()
1827 .and_then(|state| state.mob_tool_authority_context)
1828 }
1829
1830 pub fn fork_at(&self, index: usize) -> Self {
1835 let now = SystemTime::now();
1836 let truncated = self.messages[..index.min(self.messages.len())].to_vec();
1837 Self {
1838 version: SESSION_VERSION,
1839 id: SessionId::new(),
1840 messages: Arc::new(truncated),
1841 created_at: now,
1842 updated_at: now,
1843 metadata: self.metadata.clone(),
1844 usage: self.usage.clone(),
1845 }
1846 }
1847
1848 pub fn fork_replacing(
1855 &self,
1856 message_index: usize,
1857 replacement: TranscriptReplacement,
1858 ) -> Result<Self, TranscriptEditError> {
1859 let Some(original) = self.messages.get(message_index) else {
1860 return Err(TranscriptEditError::MessageIndexOutOfBounds {
1861 message_index,
1862 message_count: self.messages.len(),
1863 });
1864 };
1865
1866 let replacement_message = match replacement {
1867 TranscriptReplacement::Message { message } => message,
1868 TranscriptReplacement::UserContentBlock { block_index, block } => {
1869 let Message::User(user) = original else {
1870 return Err(TranscriptEditError::MessageRoleMismatch {
1871 message_index,
1872 expected: "user",
1873 actual: message_role_name(original),
1874 });
1875 };
1876 if block_index >= user.content.len() {
1877 return Err(TranscriptEditError::BlockIndexOutOfBounds {
1878 block_kind: "user content block",
1879 block_index,
1880 block_count: user.content.len(),
1881 });
1882 }
1883 let mut edited = user.clone();
1884 edited.content[block_index] = block;
1885 Message::User(edited)
1886 }
1887 TranscriptReplacement::AssistantBlock { block_index, block } => {
1888 let Message::BlockAssistant(assistant) = original else {
1889 return Err(TranscriptEditError::MessageRoleMismatch {
1890 message_index,
1891 expected: "block_assistant",
1892 actual: message_role_name(original),
1893 });
1894 };
1895 if block_index >= assistant.blocks.len() {
1896 return Err(TranscriptEditError::BlockIndexOutOfBounds {
1897 block_kind: "assistant block",
1898 block_index,
1899 block_count: assistant.blocks.len(),
1900 });
1901 }
1902 let mut edited = assistant.clone();
1903 edited.blocks[block_index] = block;
1904 Message::BlockAssistant(edited)
1905 }
1906 TranscriptReplacement::ToolResultContentBlock {
1907 result_index,
1908 block_index,
1909 block,
1910 } => {
1911 let Message::ToolResults {
1912 results,
1913 created_at,
1914 } = original
1915 else {
1916 return Err(TranscriptEditError::MessageRoleMismatch {
1917 message_index,
1918 expected: "tool_results",
1919 actual: message_role_name(original),
1920 });
1921 };
1922 let Some(result) = results.get(result_index) else {
1923 return Err(TranscriptEditError::BlockIndexOutOfBounds {
1924 block_kind: "tool result",
1925 block_index: result_index,
1926 block_count: results.len(),
1927 });
1928 };
1929 if block_index >= result.content.len() {
1930 return Err(TranscriptEditError::BlockIndexOutOfBounds {
1931 block_kind: "tool result content block",
1932 block_index,
1933 block_count: result.content.len(),
1934 });
1935 }
1936 let mut edited_results = results.clone();
1937 edited_results[result_index].content[block_index] = block;
1938 Message::ToolResults {
1939 results: edited_results,
1940 created_at: *created_at,
1941 }
1942 }
1943 };
1944
1945 let mut forked = self.fork_at(message_index);
1946 forked.push(replacement_message);
1947 Ok(forked)
1948 }
1949
1950 pub fn fork(&self) -> Self {
1955 let now = SystemTime::now();
1956 Self {
1957 version: SESSION_VERSION,
1958 id: SessionId::new(),
1959 messages: Arc::clone(&self.messages),
1960 created_at: now,
1961 updated_at: now,
1962 metadata: self.metadata.clone(),
1963 usage: self.usage.clone(),
1964 }
1965 }
1966}
1967
1968impl Default for Session {
1969 fn default() -> Self {
1970 Self::new()
1971 }
1972}
1973
1974#[derive(Debug, Clone, Serialize, Deserialize)]
1976#[serde(rename_all = "snake_case")]
1977pub struct SessionMeta {
1978 pub id: SessionId,
1979 pub created_at: SystemTime,
1980 pub updated_at: SystemTime,
1981 pub message_count: usize,
1982 pub total_tokens: u64,
1983 #[serde(default)]
1984 pub metadata: serde_json::Map<String, serde_json::Value>,
1985}
1986
1987#[derive(Debug, Clone, Serialize, Deserialize)]
1989#[serde(rename_all = "snake_case")]
1990pub struct SessionMetadata {
1991 #[serde(default = "default_session_metadata_schema_version")]
1997 pub schema_version: u32,
1998 pub model: String,
1999 pub max_tokens: u32,
2000 #[serde(default = "default_structured_output_retries")]
2001 pub structured_output_retries: u32,
2002 pub provider: Provider,
2003 #[serde(default, skip_serializing_if = "Option::is_none")]
2004 pub self_hosted_server_id: Option<String>,
2005 #[serde(default, skip_serializing_if = "Option::is_none")]
2006 pub provider_params: Option<serde_json::Value>,
2007 pub tooling: SessionTooling,
2008 #[serde(default)]
2009 pub keep_alive: bool,
2010 pub comms_name: Option<String>,
2011 #[serde(default, skip_serializing_if = "Option::is_none")]
2013 pub peer_meta: Option<PeerMeta>,
2014 #[serde(default, skip_serializing_if = "Option::is_none")]
2016 pub realm_id: Option<String>,
2017 #[serde(default, skip_serializing_if = "Option::is_none")]
2019 pub instance_id: Option<String>,
2020 #[serde(default, skip_serializing_if = "Option::is_none")]
2022 pub backend: Option<String>,
2023 #[serde(default, skip_serializing_if = "Option::is_none")]
2025 pub config_generation: Option<u64>,
2026 #[serde(default, skip_serializing_if = "Option::is_none")]
2036 pub auth_binding: Option<crate::AuthBindingRef>,
2037}
2038
2039fn default_structured_output_retries() -> u32 {
2040 2
2041}
2042
2043fn default_session_metadata_schema_version() -> u32 {
2044 1
2045}
2046
2047#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
2049#[serde(rename_all = "snake_case")]
2050pub struct SessionLlmIdentity {
2051 pub model: String,
2052 pub provider: Provider,
2053 #[serde(default, skip_serializing_if = "Option::is_none")]
2054 pub self_hosted_server_id: Option<String>,
2055 #[serde(default, skip_serializing_if = "Option::is_none")]
2056 pub provider_params: Option<serde_json::Value>,
2057 #[serde(default, skip_serializing_if = "Option::is_none")]
2070 pub auth_binding: Option<crate::AuthBindingRef>,
2071}
2072
2073pub struct SessionLlmIdentityOverride<'a> {
2075 pub model: Option<&'a str>,
2076 pub provider: Option<Provider>,
2077 pub provider_params: Option<&'a serde_json::Value>,
2078 pub clear_provider_params: bool,
2079 pub auth_binding: Option<&'a crate::AuthBindingRef>,
2080 pub clear_auth_binding: bool,
2081}
2082
2083#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
2084pub enum SessionLlmIdentityOverrideError {
2085 #[error("provider override requires model on an existing session")]
2086 ProviderRequiresModel,
2087 #[error("clear_provider_params cannot be combined with provider_params")]
2088 SetAndClearProviderParams,
2089 #[error("clear_auth_binding cannot be combined with auth_binding")]
2090 SetAndClearAuthBinding,
2091 #[error("{0}")]
2092 ProviderModelMismatch(String),
2093 #[error("self-hosted provider requires a registered model alias; '{model}' is not configured")]
2094 MissingSelfHostedAlias { model: String },
2095}
2096
2097pub fn resolve_session_llm_identity_override(
2105 current: &SessionLlmIdentity,
2106 registry: &crate::ModelRegistry,
2107 overrides: SessionLlmIdentityOverride<'_>,
2108) -> Result<SessionLlmIdentity, SessionLlmIdentityOverrideError> {
2109 if overrides.provider.is_some() && overrides.model.is_none() {
2110 return Err(SessionLlmIdentityOverrideError::ProviderRequiresModel);
2111 }
2112 if overrides.clear_provider_params && overrides.provider_params.is_some() {
2113 return Err(SessionLlmIdentityOverrideError::SetAndClearProviderParams);
2114 }
2115 if overrides.clear_auth_binding && overrides.auth_binding.is_some() {
2116 return Err(SessionLlmIdentityOverrideError::SetAndClearAuthBinding);
2117 }
2118
2119 let model = overrides
2120 .model
2121 .map(str::to_string)
2122 .unwrap_or_else(|| current.model.clone());
2123 let provider = if let Some(provider) = overrides.provider {
2124 provider
2125 } else if overrides.model.is_some() {
2126 registry
2127 .entry(&model)
2128 .map_or(current.provider, |entry| entry.provider)
2129 } else {
2130 current.provider
2131 };
2132
2133 if (overrides.model.is_some() || overrides.provider.is_some())
2134 && let Some(reason) = registry.provider_override_mismatch_reason(provider, &model)
2135 {
2136 return Err(SessionLlmIdentityOverrideError::ProviderModelMismatch(
2137 reason,
2138 ));
2139 }
2140
2141 let provider_params = if overrides.clear_provider_params {
2142 None
2143 } else {
2144 overrides
2145 .provider_params
2146 .cloned()
2147 .or_else(|| current.provider_params.clone())
2148 };
2149 let self_hosted_server_id = if provider == Provider::SelfHosted {
2150 if overrides.model.is_none() {
2151 current.self_hosted_server_id.clone().or_else(|| {
2152 registry
2153 .entry_for_provider(Provider::SelfHosted, &model)
2154 .and_then(|entry| entry.self_hosted.as_ref())
2155 .map(|server| server.server_id.clone())
2156 })
2157 } else {
2158 let entry = registry
2159 .entry_for_provider(Provider::SelfHosted, &model)
2160 .ok_or_else(|| SessionLlmIdentityOverrideError::MissingSelfHostedAlias {
2161 model: model.clone(),
2162 })?;
2163 entry
2164 .self_hosted
2165 .as_ref()
2166 .map(|server| server.server_id.clone())
2167 }
2168 } else {
2169 None
2170 };
2171
2172 let auth_binding = if overrides.clear_auth_binding
2173 || (provider != current.provider && overrides.auth_binding.is_none())
2174 {
2175 None
2176 } else {
2177 overrides
2178 .auth_binding
2179 .cloned()
2180 .or_else(|| current.auth_binding.clone())
2181 };
2182
2183 Ok(SessionLlmIdentity {
2184 model,
2185 provider,
2186 self_hosted_server_id,
2187 provider_params,
2188 auth_binding,
2189 })
2190}
2191
2192#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
2199#[serde(rename_all = "snake_case")]
2200pub struct SessionLlmRequestPolicy {
2201 pub model: String,
2202 #[serde(default, skip_serializing_if = "Option::is_none")]
2203 pub provider_params: Option<serde_json::Value>,
2204 #[serde(default, skip_serializing_if = "Option::is_none")]
2205 pub provider_tool_defaults: Option<serde_json::Value>,
2206}
2207
2208impl SessionMetadata {
2209 pub fn llm_identity(&self) -> SessionLlmIdentity {
2211 SessionLlmIdentity {
2212 model: self.model.clone(),
2213 provider: self.provider,
2214 self_hosted_server_id: self.self_hosted_server_id.clone(),
2215 provider_params: self.provider_params.clone(),
2216 auth_binding: self.auth_binding.clone(),
2217 }
2218 }
2219
2220 pub fn apply_llm_identity(&mut self, identity: &SessionLlmIdentity) {
2222 self.model = identity.model.clone();
2223 self.provider = identity.provider;
2224 self.self_hosted_server_id = identity.self_hosted_server_id.clone();
2225 self.provider_params = identity.provider_params.clone();
2226 self.auth_binding = identity.auth_binding.clone();
2227 }
2228}
2229
2230pub const SESSION_METADATA_KEY: &str = "session_metadata";
2232
2233#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
2241#[serde(rename_all = "snake_case")]
2242pub enum ToolCategoryOverride {
2243 #[default]
2245 Inherit,
2246 Enable,
2248 Disable,
2250}
2251
2252impl ToolCategoryOverride {
2253 #[must_use]
2259 pub fn resolve(self, runtime_default: bool) -> bool {
2260 match self {
2261 Self::Enable => true,
2262 Self::Disable => false,
2263 Self::Inherit => runtime_default,
2264 }
2265 }
2266
2267 #[must_use]
2273 pub fn to_override(self) -> Option<bool> {
2274 match self {
2275 Self::Enable => Some(true),
2276 Self::Disable => Some(false),
2277 Self::Inherit => None,
2278 }
2279 }
2280
2281 #[must_use]
2289 pub fn from_effective(enabled: bool) -> Self {
2290 if enabled { Self::Enable } else { Self::Disable }
2291 }
2292
2293 #[must_use]
2303 pub fn from_override(value: Option<bool>) -> Self {
2304 match value {
2305 Some(true) => Self::Enable,
2306 Some(false) => Self::Disable,
2307 None => Self::Inherit,
2308 }
2309 }
2310}
2311
2312fn deserialize_tool_category_compat<'de, D>(
2320 deserializer: D,
2321) -> Result<ToolCategoryOverride, D::Error>
2322where
2323 D: serde::Deserializer<'de>,
2324{
2325 use serde::de;
2326
2327 struct ToolCategoryVisitor;
2328
2329 impl de::Visitor<'_> for ToolCategoryVisitor {
2330 type Value = ToolCategoryOverride;
2331
2332 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2333 formatter.write_str("a boolean or one of \"inherit\", \"enable\", \"disable\"")
2334 }
2335
2336 fn visit_bool<E: de::Error>(self, v: bool) -> Result<Self::Value, E> {
2337 Ok(if v {
2338 ToolCategoryOverride::Enable
2339 } else {
2340 ToolCategoryOverride::Inherit
2341 })
2342 }
2343
2344 fn visit_str<E: de::Error>(self, v: &str) -> Result<Self::Value, E> {
2345 match v {
2346 "inherit" => Ok(ToolCategoryOverride::Inherit),
2347 "enable" => Ok(ToolCategoryOverride::Enable),
2348 "disable" => Ok(ToolCategoryOverride::Disable),
2349 _ => Err(de::Error::unknown_variant(
2350 v,
2351 &["inherit", "enable", "disable"],
2352 )),
2353 }
2354 }
2355 }
2356
2357 deserializer.deserialize_any(ToolCategoryVisitor)
2358}
2359
2360fn normalize_realtime_item_id(item_id: String) -> Option<String> {
2361 let trimmed = item_id.trim();
2362 (!trimmed.is_empty()).then(|| trimmed.to_string())
2363}
2364
2365fn normalize_realtime_previous_item_id(previous_item_id: Option<String>) -> Option<String> {
2366 previous_item_id.and_then(normalize_realtime_item_id)
2367}
2368
2369fn normalize_realtime_response_id(response_id: String) -> Option<String> {
2370 normalize_realtime_item_id(response_id)
2371}
2372
2373fn normalize_realtime_optional_response_id(response_id: Option<String>) -> Option<String> {
2374 response_id.and_then(normalize_realtime_response_id)
2375}
2376
2377#[must_use]
2393fn promote_item_lane(item: &mut RealtimeTranscriptItemState, lane: TranscriptLane) -> bool {
2394 if item.lane == lane {
2395 return true;
2396 }
2397 let has_content = item.content_segments.values().any(|s| !s.is_empty());
2398 if !has_content {
2399 item.lane = lane;
2402 return true;
2403 }
2404 tracing::warn!(
2405 existing_lane = ?item.lane,
2406 observed_lane = ?lane,
2407 "ignoring realtime transcript lane conflict on item with staged content"
2408 );
2409 false
2410}
2411
2412fn observe_realtime_item(
2413 state: &mut SessionRealtimeTranscriptState,
2414 item_id: String,
2415 previous_item_id: Option<String>,
2416 role: RealtimeTranscriptRole,
2417 response_id: Option<String>,
2418) -> Option<&mut RealtimeTranscriptItemState> {
2419 let item_id = normalize_realtime_item_id(item_id)?;
2420 let previous_item_id = normalize_realtime_previous_item_id(previous_item_id);
2421 let response_id = normalize_realtime_optional_response_id(response_id);
2422 if !state
2423 .first_seen_order
2424 .iter()
2425 .any(|existing| existing == &item_id)
2426 {
2427 state.first_seen_order.push(item_id.clone());
2428 }
2429 let item = state.items.entry(item_id.clone()).or_insert_with(|| {
2430 RealtimeTranscriptItemState::new(role, previous_item_id.clone(), response_id.clone())
2431 });
2432 if item.skipped {
2433 if item.previous_item_id.is_none() && previous_item_id.is_some() {
2434 item.previous_item_id = previous_item_id;
2435 }
2436 tracing::warn!(
2437 item_id = %item_id,
2438 observed_role = ?role,
2439 "ignoring realtime transcript content for item already marked as a contentless causal anchor"
2440 );
2441 return None;
2442 }
2443 if item.role != role {
2444 tracing::warn!(
2445 item_id = %item_id,
2446 existing_role = ?item.role,
2447 observed_role = ?role,
2448 "ignoring realtime transcript item role conflict"
2449 );
2450 return None;
2451 }
2452 if item.previous_item_id.is_none() && previous_item_id.is_some() {
2453 item.previous_item_id = previous_item_id;
2454 }
2455 if let Some(response_id) = response_id {
2456 match item.response_id.as_ref() {
2457 Some(existing) if existing != &response_id => {
2458 tracing::warn!(
2459 item_id = %item_id,
2460 existing_response_id = %existing,
2461 observed_response_id = %response_id,
2462 "ignoring realtime transcript item response conflict"
2463 );
2464 return None;
2465 }
2466 Some(_) => {}
2467 None => item.response_id = Some(response_id),
2468 }
2469 }
2470 Some(item)
2471}
2472
2473fn observe_realtime_skipped_item(
2474 state: &mut SessionRealtimeTranscriptState,
2475 item_id: String,
2476 previous_item_id: Option<String>,
2477) {
2478 let Some(item_id) = normalize_realtime_item_id(item_id) else {
2479 return;
2480 };
2481 let previous_item_id = normalize_realtime_previous_item_id(previous_item_id);
2482 if !state
2483 .first_seen_order
2484 .iter()
2485 .any(|existing| existing == &item_id)
2486 {
2487 state.first_seen_order.push(item_id.clone());
2488 }
2489 let item = state
2490 .items
2491 .entry(item_id)
2492 .or_insert_with(|| RealtimeTranscriptItemState::skipped(previous_item_id.clone()));
2493 if item.previous_item_id.is_none() && previous_item_id.is_some() {
2494 item.previous_item_id = previous_item_id;
2495 }
2496 if item.materialized || item.skipped {
2497 return;
2498 }
2499 if item.role != RealtimeTranscriptRole::Assistant {
2500 tracing::warn!(
2501 existing_role = ?item.role,
2502 "ignoring realtime skipped-item observation for non-assistant item"
2503 );
2504 return;
2505 }
2506 if !item.content_segments.is_empty() {
2507 tracing::warn!("ignoring realtime skipped-item observation for content-bearing item");
2508 return;
2509 }
2510 item.skipped = true;
2511 item.ready = true;
2512}
2513
2514fn mark_realtime_assistant_response_ready(
2515 state: &mut SessionRealtimeTranscriptState,
2516 response_id: &str,
2517) {
2518 for item in state.items.values_mut() {
2519 if item.role == RealtimeTranscriptRole::Assistant
2520 && item.response_id.as_deref() == Some(response_id)
2521 && !item.materialized
2522 && !item.text().is_empty()
2523 {
2524 item.ready = true;
2525 }
2526 }
2527}
2528
2529fn discard_realtime_assistant_response(
2530 state: &mut SessionRealtimeTranscriptState,
2531 response_id: &str,
2532) {
2533 state
2534 .discarded_assistant_response_ids
2535 .insert(response_id.to_string());
2536 for item in state.items.values_mut() {
2537 if item.role == RealtimeTranscriptRole::Assistant
2538 && item.response_id.as_deref() == Some(response_id)
2539 && !item.materialized
2540 {
2541 item.content_segments.clear();
2542 item.skipped = true;
2543 item.ready = true;
2544 }
2545 }
2546 state.assistant_completions.remove(response_id);
2547}
2548
2549fn discard_realtime_assistant_response_by_lane(
2565 state: &mut SessionRealtimeTranscriptState,
2566 response_id: &str,
2567) {
2568 state
2569 .discarded_assistant_response_ids
2570 .insert(response_id.to_string());
2571 for item in state.items.values_mut() {
2572 if item.role != RealtimeTranscriptRole::Assistant
2573 || item.response_id.as_deref() != Some(response_id)
2574 || item.materialized
2575 {
2576 continue;
2577 }
2578 let has_content = item.content_segments.values().any(|s| !s.is_empty());
2579 if item.lane == TranscriptLane::Display && has_content {
2580 continue;
2584 }
2585 item.content_segments.clear();
2589 item.skipped = true;
2590 item.ready = true;
2591 }
2592}
2593
2594fn realtime_transcript_order(state: &SessionRealtimeTranscriptState) -> Vec<String> {
2595 let mut roots = Vec::new();
2596 let mut children: BTreeMap<String, Vec<String>> = BTreeMap::new();
2597 for item_id in &state.first_seen_order {
2598 let Some(item) = state.items.get(item_id) else {
2599 continue;
2600 };
2601 if let Some(previous) = item.previous_item_id.as_ref()
2602 && state.items.contains_key(previous)
2603 {
2604 children
2605 .entry(previous.clone())
2606 .or_default()
2607 .push(item_id.clone());
2608 } else {
2609 roots.push(item_id.clone());
2610 }
2611 }
2612 roots.sort_by_key(|item_id| realtime_first_seen_index(state, item_id));
2613 for child_ids in children.values_mut() {
2614 child_ids.sort_by_key(|item_id| realtime_first_seen_index(state, item_id));
2615 }
2616
2617 let mut ordered = Vec::new();
2618 let mut visited = BTreeSet::new();
2619 for root in roots {
2620 visit_realtime_transcript_item(&root, &children, &mut visited, &mut ordered);
2621 }
2622 for item_id in &state.first_seen_order {
2623 visit_realtime_transcript_item(item_id, &children, &mut visited, &mut ordered);
2624 }
2625 ordered
2626}
2627
2628fn realtime_first_seen_index(state: &SessionRealtimeTranscriptState, item_id: &str) -> usize {
2629 state
2630 .first_seen_order
2631 .iter()
2632 .position(|existing| existing == item_id)
2633 .unwrap_or(usize::MAX)
2634}
2635
2636fn visit_realtime_transcript_item(
2637 item_id: &str,
2638 children: &BTreeMap<String, Vec<String>>,
2639 visited: &mut BTreeSet<String>,
2640 ordered: &mut Vec<String>,
2641) {
2642 if !visited.insert(item_id.to_string()) {
2643 return;
2644 }
2645 ordered.push(item_id.to_string());
2646 if let Some(child_ids) = children.get(item_id) {
2647 for child_id in child_ids {
2648 visit_realtime_transcript_item(child_id, children, visited, ordered);
2649 }
2650 }
2651}
2652
2653fn realtime_predecessor_materialized(
2654 state: &SessionRealtimeTranscriptState,
2655 previous_item_id: Option<&str>,
2656) -> bool {
2657 match previous_item_id {
2658 None => true,
2659 Some(previous_item_id) => state
2660 .items
2661 .get(previous_item_id)
2662 .is_some_and(|item| item.materialized),
2663 }
2664}
2665
2666#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
2673#[serde(rename_all = "snake_case")]
2674pub struct SessionTooling {
2675 #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
2676 pub builtins: ToolCategoryOverride,
2677 #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
2678 pub shell: ToolCategoryOverride,
2679 #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
2680 pub comms: ToolCategoryOverride,
2681 #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
2683 pub mob: ToolCategoryOverride,
2684 #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
2686 pub memory: ToolCategoryOverride,
2687 #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
2689 pub schedule: ToolCategoryOverride,
2690 #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
2692 pub workgraph: ToolCategoryOverride,
2693 #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
2695 pub image_generation: ToolCategoryOverride,
2696 #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
2698 pub web_search: ToolCategoryOverride,
2699 #[serde(default, skip_serializing_if = "Option::is_none")]
2701 pub active_skills: Option<Vec<crate::skills::SkillKey>>,
2702}
2703
2704impl From<&Session> for SessionMeta {
2705 fn from(session: &Session) -> Self {
2706 Self {
2707 id: session.id.clone(),
2708 created_at: session.created_at,
2709 updated_at: session.updated_at,
2710 message_count: session.messages.len(),
2711 total_tokens: session.total_tokens(),
2712 metadata: session.metadata.clone(),
2713 }
2714 }
2715}
2716
2717#[cfg(test)]
2718#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
2719mod tests {
2720 use super::*;
2721 use crate::types::{
2722 AssistantMessage, BlockAssistantMessage, StopReason, SystemMessage, UserMessage,
2723 };
2724 use std::sync::Arc;
2725
2726 fn block_assistant_text(message: &BlockAssistantMessage) -> String {
2727 message
2728 .blocks
2729 .iter()
2730 .filter_map(|block| match block {
2731 AssistantBlock::Text { text, .. } => Some(text.as_str()),
2732 _ => None,
2733 })
2734 .collect()
2735 }
2736
2737 #[test]
2738 fn test_session_new() {
2739 let session = Session::new();
2740 assert_eq!(session.version(), SESSION_VERSION);
2741 assert!(session.messages().is_empty());
2742 assert!(session.created_at() <= session.updated_at());
2743 }
2744
2745 #[test]
2746 fn llm_identity_model_override_switches_to_catalog_provider() {
2747 let registry = crate::Config::default().model_registry().unwrap();
2748 let current = SessionLlmIdentity {
2749 model: "claude-sonnet-4-5".to_string(),
2750 provider: Provider::Anthropic,
2751 self_hosted_server_id: None,
2752 provider_params: None,
2753 auth_binding: Some(crate::AuthBindingRef {
2754 realm: crate::RealmId::parse("tenant_a").unwrap(),
2755 binding: crate::BindingId::parse("anthropic_default").unwrap(),
2756 profile: None,
2757 }),
2758 };
2759
2760 let resolved = resolve_session_llm_identity_override(
2761 ¤t,
2762 ®istry,
2763 SessionLlmIdentityOverride {
2764 model: Some("gpt-5.5"),
2765 provider: None,
2766 provider_params: None,
2767 clear_provider_params: false,
2768 auth_binding: None,
2769 clear_auth_binding: false,
2770 },
2771 )
2772 .unwrap();
2773
2774 assert_eq!(resolved.model, "gpt-5.5");
2775 assert_eq!(resolved.provider, Provider::OpenAI);
2776 assert!(
2777 resolved.auth_binding.is_none(),
2778 "provider switches must not inherit a binding from the previous provider"
2779 );
2780 }
2781
2782 #[test]
2783 fn llm_identity_model_override_keeps_uncatalogued_model_on_current_provider() {
2784 let registry = crate::Config::default().model_registry().unwrap();
2785 let current = SessionLlmIdentity {
2786 model: "custom-model".to_string(),
2787 provider: Provider::Anthropic,
2788 self_hosted_server_id: None,
2789 provider_params: None,
2790 auth_binding: None,
2791 };
2792
2793 let resolved = resolve_session_llm_identity_override(
2794 ¤t,
2795 ®istry,
2796 SessionLlmIdentityOverride {
2797 model: Some("uncatalogued-custom-model"),
2798 provider: None,
2799 provider_params: None,
2800 clear_provider_params: false,
2801 auth_binding: None,
2802 clear_auth_binding: false,
2803 },
2804 )
2805 .unwrap();
2806
2807 assert_eq!(resolved.model, "uncatalogued-custom-model");
2808 assert_eq!(resolved.provider, Provider::Anthropic);
2809 }
2810
2811 #[test]
2812 fn realtime_transcript_append_is_idempotent_by_provider_item_and_delta_id() {
2813 let mut session = Session::new();
2814
2815 let user = RealtimeTranscriptEvent::UserTranscriptFinal {
2816 item_id: "item_user".to_string(),
2817 previous_item_id: None,
2818 content_index: 0,
2819 text: "hello".to_string(),
2820 };
2821 assert!(
2822 !session
2823 .append_realtime_transcript_event(user.clone())
2824 .is_inert()
2825 );
2826 assert!(session.append_realtime_transcript_event(user).is_inert());
2827
2828 let delta = RealtimeTranscriptEvent::AssistantTextDelta {
2829 response_id: "resp_assistant".to_string(),
2830 delta_id: "evt_delta_1".to_string(),
2831 item_id: "item_assistant".to_string(),
2832 previous_item_id: Some("item_user".to_string()),
2833 content_index: 0,
2834 delta: "hi".to_string(),
2835 };
2836 assert!(
2837 session
2838 .append_realtime_transcript_event(delta.clone())
2839 .is_inert()
2840 );
2841 assert!(session.append_realtime_transcript_event(delta).is_inert());
2842
2843 let terminal = RealtimeTranscriptEvent::AssistantTurnCompleted {
2844 response_id: "resp_assistant".to_string(),
2845 stop_reason: StopReason::EndTurn,
2846 usage: Usage::default(),
2847 };
2848 assert!(
2849 !session
2850 .append_realtime_transcript_event(terminal.clone())
2851 .is_inert()
2852 );
2853 assert!(
2854 session
2855 .append_realtime_transcript_event(terminal)
2856 .is_inert()
2857 );
2858
2859 assert_eq!(session.messages().len(), 2);
2860 assert!(matches!(
2861 &session.messages()[0],
2862 Message::User(user) if user.text_content() == "hello"
2863 ));
2864 assert!(matches!(
2865 &session.messages()[1],
2866 Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "hi"
2867 ));
2868 }
2869
2870 #[test]
2875 fn realtime_transcript_final_text_overrides_partial_delta_and_promotes_to_spoken_lane() {
2876 let mut session = Session::new();
2877
2878 assert!(
2881 session
2882 .append_realtime_transcript_event(
2883 RealtimeTranscriptEvent::AssistantTranscriptDelta {
2884 response_id: "resp_a".to_string(),
2885 delta_id: "evt_1".to_string(),
2886 item_id: "item_a".to_string(),
2887 previous_item_id: None,
2888 content_index: 0,
2889 delta: "incom".to_string(),
2890 }
2891 )
2892 .is_inert()
2893 );
2894
2895 assert!(
2897 session
2898 .append_realtime_transcript_event(
2899 RealtimeTranscriptEvent::AssistantTranscriptFinalText {
2900 response_id: "resp_a".to_string(),
2901 item_id: "item_a".to_string(),
2902 content_index: 0,
2903 text: "complete answer".to_string(),
2904 }
2905 )
2906 .is_inert()
2907 );
2908
2909 let outcome = session.append_realtime_transcript_event(
2911 RealtimeTranscriptEvent::AssistantTurnCompleted {
2912 response_id: "resp_a".to_string(),
2913 stop_reason: StopReason::EndTurn,
2914 usage: Usage::default(),
2915 },
2916 );
2917 assert!(!outcome.is_inert());
2918
2919 assert_eq!(session.messages().len(), 1);
2922 match &session.messages()[0] {
2923 Message::BlockAssistant(assistant) => {
2924 let mut found_transcript = false;
2925 for block in &assistant.blocks {
2926 if let AssistantBlock::Transcript { text, .. } = block {
2927 assert_eq!(text, "complete answer");
2928 found_transcript = true;
2929 }
2930 }
2931 assert!(
2932 found_transcript,
2933 "AssistantTranscriptFinalText must promote to the Spoken lane and \
2934 materialize as AssistantBlock::Transcript"
2935 );
2936 }
2937 other => unreachable!("expected BlockAssistant, got {other:?}"),
2938 }
2939 }
2940
2941 #[test]
2944 fn realtime_transcript_final_text_creates_item_when_no_delta_staged() {
2945 let mut session = Session::new();
2946
2947 assert!(
2948 session
2949 .append_realtime_transcript_event(
2950 RealtimeTranscriptEvent::AssistantTranscriptFinalText {
2951 response_id: "resp_a".to_string(),
2952 item_id: "item_a".to_string(),
2953 content_index: 0,
2954 text: "spoken-final-only".to_string(),
2955 }
2956 )
2957 .is_inert()
2958 );
2959
2960 let outcome = session.append_realtime_transcript_event(
2961 RealtimeTranscriptEvent::AssistantTurnCompleted {
2962 response_id: "resp_a".to_string(),
2963 stop_reason: StopReason::EndTurn,
2964 usage: Usage::default(),
2965 },
2966 );
2967 assert!(!outcome.is_inert());
2968
2969 assert_eq!(session.messages().len(), 1);
2970 match &session.messages()[0] {
2971 Message::BlockAssistant(assistant) => {
2972 let has_transcript = assistant.blocks.iter().any(|b| {
2973 matches!(b, AssistantBlock::Transcript { text, .. } if text == "spoken-final-only")
2974 });
2975 assert!(
2976 has_transcript,
2977 "final-only provider path must materialize as Transcript on the Spoken lane"
2978 );
2979 }
2980 other => unreachable!("expected BlockAssistant, got {other:?}"),
2981 }
2982 }
2983
2984 #[test]
2985 fn realtime_transcript_append_orders_causally_equivalent_out_of_order_items() {
2986 let mut session = Session::new();
2987
2988 assert!(
2989 session
2990 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
2991 response_id: "resp_assistant".to_string(),
2992 delta_id: "evt_delta_1".to_string(),
2993 item_id: "item_assistant".to_string(),
2994 previous_item_id: Some("item_user".to_string()),
2995 content_index: 0,
2996 delta: "answer".to_string(),
2997 })
2998 .is_inert()
2999 );
3000 assert!(
3001 session
3002 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
3003 response_id: "resp_assistant".to_string(),
3004 stop_reason: StopReason::EndTurn,
3005 usage: Usage::default(),
3006 })
3007 .is_inert()
3008 );
3009
3010 let outcome = session.append_realtime_transcript_event(
3011 RealtimeTranscriptEvent::UserTranscriptFinal {
3012 item_id: "item_user".to_string(),
3013 previous_item_id: None,
3014 content_index: 0,
3015 text: "question".to_string(),
3016 },
3017 );
3018
3019 assert_eq!(outcome.materialized_messages.len(), 2);
3020 assert_eq!(session.messages().len(), 2);
3021 assert!(matches!(
3022 &session.messages()[0],
3023 Message::User(user) if user.text_content() == "question"
3024 ));
3025 assert!(matches!(
3026 &session.messages()[1],
3027 Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "answer"
3028 ));
3029 }
3030
3031 #[test]
3032 fn realtime_transcript_replay_of_seen_provider_items_is_inert() {
3033 let mut session = Session::new();
3034 let events = vec![
3035 RealtimeTranscriptEvent::UserTranscriptFinal {
3036 item_id: "item_user".to_string(),
3037 previous_item_id: None,
3038 content_index: 0,
3039 text: "hello".to_string(),
3040 },
3041 RealtimeTranscriptEvent::AssistantTextDelta {
3042 response_id: "resp_assistant".to_string(),
3043 delta_id: "evt_delta_1".to_string(),
3044 item_id: "item_assistant".to_string(),
3045 previous_item_id: Some("item_user".to_string()),
3046 content_index: 0,
3047 delta: "world".to_string(),
3048 },
3049 RealtimeTranscriptEvent::AssistantTurnCompleted {
3050 response_id: "resp_assistant".to_string(),
3051 stop_reason: StopReason::EndTurn,
3052 usage: Usage::default(),
3053 },
3054 ];
3055
3056 for event in events.iter().cloned() {
3057 let _ = session.append_realtime_transcript_event(event);
3058 }
3059 let first_messages = serde_json::to_value(session.messages()).unwrap();
3060
3061 for event in events {
3062 assert!(session.append_realtime_transcript_event(event).is_inert());
3063 }
3064
3065 assert_eq!(
3066 serde_json::to_value(session.messages()).unwrap(),
3067 first_messages
3068 );
3069 }
3070
3071 #[test]
3072 fn realtime_transcript_user_final_replay_cannot_erase_existing_segment() {
3073 let mut session = Session::new();
3074
3075 let user = RealtimeTranscriptEvent::UserTranscriptFinal {
3076 item_id: "item_user".to_string(),
3077 previous_item_id: None,
3078 content_index: 0,
3079 text: "remember amber lantern".to_string(),
3080 };
3081 assert!(
3082 !session
3083 .append_realtime_transcript_event(user.clone())
3084 .is_inert()
3085 );
3086 let first_messages = serde_json::to_value(session.messages()).unwrap();
3087
3088 assert!(
3089 session
3090 .append_realtime_transcript_event(RealtimeTranscriptEvent::UserTranscriptFinal {
3091 item_id: "item_user".to_string(),
3092 previous_item_id: None,
3093 content_index: 0,
3094 text: String::new(),
3095 })
3096 .is_inert()
3097 );
3098 assert!(session.append_realtime_transcript_event(user).is_inert());
3099 assert_eq!(
3100 serde_json::to_value(session.messages()).unwrap(),
3101 first_messages
3102 );
3103 }
3104
3105 #[test]
3106 fn realtime_transcript_empty_user_final_can_be_filled_by_later_nonempty_replay() {
3107 let mut session = Session::new();
3108
3109 assert!(
3110 session
3111 .append_realtime_transcript_event(RealtimeTranscriptEvent::UserTranscriptFinal {
3112 item_id: "item_user".to_string(),
3113 previous_item_id: None,
3114 content_index: 0,
3115 text: String::new(),
3116 })
3117 .is_inert()
3118 );
3119 assert!(session.messages().is_empty());
3120
3121 let outcome = session.append_realtime_transcript_event(
3122 RealtimeTranscriptEvent::UserTranscriptFinal {
3123 item_id: "item_user".to_string(),
3124 previous_item_id: None,
3125 content_index: 0,
3126 text: "remember amber lantern".to_string(),
3127 },
3128 );
3129 assert_eq!(outcome.materialized_messages.len(), 1);
3130 assert_eq!(session.messages().len(), 1);
3131 assert!(matches!(
3132 &session.messages()[0],
3133 Message::User(user) if user.text_content() == "remember amber lantern"
3134 ));
3135 }
3136
3137 #[test]
3138 fn realtime_transcript_skipped_provider_items_preserve_causal_order_without_content() {
3139 let mut session = Session::new();
3140
3141 let assistant_delta = RealtimeTranscriptEvent::AssistantTextDelta {
3142 response_id: "resp_assistant".to_string(),
3143 delta_id: "evt_delta_1".to_string(),
3144 item_id: "item_assistant".to_string(),
3145 previous_item_id: Some("item_tool".to_string()),
3146 content_index: 0,
3147 delta: "done".to_string(),
3148 };
3149 assert!(
3150 session
3151 .append_realtime_transcript_event(assistant_delta.clone())
3152 .is_inert()
3153 );
3154 let assistant_complete = RealtimeTranscriptEvent::AssistantTurnCompleted {
3155 response_id: "resp_assistant".to_string(),
3156 stop_reason: StopReason::EndTurn,
3157 usage: Usage::default(),
3158 };
3159 assert!(
3160 session
3161 .append_realtime_transcript_event(assistant_complete.clone())
3162 .is_inert()
3163 );
3164
3165 let skipped = RealtimeTranscriptEvent::ItemSkipped {
3166 item_id: "item_tool".to_string(),
3167 previous_item_id: Some("item_user".to_string()),
3168 };
3169 assert!(
3170 session
3171 .append_realtime_transcript_event(skipped.clone())
3172 .is_inert(),
3173 "a skipped provider item must not append transcript content"
3174 );
3175 assert!(session.messages().is_empty());
3176
3177 let outcome = session.append_realtime_transcript_event(
3178 RealtimeTranscriptEvent::UserTranscriptFinal {
3179 item_id: "item_user".to_string(),
3180 previous_item_id: None,
3181 content_index: 0,
3182 text: "please use the tool".to_string(),
3183 },
3184 );
3185 assert_eq!(outcome.materialized_messages.len(), 2);
3186 assert_eq!(session.messages().len(), 2);
3187 assert!(matches!(
3188 &session.messages()[0],
3189 Message::User(user) if user.text_content() == "please use the tool"
3190 ));
3191 assert!(matches!(
3192 &session.messages()[1],
3193 Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "done"
3194 ));
3195
3196 let first_messages = serde_json::to_value(session.messages()).unwrap();
3197 assert!(session.append_realtime_transcript_event(skipped).is_inert());
3198 assert!(
3199 session
3200 .append_realtime_transcript_event(assistant_delta)
3201 .is_inert()
3202 );
3203 assert!(
3204 session
3205 .append_realtime_transcript_event(assistant_complete)
3206 .is_inert()
3207 );
3208 assert_eq!(
3209 serde_json::to_value(session.messages()).unwrap(),
3210 first_messages
3211 );
3212 }
3213
3214 #[test]
3215 fn realtime_transcript_interrupted_assistant_item_unblocks_later_provider_items() {
3216 let mut session = Session::new();
3223
3224 let _ = session.append_realtime_transcript_event(
3225 RealtimeTranscriptEvent::UserTranscriptFinal {
3226 item_id: "item_repeat".to_string(),
3227 previous_item_id: None,
3228 content_index: 0,
3229 text: "repeat until stop".to_string(),
3230 },
3231 );
3232 assert!(
3233 session
3234 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
3235 response_id: "resp_loop".to_string(),
3236 delta_id: "evt_loop_1".to_string(),
3237 item_id: "item_loop".to_string(),
3238 previous_item_id: Some("item_repeat".to_string()),
3239 content_index: 0,
3240 delta: "Looping now".to_string(),
3241 })
3242 .is_inert()
3243 );
3244 assert!(
3245 session
3246 .append_realtime_transcript_event(RealtimeTranscriptEvent::UserTranscriptFinal {
3247 item_id: "item_stop".to_string(),
3248 previous_item_id: Some("item_loop".to_string()),
3249 content_index: 0,
3250 text: "Stop.".to_string(),
3251 })
3252 .is_inert(),
3253 "the stop turn waits until the interrupted assistant provider item is resolved"
3254 );
3255
3256 let outcome = session.append_realtime_transcript_event(
3257 RealtimeTranscriptEvent::AssistantTurnInterrupted {
3258 response_id: "resp_loop".to_string(),
3259 },
3260 );
3261
3262 assert_eq!(outcome.materialized_messages.len(), 2);
3265 assert_eq!(session.messages().len(), 3);
3267 assert!(matches!(
3268 &session.messages()[0],
3269 Message::User(user) if user.text_content() == "repeat until stop"
3270 ));
3271 match &session.messages()[1] {
3272 Message::BlockAssistant(assistant) => {
3273 let text = block_assistant_text(assistant);
3274 assert_eq!(text, "Looping now");
3275 }
3276 other => unreachable!(
3277 "Display lane assistant item must be retained on Interrupted, got {other:?}"
3278 ),
3279 }
3280 assert!(matches!(
3281 &session.messages()[2],
3282 Message::User(user) if user.text_content() == "Stop."
3283 ));
3284 }
3285
3286 #[test]
3287 fn realtime_transcript_late_interrupted_assistant_delta_stays_noncanonical() {
3288 let mut session = Session::new();
3289
3290 let _ = session.append_realtime_transcript_event(
3291 RealtimeTranscriptEvent::UserTranscriptFinal {
3292 item_id: "item_repeat".to_string(),
3293 previous_item_id: None,
3294 content_index: 0,
3295 text: "repeat until stop".to_string(),
3296 },
3297 );
3298 assert!(
3299 session
3300 .append_realtime_transcript_event(RealtimeTranscriptEvent::ItemObserved {
3301 item_id: "item_loop".to_string(),
3302 previous_item_id: Some("item_repeat".to_string()),
3303 role: RealtimeTranscriptRole::Assistant,
3304 response_id: None,
3305 })
3306 .is_inert(),
3307 "provider can observe an assistant item before the adapter learns its response id"
3308 );
3309 assert!(
3310 session
3311 .append_realtime_transcript_event(
3312 RealtimeTranscriptEvent::AssistantTurnInterrupted {
3313 response_id: "resp_loop".to_string(),
3314 }
3315 )
3316 .is_inert(),
3317 "an interruption can arrive before delayed transcript deltas for the response"
3318 );
3319 assert!(
3320 session
3321 .append_realtime_transcript_event(RealtimeTranscriptEvent::UserTranscriptFinal {
3322 item_id: "item_stop".to_string(),
3323 previous_item_id: Some("item_loop".to_string()),
3324 content_index: 0,
3325 text: "Stop.".to_string(),
3326 })
3327 .is_inert(),
3328 "the stop turn waits for the provider's interrupted assistant item anchor"
3329 );
3330
3331 let late_delta_outcome =
3332 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
3333 response_id: "resp_loop".to_string(),
3334 delta_id: "evt_loop_late".to_string(),
3335 item_id: "item_loop".to_string(),
3336 previous_item_id: Some("item_repeat".to_string()),
3337 content_index: 0,
3338 delta: "Looping now".to_string(),
3339 });
3340 assert_eq!(late_delta_outcome.materialized_messages.len(), 1);
3341 assert!(matches!(
3342 &session.messages()[1],
3343 Message::User(user) if user.text_content() == "Stop."
3344 ));
3345 assert!(
3346 session
3347 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
3348 response_id: "resp_loop".to_string(),
3349 stop_reason: StopReason::EndTurn,
3350 usage: Usage::default(),
3351 })
3352 .is_inert(),
3353 "late completion for an interrupted response must not resurrect its deltas"
3354 );
3355 assert!(
3356 session
3357 .messages()
3358 .iter()
3359 .filter_map(|message| match message {
3360 Message::BlockAssistant(assistant) => Some(block_assistant_text(assistant)),
3361 _ => None,
3362 })
3363 .all(|text| !text.contains("Looping now")),
3364 "late interrupted assistant text must remain non-canonical"
3365 );
3366 }
3367
3368 #[test]
3369 fn realtime_transcript_completion_only_finalizes_matching_response() {
3370 let mut session = Session::new();
3371
3372 let _ = session.append_realtime_transcript_event(
3373 RealtimeTranscriptEvent::UserTranscriptFinal {
3374 item_id: "item_user".to_string(),
3375 previous_item_id: None,
3376 content_index: 0,
3377 text: "question".to_string(),
3378 },
3379 );
3380 assert!(
3381 session
3382 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
3383 response_id: "resp_a".to_string(),
3384 delta_id: "evt_a".to_string(),
3385 item_id: "item_a".to_string(),
3386 previous_item_id: Some("item_user".to_string()),
3387 content_index: 0,
3388 delta: "answer a".to_string(),
3389 })
3390 .is_inert()
3391 );
3392
3393 assert!(
3394 session
3395 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
3396 response_id: "resp_b".to_string(),
3397 stop_reason: StopReason::EndTurn,
3398 usage: Usage::default(),
3399 })
3400 .is_inert(),
3401 "a completion for another response must not finalize buffered assistant text"
3402 );
3403 assert_eq!(session.messages().len(), 1);
3404
3405 let outcome = session.append_realtime_transcript_event(
3406 RealtimeTranscriptEvent::AssistantTurnCompleted {
3407 response_id: "resp_a".to_string(),
3408 stop_reason: StopReason::EndTurn,
3409 usage: Usage::default(),
3410 },
3411 );
3412 assert_eq!(outcome.materialized_messages.len(), 1);
3413 assert_eq!(session.messages().len(), 2);
3414 assert!(matches!(
3415 &session.messages()[1],
3416 Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "answer a"
3417 ));
3418 }
3419
3420 #[test]
3421 fn realtime_transcript_completion_before_later_delta_is_response_scoped() {
3422 let mut session = Session::new();
3423
3424 let _ = session.append_realtime_transcript_event(
3425 RealtimeTranscriptEvent::UserTranscriptFinal {
3426 item_id: "item_user".to_string(),
3427 previous_item_id: None,
3428 content_index: 0,
3429 text: "question".to_string(),
3430 },
3431 );
3432 assert!(
3433 session
3434 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
3435 response_id: "resp_a".to_string(),
3436 stop_reason: StopReason::EndTurn,
3437 usage: Usage::default(),
3438 })
3439 .is_inert()
3440 );
3441 assert!(
3442 session
3443 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
3444 response_id: "resp_b".to_string(),
3445 delta_id: "evt_b".to_string(),
3446 item_id: "item_b".to_string(),
3447 previous_item_id: Some("item_user".to_string()),
3448 content_index: 0,
3449 delta: "wrong response".to_string(),
3450 })
3451 .is_inert(),
3452 "a later delta for another response must not be finalized by resp_a's pending completion"
3453 );
3454
3455 let outcome =
3456 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
3457 response_id: "resp_a".to_string(),
3458 delta_id: "evt_a".to_string(),
3459 item_id: "item_a".to_string(),
3460 previous_item_id: Some("item_user".to_string()),
3461 content_index: 0,
3462 delta: "right response".to_string(),
3463 });
3464
3465 assert_eq!(outcome.materialized_messages.len(), 1);
3466 assert_eq!(session.messages().len(), 2);
3467 assert!(matches!(
3468 &session.messages()[1],
3469 Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "right response"
3470 ));
3471 }
3472
3473 #[test]
3474 fn realtime_transcript_late_duplicate_completion_cannot_finalize_unrelated_response() {
3475 let mut session = Session::new();
3476
3477 let _ = session.append_realtime_transcript_event(
3478 RealtimeTranscriptEvent::UserTranscriptFinal {
3479 item_id: "item_user".to_string(),
3480 previous_item_id: None,
3481 content_index: 0,
3482 text: "question".to_string(),
3483 },
3484 );
3485 let _ =
3486 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
3487 response_id: "resp_a".to_string(),
3488 delta_id: "evt_a".to_string(),
3489 item_id: "item_a".to_string(),
3490 previous_item_id: Some("item_user".to_string()),
3491 content_index: 0,
3492 delta: "first".to_string(),
3493 });
3494 let _ = session.append_realtime_transcript_event(
3495 RealtimeTranscriptEvent::AssistantTurnCompleted {
3496 response_id: "resp_a".to_string(),
3497 stop_reason: StopReason::EndTurn,
3498 usage: Usage::default(),
3499 },
3500 );
3501 assert_eq!(session.messages().len(), 2);
3502
3503 assert!(
3504 session
3505 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
3506 response_id: "resp_b".to_string(),
3507 delta_id: "evt_b".to_string(),
3508 item_id: "item_b".to_string(),
3509 previous_item_id: Some("item_a".to_string()),
3510 content_index: 0,
3511 delta: "second".to_string(),
3512 })
3513 .is_inert()
3514 );
3515 assert!(
3516 session
3517 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
3518 response_id: "resp_a".to_string(),
3519 stop_reason: StopReason::EndTurn,
3520 usage: Usage::default(),
3521 })
3522 .is_inert(),
3523 "a duplicate late terminal for resp_a must not finalize resp_b"
3524 );
3525 assert_eq!(session.messages().len(), 2);
3526
3527 let outcome = session.append_realtime_transcript_event(
3528 RealtimeTranscriptEvent::AssistantTurnCompleted {
3529 response_id: "resp_b".to_string(),
3530 stop_reason: StopReason::EndTurn,
3531 usage: Usage::default(),
3532 },
3533 );
3534 assert_eq!(outcome.materialized_messages.len(), 1);
3535 assert_eq!(session.messages().len(), 3);
3536 }
3537
3538 #[test]
3539 fn realtime_transcript_interruption_discards_only_matching_response() {
3540 let mut session = Session::new();
3546
3547 let _ = session.append_realtime_transcript_event(
3548 RealtimeTranscriptEvent::UserTranscriptFinal {
3549 item_id: "item_user".to_string(),
3550 previous_item_id: None,
3551 content_index: 0,
3552 text: "question".to_string(),
3553 },
3554 );
3555 let _ =
3556 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
3557 response_id: "resp_a".to_string(),
3558 delta_id: "evt_a".to_string(),
3559 item_id: "item_a".to_string(),
3560 previous_item_id: Some("item_user".to_string()),
3561 content_index: 0,
3562 delta: "interrupted display".to_string(),
3563 });
3564 let _ =
3565 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
3566 response_id: "resp_b".to_string(),
3567 delta_id: "evt_b".to_string(),
3568 item_id: "item_b".to_string(),
3569 previous_item_id: Some("item_user".to_string()),
3570 content_index: 0,
3571 delta: "keep me".to_string(),
3572 });
3573
3574 let interrupt_outcome = session.append_realtime_transcript_event(
3577 RealtimeTranscriptEvent::AssistantTurnInterrupted {
3578 response_id: "resp_a".to_string(),
3579 },
3580 );
3581 assert_eq!(
3582 interrupt_outcome.materialized_messages.len(),
3583 1,
3584 "resp_a's Display item commits on Interrupted"
3585 );
3586
3587 let outcome = session.append_realtime_transcript_event(
3588 RealtimeTranscriptEvent::AssistantTurnCompleted {
3589 response_id: "resp_b".to_string(),
3590 stop_reason: StopReason::EndTurn,
3591 usage: Usage::default(),
3592 },
3593 );
3594 assert_eq!(
3595 outcome.materialized_messages.len(),
3596 1,
3597 "resp_b commits on its TurnCompleted, untouched by resp_a's Interrupted"
3598 );
3599
3600 assert_eq!(session.messages().len(), 3);
3602 assert!(matches!(
3603 &session.messages()[1],
3604 Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "interrupted display"
3605 ));
3606 assert!(matches!(
3607 &session.messages()[2],
3608 Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "keep me"
3609 ));
3610 }
3611
3612 #[test]
3615 fn test_fork_shares_arc_no_clone() {
3616 let mut session = Session::new();
3617 for i in 0..100 {
3618 session.push(Message::User(UserMessage::text(format!("Message {i}"))));
3619 }
3620
3621 let forked = session.fork();
3623
3624 assert!(Arc::ptr_eq(&session.messages, &forked.messages));
3626 assert_eq!(forked.messages().len(), 100);
3627 }
3628
3629 #[test]
3630 fn test_fork_at_shares_arc_prefix() {
3631 let mut session = Session::new();
3632 for i in 0..100 {
3633 session.push(Message::User(UserMessage::text(format!("Message {i}"))));
3634 }
3635
3636 let forked = session.fork_at(50);
3638 assert_eq!(forked.messages().len(), 50);
3639
3640 assert_eq!(session.messages().len(), 100);
3642 }
3643
3644 #[test]
3645 fn test_push_cow_behavior() {
3646 let mut session = Session::new();
3647 session.push(Message::User(UserMessage::text("First".to_string())));
3648
3649 let forked = session.fork();
3651 assert!(Arc::ptr_eq(&session.messages, &forked.messages));
3652
3653 session.push(Message::User(UserMessage::text("Second".to_string())));
3655
3656 assert!(!Arc::ptr_eq(&session.messages, &forked.messages));
3658 assert_eq!(session.messages().len(), 2);
3659 assert_eq!(forked.messages().len(), 1);
3660 }
3661
3662 #[test]
3665 fn test_push_batch_single_timestamp() {
3666 let mut session = Session::new();
3667 let initial_updated = session.updated_at();
3668
3669 session.push_batch(vec![
3671 Message::User(UserMessage::text("First".to_string())),
3672 Message::User(UserMessage::text("Second".to_string())),
3673 Message::User(UserMessage::text("Third".to_string())),
3674 ]);
3675
3676 assert_eq!(session.messages().len(), 3);
3677 assert!(session.updated_at() >= initial_updated);
3679 }
3680
3681 #[test]
3682 fn test_touch_updates_timestamp() {
3683 let mut session = Session::new();
3684 let initial = session.updated_at();
3685
3686 std::thread::sleep(std::time::Duration::from_millis(10));
3687
3688 session.touch();
3690
3691 assert!(session.updated_at() > initial);
3692 }
3693
3694 #[test]
3695 fn test_session_push() {
3696 let mut session = Session::new();
3697 let initial_updated = session.updated_at();
3698
3699 std::thread::sleep(std::time::Duration::from_millis(10));
3701
3702 session.push(Message::User(UserMessage::text("Hello".to_string())));
3703
3704 assert_eq!(session.messages().len(), 1);
3705 assert!(session.updated_at() > initial_updated);
3706 }
3707
3708 #[test]
3709 fn test_session_fork() {
3710 let mut session = Session::new();
3711 session.push(Message::System(SystemMessage::new("System prompt")));
3712 session.push(Message::User(UserMessage::text("Hello".to_string())));
3713 session.push(Message::Assistant(AssistantMessage {
3714 content: "Hi!".to_string(),
3715 tool_calls: vec![],
3716 stop_reason: StopReason::EndTurn,
3717 usage: Usage::default(),
3718 created_at: crate::types::message_timestamp_now(),
3719 }));
3720
3721 let forked = session.fork_at(2);
3723 assert_eq!(forked.messages().len(), 2);
3724 assert_ne!(forked.id(), session.id());
3725
3726 let full_fork = session.fork();
3728 assert_eq!(full_fork.messages().len(), 3);
3729 }
3730
3731 #[test]
3732 fn test_session_metadata() {
3733 let mut session = Session::new();
3734 session.set_metadata("key", serde_json::json!("value"));
3735
3736 assert_eq!(session.metadata().get("key").unwrap(), "value");
3737 }
3738
3739 #[test]
3740 fn test_session_metadata_backfill_preserves_timestamp() {
3741 let mut session = Session::new();
3742 let initial_updated = session.updated_at();
3743
3744 std::thread::sleep(std::time::Duration::from_millis(10));
3745
3746 assert!(session.backfill_metadata_if_absent("key", serde_json::json!("value")));
3747 assert_eq!(session.metadata().get("key").unwrap(), "value");
3748 assert_eq!(session.updated_at(), initial_updated);
3749 assert!(!session.backfill_metadata_if_absent("key", serde_json::json!("other")));
3750 assert_eq!(session.metadata().get("key").unwrap(), "value");
3751 assert_eq!(session.updated_at(), initial_updated);
3752 }
3753
3754 #[test]
3755 fn test_session_mob_tool_authority_context_roundtrip() {
3756 let mut session = Session::new();
3757 let authority = MobToolAuthorityContext::new(
3758 crate::service::OpaquePrincipalToken::new("opaque-principal"),
3759 false,
3760 )
3761 .with_managed_mob_scope(["mob-a"])
3762 .with_audit_invocation_id("audit-1");
3763
3764 session
3765 .set_mob_tool_authority_context(Some(authority.clone()))
3766 .expect("authority should serialize");
3767 assert_eq!(session.mob_tool_authority_context(), Some(authority));
3768
3769 session
3770 .set_mob_tool_authority_context(None)
3771 .expect("authority should clear");
3772 assert!(session.mob_tool_authority_context().is_none());
3773 }
3774
3775 #[test]
3776 fn test_session_tool_visibility_state_roundtrip() {
3777 let mut session = Session::new();
3778 let state = SessionToolVisibilityState {
3779 inherited_base_filter: ToolFilter::Allow(["visible".to_string()].into_iter().collect()),
3780 active_filter: ToolFilter::Allow(
3781 ["visible".to_string(), "missing".to_string()]
3782 .into_iter()
3783 .collect(),
3784 ),
3785 staged_filter: ToolFilter::Allow(
3786 ["visible".to_string(), "missing".to_string()]
3787 .into_iter()
3788 .collect(),
3789 ),
3790 active_revision: 1,
3791 staged_revision: 2,
3792 ..Default::default()
3793 };
3794
3795 session
3796 .set_tool_visibility_state(state.clone())
3797 .expect("tool visibility state should serialize");
3798 assert_eq!(session.tool_visibility_state().unwrap(), Some(state));
3799 }
3800
3801 #[test]
3802 fn test_session_tool_visibility_state_malformed_returns_error() {
3803 let mut session = Session::new();
3804 session.set_metadata(
3805 SESSION_TOOL_VISIBILITY_STATE_KEY,
3806 serde_json::json!({
3807 "active_filter": {
3808 "unexpected_filter_kind": ["secret"]
3809 }
3810 }),
3811 );
3812
3813 assert!(
3814 session.tool_visibility_state().is_err(),
3815 "malformed canonical visibility metadata must not decode as absent/default"
3816 );
3817 }
3818
3819 #[test]
3820 fn test_session_serialization() {
3821 let mut session = Session::new();
3822 session.push(Message::User(UserMessage::text("Test".to_string())));
3823
3824 let json = serde_json::to_string(&session).unwrap();
3825 let parsed: Session = serde_json::from_str(&json).unwrap();
3826
3827 assert_eq!(parsed.id(), session.id());
3828 assert_eq!(parsed.messages().len(), 1);
3829 assert_eq!(parsed.version(), SESSION_VERSION);
3830 }
3831
3832 #[test]
3833 fn test_session_meta_from_session() {
3834 let mut session = Session::new();
3835 session.push(Message::User(UserMessage::text("Hello".to_string())));
3836 session.push(Message::Assistant(AssistantMessage {
3837 content: "Hi!".to_string(),
3838 tool_calls: vec![],
3839 stop_reason: StopReason::EndTurn,
3840 usage: Usage {
3841 input_tokens: 10,
3842 output_tokens: 5,
3843 cache_creation_tokens: None,
3844 cache_read_tokens: None,
3845 },
3846 created_at: crate::types::message_timestamp_now(),
3847 }));
3848 session.record_usage(Usage {
3849 input_tokens: 10,
3850 output_tokens: 5,
3851 cache_creation_tokens: None,
3852 cache_read_tokens: None,
3853 });
3854
3855 let meta = SessionMeta::from(&session);
3856 assert_eq!(meta.id, *session.id());
3857 assert_eq!(meta.message_count, 2);
3858 assert_eq!(meta.total_tokens, 15);
3859 }
3860
3861 #[test]
3862 fn has_pending_boundary_empty_session() {
3863 let session = Session::new();
3864 assert!(!session.has_pending_boundary());
3865 }
3866
3867 #[test]
3868 fn has_pending_boundary_after_user_message() {
3869 let mut session = Session::new();
3870 session.push(Message::User(UserMessage::text("hello")));
3871 assert!(session.has_pending_boundary());
3872 }
3873
3874 #[test]
3875 fn has_pending_boundary_after_assistant_message() {
3876 let mut session = Session::new();
3877 session.push(Message::User(UserMessage::text("hello")));
3878 session.push(Message::BlockAssistant(BlockAssistantMessage::new(
3879 vec![],
3880 StopReason::EndTurn,
3881 )));
3882 assert!(!session.has_pending_boundary());
3883 }
3884
3885 #[test]
3886 fn has_pending_boundary_after_tool_results() {
3887 let mut session = Session::new();
3888 session.push(Message::User(UserMessage::text("hello")));
3889 session.push(Message::tool_results(vec![]));
3890 assert!(session.has_pending_boundary());
3891 }
3892
3893 #[test]
3894 fn has_pending_boundary_after_system() {
3895 let mut session = Session::new();
3896 session.push(Message::System(SystemMessage::new("system")));
3897 assert!(!session.has_pending_boundary());
3898 }
3899
3900 #[test]
3901 fn system_context_state_preserves_applied_runtime_context() {
3902 let accepted_at = SystemTime::UNIX_EPOCH;
3903 let mut state = SessionSystemContextState::default();
3904 state
3905 .stage_append(
3906 &AppendSystemContextRequest {
3907 text: "Authoritative peer token is birch seventeen.".to_string(),
3908 source: Some(
3909 "peer_response_terminal:analyst:018f6f79-7a82-7c4e-a552-a3b86f9630f1"
3910 .to_string(),
3911 ),
3912 idempotency_key: Some("018f6f79-7a82-7c4e-a552-a3b86f9630f1".to_string()),
3913 },
3914 accepted_at,
3915 )
3916 .expect("append should stage");
3917
3918 state.mark_pending_applied();
3919
3920 assert!(state.pending.is_empty());
3921 assert_eq!(state.applied.len(), 1);
3922 assert_eq!(
3923 state.applied[0].text,
3924 "Authoritative peer token is birch seventeen."
3925 );
3926 assert_eq!(
3927 state.applied[0].source.as_deref(),
3928 Some("peer_response_terminal:analyst:018f6f79-7a82-7c4e-a552-a3b86f9630f1")
3929 );
3930
3931 let round_tripped: SessionSystemContextState =
3932 serde_json::from_value(serde_json::to_value(&state).expect("serialize state"))
3933 .expect("deserialize state");
3934 assert_eq!(round_tripped.applied, state.applied);
3935 }
3936
3937 #[test]
3938 fn append_system_context_blocks_records_typed_applied_context() {
3939 let append = PendingSystemContextAppend {
3940 text: "Authoritative peer token is birch seventeen.".to_string(),
3941 source: Some(
3942 "peer_response_terminal:analyst:018f6f79-7a82-7c4e-a552-a3b86f9630f1".to_string(),
3943 ),
3944 idempotency_key: Some("018f6f79-7a82-7c4e-a552-a3b86f9630f1".to_string()),
3945 accepted_at: SystemTime::UNIX_EPOCH,
3946 };
3947 let mut session = Session::new();
3948
3949 session.append_system_context_blocks(std::slice::from_ref(&append));
3950
3951 let state = session
3952 .system_context_state()
3953 .expect("append should persist typed context state");
3954 assert_eq!(state.applied, vec![append]);
3955 }
3956
3957 #[test]
3958 fn append_system_context_blocks_renders_pre_marked_pending_context() {
3959 let accepted_at = SystemTime::UNIX_EPOCH;
3960 let mut state = SessionSystemContextState::default();
3961 state
3962 .stage_append(
3963 &AppendSystemContextRequest {
3964 text: "Apply this staged context at the request boundary.".to_string(),
3965 source: Some("rpc/session_inject_context".to_string()),
3966 idempotency_key: Some("ctx-boundary".to_string()),
3967 },
3968 accepted_at,
3969 )
3970 .expect("append should stage");
3971 let pending = state.pending.clone();
3972 state.mark_pending_applied();
3973 let mut session = Session::new();
3974 session
3975 .set_system_context_state(state)
3976 .expect("state should serialize");
3977
3978 session.append_system_context_blocks(&pending);
3979
3980 let system_prompt = session
3981 .messages()
3982 .first()
3983 .and_then(|message| match message {
3984 Message::System(system) => Some(system.content.as_str()),
3985 _ => None,
3986 })
3987 .unwrap_or_default();
3988 assert!(system_prompt.contains("Apply this staged context at the request boundary."));
3989 let state = session
3990 .system_context_state()
3991 .expect("append should persist typed context state");
3992 assert_eq!(state.applied.len(), 1);
3993 assert_eq!(
3994 state.seen["ctx-boundary"].state,
3995 SeenSystemContextState::Applied
3996 );
3997 }
3998
3999 #[test]
4000 fn append_system_context_blocks_renders_pre_marked_context_without_idempotency_key() {
4001 let accepted_at = SystemTime::UNIX_EPOCH;
4002 let mut state = SessionSystemContextState::default();
4003 state
4004 .stage_append(
4005 &AppendSystemContextRequest {
4006 text: "Apply this unkeyed staged context at the request boundary.".to_string(),
4007 source: Some("rpc/session_inject_context".to_string()),
4008 idempotency_key: None,
4009 },
4010 accepted_at,
4011 )
4012 .expect("append should stage");
4013 let pending = state.pending.clone();
4014 state.mark_pending_applied();
4015 let mut session = Session::new();
4016 session
4017 .set_system_context_state(state)
4018 .expect("state should serialize");
4019
4020 session.append_system_context_blocks(&pending);
4021
4022 let system_prompt = session
4023 .messages()
4024 .first()
4025 .and_then(|message| match message {
4026 Message::System(system) => Some(system.content.as_str()),
4027 _ => None,
4028 })
4029 .unwrap_or_default();
4030 assert!(
4031 system_prompt.contains("Apply this unkeyed staged context at the request boundary.")
4032 );
4033 }
4034
4035 #[test]
4036 fn append_system_context_blocks_skips_duplicate_idempotency_key() {
4037 let first = PendingSystemContextAppend {
4038 text: "Authoritative peer token is birch seventeen.".to_string(),
4039 source: Some("peer_response_terminal:analyst:req-1".to_string()),
4040 idempotency_key: Some("req-1".to_string()),
4041 accepted_at: SystemTime::UNIX_EPOCH,
4042 };
4043 let duplicate = PendingSystemContextAppend {
4044 accepted_at: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1),
4045 ..first.clone()
4046 };
4047 let mut session = Session::new();
4048
4049 session.append_system_context_blocks(std::slice::from_ref(&first));
4050 session.append_system_context_blocks(std::slice::from_ref(&duplicate));
4051
4052 let state = session
4053 .system_context_state()
4054 .expect("append should persist typed context state");
4055 assert_eq!(state.applied, vec![first]);
4056 let system_prompt = session
4057 .messages()
4058 .first()
4059 .and_then(|message| match message {
4060 Message::System(system) => Some(system.content.as_str()),
4061 _ => None,
4062 })
4063 .unwrap_or_default();
4064 assert_eq!(
4065 system_prompt
4066 .matches("Authoritative peer token is birch seventeen.")
4067 .count(),
4068 1
4069 );
4070 }
4071
4072 #[test]
4073 fn append_system_context_blocks_skips_conflicting_duplicate_idempotency_key() {
4074 let first = PendingSystemContextAppend {
4075 text: "Authoritative peer token is birch seventeen.".to_string(),
4076 source: Some("peer_response_terminal:analyst:req-1".to_string()),
4077 idempotency_key: Some("req-1".to_string()),
4078 accepted_at: SystemTime::UNIX_EPOCH,
4079 };
4080 let conflicting = PendingSystemContextAppend {
4081 text: "Conflicting peer token should not reach the prompt.".to_string(),
4082 accepted_at: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1),
4083 ..first.clone()
4084 };
4085 let mut session = Session::new();
4086
4087 session.append_system_context_blocks(std::slice::from_ref(&first));
4088 session.append_system_context_blocks(std::slice::from_ref(&conflicting));
4089
4090 let state = session
4091 .system_context_state()
4092 .expect("append should persist typed context state");
4093 assert_eq!(state.applied, vec![first]);
4094 let system_prompt = session
4095 .messages()
4096 .first()
4097 .and_then(|message| match message {
4098 Message::System(system) => Some(system.content.as_str()),
4099 _ => None,
4100 })
4101 .unwrap_or_default();
4102 assert!(system_prompt.contains("Authoritative peer token is birch seventeen."));
4103 assert!(!system_prompt.contains("Conflicting peer token should not reach the prompt."));
4104 }
4105
4106 #[test]
4118 fn realtime_transcript_assistant_transcript_delta_materializes_transcript_block() {
4119 let mut session = Session::new();
4120
4121 let delta = RealtimeTranscriptEvent::AssistantTranscriptDelta {
4122 response_id: "resp_spoken".to_string(),
4123 delta_id: "evt_delta_spoken_1".to_string(),
4124 item_id: "item_spoken".to_string(),
4125 previous_item_id: None,
4126 content_index: 0,
4127 delta: "I said hi".to_string(),
4128 };
4129 assert!(
4130 session.append_realtime_transcript_event(delta).is_inert(),
4131 "delta alone is inert until turn-completed flushes"
4132 );
4133
4134 let terminal = RealtimeTranscriptEvent::AssistantTurnCompleted {
4135 response_id: "resp_spoken".to_string(),
4136 stop_reason: StopReason::EndTurn,
4137 usage: Usage::default(),
4138 };
4139 let outcome = session.append_realtime_transcript_event(terminal);
4140 assert_eq!(outcome.materialized_messages.len(), 1);
4141
4142 let messages = session.messages();
4144 assert_eq!(messages.len(), 1);
4145 match &messages[0] {
4146 Message::BlockAssistant(assistant) => {
4147 assert_eq!(assistant.blocks.len(), 1);
4148 match &assistant.blocks[0] {
4149 AssistantBlock::Transcript { text, source, .. } => {
4150 assert_eq!(text, "I said hi");
4151 assert_eq!(*source, crate::types::TranscriptSource::Spoken);
4152 }
4153 other => unreachable!(
4154 "AssistantTranscriptDelta must materialize as AssistantBlock::Transcript, got {other:?}"
4155 ),
4156 }
4157 }
4158 other => unreachable!("expected BlockAssistant message, got {other:?}"),
4159 }
4160 }
4161
4162 #[test]
4163 fn round4_cc4_in_flight_response_ids_lists_distinct_unmaterialized_responses() {
4164 let mut session = Session::new();
4170
4171 for (i, response_id) in [
4175 ("resp_a", "resp_a"),
4176 ("resp_a_extra", "resp_a"),
4177 ("resp_b", "resp_b"),
4178 ("resp_c", "resp_c"),
4179 ]
4180 .iter()
4181 .enumerate()
4182 {
4183 let event = RealtimeTranscriptEvent::AssistantTranscriptDelta {
4184 response_id: response_id.1.to_string(),
4185 delta_id: format!("delta_{i}"),
4186 item_id: response_id.0.to_string(),
4187 previous_item_id: None,
4188 content_index: 0,
4189 delta: "x".to_string(),
4190 };
4191 let _ = session.append_realtime_transcript_event(event);
4192 }
4193
4194 let _ = session.append_realtime_transcript_event(
4196 RealtimeTranscriptEvent::AssistantTurnInterrupted {
4197 response_id: "resp_c".to_string(),
4198 },
4199 );
4200
4201 let _ = session.append_realtime_transcript_event(
4204 RealtimeTranscriptEvent::UserTranscriptFinal {
4205 item_id: "u_item".to_string(),
4206 previous_item_id: None,
4207 content_index: 0,
4208 text: "hi".to_string(),
4209 },
4210 );
4211
4212 let in_flight = session.in_flight_realtime_assistant_response_ids();
4213 assert!(in_flight.contains(&"resp_a".to_string()), "{in_flight:?}");
4214 assert!(in_flight.contains(&"resp_b".to_string()), "{in_flight:?}");
4215 assert!(
4216 !in_flight.contains(&"resp_c".to_string()),
4217 "discarded response must not appear in in_flight: {in_flight:?}"
4218 );
4219 assert_eq!(
4221 in_flight.iter().filter(|r| *r == "resp_a").count(),
4222 1,
4223 "distinct response_ids only: {in_flight:?}"
4224 );
4225 }
4226
4227 #[test]
4228 fn round4_cc2_assistant_turn_completed_after_transcript_deltas_materializes_transcript() {
4229 let mut session = Session::new();
4236
4237 let delta = RealtimeTranscriptEvent::AssistantTranscriptDelta {
4238 response_id: "resp_cc2".to_string(),
4239 delta_id: "delta_cc2_1".to_string(),
4240 item_id: "item_cc2".to_string(),
4241 previous_item_id: None,
4242 content_index: 0,
4243 delta: "hello world".to_string(),
4244 };
4245 assert!(session.append_realtime_transcript_event(delta).is_inert());
4246
4247 assert_eq!(
4249 session.in_flight_realtime_assistant_response_ids(),
4250 vec!["resp_cc2".to_string()]
4251 );
4252
4253 let outcome = session.append_realtime_transcript_event(
4254 RealtimeTranscriptEvent::AssistantTurnCompleted {
4255 response_id: "resp_cc2".to_string(),
4256 stop_reason: StopReason::EndTurn,
4257 usage: Usage::default(),
4258 },
4259 );
4260 assert_eq!(outcome.materialized_messages.len(), 1);
4261
4262 assert!(
4264 session
4265 .in_flight_realtime_assistant_response_ids()
4266 .is_empty(),
4267 "materialized items must not appear in in_flight_realtime_assistant_response_ids"
4268 );
4269
4270 let messages = session.messages();
4271 let assistant = messages.iter().find_map(|m| match m {
4272 Message::BlockAssistant(a) => Some(a),
4273 _ => None,
4274 });
4275 let assistant = assistant.expect("assistant block message expected");
4276 assert_eq!(assistant.blocks.len(), 1);
4277 assert!(matches!(
4278 &assistant.blocks[0],
4279 AssistantBlock::Transcript {
4280 source: crate::types::TranscriptSource::Spoken,
4281 ..
4282 }
4283 ));
4284 }
4285
4286 #[test]
4287 fn realtime_transcript_assistant_text_delta_still_materializes_text_block() {
4288 let mut session = Session::new();
4292
4293 let delta = RealtimeTranscriptEvent::AssistantTextDelta {
4294 response_id: "resp_display".to_string(),
4295 delta_id: "evt_delta_display_1".to_string(),
4296 item_id: "item_display".to_string(),
4297 previous_item_id: None,
4298 content_index: 0,
4299 delta: "I wrote".to_string(),
4300 };
4301 let _ = session.append_realtime_transcript_event(delta);
4302
4303 let terminal = RealtimeTranscriptEvent::AssistantTurnCompleted {
4304 response_id: "resp_display".to_string(),
4305 stop_reason: StopReason::EndTurn,
4306 usage: Usage::default(),
4307 };
4308 let outcome = session.append_realtime_transcript_event(terminal);
4309 assert_eq!(outcome.materialized_messages.len(), 1);
4310
4311 let messages = session.messages();
4312 match &messages[0] {
4313 Message::BlockAssistant(assistant) => match &assistant.blocks[0] {
4314 AssistantBlock::Text { text, .. } => assert_eq!(text, "I wrote"),
4315 other => unreachable!(
4316 "AssistantTextDelta must keep materializing AssistantBlock::Text, got {other:?}"
4317 ),
4318 },
4319 other => unreachable!("expected BlockAssistant message, got {other:?}"),
4320 }
4321 }
4322
4323 #[test]
4324 fn round4_cc7_mixed_response_persists_text_and_transcript_in_order() {
4325 let mut session = Session::new();
4343
4344 let display_a = RealtimeTranscriptEvent::AssistantTextDelta {
4346 response_id: "resp_mixed_1".to_string(),
4347 delta_id: "delta_disp_1".to_string(),
4348 item_id: "item_display".to_string(),
4349 previous_item_id: None,
4350 content_index: 0,
4351 delta: "Here's the report:".to_string(),
4352 };
4353 assert!(
4354 session
4355 .append_realtime_transcript_event(display_a)
4356 .is_inert()
4357 );
4358
4359 let display_b = RealtimeTranscriptEvent::AssistantTextDelta {
4360 response_id: "resp_mixed_1".to_string(),
4361 delta_id: "delta_disp_2".to_string(),
4362 item_id: "item_display".to_string(),
4363 previous_item_id: None,
4364 content_index: 0,
4365 delta: " (still writing)".to_string(),
4366 };
4367 assert!(
4368 session
4369 .append_realtime_transcript_event(display_b)
4370 .is_inert()
4371 );
4372
4373 let spoken_a = RealtimeTranscriptEvent::AssistantTranscriptDelta {
4378 response_id: "resp_mixed_1".to_string(),
4379 delta_id: "delta_spoken_1".to_string(),
4380 item_id: "item_spoken".to_string(),
4381 previous_item_id: Some("item_display".to_string()),
4382 content_index: 0,
4383 delta: "I'm reading the report aloud:".to_string(),
4384 };
4385 assert!(
4386 session
4387 .append_realtime_transcript_event(spoken_a)
4388 .is_inert()
4389 );
4390
4391 let spoken_b = RealtimeTranscriptEvent::AssistantTranscriptDelta {
4392 response_id: "resp_mixed_1".to_string(),
4393 delta_id: "delta_spoken_2".to_string(),
4394 item_id: "item_spoken".to_string(),
4395 previous_item_id: Some("item_display".to_string()),
4396 content_index: 0,
4397 delta: " sentence two.".to_string(),
4398 };
4399 assert!(
4400 session
4401 .append_realtime_transcript_event(spoken_b)
4402 .is_inert()
4403 );
4404
4405 let outcome = session.append_realtime_transcript_event(
4408 RealtimeTranscriptEvent::AssistantTurnCompleted {
4409 response_id: "resp_mixed_1".to_string(),
4410 stop_reason: StopReason::EndTurn,
4411 usage: Usage {
4412 input_tokens: 11,
4413 output_tokens: 22,
4414 cache_creation_tokens: None,
4415 cache_read_tokens: None,
4416 },
4417 },
4418 );
4419 assert_eq!(outcome.materialized_messages.len(), 2);
4421
4422 let messages = session.messages();
4425 let assistants: Vec<&BlockAssistantMessage> = messages
4426 .iter()
4427 .filter_map(|m| match m {
4428 Message::BlockAssistant(a) => Some(a),
4429 _ => None,
4430 })
4431 .collect();
4432 assert_eq!(
4433 assistants.len(),
4434 1,
4435 "mixed display+spoken response under one response_id must produce exactly ONE BlockAssistant message, got: {assistants:?}"
4436 );
4437 let assistant = assistants[0];
4438 assert_eq!(
4439 assistant.blocks.len(),
4440 2,
4441 "mixed response message must carry both blocks: {:?}",
4442 assistant.blocks
4443 );
4444
4445 match &assistant.blocks[0] {
4447 AssistantBlock::Text { text, .. } => {
4448 assert_eq!(text, "Here's the report: (still writing)");
4449 }
4450 other => unreachable!(
4451 "first block must be AssistantBlock::Text (display lane), got {other:?}"
4452 ),
4453 }
4454 match &assistant.blocks[1] {
4456 AssistantBlock::Transcript { text, source, .. } => {
4457 assert_eq!(text, "I'm reading the report aloud: sentence two.");
4458 assert_eq!(*source, crate::types::TranscriptSource::Spoken);
4459 }
4460 other => unreachable!(
4461 "second block must be AssistantBlock::Transcript {{ source: Spoken }}, got {other:?}"
4462 ),
4463 }
4464
4465 assert_eq!(session.usage.input_tokens, 11);
4467 assert_eq!(session.usage.output_tokens, 22);
4468 }
4469
4470 #[test]
4471 fn round5_r55_mixed_response_barge_in_preserves_display_drops_spoken() {
4472 let mut session = Session::new();
4488
4489 let display = RealtimeTranscriptEvent::AssistantTextDelta {
4490 response_id: "resp_mixed_2".to_string(),
4491 delta_id: "delta_disp_1".to_string(),
4492 item_id: "item_display_2".to_string(),
4493 previous_item_id: None,
4494 content_index: 0,
4495 delta: "Working on the report...".to_string(),
4496 };
4497 let _ = session.append_realtime_transcript_event(display);
4498
4499 let spoken = RealtimeTranscriptEvent::AssistantTranscriptDelta {
4500 response_id: "resp_mixed_2".to_string(),
4501 delta_id: "delta_spoken_1".to_string(),
4502 item_id: "item_spoken_2".to_string(),
4503 previous_item_id: Some("item_display_2".to_string()),
4504 content_index: 0,
4505 delta: "I'm reading the report".to_string(),
4506 };
4507 let _ = session.append_realtime_transcript_event(spoken);
4508
4509 let outcome = session.append_realtime_transcript_event(
4513 RealtimeTranscriptEvent::AssistantTurnInterrupted {
4514 response_id: "resp_mixed_2".to_string(),
4515 },
4516 );
4517 assert_eq!(
4518 outcome.materialized_messages.len(),
4519 1,
4520 "Display lane item must materialize on Interrupted: {outcome:?}"
4521 );
4522
4523 let late_completion = session.append_realtime_transcript_event(
4527 RealtimeTranscriptEvent::AssistantTurnCompleted {
4528 response_id: "resp_mixed_2".to_string(),
4529 stop_reason: StopReason::Cancelled,
4530 usage: Usage::default(),
4531 },
4532 );
4533 assert_eq!(
4534 late_completion.materialized_messages.len(),
4535 0,
4536 "post-barge-in TurnCompleted must not resurrect anything"
4537 );
4538
4539 let messages = session.messages();
4542 let assistants: Vec<&BlockAssistantMessage> = messages
4543 .iter()
4544 .filter_map(|m| match m {
4545 Message::BlockAssistant(a) => Some(a),
4546 _ => None,
4547 })
4548 .collect();
4549 assert_eq!(
4550 assistants.len(),
4551 1,
4552 "barge-in must commit exactly one BlockAssistant containing the Display lane: {assistants:?}"
4553 );
4554 let assistant = assistants[0];
4555 assert_eq!(assistant.blocks.len(), 1, "blocks: {:?}", assistant.blocks);
4556 match &assistant.blocks[0] {
4557 AssistantBlock::Text { text, .. } => {
4558 assert_eq!(text, "Working on the report...");
4559 }
4560 other => {
4561 unreachable!("Display lane must materialize as AssistantBlock::Text, got {other:?}")
4562 }
4563 }
4564 assert!(
4566 !assistant
4567 .blocks
4568 .iter()
4569 .any(|b| matches!(b, AssistantBlock::Transcript { .. })),
4570 "Spoken lane must be dropped on barge-in"
4571 );
4572
4573 assert!(
4576 !session
4577 .in_flight_realtime_assistant_response_ids()
4578 .contains(&"resp_mixed_2".to_string()),
4579 "barged-in response must not appear in in_flight_realtime_assistant_response_ids"
4580 );
4581 }
4582
4583 #[test]
4584 fn round5_r55_barge_in_preserves_display_lane_drops_spoken() {
4585 let mut session = Session::new();
4589
4590 let _ =
4591 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
4592 response_id: "resp_a".to_string(),
4593 delta_id: "delta_d_1".to_string(),
4594 item_id: "item_display".to_string(),
4595 previous_item_id: None,
4596 content_index: 0,
4597 delta: "display-text".to_string(),
4598 });
4599 let _ = session.append_realtime_transcript_event(
4600 RealtimeTranscriptEvent::AssistantTranscriptDelta {
4601 response_id: "resp_a".to_string(),
4602 delta_id: "delta_s_1".to_string(),
4603 item_id: "item_spoken".to_string(),
4604 previous_item_id: None,
4605 content_index: 0,
4606 delta: "spoken-transcript".to_string(),
4607 },
4608 );
4609
4610 let outcome = session.append_realtime_transcript_event(
4611 RealtimeTranscriptEvent::AssistantTurnInterrupted {
4612 response_id: "resp_a".to_string(),
4613 },
4614 );
4615 assert_eq!(outcome.materialized_messages.len(), 1);
4617
4618 let messages = session.messages();
4619 let assistants: Vec<&BlockAssistantMessage> = messages
4620 .iter()
4621 .filter_map(|m| match m {
4622 Message::BlockAssistant(a) => Some(a),
4623 _ => None,
4624 })
4625 .collect();
4626 assert_eq!(assistants.len(), 1);
4627 assert_eq!(assistants[0].blocks.len(), 1);
4629 match &assistants[0].blocks[0] {
4630 AssistantBlock::Text { text, .. } => assert_eq!(text, "display-text"),
4631 other => unreachable!("expected Text, got {other:?}"),
4632 }
4633 }
4634
4635 #[test]
4636 fn round5_r55_barge_in_finalizes_retained_display_into_committed_block() {
4637 let mut session = Session::new();
4642
4643 let _ =
4644 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
4645 response_id: "resp_a".to_string(),
4646 delta_id: "delta_d_1".to_string(),
4647 item_id: "item_display".to_string(),
4648 previous_item_id: None,
4649 content_index: 0,
4650 delta: "committed-display-text".to_string(),
4651 });
4652
4653 assert!(session.messages().is_empty());
4655
4656 let outcome = session.append_realtime_transcript_event(
4657 RealtimeTranscriptEvent::AssistantTurnInterrupted {
4658 response_id: "resp_a".to_string(),
4659 },
4660 );
4661 assert_eq!(
4662 outcome.materialized_messages.len(),
4663 1,
4664 "Interrupted must finalize retained Display lane immediately"
4665 );
4666
4667 let messages = session.messages();
4669 assert_eq!(messages.len(), 1);
4670 match &messages[0] {
4671 Message::BlockAssistant(assistant) => {
4672 assert_eq!(assistant.blocks.len(), 1);
4673 match &assistant.blocks[0] {
4674 AssistantBlock::Text { text, .. } => {
4675 assert_eq!(text, "committed-display-text");
4676 }
4677 other => unreachable!("expected Text, got {other:?}"),
4678 }
4679 }
4680 other => unreachable!("expected BlockAssistant, got {other:?}"),
4681 }
4682 }
4683
4684 #[test]
4685 fn round5_r56_truncation_promotes_default_lane_item_to_spoken() {
4686 let mut session = Session::new();
4693
4694 let _ = session.append_realtime_transcript_event(
4695 RealtimeTranscriptEvent::AssistantTranscriptTruncated {
4696 response_id: "resp_a".to_string(),
4697 item_id: "item_a".to_string(),
4698 content_index: 0,
4699 text: "what was actually heard".to_string(),
4700 },
4701 );
4702
4703 let outcome = session.append_realtime_transcript_event(
4704 RealtimeTranscriptEvent::AssistantTurnCompleted {
4705 response_id: "resp_a".to_string(),
4706 stop_reason: StopReason::EndTurn,
4707 usage: Usage::default(),
4708 },
4709 );
4710 assert_eq!(outcome.materialized_messages.len(), 1);
4711
4712 assert_eq!(session.messages().len(), 1);
4713 match &session.messages()[0] {
4714 Message::BlockAssistant(assistant) => {
4715 assert_eq!(assistant.blocks.len(), 1);
4716 match &assistant.blocks[0] {
4717 AssistantBlock::Transcript { text, source, .. } => {
4718 assert_eq!(text, "what was actually heard");
4719 assert_eq!(*source, crate::types::TranscriptSource::Spoken);
4720 }
4721 other => unreachable!(
4722 "truncation-only path must materialize as AssistantBlock::Transcript, got {other:?}"
4723 ),
4724 }
4725 }
4726 other => unreachable!("expected BlockAssistant, got {other:?}"),
4727 }
4728 }
4729
4730 #[test]
4731 fn round5_r56_truncation_after_display_delta_is_no_op_keeping_display_content() {
4732 let mut session = Session::new();
4740
4741 let _ =
4742 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
4743 response_id: "resp_a".to_string(),
4744 delta_id: "delta_d_1".to_string(),
4745 item_id: "item_a".to_string(),
4746 previous_item_id: None,
4747 content_index: 0,
4748 delta: "display-text-from-delta".to_string(),
4749 });
4750
4751 let _ = session.append_realtime_transcript_event(
4752 RealtimeTranscriptEvent::AssistantTranscriptTruncated {
4753 response_id: "resp_a".to_string(),
4754 item_id: "item_a".to_string(),
4755 content_index: 0,
4756 text: "spoken-truncation-text".to_string(),
4757 },
4758 );
4759
4760 let _ = session.append_realtime_transcript_event(
4761 RealtimeTranscriptEvent::AssistantTurnCompleted {
4762 response_id: "resp_a".to_string(),
4763 stop_reason: StopReason::EndTurn,
4764 usage: Usage::default(),
4765 },
4766 );
4767
4768 assert_eq!(session.messages().len(), 1);
4771 match &session.messages()[0] {
4772 Message::BlockAssistant(assistant) => {
4773 assert_eq!(assistant.blocks.len(), 1);
4774 match &assistant.blocks[0] {
4775 AssistantBlock::Text { text, .. } => {
4776 assert_eq!(text, "display-text-from-delta");
4777 }
4778 other => unreachable!(
4779 "Display content must survive misrouted truncation, got {other:?}"
4780 ),
4781 }
4782 }
4783 other => unreachable!("expected BlockAssistant, got {other:?}"),
4784 }
4785 }
4786
4787 #[test]
4795 fn round5_r56_sibling_display_delta_skipped_on_spoken_item() {
4796 let mut session = Session::new();
4797
4798 let _ = session.append_realtime_transcript_event(
4800 RealtimeTranscriptEvent::AssistantTranscriptTruncated {
4801 response_id: "resp_a".to_string(),
4802 item_id: "item_a".to_string(),
4803 content_index: 0,
4804 text: "what was actually heard".to_string(),
4805 },
4806 );
4807
4808 let _ =
4811 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
4812 response_id: "resp_a".to_string(),
4813 delta_id: "delta_d_1".to_string(),
4814 item_id: "item_a".to_string(),
4815 previous_item_id: None,
4816 content_index: 0,
4817 delta: "should-not-appear".to_string(),
4818 });
4819
4820 let _ = session.append_realtime_transcript_event(
4821 RealtimeTranscriptEvent::AssistantTurnCompleted {
4822 response_id: "resp_a".to_string(),
4823 stop_reason: StopReason::EndTurn,
4824 usage: Usage::default(),
4825 },
4826 );
4827
4828 assert_eq!(session.messages().len(), 1);
4831 match &session.messages()[0] {
4832 Message::BlockAssistant(assistant) => {
4833 assert_eq!(assistant.blocks.len(), 1);
4834 match &assistant.blocks[0] {
4835 AssistantBlock::Transcript { text, source, .. } => {
4836 assert_eq!(text, "what was actually heard");
4837 assert_eq!(*source, crate::types::TranscriptSource::Spoken);
4838 }
4839 other => unreachable!(
4840 "Spoken-locked item must materialize as Transcript, got {other:?}"
4841 ),
4842 }
4843 }
4844 other => unreachable!("expected BlockAssistant, got {other:?}"),
4845 }
4846 }
4847
4848 #[test]
4855 fn round5_r56_sibling_spoken_delta_skipped_on_display_item() {
4856 let mut session = Session::new();
4857
4858 let _ =
4860 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
4861 response_id: "resp_a".to_string(),
4862 delta_id: "delta_d_1".to_string(),
4863 item_id: "item_a".to_string(),
4864 previous_item_id: None,
4865 content_index: 0,
4866 delta: "display-locked-text".to_string(),
4867 });
4868
4869 let _ = session.append_realtime_transcript_event(
4872 RealtimeTranscriptEvent::AssistantTranscriptDelta {
4873 response_id: "resp_a".to_string(),
4874 delta_id: "delta_s_1".to_string(),
4875 item_id: "item_a".to_string(),
4876 previous_item_id: None,
4877 content_index: 0,
4878 delta: "should-not-appear".to_string(),
4879 },
4880 );
4881
4882 let _ = session.append_realtime_transcript_event(
4883 RealtimeTranscriptEvent::AssistantTurnCompleted {
4884 response_id: "resp_a".to_string(),
4885 stop_reason: StopReason::EndTurn,
4886 usage: Usage::default(),
4887 },
4888 );
4889
4890 assert_eq!(session.messages().len(), 1);
4892 match &session.messages()[0] {
4893 Message::BlockAssistant(assistant) => {
4894 assert_eq!(assistant.blocks.len(), 1);
4895 match &assistant.blocks[0] {
4896 AssistantBlock::Text { text, .. } => {
4897 assert_eq!(text, "display-locked-text");
4898 }
4899 other => {
4900 unreachable!("Display-locked item must materialize as Text, got {other:?}")
4901 }
4902 }
4903 }
4904 other => unreachable!("expected BlockAssistant, got {other:?}"),
4905 }
4906 }
4907
4908 #[test]
4916 fn round5_r57_late_final_text_after_turn_completed_warns_and_skips() {
4917 let mut session = Session::new();
4918
4919 let _ = session.append_realtime_transcript_event(
4921 RealtimeTranscriptEvent::AssistantTranscriptDelta {
4922 response_id: "resp_a".to_string(),
4923 delta_id: "delta_s_1".to_string(),
4924 item_id: "item_a".to_string(),
4925 previous_item_id: None,
4926 content_index: 0,
4927 delta: "delta-accumulated".to_string(),
4928 },
4929 );
4930
4931 let commit_outcome = session.append_realtime_transcript_event(
4933 RealtimeTranscriptEvent::AssistantTurnCompleted {
4934 response_id: "resp_a".to_string(),
4935 stop_reason: StopReason::EndTurn,
4936 usage: Usage::default(),
4937 },
4938 );
4939 assert_eq!(commit_outcome.materialized_messages.len(), 1);
4940
4941 let late_outcome = session.append_realtime_transcript_event(
4945 RealtimeTranscriptEvent::AssistantTranscriptFinalText {
4946 response_id: "resp_a".to_string(),
4947 item_id: "item_a".to_string(),
4948 content_index: 0,
4949 text: "authoritative-final-that-must-not-land".to_string(),
4950 },
4951 );
4952 assert!(
4953 late_outcome.is_inert(),
4954 "late FinalText after materialization must produce inert outcome"
4955 );
4956
4957 assert_eq!(session.messages().len(), 1);
4960 match &session.messages()[0] {
4961 Message::BlockAssistant(assistant) => {
4962 assert_eq!(assistant.blocks.len(), 1);
4963 match &assistant.blocks[0] {
4964 AssistantBlock::Transcript { text, .. } => {
4965 assert_eq!(
4966 text, "delta-accumulated",
4967 "canonical message must preserve delta-accumulated text; \
4968 append-only history forbids late FinalText repair"
4969 );
4970 }
4971 other => unreachable!("expected Transcript, got {other:?}"),
4972 }
4973 }
4974 other => unreachable!("expected BlockAssistant, got {other:?}"),
4975 }
4976 }
4977}