1use crate::Provider;
13use crate::peer_meta::PeerMeta;
14use crate::realtime_transcript::{
15 RealtimeTranscriptApplyOutcome, RealtimeTranscriptEvent, RealtimeTranscriptMaterializedMessage,
16 RealtimeTranscriptRole, SESSION_REALTIME_TRANSCRIPT_STATE_KEY, TranscriptLane,
17};
18use crate::service::{AppendSystemContextRequest, MobToolAuthorityContext};
19use crate::time_compat::SystemTime;
20use crate::tool_scope::ToolFilter;
21use crate::types::{
22 AssistantBlock, BlockAssistantMessage, ContentBlock, ContentInput, Message, SessionId,
23 StopReason, ToolDef, ToolProvenance, ToolResult, Usage, UserMessage,
24};
25use serde::{Deserialize, Deserializer, Serialize, Serializer};
26use sha2::{Digest, Sha256};
27use std::collections::{BTreeMap, BTreeSet, HashMap};
28use std::sync::Arc;
29
30pub const SESSION_VERSION: u32 = 2;
42
43pub const SESSION_METADATA_SCHEMA_VERSION: u32 = 2;
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
58#[serde(tag = "type", rename_all = "snake_case")]
59pub enum TranscriptReplacement {
60 Message { message: Message },
62 UserContentBlock {
64 block_index: usize,
65 block: ContentBlock,
66 },
67 AssistantBlock {
69 block_index: usize,
70 block: AssistantBlock,
71 },
72 ToolResultContentBlock {
74 result_index: usize,
75 block_index: usize,
76 block: ContentBlock,
77 },
78}
79
80pub const SESSION_TRANSCRIPT_HISTORY_STATE_KEY: &str = "session_transcript_history_state_v1";
82
83#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
85#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
86#[serde(tag = "type", rename_all = "snake_case")]
87pub enum TranscriptRewriteSelection {
88 MessageRange { start: usize, end: usize },
90}
91
92impl TranscriptRewriteSelection {
93 fn bounds(&self) -> (usize, usize) {
94 match self {
95 Self::MessageRange { start, end } => (*start, *end),
96 }
97 }
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
106#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
107#[serde(rename_all = "snake_case")]
108pub struct TranscriptRewriteReason {
109 pub kind: String,
110 #[serde(default, skip_serializing_if = "Option::is_none")]
111 pub note: Option<String>,
112}
113
114impl TranscriptRewriteReason {
115 pub fn new(kind: impl Into<String>) -> Self {
116 Self {
117 kind: kind.into(),
118 note: None,
119 }
120 }
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
125#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
126#[serde(rename_all = "snake_case")]
127pub struct TranscriptRewriteCommit {
128 pub parent_revision: String,
129 pub revision: String,
130 pub selection: TranscriptRewriteSelection,
131 pub original_span_digest: String,
132 pub replacement_digest: String,
133 pub messages_before: usize,
134 pub messages_after: usize,
135 pub reason: TranscriptRewriteReason,
136 #[serde(default, skip_serializing_if = "Option::is_none")]
137 pub actor: Option<String>,
138 #[cfg_attr(feature = "schema", schemars(with = "SchemaSystemTime"))]
139 pub committed_at: SystemTime,
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
144#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
145#[serde(rename_all = "snake_case")]
146pub struct TranscriptRevisionBody {
147 pub revision: String,
148 #[serde(default, skip_serializing_if = "Option::is_none")]
149 pub parent_revision: Option<String>,
150 #[cfg_attr(feature = "schema", schemars(with = "Vec<serde_json::Value>"))]
151 pub messages: Vec<Message>,
152 #[cfg_attr(feature = "schema", schemars(with = "SchemaSystemTime"))]
153 pub created_at: SystemTime,
154}
155
156#[cfg(feature = "schema")]
157#[allow(dead_code)]
158#[derive(schemars::JsonSchema)]
159#[schemars(rename = "SystemTime")]
160struct SchemaSystemTime {
161 secs_since_epoch: u64,
162 nanos_since_epoch: u32,
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
167#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
168#[serde(rename_all = "snake_case")]
169pub struct TranscriptRewriteRecord {
170 pub commit: TranscriptRewriteCommit,
171 pub parent_body: TranscriptRevisionBody,
172 pub revision_body: TranscriptRevisionBody,
173}
174
175impl TranscriptRewriteRecord {
176 pub fn new(
177 commit: TranscriptRewriteCommit,
178 parent_body: TranscriptRevisionBody,
179 revision_body: TranscriptRevisionBody,
180 ) -> Result<Self, TranscriptEditError> {
181 validate_transcript_rewrite_record(&commit, &parent_body, &revision_body)?;
182 Ok(Self {
183 commit,
184 parent_body,
185 revision_body,
186 })
187 }
188}
189
190#[derive(Debug, Clone, Serialize, Deserialize)]
192#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
193#[serde(rename_all = "snake_case")]
194pub struct TranscriptHistoryState {
195 pub head: String,
196 #[serde(default, skip_serializing_if = "Vec::is_empty")]
197 pub commits: Vec<TranscriptRewriteCommit>,
198 #[serde(default, skip_serializing_if = "Vec::is_empty")]
199 pub revisions: Vec<TranscriptRevisionBody>,
200}
201
202impl TranscriptHistoryState {
203 pub fn from_rewrite_records<I>(records: I) -> Result<Option<Self>, TranscriptEditError>
205 where
206 I: IntoIterator<Item = TranscriptRewriteRecord>,
207 {
208 let mut state: Option<Self> = None;
209 for record in records {
210 validate_transcript_rewrite_record(
211 &record.commit,
212 &record.parent_body,
213 &record.revision_body,
214 )?;
215 let state = state.get_or_insert_with(|| Self {
216 head: record.commit.parent_revision.clone(),
217 commits: Vec::new(),
218 revisions: Vec::new(),
219 });
220 if record.commit.parent_revision != state.head {
221 if revision_body_extends_head(&record.parent_body, &state.revisions, &state.head)? {
222 state.head = record.commit.parent_revision.clone();
223 } else {
224 return Err(TranscriptEditError::HistoryStateMalformed(format!(
225 "rewrite record parent {} does not extend transcript head {}",
226 record.commit.parent_revision, state.head
227 )));
228 }
229 }
230 if !state
231 .revisions
232 .iter()
233 .any(|body| body.revision == record.parent_body.revision)
234 {
235 state.revisions.push(record.parent_body);
236 }
237 if !state
238 .revisions
239 .iter()
240 .any(|body| body.revision == record.revision_body.revision)
241 {
242 state.revisions.push(record.revision_body);
243 }
244 state.head = record.commit.revision.clone();
245 state.commits.push(record.commit);
246 }
247 Ok(state)
248 }
249}
250
251#[derive(Debug, Clone, thiserror::Error)]
253pub enum TranscriptEditError {
254 #[error("message index {message_index} out of bounds for {message_count} messages")]
255 MessageIndexOutOfBounds {
256 message_index: usize,
257 message_count: usize,
258 },
259 #[error("{block_kind} index {block_index} out of bounds for {block_count} blocks")]
260 BlockIndexOutOfBounds {
261 block_kind: &'static str,
262 block_index: usize,
263 block_count: usize,
264 },
265 #[error("replacement expected {expected} at message index {message_index}, found {actual}")]
266 MessageRoleMismatch {
267 message_index: usize,
268 expected: &'static str,
269 actual: &'static str,
270 },
271 #[error("invalid transcript rewrite range {start}..{end} for {message_count} messages")]
272 InvalidRewriteRange {
273 start: usize,
274 end: usize,
275 message_count: usize,
276 },
277 #[error("transcript rewrite does not change transcript revision {revision}")]
278 NoOpRewrite { revision: String },
279 #[error("transcript rewrite parent revision mismatch: expected {expected}, actual {actual}")]
280 RevisionConflict { expected: String, actual: String },
281 #[error("transcript history state is malformed: {0}")]
282 HistoryStateMalformed(String),
283 #[error("invalid transcript shape after rewrite: {0}")]
284 InvalidTranscriptShape(String),
285}
286
287fn message_role_name(message: &Message) -> &'static str {
288 match message {
289 Message::System(_) => "system",
290 Message::SystemNotice(_) => "system_notice",
291 Message::User(_) => "user",
292 Message::Assistant(_) => "assistant",
293 Message::BlockAssistant(_) => "block_assistant",
294 Message::ToolResults { .. } => "tool_results",
295 }
296}
297
298fn assistant_tool_use_ids(message: &Message) -> Vec<&str> {
299 match message {
300 Message::Assistant(assistant) => assistant
301 .tool_calls
302 .iter()
303 .map(|tool_call| tool_call.id.as_str())
304 .collect(),
305 Message::BlockAssistant(assistant) => assistant
306 .blocks
307 .iter()
308 .filter_map(|block| match block {
309 AssistantBlock::ToolUse { id, .. } => Some(id.as_str()),
310 _ => None,
311 })
312 .collect(),
313 _ => Vec::new(),
314 }
315}
316
317fn validate_transcript_tool_result_shape(messages: &[Message]) -> Result<(), TranscriptEditError> {
318 for (index, message) in messages.iter().enumerate() {
319 if let Message::ToolResults { results, .. } = message {
320 let Some(previous) = index
321 .checked_sub(1)
322 .and_then(|previous| messages.get(previous))
323 else {
324 return Err(TranscriptEditError::InvalidTranscriptShape(format!(
325 "tool_results at message {index} has no preceding assistant tool-use message"
326 )));
327 };
328 let expected = assistant_tool_use_ids(previous);
329 if expected.is_empty() {
330 return Err(TranscriptEditError::InvalidTranscriptShape(format!(
331 "tool_results at message {index} follows {}, not an assistant tool-use message",
332 message_role_name(previous)
333 )));
334 }
335 let actual = results
336 .iter()
337 .map(|result| result.tool_use_id.as_str())
338 .collect::<Vec<_>>();
339 let actual_set = actual.iter().copied().collect::<BTreeSet<_>>();
340 let expected_set = expected.iter().copied().collect::<BTreeSet<_>>();
341 if actual.len() != actual_set.len() {
342 return Err(TranscriptEditError::InvalidTranscriptShape(format!(
343 "tool_results at message {index} contains duplicate tool ids"
344 )));
345 }
346 if expected.len() != expected_set.len() {
347 return Err(TranscriptEditError::InvalidTranscriptShape(format!(
348 "assistant tool-use message before tool_results at message {index} contains duplicate tool ids"
349 )));
350 }
351 if actual_set != expected_set {
352 return Err(TranscriptEditError::InvalidTranscriptShape(format!(
353 "tool_results at message {index} resolve tool ids {actual_set:?}, expected {expected_set:?}"
354 )));
355 }
356 }
357
358 let tool_use_ids = assistant_tool_use_ids(message);
359 if tool_use_ids.is_empty() {
360 continue;
361 }
362 let Some(next) = messages.get(index + 1) else {
363 return Err(TranscriptEditError::InvalidTranscriptShape(format!(
364 "assistant tool-use message {index} has no following tool_results"
365 )));
366 };
367 if !matches!(next, Message::ToolResults { .. }) {
368 return Err(TranscriptEditError::InvalidTranscriptShape(format!(
369 "assistant tool-use message {index} is followed by {}, not tool_results",
370 message_role_name(next)
371 )));
372 }
373 }
374 Ok(())
375}
376
377pub fn transcript_messages_digest(messages: &[Message]) -> Result<String, serde_json::Error> {
378 sha256_json_digest(messages)
379}
380
381fn validate_transcript_rewrite_record(
382 commit: &TranscriptRewriteCommit,
383 parent_body: &TranscriptRevisionBody,
384 revision_body: &TranscriptRevisionBody,
385) -> Result<(), TranscriptEditError> {
386 if parent_body.revision != commit.parent_revision {
387 return Err(TranscriptEditError::HistoryStateMalformed(format!(
388 "parent body revision {} does not match commit parent {}",
389 parent_body.revision, commit.parent_revision
390 )));
391 }
392 if revision_body.revision != commit.revision {
393 return Err(TranscriptEditError::HistoryStateMalformed(format!(
394 "revision body {} does not match commit revision {}",
395 revision_body.revision, commit.revision
396 )));
397 }
398 if commit.parent_revision == commit.revision {
399 return Err(TranscriptEditError::NoOpRewrite {
400 revision: commit.revision.clone(),
401 });
402 }
403 let parent_digest = transcript_messages_digest(&parent_body.messages)
404 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
405 if parent_digest != commit.parent_revision {
406 return Err(TranscriptEditError::HistoryStateMalformed(format!(
407 "parent body digest {parent_digest} does not match commit parent {}",
408 commit.parent_revision
409 )));
410 }
411 let revision_digest = transcript_messages_digest(&revision_body.messages)
412 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
413 if revision_digest != commit.revision {
414 return Err(TranscriptEditError::HistoryStateMalformed(format!(
415 "revision body digest {revision_digest} does not match commit revision {}",
416 commit.revision
417 )));
418 }
419 let (start, end) = commit.selection.bounds();
420 if start > end || end > parent_body.messages.len() {
421 return Err(TranscriptEditError::InvalidRewriteRange {
422 start,
423 end,
424 message_count: parent_body.messages.len(),
425 });
426 }
427 if commit.messages_before != parent_body.messages.len()
428 || commit.messages_after != revision_body.messages.len()
429 {
430 return Err(TranscriptEditError::HistoryStateMalformed(format!(
431 "commit message counts {} -> {} do not match revision bodies {} -> {}",
432 commit.messages_before,
433 commit.messages_after,
434 parent_body.messages.len(),
435 revision_body.messages.len()
436 )));
437 }
438 let original_span_digest = sha256_json_digest(&parent_body.messages[start..end])
439 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
440 if original_span_digest != commit.original_span_digest {
441 return Err(TranscriptEditError::HistoryStateMalformed(format!(
442 "original span digest {original_span_digest} does not match commit digest {}",
443 commit.original_span_digest
444 )));
445 }
446 let removed_len = end - start;
447 let retained_len = commit
448 .messages_before
449 .checked_sub(removed_len)
450 .ok_or_else(|| {
451 TranscriptEditError::HistoryStateMalformed(
452 "commit removed more messages than it recorded before rewrite".to_string(),
453 )
454 })?;
455 let replacement_len = commit
456 .messages_after
457 .checked_sub(retained_len)
458 .ok_or_else(|| {
459 TranscriptEditError::HistoryStateMalformed(
460 "commit message counts cannot describe a replacement span".to_string(),
461 )
462 })?;
463 let replacement_end = start.checked_add(replacement_len).ok_or_else(|| {
464 TranscriptEditError::HistoryStateMalformed("replacement span end overflowed".to_string())
465 })?;
466 if replacement_end > revision_body.messages.len() {
467 return Err(TranscriptEditError::InvalidRewriteRange {
468 start,
469 end: replacement_end,
470 message_count: revision_body.messages.len(),
471 });
472 }
473 let parent_prefix_digest = transcript_messages_digest(&parent_body.messages[..start])
474 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
475 let revision_prefix_digest = transcript_messages_digest(&revision_body.messages[..start])
476 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
477 if parent_prefix_digest != revision_prefix_digest {
478 return Err(TranscriptEditError::HistoryStateMalformed(
479 "rewrite revision changed messages before the selected span".to_string(),
480 ));
481 }
482 let parent_suffix_digest = transcript_messages_digest(&parent_body.messages[end..])
483 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
484 let revision_suffix_digest =
485 transcript_messages_digest(&revision_body.messages[replacement_end..])
486 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
487 if parent_suffix_digest != revision_suffix_digest {
488 return Err(TranscriptEditError::HistoryStateMalformed(
489 "rewrite revision changed messages after the selected span".to_string(),
490 ));
491 }
492 let replacement_digest = sha256_json_digest(&revision_body.messages[start..replacement_end])
493 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
494 if replacement_digest != commit.replacement_digest {
495 return Err(TranscriptEditError::HistoryStateMalformed(format!(
496 "replacement span digest {replacement_digest} does not match commit digest {}",
497 commit.replacement_digest
498 )));
499 }
500 Ok(())
501}
502
503fn validate_transcript_history_state(
504 state: &TranscriptHistoryState,
505) -> Result<(), TranscriptEditError> {
506 if state
507 .revisions
508 .iter()
509 .all(|body| body.revision != state.head)
510 {
511 return Err(TranscriptEditError::HistoryStateMalformed(format!(
512 "missing transcript head body {}",
513 state.head
514 )));
515 }
516 for body in &state.revisions {
517 let digest = transcript_messages_digest(&body.messages)
518 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
519 if digest != body.revision {
520 return Err(TranscriptEditError::HistoryStateMalformed(format!(
521 "transcript revision body {} has digest {digest}",
522 body.revision
523 )));
524 }
525 }
526 for commit in &state.commits {
527 let parent_body = state
528 .revisions
529 .iter()
530 .find(|body| body.revision == commit.parent_revision)
531 .ok_or_else(|| {
532 TranscriptEditError::HistoryStateMalformed(format!(
533 "missing parent transcript body {}",
534 commit.parent_revision
535 ))
536 })?;
537 let revision_body = state
538 .revisions
539 .iter()
540 .find(|body| body.revision == commit.revision)
541 .ok_or_else(|| {
542 TranscriptEditError::HistoryStateMalformed(format!(
543 "missing transcript revision body {}",
544 commit.revision
545 ))
546 })?;
547 validate_transcript_rewrite_record(commit, parent_body, revision_body)?;
548 }
549 let Some(first_commit) = state.commits.first() else {
550 return Ok(());
551 };
552 let mut expected_head = first_commit.parent_revision.clone();
553 for commit in &state.commits {
554 let parent_body = state
555 .revisions
556 .iter()
557 .find(|body| body.revision == commit.parent_revision)
558 .ok_or_else(|| {
559 TranscriptEditError::HistoryStateMalformed(format!(
560 "missing parent transcript body {}",
561 commit.parent_revision
562 ))
563 })?;
564 if commit.parent_revision != expected_head
565 && !revision_body_extends_head(parent_body, &state.revisions, &expected_head)?
566 {
567 return Err(TranscriptEditError::HistoryStateMalformed(format!(
568 "rewrite commit parent {} does not extend transcript head {}",
569 commit.parent_revision, expected_head
570 )));
571 }
572 expected_head = commit.revision.clone();
573 }
574 let mut cursor = state.head.clone();
575 while cursor != expected_head {
576 let Some(head_body) = state.revisions.iter().find(|body| body.revision == cursor) else {
577 break;
578 };
579 match head_body.parent_revision.as_deref() {
580 Some(parent) => cursor = parent.to_string(),
581 None => break,
582 }
583 }
584 if cursor != expected_head {
585 return Err(TranscriptEditError::HistoryStateMalformed(format!(
586 "transcript head {} does not extend the rewrite chain",
587 state.head
588 )));
589 }
590 Ok(())
591}
592
593fn revision_body_extends_head(
594 candidate: &TranscriptRevisionBody,
595 revisions: &[TranscriptRevisionBody],
596 head: &str,
597) -> Result<bool, TranscriptEditError> {
598 if candidate.parent_revision.as_deref() == Some(head) {
599 return Ok(true);
600 }
601 let Some(head_body) = revisions.iter().find(|body| body.revision == head) else {
602 return Ok(false);
603 };
604 if candidate.messages.len() < head_body.messages.len() {
605 return Ok(false);
606 }
607 let prefix_digest = transcript_messages_digest(&candidate.messages[..head_body.messages.len()])
608 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
609 Ok(prefix_digest == head)
610}
611
612fn sha256_json_digest<T: Serialize + ?Sized>(value: &T) -> Result<String, serde_json::Error> {
613 let bytes = serde_json::to_vec(value)?;
614 let digest = Sha256::digest(bytes);
615 let mut out = String::with_capacity(digest.len() * 2);
616 const HEX: &[u8; 16] = b"0123456789abcdef";
617 for byte in digest {
618 out.push(HEX[(byte >> 4) as usize] as char);
619 out.push(HEX[(byte & 0x0f) as usize] as char);
620 }
621 Ok(format!("sha256:{out}"))
622}
623
624#[derive(Debug, Clone, Default, Serialize, Deserialize)]
625#[serde(rename_all = "snake_case")]
626struct SessionRealtimeTranscriptState {
627 #[serde(default)]
628 items: BTreeMap<String, RealtimeTranscriptItemState>,
629 #[serde(default)]
630 first_seen_order: Vec<String>,
631 #[serde(default)]
632 seen_delta_ids: BTreeSet<String>,
633 #[serde(default)]
634 assistant_completions: BTreeMap<String, RealtimeAssistantCompletion>,
635 #[serde(default, skip_serializing_if = "BTreeSet::is_empty")]
636 discarded_assistant_response_ids: BTreeSet<String>,
637}
638
639#[derive(Debug, Clone, Serialize, Deserialize)]
640#[serde(rename_all = "snake_case")]
641struct RealtimeTranscriptItemState {
642 role: RealtimeTranscriptRole,
643 #[serde(default)]
644 previous_item_id: Option<String>,
645 #[serde(default)]
646 response_id: Option<String>,
647 #[serde(default)]
648 content_segments: BTreeMap<u32, String>,
649 #[serde(default)]
650 skipped: bool,
651 #[serde(default)]
652 ready: bool,
653 #[serde(default)]
654 materialized: bool,
655 #[serde(default)]
661 lane: TranscriptLane,
662}
663
664impl RealtimeTranscriptItemState {
665 fn new(
666 role: RealtimeTranscriptRole,
667 previous_item_id: Option<String>,
668 response_id: Option<String>,
669 ) -> Self {
670 Self {
671 role,
672 previous_item_id,
673 response_id,
674 content_segments: BTreeMap::new(),
675 skipped: false,
676 ready: false,
677 materialized: false,
678 lane: TranscriptLane::Display,
679 }
680 }
681
682 fn skipped(previous_item_id: Option<String>) -> Self {
683 Self {
684 role: RealtimeTranscriptRole::Assistant,
685 previous_item_id,
686 response_id: None,
687 content_segments: BTreeMap::new(),
688 skipped: true,
689 ready: true,
690 materialized: false,
691 lane: TranscriptLane::Display,
692 }
693 }
694
695 fn text(&self) -> String {
696 self.content_segments.values().cloned().collect()
697 }
698}
699
700#[derive(Debug, Clone, Serialize, Deserialize)]
701#[serde(rename_all = "snake_case")]
702struct RealtimeAssistantCompletion {
703 stop_reason: StopReason,
704 usage: Usage,
705 usage_consumed: bool,
706}
707
708#[derive(Debug, Clone)]
712pub struct Session {
713 version: u32,
715 id: SessionId,
717 pub(crate) messages: Arc<Vec<Message>>,
719 created_at: SystemTime,
721 updated_at: SystemTime,
723 metadata: serde_json::Map<String, serde_json::Value>,
725 usage: Usage,
727}
728
729#[derive(Serialize, Deserialize)]
731#[serde(rename_all = "snake_case")]
732struct SessionSerde {
733 #[serde(default = "default_version")]
734 version: u32,
735 id: SessionId,
736 messages: Vec<Message>,
737 created_at: SystemTime,
738 updated_at: SystemTime,
739 #[serde(default)]
740 metadata: serde_json::Map<String, serde_json::Value>,
741 #[serde(default)]
742 usage: Usage,
743}
744
745impl Serialize for Session {
746 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
747 where
748 S: Serializer,
749 {
750 let serde_repr = SessionSerde {
751 version: self.version,
752 id: self.id.clone(),
753 messages: (*self.messages).clone(),
754 created_at: self.created_at,
755 updated_at: self.updated_at,
756 metadata: self.metadata.clone(),
757 usage: self.usage.clone(),
758 };
759 serde_repr.serialize(serializer)
760 }
761}
762
763impl<'de> Deserialize<'de> for Session {
764 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
765 where
766 D: Deserializer<'de>,
767 {
768 let serde_repr = SessionSerde::deserialize(deserializer)?;
769 Ok(Session {
770 version: serde_repr.version,
771 id: serde_repr.id,
772 messages: Arc::new(serde_repr.messages),
773 created_at: serde_repr.created_at,
774 updated_at: serde_repr.updated_at,
775 metadata: serde_repr.metadata,
776 usage: serde_repr.usage,
777 })
778 }
779}
780
781fn default_version() -> u32 {
782 SESSION_VERSION
783}
784
785pub const SESSION_SYSTEM_CONTEXT_STATE_KEY: &str = "session_system_context_state";
787
788pub const SESSION_DEFERRED_TURN_STATE_KEY: &str = "session_deferred_turn_state";
790
791pub const SESSION_BUILD_STATE_KEY: &str = "session_build_state";
793
794pub const SESSION_TOOL_VISIBILITY_STATE_KEY: &str = "session_tool_visibility_state_v1";
796
797pub const VIEW_IMAGE_TOOL_NAME: &str = "view_image";
799
800pub const SYSTEM_CONTEXT_SEPARATOR: &str = "\n\n---\n\n";
802
803#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
805#[serde(rename_all = "snake_case")]
806pub struct SessionSystemContextState {
807 #[serde(default, skip_serializing_if = "Vec::is_empty")]
808 pub pending: Vec<PendingSystemContextAppend>,
809 #[serde(default, skip_serializing_if = "Vec::is_empty")]
810 pub applied: Vec<PendingSystemContextAppend>,
811 #[serde(default, skip_serializing_if = "std::collections::BTreeMap::is_empty")]
812 pub seen: std::collections::BTreeMap<String, SeenSystemContextKey>,
813 #[serde(default, skip_serializing_if = "std::collections::BTreeSet::is_empty")]
814 pub active_turn_pending_keys: std::collections::BTreeSet<String>,
815}
816
817#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
819#[serde(rename_all = "snake_case")]
820pub struct PendingSystemContextAppend {
821 pub text: String,
822 #[serde(default, skip_serializing_if = "Option::is_none")]
823 pub source: Option<String>,
824 #[serde(default, skip_serializing_if = "Option::is_none")]
825 pub idempotency_key: Option<String>,
826 pub accepted_at: SystemTime,
827}
828
829#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
831#[serde(rename_all = "snake_case")]
832pub struct SessionDeferredTurnState {
833 #[serde(default, skip_serializing_if = "DeferredFirstTurnPhase::is_inactive")]
834 pub first_turn_phase: DeferredFirstTurnPhase,
835 #[serde(default, skip_serializing_if = "Option::is_none")]
836 pub pending_initial_prompt: Option<PendingDeferredPrompt>,
837 #[serde(default, skip_serializing_if = "Vec::is_empty")]
838 pub pending_tool_results: Vec<PendingToolResultsMessage>,
839}
840
841#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq)]
843#[serde(rename_all = "snake_case")]
844pub enum DeferredFirstTurnPhase {
845 #[default]
847 Inactive,
848 Pending,
850 Consumed,
852}
853
854impl DeferredFirstTurnPhase {
855 pub fn is_inactive(&self) -> bool {
856 matches!(self, Self::Inactive)
857 }
858}
859
860fn is_default_hook_run_overrides(value: &crate::HookRunOverrides) -> bool {
861 value == &crate::HookRunOverrides::default()
862}
863
864fn is_default_call_timeout_override(value: &crate::CallTimeoutOverride) -> bool {
865 value == &crate::CallTimeoutOverride::default()
866}
867
868fn is_tool_filter_all(value: &ToolFilter) -> bool {
869 matches!(value, ToolFilter::All)
870}
871
872fn is_zero(value: &u64) -> bool {
873 *value == 0
874}
875
876pub fn capability_base_filter_for_image_tool_results(image_tool_results: bool) -> ToolFilter {
878 if image_tool_results {
879 ToolFilter::All
880 } else {
881 ToolFilter::Deny([VIEW_IMAGE_TOOL_NAME.to_string()].into_iter().collect())
882 }
883}
884
885#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
887#[serde(rename_all = "snake_case")]
888pub struct ToolVisibilityWitness {
889 #[serde(default, skip_serializing_if = "Option::is_none")]
890 pub stable_owner_key: Option<String>,
891 #[serde(default, skip_serializing_if = "Option::is_none")]
892 pub last_seen_provenance: Option<ToolProvenance>,
893}
894
895impl ToolVisibilityWitness {
896 pub fn has_identity_witness(&self) -> bool {
897 self.stable_owner_key.is_some() || self.last_seen_provenance.is_some()
898 }
899
900 pub fn has_provenance_identity_witness(&self) -> bool {
901 self.last_seen_provenance.is_some()
902 }
903}
904
905#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
911#[serde(rename_all = "snake_case")]
912pub struct DeferredToolLoadAuthority {
913 pub name: String,
914 pub witness: ToolVisibilityWitness,
915}
916
917impl DeferredToolLoadAuthority {
918 pub fn new(name: impl Into<String>, witness: ToolVisibilityWitness) -> Self {
919 Self {
920 name: name.into(),
921 witness,
922 }
923 }
924
925 pub fn into_parts(self) -> (String, ToolVisibilityWitness) {
926 (self.name, self.witness)
927 }
928}
929
930#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
933#[serde(rename_all = "snake_case")]
934pub struct WitnessedToolFilter {
935 pub filter: ToolFilter,
936 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
937 pub witnesses: BTreeMap<String, ToolVisibilityWitness>,
938}
939
940impl WitnessedToolFilter {
941 pub fn new(filter: ToolFilter, witnesses: BTreeMap<String, ToolVisibilityWitness>) -> Self {
942 Self { filter, witnesses }
943 }
944
945 pub fn into_parts(self) -> (ToolFilter, BTreeMap<String, ToolVisibilityWitness>) {
946 (self.filter, self.witnesses)
947 }
948}
949
950#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
952#[serde(rename_all = "snake_case")]
953pub struct SessionToolVisibilityState {
954 #[serde(default, skip_serializing_if = "is_tool_filter_all")]
955 pub capability_base_filter: ToolFilter,
956 #[serde(default, skip_serializing_if = "is_tool_filter_all")]
957 pub inherited_base_filter: ToolFilter,
958 #[serde(default, skip_serializing_if = "is_tool_filter_all")]
959 pub active_filter: ToolFilter,
960 #[serde(default, skip_serializing_if = "is_tool_filter_all")]
961 pub staged_filter: ToolFilter,
962 #[serde(default, skip_serializing_if = "BTreeSet::is_empty")]
963 pub active_requested_deferred_names: BTreeSet<String>,
964 #[serde(default, skip_serializing_if = "BTreeSet::is_empty")]
965 pub staged_requested_deferred_names: BTreeSet<String>,
966 #[serde(default, skip_serializing_if = "is_zero")]
967 pub active_revision: u64,
968 #[serde(default, skip_serializing_if = "is_zero")]
969 pub staged_revision: u64,
970 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
971 pub requested_witnesses: BTreeMap<String, ToolVisibilityWitness>,
972 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
973 pub filter_witnesses: BTreeMap<String, ToolVisibilityWitness>,
974}
975
976#[derive(Debug, Clone, Serialize, Deserialize, Default)]
979#[serde(rename_all = "snake_case")]
980pub struct SessionBuildState {
981 #[serde(default, skip_serializing_if = "Option::is_none")]
982 pub system_prompt: Option<String>,
983 #[serde(default, skip_serializing_if = "Option::is_none")]
984 pub output_schema: Option<crate::OutputSchema>,
985 #[serde(default, skip_serializing_if = "is_default_hook_run_overrides")]
986 pub hooks_override: crate::HookRunOverrides,
987 #[serde(default, skip_serializing_if = "Option::is_none")]
988 pub budget_limits: Option<crate::BudgetLimits>,
989 #[serde(default, skip_serializing_if = "Vec::is_empty")]
990 pub recoverable_tool_defs: Vec<ToolDef>,
991 #[serde(default, skip_serializing_if = "Vec::is_empty")]
992 pub silent_comms_intents: Vec<String>,
993 #[serde(default, skip_serializing_if = "Option::is_none")]
994 pub max_inline_peer_notifications: Option<i32>,
995 #[serde(default, skip_serializing_if = "Option::is_none")]
996 pub app_context: Option<serde_json::Value>,
997 #[serde(default, skip_serializing_if = "Option::is_none")]
998 pub additional_instructions: Option<Vec<String>>,
999 #[serde(default, skip_serializing_if = "Option::is_none")]
1000 pub shell_env: Option<HashMap<String, String>>,
1001 #[serde(default, skip_serializing_if = "Option::is_none")]
1002 pub mob_tool_authority_context: Option<MobToolAuthorityContext>,
1003 #[serde(default, skip_serializing_if = "is_default_call_timeout_override")]
1004 pub call_timeout_override: crate::CallTimeoutOverride,
1005}
1006
1007#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1009#[serde(rename_all = "snake_case")]
1010pub struct PendingDeferredPrompt {
1011 pub prompt: ContentInput,
1012 pub accepted_at: SystemTime,
1013}
1014
1015#[derive(Debug, Clone, Serialize, Deserialize)]
1017#[serde(rename_all = "snake_case")]
1018pub struct PendingToolResultsMessage {
1019 pub results: Vec<ToolResult>,
1020 pub accepted_at: SystemTime,
1021}
1022
1023impl PartialEq for PendingToolResultsMessage {
1024 fn eq(&self, other: &Self) -> bool {
1025 self.accepted_at == other.accepted_at
1026 && serde_json::to_value(&self.results).ok() == serde_json::to_value(&other.results).ok()
1027 }
1028}
1029
1030#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1032#[serde(rename_all = "snake_case")]
1033pub struct SeenSystemContextKey {
1034 pub text: String,
1035 #[serde(default, skip_serializing_if = "Option::is_none")]
1036 pub source: Option<String>,
1037 pub state: SeenSystemContextState,
1038}
1039
1040#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
1042#[serde(rename_all = "snake_case")]
1043pub enum SeenSystemContextState {
1044 Pending,
1045 Applied,
1046}
1047
1048impl SessionSystemContextState {
1049 pub fn stage_append(
1051 &mut self,
1052 req: &AppendSystemContextRequest,
1053 accepted_at: SystemTime,
1054 ) -> Result<crate::service::AppendSystemContextStatus, SystemContextStageError> {
1055 let text = req.text.trim();
1056 if text.is_empty() {
1057 return Err(SystemContextStageError::InvalidRequest(
1058 "system context text must not be empty".to_string(),
1059 ));
1060 }
1061
1062 if let Some(key) = req.idempotency_key.as_ref() {
1063 match self.seen.get(key) {
1064 Some(existing)
1065 if existing.text == text
1066 && existing.source.as_deref() == req.source.as_deref() =>
1067 {
1068 return Ok(crate::service::AppendSystemContextStatus::Duplicate);
1069 }
1070 Some(existing) => {
1071 return Err(SystemContextStageError::Conflict {
1072 key: key.clone(),
1073 existing_text: existing.text.clone(),
1074 existing_source: existing.source.clone(),
1075 });
1076 }
1077 None => {}
1078 }
1079 }
1080
1081 let append = PendingSystemContextAppend {
1082 text: text.to_string(),
1083 source: req.source.clone(),
1084 idempotency_key: req.idempotency_key.clone(),
1085 accepted_at,
1086 };
1087 if let Some(key) = req.idempotency_key.as_ref() {
1088 self.seen.insert(
1089 key.clone(),
1090 SeenSystemContextKey {
1091 text: append.text.clone(),
1092 source: append.source.clone(),
1093 state: SeenSystemContextState::Pending,
1094 },
1095 );
1096 }
1097 self.pending.push(append);
1098 Ok(crate::service::AppendSystemContextStatus::Staged)
1099 }
1100
1101 pub fn stage_active_turn_append(
1108 &mut self,
1109 req: &AppendSystemContextRequest,
1110 accepted_at: SystemTime,
1111 ) -> Result<crate::service::AppendSystemContextStatus, SystemContextStageError> {
1112 let idempotency_key = req.idempotency_key.clone();
1113 let status = self.stage_append(req, accepted_at)?;
1114 if matches!(status, crate::service::AppendSystemContextStatus::Staged)
1115 && let Some(key) = idempotency_key
1116 {
1117 self.active_turn_pending_keys.insert(key);
1118 }
1119 Ok(status)
1120 }
1121
1122 pub fn mark_pending_applied(&mut self) {
1124 for pending in &self.pending {
1125 if is_runtime_steer_append(pending) {
1126 continue;
1127 }
1128 if !self.applied.contains(pending) {
1129 self.applied.push(pending.clone());
1130 }
1131 }
1132 let mut seen_to_remove = Vec::new();
1133 for pending in &self.pending {
1134 if let Some(key) = pending.idempotency_key.as_ref()
1135 && let Some(seen) = self.seen.get_mut(key)
1136 {
1137 if is_runtime_steer_append(pending) {
1138 seen_to_remove.push(key.clone());
1139 } else {
1140 seen.state = SeenSystemContextState::Applied;
1141 }
1142 }
1143 }
1144 for key in seen_to_remove {
1145 self.seen.remove(&key);
1146 }
1147 self.pending.clear();
1148 self.active_turn_pending_keys.clear();
1149 }
1150
1151 pub fn discard_unapplied_active_turn_pending(&mut self) -> Vec<PendingSystemContextAppend> {
1154 if self.active_turn_pending_keys.is_empty() {
1155 return Vec::new();
1156 }
1157
1158 let active_keys = std::mem::take(&mut self.active_turn_pending_keys);
1159 let mut discarded = Vec::new();
1160 self.pending.retain(|append| {
1161 let should_discard = append
1162 .idempotency_key
1163 .as_ref()
1164 .is_some_and(|key| active_keys.contains(key));
1165 if should_discard {
1166 discarded.push(append.clone());
1167 }
1168 !should_discard
1169 });
1170
1171 for append in &discarded {
1172 if let Some(key) = append.idempotency_key.as_ref()
1173 && self
1174 .seen
1175 .get(key)
1176 .is_some_and(|seen| seen.state == SeenSystemContextState::Pending)
1177 {
1178 self.seen.remove(key);
1179 }
1180 }
1181
1182 discarded
1183 }
1184
1185 pub fn discard_active_turn_pending_by_keys(
1192 &mut self,
1193 idempotency_keys: &[String],
1194 ) -> Vec<PendingSystemContextAppend> {
1195 if idempotency_keys.is_empty() || self.active_turn_pending_keys.is_empty() {
1196 return Vec::new();
1197 }
1198
1199 let requested_keys: std::collections::BTreeSet<&str> =
1200 idempotency_keys.iter().map(String::as_str).collect();
1201 let mut discarded = Vec::new();
1202 let mut discarded_keys = Vec::new();
1203 self.pending.retain(|append| {
1204 let should_discard = append.idempotency_key.as_ref().is_some_and(|key| {
1205 requested_keys.contains(key.as_str()) && self.active_turn_pending_keys.contains(key)
1206 });
1207 if should_discard {
1208 if let Some(key) = append.idempotency_key.as_ref() {
1209 discarded_keys.push(key.clone());
1210 }
1211 discarded.push(append.clone());
1212 }
1213 !should_discard
1214 });
1215
1216 for key in discarded_keys {
1217 self.active_turn_pending_keys.remove(&key);
1218 if self
1219 .seen
1220 .get(&key)
1221 .is_some_and(|seen| seen.state == SeenSystemContextState::Pending)
1222 {
1223 self.seen.remove(&key);
1224 }
1225 }
1226
1227 discarded
1228 }
1229}
1230
1231impl SessionDeferredTurnState {
1232 pub fn mark_initial_turn_pending(&mut self) {
1234 self.first_turn_phase = DeferredFirstTurnPhase::Pending;
1235 }
1236
1237 pub fn mark_initial_turn_started(&mut self) -> bool {
1241 let was_pending = matches!(self.first_turn_phase, DeferredFirstTurnPhase::Pending);
1242 if was_pending {
1243 self.first_turn_phase = DeferredFirstTurnPhase::Consumed;
1244 }
1245 was_pending
1246 }
1247
1248 pub fn restore_initial_turn_pending(&mut self) {
1250 self.first_turn_phase = DeferredFirstTurnPhase::Pending;
1251 }
1252
1253 pub fn allows_initial_turn_overrides(&self) -> bool {
1255 matches!(self.first_turn_phase, DeferredFirstTurnPhase::Pending)
1256 }
1257
1258 pub fn stage_initial_prompt(&mut self, prompt: ContentInput, accepted_at: SystemTime) {
1260 if !prompt.has_images() && prompt.text_content().trim().is_empty() {
1261 self.pending_initial_prompt = None;
1262 return;
1263 }
1264
1265 self.pending_initial_prompt = Some(PendingDeferredPrompt {
1266 prompt,
1267 accepted_at,
1268 });
1269 }
1270
1271 pub fn stage_tool_results(
1273 &mut self,
1274 results: Vec<ToolResult>,
1275 accepted_at: SystemTime,
1276 ) -> usize {
1277 if results.is_empty() {
1278 return 0;
1279 }
1280
1281 let accepted = results.len();
1282 self.pending_tool_results.push(PendingToolResultsMessage {
1283 results,
1284 accepted_at,
1285 });
1286 accepted
1287 }
1288
1289 pub fn take_initial_prompt(&mut self) -> Option<ContentInput> {
1291 self.pending_initial_prompt
1292 .take()
1293 .map(|pending| pending.prompt)
1294 }
1295
1296 pub fn take_tool_results(&mut self) -> Vec<PendingToolResultsMessage> {
1298 std::mem::take(&mut self.pending_tool_results)
1299 }
1300
1301 pub fn has_pending_tool_results(&self) -> bool {
1303 !self.pending_tool_results.is_empty()
1304 }
1305}
1306
1307#[derive(Debug, Clone, PartialEq, Eq)]
1309pub enum SystemContextStageError {
1310 InvalidRequest(String),
1311 Conflict {
1312 key: String,
1313 existing_text: String,
1314 existing_source: Option<String>,
1315 },
1316}
1317
1318fn render_system_context_block(append: &PendingSystemContextAppend) -> String {
1319 let mut rendered = String::from("[Runtime System Context]");
1320 if let Some(source) = &append.source {
1321 rendered.push_str("\nsource: ");
1322 rendered.push_str(source);
1323 }
1324 rendered.push_str("\n\n");
1325 rendered.push_str(&append.text);
1326 rendered
1327}
1328
1329fn is_runtime_steer_key(value: &str) -> bool {
1330 value.starts_with("runtime:steer:")
1331}
1332
1333fn is_runtime_steer_append(append: &PendingSystemContextAppend) -> bool {
1334 append.source.as_deref().is_some_and(is_runtime_steer_key)
1335 || append
1336 .idempotency_key
1337 .as_deref()
1338 .is_some_and(is_runtime_steer_key)
1339}
1340
1341fn seen_system_context_matches(
1342 seen: &SeenSystemContextKey,
1343 append: &PendingSystemContextAppend,
1344) -> bool {
1345 seen.text == append.text && seen.source.as_deref() == append.source.as_deref()
1346}
1347
1348fn pending_system_context_matches(
1349 existing: &PendingSystemContextAppend,
1350 append: &PendingSystemContextAppend,
1351) -> bool {
1352 existing.text == append.text && existing.source.as_deref() == append.source.as_deref()
1353}
1354
1355impl Session {
1356 pub fn new() -> Self {
1358 let now = SystemTime::now();
1359 Self {
1360 version: SESSION_VERSION,
1361 id: SessionId::new(),
1362 messages: Arc::new(Vec::new()),
1363 created_at: now,
1364 updated_at: now,
1365 metadata: serde_json::Map::new(),
1366 usage: Usage::default(),
1367 }
1368 }
1369
1370 pub fn with_id(id: SessionId) -> Self {
1372 let mut session = Self::new();
1373 session.id = id;
1374 session
1375 }
1376
1377 pub fn id(&self) -> &SessionId {
1379 &self.id
1380 }
1381
1382 pub fn version(&self) -> u32 {
1384 self.version
1385 }
1386
1387 pub fn messages(&self) -> &[Message] {
1389 &self.messages
1390 }
1391
1392 pub(crate) fn replace_messages_internal(
1398 &mut self,
1399 messages: Vec<Message>,
1400 reason: TranscriptRewriteReason,
1401 ) -> Result<Option<TranscriptRewriteCommit>, TranscriptEditError> {
1402 if transcript_messages_digest(self.messages()).ok()
1403 == transcript_messages_digest(&messages).ok()
1404 {
1405 return Ok(None);
1406 }
1407 let commit = self.commit_transcript_rewrite(
1408 TranscriptRewriteSelection::MessageRange {
1409 start: 0,
1410 end: self.messages.len(),
1411 },
1412 messages,
1413 reason,
1414 Some("meerkat-core".to_string()),
1415 None,
1416 )?;
1417 Ok(Some(commit))
1418 }
1419
1420 pub(crate) fn retain_messages_internal<F>(
1422 &mut self,
1423 mut retain: F,
1424 reason: TranscriptRewriteReason,
1425 ) -> Result<Option<TranscriptRewriteCommit>, TranscriptEditError>
1426 where
1427 F: FnMut(&Message) -> bool,
1428 {
1429 let retained = self
1430 .messages
1431 .iter()
1432 .filter(|message| retain(message))
1433 .cloned()
1434 .collect::<Vec<_>>();
1435 if retained.len() == self.messages.len()
1436 && transcript_messages_digest(self.messages()).ok()
1437 == transcript_messages_digest(&retained).ok()
1438 {
1439 return Ok(None);
1440 }
1441 self.replace_messages_internal(retained, reason)
1442 }
1443
1444 pub fn created_at(&self) -> SystemTime {
1446 self.created_at
1447 }
1448
1449 pub fn updated_at(&self) -> SystemTime {
1451 self.updated_at
1452 }
1453
1454 pub fn push(&mut self, message: Message) {
1458 Arc::make_mut(&mut self.messages).push(message);
1459 self.updated_at = SystemTime::now();
1460 self.refresh_transcript_head_after_message_mutation();
1461 }
1462
1463 pub fn push_batch(&mut self, messages: Vec<Message>) {
1467 if messages.is_empty() {
1468 return;
1469 }
1470 let inner = Arc::make_mut(&mut self.messages);
1471 inner.extend(messages);
1472 self.updated_at = SystemTime::now();
1473 self.refresh_transcript_head_after_message_mutation();
1474 }
1475
1476 pub async fn externalize_media(
1487 &mut self,
1488 blob_store: &dyn crate::BlobStore,
1489 start: usize,
1490 ) -> Result<(), crate::blob::BlobStoreError> {
1491 let previous_digest = if self
1492 .metadata
1493 .contains_key(SESSION_TRANSCRIPT_HISTORY_STATE_KEY)
1494 {
1495 transcript_messages_digest(self.messages()).ok()
1496 } else {
1497 None
1498 };
1499 let messages = Arc::make_mut(&mut self.messages);
1500 crate::image_content::externalize_messages_from(blob_store, messages, start).await?;
1501 if let Some(previous_digest) = previous_digest
1502 && transcript_messages_digest(self.messages()).ok().as_ref() != Some(&previous_digest)
1503 {
1504 self.refresh_transcript_head_after_message_mutation();
1505 }
1506 Ok(())
1507 }
1508
1509 pub fn touch(&mut self) {
1513 self.updated_at = SystemTime::now();
1514 }
1515
1516 pub fn has_pending_boundary(&self) -> bool {
1522 self.messages
1523 .last()
1524 .is_some_and(|m| matches!(m, Message::User(_) | Message::ToolResults { .. }))
1525 }
1526
1527 pub fn last_n(&self, n: usize) -> &[Message] {
1529 let start = self.messages.len().saturating_sub(n);
1530 &self.messages[start..]
1531 }
1532
1533 pub fn total_tokens(&self) -> u64 {
1535 self.usage.total_tokens()
1536 }
1537
1538 pub fn total_usage(&self) -> Usage {
1540 self.usage.clone()
1541 }
1542
1543 pub fn record_usage(&mut self, turn_usage: Usage) {
1545 self.usage.add(&turn_usage);
1546 self.updated_at = SystemTime::now();
1547 }
1548
1549 pub fn append_external_user_content(&mut self, content: ContentInput) {
1551 self.push(Message::User(UserMessage::with_blocks(
1552 content.into_blocks(),
1553 )));
1554 }
1555
1556 pub fn append_external_assistant_blocks(
1558 &mut self,
1559 blocks: Vec<AssistantBlock>,
1560 stop_reason: StopReason,
1561 usage: Usage,
1562 ) {
1563 if !blocks.is_empty() {
1564 self.push(Message::BlockAssistant(BlockAssistantMessage::new(
1565 blocks,
1566 stop_reason,
1567 )));
1568 }
1569 if usage != Usage::default() {
1570 self.record_usage(usage);
1571 }
1572 }
1573
1574 pub fn append_realtime_transcript_event(
1582 &mut self,
1583 event: RealtimeTranscriptEvent,
1584 ) -> RealtimeTranscriptApplyOutcome {
1585 let mut state = self.realtime_transcript_state();
1586 match event {
1587 RealtimeTranscriptEvent::ItemObserved {
1588 item_id,
1589 previous_item_id,
1590 role,
1591 response_id,
1592 } => {
1593 let response_id = normalize_realtime_optional_response_id(response_id);
1594 if role == RealtimeTranscriptRole::Assistant
1595 && response_id
1596 .as_ref()
1597 .is_some_and(|id| state.discarded_assistant_response_ids.contains(id))
1598 {
1599 observe_realtime_skipped_item(&mut state, item_id, previous_item_id);
1600 } else {
1601 observe_realtime_item(&mut state, item_id, previous_item_id, role, response_id);
1602 }
1603 }
1604 RealtimeTranscriptEvent::ItemSkipped {
1605 item_id,
1606 previous_item_id,
1607 } => {
1608 observe_realtime_skipped_item(&mut state, item_id, previous_item_id);
1609 }
1610 RealtimeTranscriptEvent::UserTranscriptFinal {
1611 item_id,
1612 previous_item_id,
1613 content_index,
1614 text,
1615 } => {
1616 if let Some(item) = observe_realtime_item(
1617 &mut state,
1618 item_id,
1619 previous_item_id,
1620 RealtimeTranscriptRole::User,
1621 None,
1622 ) {
1623 let segment = item.content_segments.entry(content_index).or_default();
1624 if segment.is_empty() && !text.is_empty() {
1625 *segment = text;
1626 } else if !text.is_empty() && segment.as_str() != text {
1627 tracing::warn!(
1628 content_index,
1629 "ignoring conflicting realtime user transcript segment replay"
1630 );
1631 }
1632 item.ready = true;
1633 }
1634 }
1635 RealtimeTranscriptEvent::AssistantTextDelta {
1636 response_id,
1637 delta_id,
1638 item_id,
1639 previous_item_id,
1640 content_index,
1641 delta,
1642 } => {
1643 let Some(response_id) = normalize_realtime_response_id(response_id) else {
1644 return RealtimeTranscriptApplyOutcome::default();
1645 };
1646 if state
1647 .discarded_assistant_response_ids
1648 .contains(&response_id)
1649 {
1650 observe_realtime_skipped_item(&mut state, item_id, previous_item_id);
1651 let outcome = self.materialize_realtime_transcript_ready_items(&mut state);
1652 self.store_realtime_transcript_state(&state);
1653 return outcome;
1654 }
1655 if !delta_id.trim().is_empty() && !state.seen_delta_ids.insert(delta_id) {
1656 return RealtimeTranscriptApplyOutcome::default();
1657 }
1658 let response_completed = state.assistant_completions.contains_key(&response_id);
1659 if let Some(item) = observe_realtime_item(
1660 &mut state,
1661 item_id,
1662 previous_item_id,
1663 RealtimeTranscriptRole::Assistant,
1664 Some(response_id),
1665 ) {
1666 if promote_item_lane(item, TranscriptLane::Display) {
1667 item.content_segments
1668 .entry(content_index)
1669 .or_default()
1670 .push_str(&delta);
1671 if response_completed && !item.text().is_empty() {
1672 item.ready = true;
1673 }
1674 } else {
1675 tracing::warn!(
1682 "AssistantTextDelta routed to a Spoken-lane item; dropping delta to preserve lane invariant — this indicates a provider lane-classification bug"
1683 );
1684 }
1685 }
1686 }
1687 RealtimeTranscriptEvent::AssistantTranscriptDelta {
1688 response_id,
1689 delta_id,
1690 item_id,
1691 previous_item_id,
1692 content_index,
1693 delta,
1694 } => {
1695 let Some(response_id) = normalize_realtime_response_id(response_id) else {
1701 return RealtimeTranscriptApplyOutcome::default();
1702 };
1703 if state
1704 .discarded_assistant_response_ids
1705 .contains(&response_id)
1706 {
1707 observe_realtime_skipped_item(&mut state, item_id, previous_item_id);
1708 let outcome = self.materialize_realtime_transcript_ready_items(&mut state);
1709 self.store_realtime_transcript_state(&state);
1710 return outcome;
1711 }
1712 if !delta_id.trim().is_empty() && !state.seen_delta_ids.insert(delta_id) {
1713 return RealtimeTranscriptApplyOutcome::default();
1714 }
1715 let response_completed = state.assistant_completions.contains_key(&response_id);
1716 if let Some(item) = observe_realtime_item(
1717 &mut state,
1718 item_id,
1719 previous_item_id,
1720 RealtimeTranscriptRole::Assistant,
1721 Some(response_id),
1722 ) {
1723 if promote_item_lane(item, TranscriptLane::Spoken) {
1724 item.content_segments
1725 .entry(content_index)
1726 .or_default()
1727 .push_str(&delta);
1728 if response_completed && !item.text().is_empty() {
1729 item.ready = true;
1730 }
1731 } else {
1732 tracing::warn!(
1738 "AssistantTranscriptDelta routed to a Display-lane item; dropping delta to preserve lane invariant — this indicates a provider lane-classification bug"
1739 );
1740 }
1741 }
1742 }
1743 RealtimeTranscriptEvent::AssistantTranscriptTruncated {
1744 response_id,
1745 item_id,
1746 content_index,
1747 text,
1748 } => {
1749 let Some(response_id) = normalize_realtime_response_id(response_id) else {
1750 return RealtimeTranscriptApplyOutcome::default();
1751 };
1752 if state
1753 .discarded_assistant_response_ids
1754 .contains(&response_id)
1755 {
1756 observe_realtime_skipped_item(&mut state, item_id, None);
1757 let outcome = self.materialize_realtime_transcript_ready_items(&mut state);
1758 self.store_realtime_transcript_state(&state);
1759 return outcome;
1760 }
1761 let response_completed = state.assistant_completions.contains_key(&response_id);
1762 let item_id_for_log = item_id.clone();
1763 let response_id_for_log = response_id.clone();
1764 if let Some(item) = observe_realtime_item(
1765 &mut state,
1766 item_id,
1767 None,
1768 RealtimeTranscriptRole::Assistant,
1769 Some(response_id),
1770 ) {
1771 if item.materialized {
1776 tracing::warn!(
1777 target: "meerkat::session",
1778 item_id = %item_id_for_log,
1779 response_id = %response_id_for_log,
1780 "AssistantTranscriptTruncated arrived after item already materialized; canonical message is locked, late truncation dropped",
1781 );
1782 } else if promote_item_lane(item, TranscriptLane::Spoken) {
1783 item.content_segments.insert(content_index, text);
1793 if response_completed && !item.text().is_empty() {
1794 item.ready = true;
1795 }
1796 }
1797 }
1798 }
1799 RealtimeTranscriptEvent::AssistantTranscriptFinalText {
1800 response_id,
1801 item_id,
1802 content_index,
1803 text,
1804 } => {
1805 let Some(response_id) = normalize_realtime_response_id(response_id) else {
1813 return RealtimeTranscriptApplyOutcome::default();
1814 };
1815 if state
1816 .discarded_assistant_response_ids
1817 .contains(&response_id)
1818 {
1819 observe_realtime_skipped_item(&mut state, item_id, None);
1820 let outcome = self.materialize_realtime_transcript_ready_items(&mut state);
1821 self.store_realtime_transcript_state(&state);
1822 return outcome;
1823 }
1824 let response_completed = state.assistant_completions.contains_key(&response_id);
1825 if let Some(item) = observe_realtime_item(
1826 &mut state,
1827 item_id,
1828 None,
1829 RealtimeTranscriptRole::Assistant,
1830 Some(response_id),
1831 ) {
1832 if item.materialized {
1844 tracing::warn!(
1845 "AssistantTranscriptFinalText arrived after item already materialized; canonical message is locked, late repair dropped"
1846 );
1847 self.store_realtime_transcript_state(&state);
1848 return RealtimeTranscriptApplyOutcome::default();
1849 }
1850 if promote_item_lane(item, TranscriptLane::Spoken) {
1858 item.content_segments.insert(content_index, text);
1862 if response_completed && !item.text().is_empty() {
1863 item.ready = true;
1864 }
1865 } else {
1866 tracing::warn!(
1867 "AssistantTranscriptFinalText routed to a Display-lane item; dropping authoritative final to preserve lane invariant — this indicates a provider lane-classification bug"
1868 );
1869 }
1870 }
1871 }
1872 RealtimeTranscriptEvent::AssistantTurnCompleted {
1873 response_id,
1874 stop_reason,
1875 usage,
1876 } => {
1877 let Some(response_id) = normalize_realtime_response_id(response_id) else {
1878 return RealtimeTranscriptApplyOutcome::default();
1879 };
1880 if state
1881 .discarded_assistant_response_ids
1882 .contains(&response_id)
1883 {
1884 discard_realtime_assistant_response(&mut state, &response_id);
1885 let outcome = self.materialize_realtime_transcript_ready_items(&mut state);
1886 self.store_realtime_transcript_state(&state);
1887 return outcome;
1888 }
1889 match stop_reason {
1890 StopReason::Cancelled => {
1891 discard_realtime_assistant_response(&mut state, &response_id);
1892 }
1893 StopReason::ToolUse => {
1894 state.assistant_completions.remove(&response_id);
1895 }
1896 _ => {
1897 state
1898 .assistant_completions
1899 .entry(response_id.clone())
1900 .or_insert(RealtimeAssistantCompletion {
1901 stop_reason,
1902 usage,
1903 usage_consumed: false,
1904 });
1905 mark_realtime_assistant_response_ready(&mut state, &response_id);
1906 }
1907 }
1908 }
1909 RealtimeTranscriptEvent::AssistantTurnInterrupted { response_id } => {
1910 let Some(response_id) = normalize_realtime_response_id(response_id) else {
1911 return RealtimeTranscriptApplyOutcome::default();
1912 };
1913 discard_realtime_assistant_response_by_lane(&mut state, &response_id);
1924 state
1925 .assistant_completions
1926 .entry(response_id.clone())
1927 .or_insert(RealtimeAssistantCompletion {
1928 stop_reason: StopReason::Cancelled,
1929 usage: Usage::default(),
1930 usage_consumed: false,
1931 });
1932 mark_realtime_assistant_response_ready(&mut state, &response_id);
1933 }
1934 }
1935
1936 let outcome = self.materialize_realtime_transcript_ready_items(&mut state);
1937 self.store_realtime_transcript_state(&state);
1938 outcome
1939 }
1940
1941 #[must_use]
1960 pub fn in_flight_realtime_assistant_response_ids(&self) -> Vec<String> {
1961 let state = self.realtime_transcript_state();
1962 let mut seen: BTreeSet<String> = BTreeSet::new();
1963 let mut out: Vec<String> = Vec::new();
1964 for item_id in &state.first_seen_order {
1965 let Some(item) = state.items.get(item_id) else {
1966 continue;
1967 };
1968 if item.role != RealtimeTranscriptRole::Assistant {
1969 continue;
1970 }
1971 if item.materialized || item.skipped {
1972 continue;
1973 }
1974 let Some(response_id) = item.response_id.as_ref() else {
1975 continue;
1976 };
1977 if state.discarded_assistant_response_ids.contains(response_id) {
1978 continue;
1979 }
1980 if seen.insert(response_id.clone()) {
1981 out.push(response_id.clone());
1982 }
1983 }
1984 out
1985 }
1986
1987 fn realtime_transcript_state(&self) -> SessionRealtimeTranscriptState {
1988 self.metadata
1989 .get(SESSION_REALTIME_TRANSCRIPT_STATE_KEY)
1990 .cloned()
1991 .and_then(|value| serde_json::from_value(value).ok())
1992 .unwrap_or_default()
1993 }
1994
1995 fn store_realtime_transcript_state(&mut self, state: &SessionRealtimeTranscriptState) {
1996 match serde_json::to_value(state) {
1997 Ok(value) => self.set_metadata(SESSION_REALTIME_TRANSCRIPT_STATE_KEY, value),
1998 Err(error) => {
1999 tracing::warn!(error = %error, "failed to serialize realtime transcript state");
2000 }
2001 }
2002 }
2003
2004 fn materialize_realtime_transcript_ready_items(
2005 &mut self,
2006 state: &mut SessionRealtimeTranscriptState,
2007 ) -> RealtimeTranscriptApplyOutcome {
2008 let mut materialized = Vec::new();
2009
2010 let mut pending_blocks: Vec<AssistantBlock> = Vec::new();
2025 let mut pending_response_id: Option<String> = None;
2026 let mut pending_stop_reason: StopReason = StopReason::EndTurn;
2027 let mut pending_usage: Usage = Usage::default();
2028
2029 loop {
2030 let order = realtime_transcript_order(state);
2031 let mut skipped_batch = Vec::new();
2032 let mut batch = Vec::new();
2033 for item_id in order {
2034 let Some(item) = state.items.get(&item_id) else {
2035 continue;
2036 };
2037 if item.materialized {
2038 continue;
2039 }
2040 if !realtime_predecessor_materialized(state, item.previous_item_id.as_deref()) {
2041 continue;
2042 }
2043 if item.skipped {
2044 skipped_batch.push(item_id.clone());
2045 continue;
2046 }
2047 if !item.ready {
2048 continue;
2049 }
2050 let text = item.text();
2051 if text.is_empty() {
2052 continue;
2053 }
2054 match item.role {
2055 RealtimeTranscriptRole::User => {
2056 batch.push(RealtimeTranscriptMaterializedMessage::User {
2057 item_id: item_id.clone(),
2058 text,
2059 });
2060 }
2061 RealtimeTranscriptRole::Assistant => {
2062 let Some(response_id) = item.response_id.as_ref() else {
2063 continue;
2064 };
2065 let Some(completion) = state.assistant_completions.get(response_id) else {
2066 continue;
2067 };
2068 let usage = if completion.usage_consumed {
2069 Usage::default()
2070 } else {
2071 completion.usage.clone()
2072 };
2073 batch.push(RealtimeTranscriptMaterializedMessage::Assistant {
2074 item_id: item_id.clone(),
2075 response_id: response_id.clone(),
2076 text,
2077 stop_reason: completion.stop_reason,
2078 usage,
2079 lane: item.lane,
2080 });
2081 }
2082 }
2083 }
2084 if skipped_batch.is_empty() && batch.is_empty() {
2085 break;
2086 }
2087 for item_id in skipped_batch {
2088 if let Some(item) = state.items.get_mut(&item_id) {
2089 item.materialized = true;
2090 }
2091 }
2092
2093 for message in batch {
2094 match &message {
2095 RealtimeTranscriptMaterializedMessage::User { item_id, text } => {
2096 if !pending_blocks.is_empty() {
2097 let drained = std::mem::take(&mut pending_blocks);
2098 self.append_external_assistant_blocks(
2099 drained,
2100 pending_stop_reason,
2101 std::mem::take(&mut pending_usage),
2102 );
2103 pending_response_id = None;
2104 }
2105 if let Some(item) = state.items.get_mut(item_id) {
2106 item.materialized = true;
2107 }
2108 self.append_external_user_content(ContentInput::Text(text.clone()));
2109 }
2110 RealtimeTranscriptMaterializedMessage::Assistant {
2111 item_id,
2112 response_id,
2113 text,
2114 stop_reason,
2115 usage,
2116 lane,
2117 } => {
2118 if pending_response_id
2122 .as_ref()
2123 .is_some_and(|existing| existing != response_id)
2124 && !pending_blocks.is_empty()
2125 {
2126 let drained = std::mem::take(&mut pending_blocks);
2127 self.append_external_assistant_blocks(
2128 drained,
2129 pending_stop_reason,
2130 std::mem::take(&mut pending_usage),
2131 );
2132 pending_response_id = None;
2133 }
2134 if let Some(item) = state.items.get_mut(item_id) {
2135 item.materialized = true;
2136 }
2137 if let Some(completion) = state.assistant_completions.get_mut(response_id) {
2138 completion.usage_consumed = true;
2139 }
2140 let block = match lane {
2147 TranscriptLane::Display => AssistantBlock::Text {
2148 text: text.clone(),
2149 meta: None,
2150 },
2151 TranscriptLane::Spoken => AssistantBlock::Transcript {
2152 text: text.clone(),
2153 source: crate::types::TranscriptSource::Spoken,
2154 meta: None,
2155 },
2156 };
2157 if pending_response_id.is_none() {
2165 pending_response_id = Some(response_id.clone());
2166 pending_stop_reason = *stop_reason;
2167 pending_usage = usage.clone();
2168 }
2169 pending_blocks.push(block);
2170 }
2171 }
2172 materialized.push(message);
2173 }
2174 }
2175
2176 if !pending_blocks.is_empty() {
2178 self.append_external_assistant_blocks(
2179 pending_blocks,
2180 pending_stop_reason,
2181 pending_usage,
2182 );
2183 }
2184
2185 RealtimeTranscriptApplyOutcome {
2186 materialized_messages: materialized,
2187 }
2188 }
2189
2190 pub fn set_system_prompt(&mut self, prompt: String) {
2192 use crate::types::SystemMessage;
2193
2194 let inner = Arc::make_mut(&mut self.messages);
2195 if let Some(Message::System(_)) = inner.first() {
2197 inner[0] = Message::System(SystemMessage::new(prompt));
2198 } else {
2199 inner.insert(0, Message::System(SystemMessage::new(prompt)));
2200 }
2201 self.updated_at = SystemTime::now();
2202 self.refresh_transcript_head_after_message_mutation();
2203 }
2204
2205 pub fn discard_transient_runtime_steer_context(&mut self) -> usize {
2211 let mut removed = 0usize;
2212
2213 if let Some(Message::System(system)) = self.messages.first() {
2214 let parts = system
2215 .content
2216 .split(SYSTEM_CONTEXT_SEPARATOR)
2217 .map(str::to_string)
2218 .collect::<Vec<_>>();
2219 let original_len = parts.len();
2220 let retained = parts
2221 .into_iter()
2222 .filter(|part| {
2223 !(part.starts_with("[Runtime System Context]")
2224 && part.contains("\nsource: runtime:steer:"))
2225 })
2226 .collect::<Vec<_>>();
2227 let removed_blocks = original_len.saturating_sub(retained.len());
2228 if removed_blocks > 0 {
2229 removed += removed_blocks;
2230 self.set_system_prompt(retained.join(SYSTEM_CONTEXT_SEPARATOR));
2231 }
2232 }
2233
2234 let mut state = self.system_context_state().unwrap_or_default();
2235
2236 let before_pending = state.pending.len();
2237 state
2238 .pending
2239 .retain(|append| !is_runtime_steer_append(append));
2240 removed += before_pending.saturating_sub(state.pending.len());
2241
2242 let before_applied = state.applied.len();
2243 state
2244 .applied
2245 .retain(|append| !is_runtime_steer_append(append));
2246 removed += before_applied.saturating_sub(state.applied.len());
2247
2248 let before_seen = state.seen.len();
2249 state.seen.retain(|key, seen| {
2250 !is_runtime_steer_key(key) && !seen.source.as_deref().is_some_and(is_runtime_steer_key)
2251 });
2252 removed += before_seen.saturating_sub(state.seen.len());
2253
2254 let before_active = state.active_turn_pending_keys.len();
2255 state
2256 .active_turn_pending_keys
2257 .retain(|key| !is_runtime_steer_key(key));
2258 removed += before_active.saturating_sub(state.active_turn_pending_keys.len());
2259
2260 if removed > 0
2261 && let Err(err) = self.set_system_context_state(state)
2262 {
2263 tracing::warn!(
2264 error = %err,
2265 "failed to persist runtime steer context cleanup"
2266 );
2267 }
2268
2269 removed
2270 }
2271
2272 pub fn append_system_context_blocks(&mut self, appends: &[PendingSystemContextAppend]) {
2274 if appends.is_empty() {
2275 return;
2276 }
2277
2278 let current_system_prompt = self
2279 .messages
2280 .first()
2281 .and_then(|message| match message {
2282 Message::System(system) => Some(system.content.as_str()),
2283 _ => None,
2284 })
2285 .unwrap_or_default();
2286 let mut state = self.system_context_state().unwrap_or_default();
2287 let mut state_dirty = false;
2288 let mut new_appends: Vec<PendingSystemContextAppend> = Vec::new();
2289 for append in appends {
2290 if append.text.trim().is_empty() {
2291 continue;
2292 }
2293 let rendered = render_system_context_block(append);
2294 if let Some(key) = append.idempotency_key.as_ref() {
2295 if let Some(existing) = state.seen.get(key)
2296 && !seen_system_context_matches(existing, append)
2297 {
2298 tracing::warn!(
2299 idempotency_key = %key,
2300 "skipping conflicting runtime system-context append"
2301 );
2302 continue;
2303 }
2304 if let Some(existing) = state
2305 .applied
2306 .iter()
2307 .find(|applied| applied.idempotency_key.as_ref() == Some(key))
2308 && !pending_system_context_matches(existing, append)
2309 {
2310 tracing::warn!(
2311 idempotency_key = %key,
2312 "skipping conflicting runtime system-context append"
2313 );
2314 continue;
2315 }
2316 if let Some(existing) = new_appends
2317 .iter()
2318 .find(|pending| pending.idempotency_key.as_ref() == Some(key))
2319 {
2320 if !pending_system_context_matches(existing, append) {
2321 tracing::warn!(
2322 idempotency_key = %key,
2323 "skipping conflicting runtime system-context append"
2324 );
2325 }
2326 continue;
2327 }
2328 if current_system_prompt.contains(&rendered) {
2329 if !state
2330 .applied
2331 .iter()
2332 .any(|applied| applied.idempotency_key.as_ref() == Some(key))
2333 {
2334 state.applied.push(append.clone());
2335 state_dirty = true;
2336 }
2337 if state
2338 .seen
2339 .get(key)
2340 .is_none_or(|seen| seen.state != SeenSystemContextState::Applied)
2341 {
2342 state.seen.insert(
2343 key.clone(),
2344 SeenSystemContextKey {
2345 text: append.text.clone(),
2346 source: append.source.clone(),
2347 state: SeenSystemContextState::Applied,
2348 },
2349 );
2350 state_dirty = true;
2351 }
2352 continue;
2353 }
2354 } else if new_appends.contains(append) || current_system_prompt.contains(&rendered) {
2355 continue;
2356 }
2357 new_appends.push(append.clone());
2358 }
2359 if new_appends.is_empty() {
2360 if state_dirty && let Err(err) = self.set_system_context_state(state) {
2361 tracing::warn!(error = %err, "failed to persist applied system-context state");
2362 }
2363 return;
2364 }
2365
2366 let rendered = new_appends
2367 .iter()
2368 .map(render_system_context_block)
2369 .collect::<Vec<_>>()
2370 .join(SYSTEM_CONTEXT_SEPARATOR);
2371
2372 let next = match self.messages.first() {
2373 Some(Message::System(sys)) if !sys.content.is_empty() => {
2374 format!("{}{}{}", sys.content, SYSTEM_CONTEXT_SEPARATOR, rendered)
2375 }
2376 _ => rendered,
2377 };
2378 self.set_system_prompt(next);
2379
2380 for append in new_appends {
2381 if let Some(key) = append.idempotency_key.as_ref() {
2382 state.seen.insert(
2383 key.clone(),
2384 SeenSystemContextKey {
2385 text: append.text.clone(),
2386 source: append.source.clone(),
2387 state: SeenSystemContextState::Applied,
2388 },
2389 );
2390 if state
2391 .applied
2392 .iter()
2393 .any(|applied| applied.idempotency_key.as_ref() == Some(key))
2394 {
2395 continue;
2396 }
2397 } else if state.applied.contains(&append) {
2398 continue;
2399 }
2400 state.applied.push(append);
2401 }
2402 if let Err(err) = self.set_system_context_state(state) {
2403 tracing::warn!(error = %err, "failed to persist applied system-context state");
2404 }
2405 }
2406
2407 pub fn last_assistant_text(&self) -> Option<String> {
2414 self.messages.iter().rev().find_map(|m| match m {
2415 Message::BlockAssistant(a) => {
2416 let mut buf = String::new();
2417 for block in &a.blocks {
2418 match block {
2419 crate::types::AssistantBlock::Text { text, .. }
2420 | crate::types::AssistantBlock::Transcript { text, .. } => {
2421 buf.push_str(text);
2422 }
2423 _ => {}
2424 }
2425 }
2426 if buf.is_empty() { None } else { Some(buf) }
2427 }
2428 Message::Assistant(a) if !a.content.is_empty() => Some(a.content.clone()),
2429 _ => None,
2430 })
2431 }
2432
2433 pub fn tool_call_count(&self) -> usize {
2435 self.messages
2436 .iter()
2437 .filter_map(|m| match m {
2438 Message::BlockAssistant(a) => Some(
2439 a.blocks
2440 .iter()
2441 .filter(|b| matches!(b, crate::types::AssistantBlock::ToolUse { .. }))
2442 .count(),
2443 ),
2444 Message::Assistant(a) => Some(a.tool_calls.len()),
2445 _ => None,
2446 })
2447 .sum()
2448 }
2449
2450 pub fn metadata(&self) -> &serde_json::Map<String, serde_json::Value> {
2452 &self.metadata
2453 }
2454
2455 pub fn set_metadata(&mut self, key: &str, value: serde_json::Value) {
2457 self.metadata.insert(key.to_string(), value);
2458 self.updated_at = SystemTime::now();
2459 }
2460
2461 pub fn backfill_metadata_if_absent(&mut self, key: &str, value: serde_json::Value) -> bool {
2467 if self.metadata.contains_key(key) {
2468 false
2469 } else {
2470 self.metadata.insert(key.to_string(), value);
2471 true
2472 }
2473 }
2474
2475 pub fn remove_metadata(&mut self, key: &str) {
2477 self.metadata.remove(key);
2478 self.updated_at = SystemTime::now();
2479 }
2480
2481 pub fn set_session_metadata(
2483 &mut self,
2484 metadata: SessionMetadata,
2485 ) -> Result<(), serde_json::Error> {
2486 let value = serde_json::to_value(metadata)?;
2487 self.set_metadata(SESSION_METADATA_KEY, value);
2488 Ok(())
2489 }
2490
2491 pub fn session_metadata(&self) -> Option<SessionMetadata> {
2493 self.metadata
2494 .get(SESSION_METADATA_KEY)
2495 .and_then(|value| serde_json::from_value(value.clone()).ok())
2496 }
2497
2498 pub fn set_system_context_state(
2500 &mut self,
2501 state: SessionSystemContextState,
2502 ) -> Result<(), serde_json::Error> {
2503 let value = serde_json::to_value(state)?;
2504 self.set_metadata(SESSION_SYSTEM_CONTEXT_STATE_KEY, value);
2505 Ok(())
2506 }
2507
2508 pub fn system_context_state(&self) -> Option<SessionSystemContextState> {
2510 self.metadata
2511 .get(SESSION_SYSTEM_CONTEXT_STATE_KEY)
2512 .and_then(|value| serde_json::from_value(value.clone()).ok())
2513 }
2514
2515 pub fn set_deferred_turn_state(
2517 &mut self,
2518 state: SessionDeferredTurnState,
2519 ) -> Result<(), serde_json::Error> {
2520 let value = serde_json::to_value(state)?;
2521 self.set_metadata(SESSION_DEFERRED_TURN_STATE_KEY, value);
2522 Ok(())
2523 }
2524
2525 pub fn deferred_turn_state(&self) -> Option<SessionDeferredTurnState> {
2527 self.metadata
2528 .get(SESSION_DEFERRED_TURN_STATE_KEY)
2529 .and_then(|value| serde_json::from_value(value.clone()).ok())
2530 }
2531
2532 pub fn set_build_state(&mut self, state: SessionBuildState) -> Result<(), serde_json::Error> {
2534 let value = serde_json::to_value(state)?;
2535 self.set_metadata(SESSION_BUILD_STATE_KEY, value);
2536 Ok(())
2537 }
2538
2539 pub fn build_state(&self) -> Option<SessionBuildState> {
2541 self.metadata
2542 .get(SESSION_BUILD_STATE_KEY)
2543 .and_then(|value| serde_json::from_value(value.clone()).ok())
2544 }
2545
2546 pub fn set_tool_visibility_state(
2548 &mut self,
2549 state: SessionToolVisibilityState,
2550 ) -> Result<(), serde_json::Error> {
2551 let value = serde_json::to_value(state)?;
2552 self.set_metadata(SESSION_TOOL_VISIBILITY_STATE_KEY, value);
2553 Ok(())
2554 }
2555
2556 pub fn tool_visibility_state(
2558 &self,
2559 ) -> Result<Option<SessionToolVisibilityState>, serde_json::Error> {
2560 self.try_tool_visibility_state()
2561 }
2562
2563 pub fn try_tool_visibility_state(
2566 &self,
2567 ) -> Result<Option<SessionToolVisibilityState>, serde_json::Error> {
2568 self.metadata
2569 .get(SESSION_TOOL_VISIBILITY_STATE_KEY)
2570 .map(|value| serde_json::from_value(value.clone()))
2571 .transpose()
2572 }
2573
2574 pub fn transcript_history_state(
2576 &self,
2577 ) -> Result<Option<TranscriptHistoryState>, serde_json::Error> {
2578 self.metadata
2579 .get(SESSION_TRANSCRIPT_HISTORY_STATE_KEY)
2580 .map(|value| serde_json::from_value(value.clone()))
2581 .transpose()
2582 }
2583
2584 pub fn validate_transcript_history_state(&self) -> Result<(), TranscriptEditError> {
2586 let Some(state) = self
2587 .transcript_history_state()
2588 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?
2589 else {
2590 return Ok(());
2591 };
2592 validate_transcript_history_state(&state)
2593 }
2594
2595 pub fn transcript_revision_body(
2597 &self,
2598 revision: &str,
2599 ) -> Result<Option<TranscriptRevisionBody>, serde_json::Error> {
2600 Ok(self.transcript_history_state()?.and_then(|state| {
2601 state
2602 .revisions
2603 .into_iter()
2604 .find(|body| body.revision == revision)
2605 }))
2606 }
2607
2608 pub fn transcript_revision_messages(
2610 &self,
2611 revision: &str,
2612 ) -> Result<Option<Vec<Message>>, serde_json::Error> {
2613 Ok(self
2614 .transcript_revision_body(revision)?
2615 .map(|body| body.messages))
2616 }
2617
2618 pub fn apply_transcript_history_state(
2620 &mut self,
2621 state: TranscriptHistoryState,
2622 ) -> Result<(), TranscriptEditError> {
2623 validate_transcript_history_state(&state)?;
2624 let head_body = state
2625 .revisions
2626 .iter()
2627 .find(|body| body.revision == state.head)
2628 .ok_or_else(|| {
2629 TranscriptEditError::HistoryStateMalformed(format!(
2630 "missing transcript head body {}",
2631 state.head
2632 ))
2633 })?
2634 .clone();
2635 let value = serde_json::to_value(&state)
2636 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
2637 self.set_metadata(SESSION_TRANSCRIPT_HISTORY_STATE_KEY, value);
2638 let mut updated_at = head_body.created_at;
2639 for commit in &state.commits {
2640 if commit.committed_at > updated_at {
2641 updated_at = commit.committed_at;
2642 }
2643 }
2644 self.messages = Arc::new(head_body.messages);
2645 self.updated_at = updated_at;
2646 Ok(())
2647 }
2648
2649 pub fn transcript_revision(&self) -> Result<String, serde_json::Error> {
2652 if let Some(state) = self.transcript_history_state()? {
2653 Ok(state.head)
2654 } else {
2655 transcript_messages_digest(self.messages())
2656 }
2657 }
2658
2659 pub fn commit_transcript_rewrite(
2661 &mut self,
2662 selection: TranscriptRewriteSelection,
2663 replacement: Vec<Message>,
2664 reason: TranscriptRewriteReason,
2665 actor: Option<String>,
2666 expected_parent_revision: Option<String>,
2667 ) -> Result<TranscriptRewriteCommit, TranscriptEditError> {
2668 let parent_revision = self
2669 .transcript_revision()
2670 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
2671 if let Some(expected) = expected_parent_revision
2672 && expected != parent_revision
2673 {
2674 return Err(TranscriptEditError::RevisionConflict {
2675 expected,
2676 actual: parent_revision,
2677 });
2678 }
2679
2680 let (start, end) = selection.bounds();
2681 let message_count = self.messages.len();
2682 if start > end || end > message_count {
2683 return Err(TranscriptEditError::InvalidRewriteRange {
2684 start,
2685 end,
2686 message_count,
2687 });
2688 }
2689
2690 let replacement_len = replacement.len();
2691 let mut rewritten = Vec::with_capacity(
2692 start
2693 .saturating_add(replacement_len)
2694 .saturating_add(message_count.saturating_sub(end)),
2695 );
2696 rewritten.extend_from_slice(&self.messages[..start]);
2697 rewritten.extend(replacement);
2698 rewritten.extend_from_slice(&self.messages[end..]);
2699 validate_transcript_tool_result_shape(&rewritten)?;
2700
2701 let original_span_digest = sha256_json_digest(&self.messages[start..end])
2702 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
2703 let replacement_digest = sha256_json_digest(&rewritten[start..start + replacement_len])
2704 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
2705 let revision = transcript_messages_digest(&rewritten)
2706 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
2707 if revision == parent_revision {
2708 return Err(TranscriptEditError::NoOpRewrite { revision });
2709 }
2710
2711 let commit = TranscriptRewriteCommit {
2712 parent_revision,
2713 revision: revision.clone(),
2714 selection,
2715 original_span_digest,
2716 replacement_digest,
2717 messages_before: message_count,
2718 messages_after: rewritten.len(),
2719 reason,
2720 actor,
2721 committed_at: SystemTime::now(),
2722 };
2723
2724 let mut state = self
2725 .transcript_history_state()
2726 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?
2727 .unwrap_or_else(|| TranscriptHistoryState {
2728 head: commit.parent_revision.clone(),
2729 commits: Vec::new(),
2730 revisions: Vec::new(),
2731 });
2732 if !state
2733 .revisions
2734 .iter()
2735 .any(|body| body.revision == commit.parent_revision)
2736 {
2737 state.revisions.push(TranscriptRevisionBody {
2738 revision: commit.parent_revision.clone(),
2739 parent_revision: None,
2740 messages: self.messages().to_vec(),
2741 created_at: self.updated_at,
2742 });
2743 }
2744 if !state
2745 .revisions
2746 .iter()
2747 .any(|body| body.revision == commit.revision)
2748 {
2749 state.revisions.push(TranscriptRevisionBody {
2750 revision: commit.revision.clone(),
2751 parent_revision: Some(commit.parent_revision.clone()),
2752 messages: rewritten.clone(),
2753 created_at: commit.committed_at,
2754 });
2755 }
2756 state.head = revision;
2757 state.commits.push(commit.clone());
2758 let value = serde_json::to_value(state)
2759 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
2760 self.set_metadata(SESSION_TRANSCRIPT_HISTORY_STATE_KEY, value);
2761
2762 self.messages = Arc::new(rewritten);
2763 self.updated_at = SystemTime::now();
2764 Ok(commit)
2765 }
2766
2767 fn refresh_transcript_head_after_message_mutation(&mut self) {
2768 if !self
2769 .metadata
2770 .contains_key(SESSION_TRANSCRIPT_HISTORY_STATE_KEY)
2771 {
2772 return;
2773 }
2774 let Ok(Some(mut state)) = self.transcript_history_state() else {
2775 tracing::warn!(
2776 session_id = %self.id,
2777 "transcript history state is malformed; leaving head unchanged after message mutation"
2778 );
2779 return;
2780 };
2781 let Ok(head) = transcript_messages_digest(self.messages()) else {
2782 tracing::warn!(
2783 session_id = %self.id,
2784 "failed to digest transcript after message mutation; leaving head unchanged"
2785 );
2786 return;
2787 };
2788 let previous_head = state.head.clone();
2789 if !state.revisions.iter().any(|body| body.revision == head) {
2790 state.revisions.push(TranscriptRevisionBody {
2791 revision: head.clone(),
2792 parent_revision: Some(previous_head),
2793 messages: self.messages().to_vec(),
2794 created_at: SystemTime::now(),
2795 });
2796 }
2797 state.head = head;
2798 match serde_json::to_value(state) {
2799 Ok(value) => self.set_metadata(SESSION_TRANSCRIPT_HISTORY_STATE_KEY, value),
2800 Err(error) => {
2801 tracing::warn!(
2802 session_id = %self.id,
2803 error = %error,
2804 "failed to serialize transcript history state after message mutation"
2805 );
2806 }
2807 }
2808 }
2809
2810 pub fn set_mob_tool_authority_context(
2812 &mut self,
2813 authority_context: Option<MobToolAuthorityContext>,
2814 ) -> Result<(), serde_json::Error> {
2815 let mut build_state = self.build_state().unwrap_or_default();
2816 build_state.mob_tool_authority_context = authority_context;
2817 self.set_build_state(build_state)
2818 }
2819
2820 pub fn mob_tool_authority_context(&self) -> Option<MobToolAuthorityContext> {
2822 self.build_state()
2823 .and_then(|state| state.mob_tool_authority_context)
2824 }
2825
2826 pub fn fork_at(&self, index: usize) -> Self {
2831 let now = SystemTime::now();
2832 let truncated = self.messages[..index.min(self.messages.len())].to_vec();
2833 Self {
2834 version: SESSION_VERSION,
2835 id: SessionId::new(),
2836 messages: Arc::new(truncated),
2837 created_at: now,
2838 updated_at: now,
2839 metadata: self.branch_metadata(),
2840 usage: self.usage.clone(),
2841 }
2842 }
2843
2844 pub fn fork_replacing(
2851 &self,
2852 message_index: usize,
2853 replacement: TranscriptReplacement,
2854 ) -> Result<Self, TranscriptEditError> {
2855 let Some(original) = self.messages.get(message_index) else {
2856 return Err(TranscriptEditError::MessageIndexOutOfBounds {
2857 message_index,
2858 message_count: self.messages.len(),
2859 });
2860 };
2861
2862 let replacement_message = match replacement {
2863 TranscriptReplacement::Message { message } => message,
2864 TranscriptReplacement::UserContentBlock { block_index, block } => {
2865 let Message::User(user) = original else {
2866 return Err(TranscriptEditError::MessageRoleMismatch {
2867 message_index,
2868 expected: "user",
2869 actual: message_role_name(original),
2870 });
2871 };
2872 if block_index >= user.content.len() {
2873 return Err(TranscriptEditError::BlockIndexOutOfBounds {
2874 block_kind: "user content block",
2875 block_index,
2876 block_count: user.content.len(),
2877 });
2878 }
2879 let mut edited = user.clone();
2880 edited.content[block_index] = block;
2881 Message::User(edited)
2882 }
2883 TranscriptReplacement::AssistantBlock { block_index, block } => {
2884 let Message::BlockAssistant(assistant) = original else {
2885 return Err(TranscriptEditError::MessageRoleMismatch {
2886 message_index,
2887 expected: "block_assistant",
2888 actual: message_role_name(original),
2889 });
2890 };
2891 if block_index >= assistant.blocks.len() {
2892 return Err(TranscriptEditError::BlockIndexOutOfBounds {
2893 block_kind: "assistant block",
2894 block_index,
2895 block_count: assistant.blocks.len(),
2896 });
2897 }
2898 let mut edited = assistant.clone();
2899 edited.blocks[block_index] = block;
2900 Message::BlockAssistant(edited)
2901 }
2902 TranscriptReplacement::ToolResultContentBlock {
2903 result_index,
2904 block_index,
2905 block,
2906 } => {
2907 let Message::ToolResults {
2908 results,
2909 created_at,
2910 } = original
2911 else {
2912 return Err(TranscriptEditError::MessageRoleMismatch {
2913 message_index,
2914 expected: "tool_results",
2915 actual: message_role_name(original),
2916 });
2917 };
2918 let Some(result) = results.get(result_index) else {
2919 return Err(TranscriptEditError::BlockIndexOutOfBounds {
2920 block_kind: "tool result",
2921 block_index: result_index,
2922 block_count: results.len(),
2923 });
2924 };
2925 if block_index >= result.content.len() {
2926 return Err(TranscriptEditError::BlockIndexOutOfBounds {
2927 block_kind: "tool result content block",
2928 block_index,
2929 block_count: result.content.len(),
2930 });
2931 }
2932 let mut edited_results = results.clone();
2933 edited_results[result_index].content[block_index] = block;
2934 Message::ToolResults {
2935 results: edited_results,
2936 created_at: *created_at,
2937 }
2938 }
2939 };
2940
2941 let mut forked = self.fork_at(message_index);
2942 forked.push(replacement_message);
2943 Ok(forked)
2944 }
2945
2946 pub fn fork(&self) -> Self {
2951 let now = SystemTime::now();
2952 Self {
2953 version: SESSION_VERSION,
2954 id: SessionId::new(),
2955 messages: Arc::clone(&self.messages),
2956 created_at: now,
2957 updated_at: now,
2958 metadata: self.branch_metadata(),
2959 usage: self.usage.clone(),
2960 }
2961 }
2962
2963 fn branch_metadata(&self) -> serde_json::Map<String, serde_json::Value> {
2964 let mut metadata = self.metadata.clone();
2965 metadata.remove(SESSION_TRANSCRIPT_HISTORY_STATE_KEY);
2966 metadata
2967 }
2968}
2969
2970impl Default for Session {
2971 fn default() -> Self {
2972 Self::new()
2973 }
2974}
2975
2976#[derive(Debug, Clone, Serialize, Deserialize)]
2978#[serde(rename_all = "snake_case")]
2979pub struct SessionMeta {
2980 pub id: SessionId,
2981 pub created_at: SystemTime,
2982 pub updated_at: SystemTime,
2983 pub message_count: usize,
2984 pub total_tokens: u64,
2985 #[serde(default)]
2986 pub metadata: serde_json::Map<String, serde_json::Value>,
2987}
2988
2989#[derive(Debug, Clone, Serialize, Deserialize)]
2991#[serde(rename_all = "snake_case")]
2992pub struct SessionMetadata {
2993 #[serde(default = "default_session_metadata_schema_version")]
2999 pub schema_version: u32,
3000 pub model: String,
3001 pub max_tokens: u32,
3002 #[serde(default = "default_structured_output_retries")]
3003 pub structured_output_retries: u32,
3004 pub provider: Provider,
3005 #[serde(default, skip_serializing_if = "Option::is_none")]
3006 pub self_hosted_server_id: Option<String>,
3007 #[serde(default, skip_serializing_if = "Option::is_none")]
3008 pub provider_params: Option<serde_json::Value>,
3009 pub tooling: SessionTooling,
3010 #[serde(default)]
3011 pub keep_alive: bool,
3012 pub comms_name: Option<String>,
3013 #[serde(default, skip_serializing_if = "Option::is_none")]
3015 pub peer_meta: Option<PeerMeta>,
3016 #[serde(default, skip_serializing_if = "Option::is_none")]
3018 pub realm_id: Option<String>,
3019 #[serde(default, skip_serializing_if = "Option::is_none")]
3021 pub instance_id: Option<String>,
3022 #[serde(default, skip_serializing_if = "Option::is_none")]
3024 pub backend: Option<String>,
3025 #[serde(default, skip_serializing_if = "Option::is_none")]
3027 pub config_generation: Option<u64>,
3028 #[serde(default, skip_serializing_if = "Option::is_none")]
3038 pub auth_binding: Option<crate::AuthBindingRef>,
3039}
3040
3041fn default_structured_output_retries() -> u32 {
3042 2
3043}
3044
3045fn default_session_metadata_schema_version() -> u32 {
3046 1
3047}
3048
3049#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
3051#[serde(rename_all = "snake_case")]
3052pub struct SessionLlmIdentity {
3053 pub model: String,
3054 pub provider: Provider,
3055 #[serde(default, skip_serializing_if = "Option::is_none")]
3056 pub self_hosted_server_id: Option<String>,
3057 #[serde(default, skip_serializing_if = "Option::is_none")]
3058 pub provider_params: Option<serde_json::Value>,
3059 #[serde(default, skip_serializing_if = "Option::is_none")]
3072 pub auth_binding: Option<crate::AuthBindingRef>,
3073}
3074
3075pub struct SessionLlmIdentityOverride<'a> {
3077 pub model: Option<&'a str>,
3078 pub provider: Option<Provider>,
3079 pub provider_params: Option<&'a serde_json::Value>,
3080 pub clear_provider_params: bool,
3081 pub auth_binding: Option<&'a crate::AuthBindingRef>,
3082 pub clear_auth_binding: bool,
3083}
3084
3085#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
3086pub enum SessionLlmIdentityOverrideError {
3087 #[error("provider override requires model on an existing session")]
3088 ProviderRequiresModel,
3089 #[error("clear_provider_params cannot be combined with provider_params")]
3090 SetAndClearProviderParams,
3091 #[error("clear_auth_binding cannot be combined with auth_binding")]
3092 SetAndClearAuthBinding,
3093 #[error("{0}")]
3094 ProviderModelMismatch(String),
3095 #[error("self-hosted provider requires a registered model alias; '{model}' is not configured")]
3096 MissingSelfHostedAlias { model: String },
3097}
3098
3099pub fn resolve_session_llm_identity_override(
3107 current: &SessionLlmIdentity,
3108 registry: &crate::ModelRegistry,
3109 overrides: SessionLlmIdentityOverride<'_>,
3110) -> Result<SessionLlmIdentity, SessionLlmIdentityOverrideError> {
3111 if overrides.provider.is_some() && overrides.model.is_none() {
3112 return Err(SessionLlmIdentityOverrideError::ProviderRequiresModel);
3113 }
3114 if overrides.clear_provider_params && overrides.provider_params.is_some() {
3115 return Err(SessionLlmIdentityOverrideError::SetAndClearProviderParams);
3116 }
3117 if overrides.clear_auth_binding && overrides.auth_binding.is_some() {
3118 return Err(SessionLlmIdentityOverrideError::SetAndClearAuthBinding);
3119 }
3120
3121 let model = overrides
3122 .model
3123 .map(str::to_string)
3124 .unwrap_or_else(|| current.model.clone());
3125 let provider = if let Some(provider) = overrides.provider {
3126 provider
3127 } else if overrides.model.is_some() {
3128 registry
3129 .entry(&model)
3130 .map_or(current.provider, |entry| entry.provider)
3131 } else {
3132 current.provider
3133 };
3134
3135 if (overrides.model.is_some() || overrides.provider.is_some())
3136 && let Some(reason) = registry.provider_override_mismatch_reason(provider, &model)
3137 {
3138 return Err(SessionLlmIdentityOverrideError::ProviderModelMismatch(
3139 reason,
3140 ));
3141 }
3142
3143 let provider_params = if overrides.clear_provider_params {
3144 None
3145 } else {
3146 overrides
3147 .provider_params
3148 .cloned()
3149 .or_else(|| current.provider_params.clone())
3150 };
3151 let self_hosted_server_id = if provider == Provider::SelfHosted {
3152 if overrides.model.is_none() {
3153 current.self_hosted_server_id.clone().or_else(|| {
3154 registry
3155 .entry_for_provider(Provider::SelfHosted, &model)
3156 .and_then(|entry| entry.self_hosted.as_ref())
3157 .map(|server| server.server_id.clone())
3158 })
3159 } else {
3160 let entry = registry
3161 .entry_for_provider(Provider::SelfHosted, &model)
3162 .ok_or_else(|| SessionLlmIdentityOverrideError::MissingSelfHostedAlias {
3163 model: model.clone(),
3164 })?;
3165 entry
3166 .self_hosted
3167 .as_ref()
3168 .map(|server| server.server_id.clone())
3169 }
3170 } else {
3171 None
3172 };
3173
3174 let auth_binding = if overrides.clear_auth_binding
3175 || (provider != current.provider && overrides.auth_binding.is_none())
3176 {
3177 None
3178 } else {
3179 overrides
3180 .auth_binding
3181 .cloned()
3182 .or_else(|| current.auth_binding.clone())
3183 };
3184
3185 Ok(SessionLlmIdentity {
3186 model,
3187 provider,
3188 self_hosted_server_id,
3189 provider_params,
3190 auth_binding,
3191 })
3192}
3193
3194#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
3201#[serde(rename_all = "snake_case")]
3202pub struct SessionLlmRequestPolicy {
3203 pub model: String,
3204 #[serde(default, skip_serializing_if = "Option::is_none")]
3205 pub provider_params: Option<serde_json::Value>,
3206 #[serde(default, skip_serializing_if = "Option::is_none")]
3207 pub provider_tool_defaults: Option<serde_json::Value>,
3208}
3209
3210impl SessionMetadata {
3211 pub fn llm_identity(&self) -> SessionLlmIdentity {
3213 SessionLlmIdentity {
3214 model: self.model.clone(),
3215 provider: self.provider,
3216 self_hosted_server_id: self.self_hosted_server_id.clone(),
3217 provider_params: self.provider_params.clone(),
3218 auth_binding: self.auth_binding.clone(),
3219 }
3220 }
3221
3222 pub fn apply_llm_identity(&mut self, identity: &SessionLlmIdentity) {
3224 self.model = identity.model.clone();
3225 self.provider = identity.provider;
3226 self.self_hosted_server_id = identity.self_hosted_server_id.clone();
3227 self.provider_params = identity.provider_params.clone();
3228 self.auth_binding = identity.auth_binding.clone();
3229 }
3230}
3231
3232pub const SESSION_METADATA_KEY: &str = "session_metadata";
3234
3235#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
3243#[serde(rename_all = "snake_case")]
3244pub enum ToolCategoryOverride {
3245 #[default]
3247 Inherit,
3248 Enable,
3250 Disable,
3252}
3253
3254impl ToolCategoryOverride {
3255 #[must_use]
3261 pub fn resolve(self, runtime_default: bool) -> bool {
3262 match self {
3263 Self::Enable => true,
3264 Self::Disable => false,
3265 Self::Inherit => runtime_default,
3266 }
3267 }
3268
3269 #[must_use]
3275 pub fn to_override(self) -> Option<bool> {
3276 match self {
3277 Self::Enable => Some(true),
3278 Self::Disable => Some(false),
3279 Self::Inherit => None,
3280 }
3281 }
3282
3283 #[must_use]
3291 pub fn from_effective(enabled: bool) -> Self {
3292 if enabled { Self::Enable } else { Self::Disable }
3293 }
3294
3295 #[must_use]
3305 pub fn from_override(value: Option<bool>) -> Self {
3306 match value {
3307 Some(true) => Self::Enable,
3308 Some(false) => Self::Disable,
3309 None => Self::Inherit,
3310 }
3311 }
3312}
3313
3314fn deserialize_tool_category_compat<'de, D>(
3322 deserializer: D,
3323) -> Result<ToolCategoryOverride, D::Error>
3324where
3325 D: serde::Deserializer<'de>,
3326{
3327 use serde::de;
3328
3329 struct ToolCategoryVisitor;
3330
3331 impl de::Visitor<'_> for ToolCategoryVisitor {
3332 type Value = ToolCategoryOverride;
3333
3334 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3335 formatter.write_str("a boolean or one of \"inherit\", \"enable\", \"disable\"")
3336 }
3337
3338 fn visit_bool<E: de::Error>(self, v: bool) -> Result<Self::Value, E> {
3339 Ok(if v {
3340 ToolCategoryOverride::Enable
3341 } else {
3342 ToolCategoryOverride::Inherit
3343 })
3344 }
3345
3346 fn visit_str<E: de::Error>(self, v: &str) -> Result<Self::Value, E> {
3347 match v {
3348 "inherit" => Ok(ToolCategoryOverride::Inherit),
3349 "enable" => Ok(ToolCategoryOverride::Enable),
3350 "disable" => Ok(ToolCategoryOverride::Disable),
3351 _ => Err(de::Error::unknown_variant(
3352 v,
3353 &["inherit", "enable", "disable"],
3354 )),
3355 }
3356 }
3357 }
3358
3359 deserializer.deserialize_any(ToolCategoryVisitor)
3360}
3361
3362fn normalize_realtime_item_id(item_id: String) -> Option<String> {
3363 let trimmed = item_id.trim();
3364 (!trimmed.is_empty()).then(|| trimmed.to_string())
3365}
3366
3367fn normalize_realtime_previous_item_id(previous_item_id: Option<String>) -> Option<String> {
3368 previous_item_id.and_then(normalize_realtime_item_id)
3369}
3370
3371fn normalize_realtime_response_id(response_id: String) -> Option<String> {
3372 normalize_realtime_item_id(response_id)
3373}
3374
3375fn normalize_realtime_optional_response_id(response_id: Option<String>) -> Option<String> {
3376 response_id.and_then(normalize_realtime_response_id)
3377}
3378
3379#[must_use]
3395fn promote_item_lane(item: &mut RealtimeTranscriptItemState, lane: TranscriptLane) -> bool {
3396 if item.lane == lane {
3397 return true;
3398 }
3399 let has_content = item.content_segments.values().any(|s| !s.is_empty());
3400 if !has_content {
3401 item.lane = lane;
3404 return true;
3405 }
3406 tracing::warn!(
3407 existing_lane = ?item.lane,
3408 observed_lane = ?lane,
3409 "ignoring realtime transcript lane conflict on item with staged content"
3410 );
3411 false
3412}
3413
3414fn observe_realtime_item(
3415 state: &mut SessionRealtimeTranscriptState,
3416 item_id: String,
3417 previous_item_id: Option<String>,
3418 role: RealtimeTranscriptRole,
3419 response_id: Option<String>,
3420) -> Option<&mut RealtimeTranscriptItemState> {
3421 let item_id = normalize_realtime_item_id(item_id)?;
3422 let previous_item_id = normalize_realtime_previous_item_id(previous_item_id);
3423 let response_id = normalize_realtime_optional_response_id(response_id);
3424 if !state
3425 .first_seen_order
3426 .iter()
3427 .any(|existing| existing == &item_id)
3428 {
3429 state.first_seen_order.push(item_id.clone());
3430 }
3431 let item = state.items.entry(item_id.clone()).or_insert_with(|| {
3432 RealtimeTranscriptItemState::new(role, previous_item_id.clone(), response_id.clone())
3433 });
3434 if item.skipped {
3435 if item.previous_item_id.is_none() && previous_item_id.is_some() {
3436 item.previous_item_id = previous_item_id;
3437 }
3438 tracing::warn!(
3439 item_id = %item_id,
3440 observed_role = ?role,
3441 "ignoring realtime transcript content for item already marked as a contentless causal anchor"
3442 );
3443 return None;
3444 }
3445 if item.role != role {
3446 tracing::warn!(
3447 item_id = %item_id,
3448 existing_role = ?item.role,
3449 observed_role = ?role,
3450 "ignoring realtime transcript item role conflict"
3451 );
3452 return None;
3453 }
3454 if item.previous_item_id.is_none() && previous_item_id.is_some() {
3455 item.previous_item_id = previous_item_id;
3456 }
3457 if let Some(response_id) = response_id {
3458 match item.response_id.as_ref() {
3459 Some(existing) if existing != &response_id => {
3460 tracing::warn!(
3461 item_id = %item_id,
3462 existing_response_id = %existing,
3463 observed_response_id = %response_id,
3464 "ignoring realtime transcript item response conflict"
3465 );
3466 return None;
3467 }
3468 Some(_) => {}
3469 None => item.response_id = Some(response_id),
3470 }
3471 }
3472 Some(item)
3473}
3474
3475fn observe_realtime_skipped_item(
3476 state: &mut SessionRealtimeTranscriptState,
3477 item_id: String,
3478 previous_item_id: Option<String>,
3479) {
3480 let Some(item_id) = normalize_realtime_item_id(item_id) else {
3481 return;
3482 };
3483 let previous_item_id = normalize_realtime_previous_item_id(previous_item_id);
3484 if !state
3485 .first_seen_order
3486 .iter()
3487 .any(|existing| existing == &item_id)
3488 {
3489 state.first_seen_order.push(item_id.clone());
3490 }
3491 let item = state
3492 .items
3493 .entry(item_id)
3494 .or_insert_with(|| RealtimeTranscriptItemState::skipped(previous_item_id.clone()));
3495 if item.previous_item_id.is_none() && previous_item_id.is_some() {
3496 item.previous_item_id = previous_item_id;
3497 }
3498 if item.materialized || item.skipped {
3499 return;
3500 }
3501 if item.role != RealtimeTranscriptRole::Assistant {
3502 tracing::warn!(
3503 existing_role = ?item.role,
3504 "ignoring realtime skipped-item observation for non-assistant item"
3505 );
3506 return;
3507 }
3508 if !item.content_segments.is_empty() {
3509 tracing::warn!("ignoring realtime skipped-item observation for content-bearing item");
3510 return;
3511 }
3512 item.skipped = true;
3513 item.ready = true;
3514}
3515
3516fn mark_realtime_assistant_response_ready(
3517 state: &mut SessionRealtimeTranscriptState,
3518 response_id: &str,
3519) {
3520 for item in state.items.values_mut() {
3521 if item.role == RealtimeTranscriptRole::Assistant
3522 && item.response_id.as_deref() == Some(response_id)
3523 && !item.materialized
3524 && !item.text().is_empty()
3525 {
3526 item.ready = true;
3527 }
3528 }
3529}
3530
3531fn discard_realtime_assistant_response(
3532 state: &mut SessionRealtimeTranscriptState,
3533 response_id: &str,
3534) {
3535 state
3536 .discarded_assistant_response_ids
3537 .insert(response_id.to_string());
3538 for item in state.items.values_mut() {
3539 if item.role == RealtimeTranscriptRole::Assistant
3540 && item.response_id.as_deref() == Some(response_id)
3541 && !item.materialized
3542 {
3543 item.content_segments.clear();
3544 item.skipped = true;
3545 item.ready = true;
3546 }
3547 }
3548 state.assistant_completions.remove(response_id);
3549}
3550
3551fn discard_realtime_assistant_response_by_lane(
3567 state: &mut SessionRealtimeTranscriptState,
3568 response_id: &str,
3569) {
3570 state
3571 .discarded_assistant_response_ids
3572 .insert(response_id.to_string());
3573 for item in state.items.values_mut() {
3574 if item.role != RealtimeTranscriptRole::Assistant
3575 || item.response_id.as_deref() != Some(response_id)
3576 || item.materialized
3577 {
3578 continue;
3579 }
3580 let has_content = item.content_segments.values().any(|s| !s.is_empty());
3581 if item.lane == TranscriptLane::Display && has_content {
3582 continue;
3586 }
3587 item.content_segments.clear();
3591 item.skipped = true;
3592 item.ready = true;
3593 }
3594}
3595
3596fn realtime_transcript_order(state: &SessionRealtimeTranscriptState) -> Vec<String> {
3597 let mut roots = Vec::new();
3598 let mut children: BTreeMap<String, Vec<String>> = BTreeMap::new();
3599 for item_id in &state.first_seen_order {
3600 let Some(item) = state.items.get(item_id) else {
3601 continue;
3602 };
3603 if let Some(previous) = item.previous_item_id.as_ref()
3604 && state.items.contains_key(previous)
3605 {
3606 children
3607 .entry(previous.clone())
3608 .or_default()
3609 .push(item_id.clone());
3610 } else {
3611 roots.push(item_id.clone());
3612 }
3613 }
3614 roots.sort_by_key(|item_id| realtime_first_seen_index(state, item_id));
3615 for child_ids in children.values_mut() {
3616 child_ids.sort_by_key(|item_id| realtime_first_seen_index(state, item_id));
3617 }
3618
3619 let mut ordered = Vec::new();
3620 let mut visited = BTreeSet::new();
3621 for root in roots {
3622 visit_realtime_transcript_item(&root, &children, &mut visited, &mut ordered);
3623 }
3624 for item_id in &state.first_seen_order {
3625 visit_realtime_transcript_item(item_id, &children, &mut visited, &mut ordered);
3626 }
3627 ordered
3628}
3629
3630fn realtime_first_seen_index(state: &SessionRealtimeTranscriptState, item_id: &str) -> usize {
3631 state
3632 .first_seen_order
3633 .iter()
3634 .position(|existing| existing == item_id)
3635 .unwrap_or(usize::MAX)
3636}
3637
3638fn visit_realtime_transcript_item(
3639 item_id: &str,
3640 children: &BTreeMap<String, Vec<String>>,
3641 visited: &mut BTreeSet<String>,
3642 ordered: &mut Vec<String>,
3643) {
3644 if !visited.insert(item_id.to_string()) {
3645 return;
3646 }
3647 ordered.push(item_id.to_string());
3648 if let Some(child_ids) = children.get(item_id) {
3649 for child_id in child_ids {
3650 visit_realtime_transcript_item(child_id, children, visited, ordered);
3651 }
3652 }
3653}
3654
3655fn realtime_predecessor_materialized(
3656 state: &SessionRealtimeTranscriptState,
3657 previous_item_id: Option<&str>,
3658) -> bool {
3659 match previous_item_id {
3660 None => true,
3661 Some(previous_item_id) => state
3662 .items
3663 .get(previous_item_id)
3664 .is_some_and(|item| item.materialized),
3665 }
3666}
3667
3668#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
3675#[serde(rename_all = "snake_case")]
3676pub struct SessionTooling {
3677 #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
3678 pub builtins: ToolCategoryOverride,
3679 #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
3680 pub shell: ToolCategoryOverride,
3681 #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
3682 pub comms: ToolCategoryOverride,
3683 #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
3685 pub mob: ToolCategoryOverride,
3686 #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
3688 pub memory: ToolCategoryOverride,
3689 #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
3691 pub schedule: ToolCategoryOverride,
3692 #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
3694 pub workgraph: ToolCategoryOverride,
3695 #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
3697 pub image_generation: ToolCategoryOverride,
3698 #[serde(default, deserialize_with = "deserialize_tool_category_compat")]
3700 pub web_search: ToolCategoryOverride,
3701 #[serde(default, skip_serializing_if = "Option::is_none")]
3703 pub active_skills: Option<Vec<crate::skills::SkillKey>>,
3704}
3705
3706impl From<&Session> for SessionMeta {
3707 fn from(session: &Session) -> Self {
3708 Self {
3709 id: session.id.clone(),
3710 created_at: session.created_at,
3711 updated_at: session.updated_at,
3712 message_count: session.messages.len(),
3713 total_tokens: session.total_tokens(),
3714 metadata: session.metadata.clone(),
3715 }
3716 }
3717}
3718
3719#[cfg(test)]
3720#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
3721mod tests {
3722 use super::*;
3723 use crate::types::{
3724 AssistantMessage, BlockAssistantMessage, ContentBlock, StopReason, SystemMessage, Usage,
3725 UserMessage,
3726 };
3727 use std::sync::Arc;
3728
3729 fn block_assistant_text(message: &BlockAssistantMessage) -> String {
3730 message
3731 .blocks
3732 .iter()
3733 .filter_map(|block| match block {
3734 AssistantBlock::Text { text, .. } => Some(text.as_str()),
3735 _ => None,
3736 })
3737 .collect()
3738 }
3739
3740 #[test]
3741 fn transcript_rewrite_preserves_full_assistant_block_trace() {
3742 let mut session = Session::new();
3743 session.push(Message::User(UserMessage::text(
3744 "run the trace".to_string(),
3745 )));
3746 session.push(Message::BlockAssistant(BlockAssistantMessage::new(
3747 vec![AssistantBlock::Text {
3748 text: "original assistant trace".to_string(),
3749 meta: None,
3750 }],
3751 StopReason::EndTurn,
3752 )));
3753
3754 let parent_revision = session.transcript_revision().expect("parent revision");
3755 let replacement = vec![
3756 Message::BlockAssistant(BlockAssistantMessage::new(
3757 vec![
3758 AssistantBlock::Text {
3759 text: "compacted assistant trace".to_string(),
3760 meta: None,
3761 },
3762 AssistantBlock::ToolUse {
3763 id: "toolu_trace".to_string(),
3764 name: "trace_probe".to_string(),
3765 args: serde_json::value::RawValue::from_string(
3766 r#"{"path":"N-3"}"#.to_string(),
3767 )
3768 .expect("valid tool args"),
3769 meta: None,
3770 },
3771 ],
3772 StopReason::ToolUse,
3773 )),
3774 Message::tool_results(vec![ToolResult::new(
3775 "toolu_trace".to_string(),
3776 "trace complete".to_string(),
3777 false,
3778 )]),
3779 ];
3780
3781 let commit = session
3782 .commit_transcript_rewrite(
3783 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
3784 replacement,
3785 TranscriptRewriteReason::new("compaction"),
3786 Some("unit-test".to_string()),
3787 Some(parent_revision.clone()),
3788 )
3789 .expect("rewrite should commit");
3790
3791 assert_eq!(commit.parent_revision, parent_revision);
3792 let current = session
3793 .transcript_revision_messages(&commit.revision)
3794 .expect("history state should decode")
3795 .expect("current revision should be retained");
3796 let Message::BlockAssistant(assistant) = ¤t[1] else {
3797 panic!("replacement should remain a block assistant message");
3798 };
3799 assert!(assistant.blocks.iter().any(|block| matches!(
3800 block,
3801 AssistantBlock::ToolUse { name, args, .. }
3802 if name == "trace_probe" && args.get().contains("\"N-3\"")
3803 )));
3804
3805 let parent = session
3806 .transcript_revision_messages(&parent_revision)
3807 .expect("history state should decode")
3808 .expect("parent revision should remain retained");
3809 assert!(matches!(
3810 &parent[1],
3811 Message::BlockAssistant(assistant)
3812 if block_assistant_text(assistant).contains("original assistant trace")
3813 ));
3814 }
3815
3816 #[test]
3817 fn transcript_rewrite_rejects_trailing_block_assistant_tool_call() {
3818 let mut session = Session::new();
3819 session.push(Message::User(UserMessage::text("question".to_string())));
3820 session.push(Message::Assistant(AssistantMessage {
3821 content: "plain answer".to_string(),
3822 tool_calls: Vec::new(),
3823 stop_reason: StopReason::EndTurn,
3824 usage: Usage::default(),
3825 created_at: crate::types::message_timestamp_now(),
3826 }));
3827 let parent_revision = session.transcript_revision().expect("parent revision");
3828
3829 let err = session
3830 .commit_transcript_rewrite(
3831 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
3832 vec![Message::BlockAssistant(BlockAssistantMessage::new(
3833 vec![AssistantBlock::ToolUse {
3834 id: "toolu_1".to_string(),
3835 name: "lookup".to_string(),
3836 args: serde_json::value::RawValue::from_string("{}".to_string())
3837 .expect("valid args"),
3838 meta: None,
3839 }],
3840 StopReason::ToolUse,
3841 ))],
3842 TranscriptRewriteReason::new("compaction"),
3843 Some("unit-test".to_string()),
3844 Some(parent_revision),
3845 )
3846 .expect_err("rewrite should reject trailing unresolved block-assistant tool call");
3847 assert!(matches!(
3848 err,
3849 TranscriptEditError::InvalidTranscriptShape(_)
3850 ));
3851 }
3852
3853 #[test]
3854 fn transcript_rewrite_rejects_no_op_self_edge() {
3855 let mut session = Session::new();
3856 session.push(Message::User(UserMessage::text(
3857 "keep this exact transcript".to_string(),
3858 )));
3859 session.push(Message::Assistant(AssistantMessage {
3860 content: "unchanged".to_string(),
3861 tool_calls: Vec::new(),
3862 stop_reason: StopReason::EndTurn,
3863 usage: Usage::default(),
3864 created_at: crate::types::message_timestamp_now(),
3865 }));
3866
3867 let parent_revision = session.transcript_revision().expect("parent revision");
3868 let err = session
3869 .commit_transcript_rewrite(
3870 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
3871 vec![session.messages()[1].clone()],
3872 TranscriptRewriteReason::new("retry"),
3873 Some("unit-test".to_string()),
3874 Some(parent_revision.clone()),
3875 )
3876 .expect_err("same-content rewrite should not emit a self-edge commit");
3877
3878 assert!(matches!(
3879 err,
3880 TranscriptEditError::NoOpRewrite { revision } if revision == parent_revision
3881 ));
3882 assert!(
3883 session
3884 .transcript_history_state()
3885 .expect("history state should decode")
3886 .is_none()
3887 );
3888 }
3889
3890 #[test]
3891 fn transcript_rewrite_run_boundary_guard_accepts_rewrite_then_append() {
3892 let mut original = Session::new();
3893 original.push(Message::User(UserMessage::text("question".to_string())));
3894 original.push(Message::Assistant(AssistantMessage {
3895 content: "verbose answer".to_string(),
3896 tool_calls: Vec::new(),
3897 stop_reason: StopReason::EndTurn,
3898 usage: Usage::default(),
3899 created_at: crate::types::message_timestamp_now(),
3900 }));
3901
3902 let parent_revision = original.transcript_revision().expect("parent revision");
3903 let mut incoming = original.clone();
3904 incoming
3905 .commit_transcript_rewrite(
3906 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
3907 vec![Message::Assistant(AssistantMessage {
3908 content: "compact answer".to_string(),
3909 tool_calls: Vec::new(),
3910 stop_reason: StopReason::EndTurn,
3911 usage: Usage::default(),
3912 created_at: crate::types::message_timestamp_now(),
3913 })],
3914 TranscriptRewriteReason::new("compaction"),
3915 Some("unit-test".to_string()),
3916 Some(parent_revision),
3917 )
3918 .expect("rewrite should commit");
3919 incoming.push(Message::User(UserMessage::text("follow-up".to_string())));
3920 incoming.push(Message::Assistant(AssistantMessage {
3921 content: "follow-up answer".to_string(),
3922 tool_calls: Vec::new(),
3923 stop_reason: StopReason::EndTurn,
3924 usage: Usage::default(),
3925 created_at: crate::types::message_timestamp_now(),
3926 }));
3927
3928 crate::session_store::run_boundary_snapshot_save_guard(&incoming, Some(&original))
3929 .expect("rewrite plus appended turn should be a valid run-boundary commit");
3930 }
3931
3932 #[test]
3933 fn transcript_rewrite_rejects_orphaned_tool_results() {
3934 let mut session = Session::new();
3935 session.push(Message::User(UserMessage::text("use a tool".to_string())));
3936 session.push(Message::BlockAssistant(BlockAssistantMessage::new(
3937 vec![AssistantBlock::ToolUse {
3938 id: "toolu_1".to_string(),
3939 name: "lookup".to_string(),
3940 args: serde_json::value::RawValue::from_string("{}".to_string())
3941 .expect("valid args"),
3942 meta: None,
3943 }],
3944 StopReason::ToolUse,
3945 )));
3946 session.push(Message::tool_results(vec![ToolResult::new(
3947 "toolu_1".to_string(),
3948 "done".to_string(),
3949 false,
3950 )]));
3951 let parent_revision = session.transcript_revision().expect("parent revision");
3952
3953 let err = session
3954 .commit_transcript_rewrite(
3955 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
3956 vec![Message::Assistant(AssistantMessage {
3957 content: "no tool after all".to_string(),
3958 tool_calls: Vec::new(),
3959 stop_reason: StopReason::EndTurn,
3960 usage: Usage::default(),
3961 created_at: crate::types::message_timestamp_now(),
3962 })],
3963 TranscriptRewriteReason::new("compaction"),
3964 Some("unit-test".to_string()),
3965 Some(parent_revision),
3966 )
3967 .expect_err("rewrite should reject stranded tool results");
3968 assert!(matches!(
3969 err,
3970 TranscriptEditError::InvalidTranscriptShape(_)
3971 ));
3972 }
3973
3974 #[test]
3975 fn transcript_rewrite_rejects_trailing_assistant_tool_call() {
3976 let mut session = Session::new();
3977 session.push(Message::User(UserMessage::text("question".to_string())));
3978 session.push(Message::Assistant(AssistantMessage {
3979 content: "plain answer".to_string(),
3980 tool_calls: Vec::new(),
3981 stop_reason: StopReason::EndTurn,
3982 usage: Usage::default(),
3983 created_at: crate::types::message_timestamp_now(),
3984 }));
3985 let parent_revision = session.transcript_revision().expect("parent revision");
3986
3987 let err = session
3988 .commit_transcript_rewrite(
3989 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
3990 vec![Message::Assistant(AssistantMessage {
3991 content: String::new(),
3992 tool_calls: vec![crate::types::ToolCall::new(
3993 "toolu_1".to_string(),
3994 "lookup".to_string(),
3995 serde_json::json!({}),
3996 )],
3997 stop_reason: StopReason::ToolUse,
3998 usage: Usage::default(),
3999 created_at: crate::types::message_timestamp_now(),
4000 })],
4001 TranscriptRewriteReason::new("compaction"),
4002 Some("unit-test".to_string()),
4003 Some(parent_revision),
4004 )
4005 .expect_err("rewrite should reject trailing unresolved tool call");
4006 assert!(matches!(
4007 err,
4008 TranscriptEditError::InvalidTranscriptShape(_)
4009 ));
4010 }
4011
4012 #[test]
4013 fn transcript_rewrite_rejects_duplicate_tool_results() {
4014 let mut session = Session::new();
4015 session.push(Message::User(UserMessage::text("use a tool".to_string())));
4016 session.push(Message::Assistant(AssistantMessage {
4017 content: "plain answer".to_string(),
4018 tool_calls: Vec::new(),
4019 stop_reason: StopReason::EndTurn,
4020 usage: Usage::default(),
4021 created_at: crate::types::message_timestamp_now(),
4022 }));
4023 let parent_revision = session.transcript_revision().expect("parent revision");
4024
4025 let err = session
4026 .commit_transcript_rewrite(
4027 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4028 vec![
4029 Message::BlockAssistant(BlockAssistantMessage::new(
4030 vec![AssistantBlock::ToolUse {
4031 id: "toolu_1".to_string(),
4032 name: "lookup".to_string(),
4033 args: serde_json::value::RawValue::from_string("{}".to_string())
4034 .expect("valid args"),
4035 meta: None,
4036 }],
4037 StopReason::ToolUse,
4038 )),
4039 Message::tool_results(vec![
4040 ToolResult::new("toolu_1".to_string(), "one".to_string(), false),
4041 ToolResult::new("toolu_1".to_string(), "two".to_string(), false),
4042 ]),
4043 ],
4044 TranscriptRewriteReason::new("compaction"),
4045 Some("unit-test".to_string()),
4046 Some(parent_revision),
4047 )
4048 .expect_err("rewrite should reject duplicate tool results");
4049 assert!(matches!(
4050 err,
4051 TranscriptEditError::InvalidTranscriptShape(_)
4052 ));
4053 }
4054
4055 #[test]
4056 fn transcript_rewrite_record_rejects_prefix_or_suffix_tampering() {
4057 let mut session = Session::new();
4058 session.push(Message::System(SystemMessage::new("keep prefix")));
4059 session.push(Message::Assistant(AssistantMessage {
4060 content: "verbose answer".to_string(),
4061 tool_calls: Vec::new(),
4062 stop_reason: StopReason::EndTurn,
4063 usage: Usage::default(),
4064 created_at: crate::types::message_timestamp_now(),
4065 }));
4066 session.push(Message::User(UserMessage::text("keep suffix".to_string())));
4067
4068 let parent_revision = session.transcript_revision().expect("parent revision");
4069 let commit = session
4070 .commit_transcript_rewrite(
4071 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4072 vec![Message::Assistant(AssistantMessage {
4073 content: "compact answer".to_string(),
4074 tool_calls: Vec::new(),
4075 stop_reason: StopReason::EndTurn,
4076 usage: Usage::default(),
4077 created_at: crate::types::message_timestamp_now(),
4078 })],
4079 TranscriptRewriteReason::new("compaction"),
4080 Some("unit-test".to_string()),
4081 Some(parent_revision),
4082 )
4083 .expect("rewrite should commit");
4084 let state = session
4085 .transcript_history_state()
4086 .expect("history state should decode")
4087 .expect("history state should exist");
4088 let parent_body = state
4089 .revisions
4090 .iter()
4091 .find(|body| body.revision == commit.parent_revision)
4092 .expect("parent body retained")
4093 .clone();
4094 let revision_body = state
4095 .revisions
4096 .iter()
4097 .find(|body| body.revision == commit.revision)
4098 .expect("revision body retained")
4099 .clone();
4100
4101 let mut forged_body = revision_body;
4102 forged_body.messages[0] = Message::System(SystemMessage::new("tampered prefix"));
4103 forged_body.revision =
4104 transcript_messages_digest(&forged_body.messages).expect("forged digest");
4105 let mut forged_commit = commit;
4106 forged_commit.revision = forged_body.revision.clone();
4107 let err = TranscriptRewriteRecord::new(forged_commit, parent_body, forged_body)
4108 .expect_err("record validation must reject changes outside selected span");
4109 assert!(
4110 err.to_string().contains("before the selected span"),
4111 "unexpected error: {err}"
4112 );
4113 }
4114
4115 #[test]
4116 fn transcript_rewrite_replay_allows_normal_turn_revisions_between_rewrites() {
4117 let mut session = Session::new();
4118 session.push(Message::User(UserMessage::text("first".to_string())));
4119 session.push(Message::Assistant(AssistantMessage {
4120 content: "verbose first answer".to_string(),
4121 tool_calls: Vec::new(),
4122 stop_reason: StopReason::EndTurn,
4123 usage: crate::types::Usage::default(),
4124 created_at: crate::types::message_timestamp_now(),
4125 }));
4126
4127 let first_parent = session.transcript_revision().expect("first parent");
4128 let first_commit = session
4129 .commit_transcript_rewrite(
4130 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4131 vec![Message::Assistant(AssistantMessage {
4132 content: "compact first answer".to_string(),
4133 tool_calls: Vec::new(),
4134 stop_reason: StopReason::EndTurn,
4135 usage: crate::types::Usage::default(),
4136 created_at: crate::types::message_timestamp_now(),
4137 })],
4138 TranscriptRewriteReason::new("compaction"),
4139 Some("unit-test".to_string()),
4140 Some(first_parent),
4141 )
4142 .expect("first rewrite");
4143
4144 session.push(Message::User(UserMessage::text("normal turn".to_string())));
4145 session.push(Message::Assistant(AssistantMessage {
4146 content: "verbose second answer".to_string(),
4147 tool_calls: Vec::new(),
4148 stop_reason: StopReason::EndTurn,
4149 usage: crate::types::Usage::default(),
4150 created_at: crate::types::message_timestamp_now(),
4151 }));
4152 let bridge_parent = session
4153 .transcript_revision()
4154 .expect("normal turn should advance transcript head");
4155 assert_ne!(bridge_parent, first_commit.revision);
4156 validate_transcript_history_state(
4157 &session
4158 .transcript_history_state()
4159 .expect("history state should decode")
4160 .expect("history state should exist"),
4161 )
4162 .expect("normal turn head may legitimately differ from last rewrite commit");
4163
4164 let second_commit = session
4165 .commit_transcript_rewrite(
4166 TranscriptRewriteSelection::MessageRange { start: 3, end: 4 },
4167 vec![Message::Assistant(AssistantMessage {
4168 content: "compact second answer".to_string(),
4169 tool_calls: Vec::new(),
4170 stop_reason: StopReason::EndTurn,
4171 usage: crate::types::Usage::default(),
4172 created_at: crate::types::message_timestamp_now(),
4173 })],
4174 TranscriptRewriteReason::new("compaction"),
4175 Some("unit-test".to_string()),
4176 Some(bridge_parent.clone()),
4177 )
4178 .expect("second rewrite");
4179
4180 let state = session
4181 .transcript_history_state()
4182 .expect("history state should decode")
4183 .expect("history state should exist");
4184 let records = state.commits.iter().map(|commit| {
4185 let parent_body = state
4186 .revisions
4187 .iter()
4188 .find(|body| body.revision == commit.parent_revision)
4189 .expect("parent body retained")
4190 .clone();
4191 let revision_body = state
4192 .revisions
4193 .iter()
4194 .find(|body| body.revision == commit.revision)
4195 .expect("revision body retained")
4196 .clone();
4197 TranscriptRewriteRecord::new(commit.clone(), parent_body, revision_body)
4198 .expect("record should validate")
4199 });
4200
4201 let replayed = TranscriptHistoryState::from_rewrite_records(records)
4202 .expect("rewrite replay should accept normal-turn bridge revisions")
4203 .expect("rewrite records should exist");
4204 assert_eq!(replayed.head, second_commit.revision);
4205 assert!(
4206 replayed
4207 .revisions
4208 .iter()
4209 .any(|body| body.revision == bridge_parent)
4210 );
4211 }
4212
4213 #[test]
4214 fn transcript_rewrite_replay_rejects_branched_rewrite_records() {
4215 let mut base = Session::new();
4216 base.push(Message::User(UserMessage::text("question".to_string())));
4217 base.push(Message::Assistant(AssistantMessage {
4218 content: "verbose answer".to_string(),
4219 tool_calls: Vec::new(),
4220 stop_reason: StopReason::EndTurn,
4221 usage: crate::types::Usage::default(),
4222 created_at: crate::types::message_timestamp_now(),
4223 }));
4224 let parent = base.transcript_revision().expect("parent revision");
4225
4226 let mut first = base.clone();
4227 let first_commit = first
4228 .commit_transcript_rewrite(
4229 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4230 vec![Message::Assistant(AssistantMessage {
4231 content: "first compact answer".to_string(),
4232 tool_calls: Vec::new(),
4233 stop_reason: StopReason::EndTurn,
4234 usage: crate::types::Usage::default(),
4235 created_at: crate::types::message_timestamp_now(),
4236 })],
4237 TranscriptRewriteReason::new("compaction"),
4238 Some("unit-test".to_string()),
4239 Some(parent.clone()),
4240 )
4241 .expect("first rewrite");
4242 let first_state = first
4243 .transcript_history_state()
4244 .expect("first state decodes")
4245 .expect("first state exists");
4246
4247 let mut second = base;
4248 let second_commit = second
4249 .commit_transcript_rewrite(
4250 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4251 vec![Message::Assistant(AssistantMessage {
4252 content: "second compact answer".to_string(),
4253 tool_calls: Vec::new(),
4254 stop_reason: StopReason::EndTurn,
4255 usage: crate::types::Usage::default(),
4256 created_at: crate::types::message_timestamp_now(),
4257 })],
4258 TranscriptRewriteReason::new("compaction"),
4259 Some("unit-test".to_string()),
4260 Some(parent),
4261 )
4262 .expect("second rewrite");
4263 let second_state = second
4264 .transcript_history_state()
4265 .expect("second state decodes")
4266 .expect("second state exists");
4267
4268 let record = |state: &TranscriptHistoryState, commit: &TranscriptRewriteCommit| {
4269 let parent_body = state
4270 .revisions
4271 .iter()
4272 .find(|body| body.revision == commit.parent_revision)
4273 .expect("parent body retained")
4274 .clone();
4275 let revision_body = state
4276 .revisions
4277 .iter()
4278 .find(|body| body.revision == commit.revision)
4279 .expect("revision body retained")
4280 .clone();
4281 TranscriptRewriteRecord::new(commit.clone(), parent_body, revision_body)
4282 .expect("record should validate")
4283 };
4284
4285 let err = TranscriptHistoryState::from_rewrite_records(vec![
4286 record(&first_state, &first_commit),
4287 record(&second_state, &second_commit),
4288 ])
4289 .expect_err("branched rewrite records must not replay as a linear source history");
4290 assert!(
4291 err.to_string().contains("does not extend transcript head"),
4292 "unexpected error: {err}"
4293 );
4294 }
4295
4296 #[test]
4297 fn internal_message_rewrites_refresh_transcript_history_head() {
4298 let mut session = Session::new();
4299 session.push(Message::User(UserMessage::text("question".to_string())));
4300 session.push(Message::Assistant(AssistantMessage {
4301 content: "verbose answer".to_string(),
4302 tool_calls: Vec::new(),
4303 stop_reason: StopReason::EndTurn,
4304 usage: crate::types::Usage::default(),
4305 created_at: crate::types::message_timestamp_now(),
4306 }));
4307
4308 let parent = session.transcript_revision().expect("parent revision");
4309 session
4310 .commit_transcript_rewrite(
4311 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4312 vec![Message::Assistant(AssistantMessage {
4313 content: "compact answer".to_string(),
4314 tool_calls: Vec::new(),
4315 stop_reason: StopReason::EndTurn,
4316 usage: crate::types::Usage::default(),
4317 created_at: crate::types::message_timestamp_now(),
4318 })],
4319 TranscriptRewriteReason::new("compaction"),
4320 Some("unit-test".to_string()),
4321 Some(parent),
4322 )
4323 .expect("rewrite should commit");
4324
4325 session.push(Message::User(UserMessage::text(
4326 "notice-bearing turn".to_string(),
4327 )));
4328 session
4329 .retain_messages_internal(
4330 |message| {
4331 !matches!(
4332 message,
4333 Message::User(user)
4334 if user.content.iter().any(|block| matches!(
4335 block,
4336 ContentBlock::Text { text } if text.contains("notice-bearing")
4337 ))
4338 )
4339 },
4340 TranscriptRewriteReason::new("synthetic_notice_cleanup"),
4341 )
4342 .expect("retain should commit internal rewrite");
4343 let retained_digest =
4344 transcript_messages_digest(session.messages()).expect("retained digest");
4345 assert_eq!(
4346 session.transcript_revision().expect("retained head"),
4347 retained_digest
4348 );
4349
4350 session
4351 .replace_messages_internal(
4352 vec![
4353 Message::User(UserMessage::text("compacted question".to_string())),
4354 Message::Assistant(AssistantMessage {
4355 content: "compacted answer".to_string(),
4356 tool_calls: Vec::new(),
4357 stop_reason: StopReason::EndTurn,
4358 usage: crate::types::Usage::default(),
4359 created_at: crate::types::message_timestamp_now(),
4360 }),
4361 ],
4362 TranscriptRewriteReason::new("compaction"),
4363 )
4364 .expect("replace should commit internal rewrite");
4365 let replaced_digest =
4366 transcript_messages_digest(session.messages()).expect("replaced digest");
4367 assert_eq!(
4368 session.transcript_revision().expect("replaced head"),
4369 replaced_digest
4370 );
4371 let state = session
4372 .transcript_history_state()
4373 .expect("history state should decode")
4374 .expect("history state should exist");
4375 assert!(
4376 state
4377 .revisions
4378 .iter()
4379 .any(|body| body.revision == replaced_digest)
4380 );
4381 validate_transcript_history_state(&state).expect("history state remains valid");
4382 }
4383
4384 #[test]
4385 fn set_system_prompt_refreshes_transcript_history_head_after_rewrite() {
4386 let mut session = Session::new();
4387 session.push(Message::User(UserMessage::text("question".to_string())));
4388 session.push(Message::Assistant(AssistantMessage {
4389 content: "verbose answer".to_string(),
4390 tool_calls: Vec::new(),
4391 stop_reason: StopReason::EndTurn,
4392 usage: crate::types::Usage::default(),
4393 created_at: crate::types::message_timestamp_now(),
4394 }));
4395
4396 let parent = session.transcript_revision().expect("parent revision");
4397 let rewrite = session
4398 .commit_transcript_rewrite(
4399 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4400 vec![Message::Assistant(AssistantMessage {
4401 content: "compact answer".to_string(),
4402 tool_calls: Vec::new(),
4403 stop_reason: StopReason::EndTurn,
4404 usage: crate::types::Usage::default(),
4405 created_at: crate::types::message_timestamp_now(),
4406 })],
4407 TranscriptRewriteReason::new("compaction"),
4408 Some("unit-test".to_string()),
4409 Some(parent),
4410 )
4411 .expect("rewrite should commit");
4412
4413 session.set_system_prompt("durable system prompt".to_string());
4414
4415 let head = session
4416 .transcript_revision()
4417 .expect("system prompt should refresh transcript head");
4418 assert_ne!(head, rewrite.revision);
4419 assert_eq!(
4420 head,
4421 transcript_messages_digest(session.messages()).expect("current digest")
4422 );
4423 let head_messages = session
4424 .transcript_revision_messages(&head)
4425 .expect("history state should decode")
4426 .expect("refreshed head body should be retained");
4427 assert_eq!(
4428 serde_json::to_value(&head_messages).expect("head serializes"),
4429 serde_json::to_value(session.messages()).expect("session serializes")
4430 );
4431 validate_transcript_history_state(
4432 &session
4433 .transcript_history_state()
4434 .expect("history state should decode")
4435 .expect("history state should exist"),
4436 )
4437 .expect("history state remains valid after system prompt update");
4438 }
4439
4440 #[test]
4441 fn apply_transcript_history_state_uses_latest_commit_time_for_restored_head() {
4442 let mut session = Session::new();
4443 session.push(Message::User(UserMessage::text("question".to_string())));
4444 session.push(Message::Assistant(AssistantMessage {
4445 content: "verbose answer".to_string(),
4446 tool_calls: Vec::new(),
4447 stop_reason: StopReason::EndTurn,
4448 usage: crate::types::Usage::default(),
4449 created_at: crate::types::message_timestamp_now(),
4450 }));
4451 let original_messages = session.messages().to_vec();
4452 let parent = session.transcript_revision().expect("parent revision");
4453 let compact = session
4454 .commit_transcript_rewrite(
4455 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4456 vec![Message::Assistant(AssistantMessage {
4457 content: "compact answer".to_string(),
4458 tool_calls: Vec::new(),
4459 stop_reason: StopReason::EndTurn,
4460 usage: crate::types::Usage::default(),
4461 created_at: crate::types::message_timestamp_now(),
4462 })],
4463 TranscriptRewriteReason::new("compaction"),
4464 Some("unit-test".to_string()),
4465 Some(parent.clone()),
4466 )
4467 .expect("rewrite should commit");
4468
4469 std::thread::sleep(std::time::Duration::from_millis(2));
4470 let restore = session
4471 .commit_transcript_rewrite(
4472 TranscriptRewriteSelection::MessageRange {
4473 start: 0,
4474 end: session.messages().len(),
4475 },
4476 original_messages.clone(),
4477 TranscriptRewriteReason::new("restore"),
4478 Some("unit-test".to_string()),
4479 Some(compact.revision),
4480 )
4481 .expect("restore should commit");
4482 assert_eq!(restore.revision, parent);
4483
4484 let state = session
4485 .transcript_history_state()
4486 .expect("history state should decode")
4487 .expect("history state should exist");
4488 let restored_body_created_at = state
4489 .revisions
4490 .iter()
4491 .find(|body| body.revision == restore.revision)
4492 .expect("restored body should be retained")
4493 .created_at;
4494 assert!(
4495 restored_body_created_at < restore.committed_at,
4496 "test requires restore commit to be newer than retained body"
4497 );
4498
4499 let mut replayed = Session::new();
4500 replayed
4501 .apply_transcript_history_state(state)
4502 .expect("replay should materialize restored head");
4503 assert_eq!(
4504 serde_json::to_value(replayed.messages()).expect("replayed serializes"),
4505 serde_json::to_value(&original_messages).expect("original serializes")
4506 );
4507 assert_eq!(replayed.updated_at(), restore.committed_at);
4508 }
4509
4510 #[test]
4511 fn test_session_new() {
4512 let session = Session::new();
4513 assert_eq!(session.version(), SESSION_VERSION);
4514 assert!(session.messages().is_empty());
4515 assert!(session.created_at() <= session.updated_at());
4516 }
4517
4518 #[test]
4519 fn llm_identity_model_override_switches_to_catalog_provider() {
4520 let registry = crate::Config::default().model_registry().unwrap();
4521 let current = SessionLlmIdentity {
4522 model: "claude-sonnet-4-5".to_string(),
4523 provider: Provider::Anthropic,
4524 self_hosted_server_id: None,
4525 provider_params: None,
4526 auth_binding: Some(crate::AuthBindingRef {
4527 realm: crate::RealmId::parse("tenant_a").unwrap(),
4528 binding: crate::BindingId::parse("anthropic_default").unwrap(),
4529 profile: None,
4530 }),
4531 };
4532
4533 let resolved = resolve_session_llm_identity_override(
4534 ¤t,
4535 ®istry,
4536 SessionLlmIdentityOverride {
4537 model: Some("gpt-5.5"),
4538 provider: None,
4539 provider_params: None,
4540 clear_provider_params: false,
4541 auth_binding: None,
4542 clear_auth_binding: false,
4543 },
4544 )
4545 .unwrap();
4546
4547 assert_eq!(resolved.model, "gpt-5.5");
4548 assert_eq!(resolved.provider, Provider::OpenAI);
4549 assert!(
4550 resolved.auth_binding.is_none(),
4551 "provider switches must not inherit a binding from the previous provider"
4552 );
4553 }
4554
4555 #[test]
4556 fn llm_identity_model_override_keeps_uncatalogued_model_on_current_provider() {
4557 let registry = crate::Config::default().model_registry().unwrap();
4558 let current = SessionLlmIdentity {
4559 model: "custom-model".to_string(),
4560 provider: Provider::Anthropic,
4561 self_hosted_server_id: None,
4562 provider_params: None,
4563 auth_binding: None,
4564 };
4565
4566 let resolved = resolve_session_llm_identity_override(
4567 ¤t,
4568 ®istry,
4569 SessionLlmIdentityOverride {
4570 model: Some("uncatalogued-custom-model"),
4571 provider: None,
4572 provider_params: None,
4573 clear_provider_params: false,
4574 auth_binding: None,
4575 clear_auth_binding: false,
4576 },
4577 )
4578 .unwrap();
4579
4580 assert_eq!(resolved.model, "uncatalogued-custom-model");
4581 assert_eq!(resolved.provider, Provider::Anthropic);
4582 }
4583
4584 #[test]
4585 fn realtime_transcript_append_is_idempotent_by_provider_item_and_delta_id() {
4586 let mut session = Session::new();
4587
4588 let user = RealtimeTranscriptEvent::UserTranscriptFinal {
4589 item_id: "item_user".to_string(),
4590 previous_item_id: None,
4591 content_index: 0,
4592 text: "hello".to_string(),
4593 };
4594 assert!(
4595 !session
4596 .append_realtime_transcript_event(user.clone())
4597 .is_inert()
4598 );
4599 assert!(session.append_realtime_transcript_event(user).is_inert());
4600
4601 let delta = RealtimeTranscriptEvent::AssistantTextDelta {
4602 response_id: "resp_assistant".to_string(),
4603 delta_id: "evt_delta_1".to_string(),
4604 item_id: "item_assistant".to_string(),
4605 previous_item_id: Some("item_user".to_string()),
4606 content_index: 0,
4607 delta: "hi".to_string(),
4608 };
4609 assert!(
4610 session
4611 .append_realtime_transcript_event(delta.clone())
4612 .is_inert()
4613 );
4614 assert!(session.append_realtime_transcript_event(delta).is_inert());
4615
4616 let terminal = RealtimeTranscriptEvent::AssistantTurnCompleted {
4617 response_id: "resp_assistant".to_string(),
4618 stop_reason: StopReason::EndTurn,
4619 usage: Usage::default(),
4620 };
4621 assert!(
4622 !session
4623 .append_realtime_transcript_event(terminal.clone())
4624 .is_inert()
4625 );
4626 assert!(
4627 session
4628 .append_realtime_transcript_event(terminal)
4629 .is_inert()
4630 );
4631
4632 assert_eq!(session.messages().len(), 2);
4633 assert!(matches!(
4634 &session.messages()[0],
4635 Message::User(user) if user.text_content() == "hello"
4636 ));
4637 assert!(matches!(
4638 &session.messages()[1],
4639 Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "hi"
4640 ));
4641 }
4642
4643 #[test]
4648 fn realtime_transcript_final_text_overrides_partial_delta_and_promotes_to_spoken_lane() {
4649 let mut session = Session::new();
4650
4651 assert!(
4654 session
4655 .append_realtime_transcript_event(
4656 RealtimeTranscriptEvent::AssistantTranscriptDelta {
4657 response_id: "resp_a".to_string(),
4658 delta_id: "evt_1".to_string(),
4659 item_id: "item_a".to_string(),
4660 previous_item_id: None,
4661 content_index: 0,
4662 delta: "incom".to_string(),
4663 }
4664 )
4665 .is_inert()
4666 );
4667
4668 assert!(
4670 session
4671 .append_realtime_transcript_event(
4672 RealtimeTranscriptEvent::AssistantTranscriptFinalText {
4673 response_id: "resp_a".to_string(),
4674 item_id: "item_a".to_string(),
4675 content_index: 0,
4676 text: "complete answer".to_string(),
4677 }
4678 )
4679 .is_inert()
4680 );
4681
4682 let outcome = session.append_realtime_transcript_event(
4684 RealtimeTranscriptEvent::AssistantTurnCompleted {
4685 response_id: "resp_a".to_string(),
4686 stop_reason: StopReason::EndTurn,
4687 usage: Usage::default(),
4688 },
4689 );
4690 assert!(!outcome.is_inert());
4691
4692 assert_eq!(session.messages().len(), 1);
4695 match &session.messages()[0] {
4696 Message::BlockAssistant(assistant) => {
4697 let mut found_transcript = false;
4698 for block in &assistant.blocks {
4699 if let AssistantBlock::Transcript { text, .. } = block {
4700 assert_eq!(text, "complete answer");
4701 found_transcript = true;
4702 }
4703 }
4704 assert!(
4705 found_transcript,
4706 "AssistantTranscriptFinalText must promote to the Spoken lane and \
4707 materialize as AssistantBlock::Transcript"
4708 );
4709 }
4710 other => unreachable!("expected BlockAssistant, got {other:?}"),
4711 }
4712 }
4713
4714 #[test]
4717 fn realtime_transcript_final_text_creates_item_when_no_delta_staged() {
4718 let mut session = Session::new();
4719
4720 assert!(
4721 session
4722 .append_realtime_transcript_event(
4723 RealtimeTranscriptEvent::AssistantTranscriptFinalText {
4724 response_id: "resp_a".to_string(),
4725 item_id: "item_a".to_string(),
4726 content_index: 0,
4727 text: "spoken-final-only".to_string(),
4728 }
4729 )
4730 .is_inert()
4731 );
4732
4733 let outcome = session.append_realtime_transcript_event(
4734 RealtimeTranscriptEvent::AssistantTurnCompleted {
4735 response_id: "resp_a".to_string(),
4736 stop_reason: StopReason::EndTurn,
4737 usage: Usage::default(),
4738 },
4739 );
4740 assert!(!outcome.is_inert());
4741
4742 assert_eq!(session.messages().len(), 1);
4743 match &session.messages()[0] {
4744 Message::BlockAssistant(assistant) => {
4745 let has_transcript = assistant.blocks.iter().any(|b| {
4746 matches!(b, AssistantBlock::Transcript { text, .. } if text == "spoken-final-only")
4747 });
4748 assert!(
4749 has_transcript,
4750 "final-only provider path must materialize as Transcript on the Spoken lane"
4751 );
4752 }
4753 other => unreachable!("expected BlockAssistant, got {other:?}"),
4754 }
4755 }
4756
4757 #[test]
4758 fn realtime_transcript_append_orders_causally_equivalent_out_of_order_items() {
4759 let mut session = Session::new();
4760
4761 assert!(
4762 session
4763 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
4764 response_id: "resp_assistant".to_string(),
4765 delta_id: "evt_delta_1".to_string(),
4766 item_id: "item_assistant".to_string(),
4767 previous_item_id: Some("item_user".to_string()),
4768 content_index: 0,
4769 delta: "answer".to_string(),
4770 })
4771 .is_inert()
4772 );
4773 assert!(
4774 session
4775 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
4776 response_id: "resp_assistant".to_string(),
4777 stop_reason: StopReason::EndTurn,
4778 usage: Usage::default(),
4779 })
4780 .is_inert()
4781 );
4782
4783 let outcome = session.append_realtime_transcript_event(
4784 RealtimeTranscriptEvent::UserTranscriptFinal {
4785 item_id: "item_user".to_string(),
4786 previous_item_id: None,
4787 content_index: 0,
4788 text: "question".to_string(),
4789 },
4790 );
4791
4792 assert_eq!(outcome.materialized_messages.len(), 2);
4793 assert_eq!(session.messages().len(), 2);
4794 assert!(matches!(
4795 &session.messages()[0],
4796 Message::User(user) if user.text_content() == "question"
4797 ));
4798 assert!(matches!(
4799 &session.messages()[1],
4800 Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "answer"
4801 ));
4802 }
4803
4804 #[test]
4805 fn realtime_transcript_replay_of_seen_provider_items_is_inert() {
4806 let mut session = Session::new();
4807 let events = vec![
4808 RealtimeTranscriptEvent::UserTranscriptFinal {
4809 item_id: "item_user".to_string(),
4810 previous_item_id: None,
4811 content_index: 0,
4812 text: "hello".to_string(),
4813 },
4814 RealtimeTranscriptEvent::AssistantTextDelta {
4815 response_id: "resp_assistant".to_string(),
4816 delta_id: "evt_delta_1".to_string(),
4817 item_id: "item_assistant".to_string(),
4818 previous_item_id: Some("item_user".to_string()),
4819 content_index: 0,
4820 delta: "world".to_string(),
4821 },
4822 RealtimeTranscriptEvent::AssistantTurnCompleted {
4823 response_id: "resp_assistant".to_string(),
4824 stop_reason: StopReason::EndTurn,
4825 usage: Usage::default(),
4826 },
4827 ];
4828
4829 for event in events.iter().cloned() {
4830 let _ = session.append_realtime_transcript_event(event);
4831 }
4832 let first_messages = serde_json::to_value(session.messages()).unwrap();
4833
4834 for event in events {
4835 assert!(session.append_realtime_transcript_event(event).is_inert());
4836 }
4837
4838 assert_eq!(
4839 serde_json::to_value(session.messages()).unwrap(),
4840 first_messages
4841 );
4842 }
4843
4844 #[test]
4845 fn realtime_transcript_user_final_replay_cannot_erase_existing_segment() {
4846 let mut session = Session::new();
4847
4848 let user = RealtimeTranscriptEvent::UserTranscriptFinal {
4849 item_id: "item_user".to_string(),
4850 previous_item_id: None,
4851 content_index: 0,
4852 text: "remember amber lantern".to_string(),
4853 };
4854 assert!(
4855 !session
4856 .append_realtime_transcript_event(user.clone())
4857 .is_inert()
4858 );
4859 let first_messages = serde_json::to_value(session.messages()).unwrap();
4860
4861 assert!(
4862 session
4863 .append_realtime_transcript_event(RealtimeTranscriptEvent::UserTranscriptFinal {
4864 item_id: "item_user".to_string(),
4865 previous_item_id: None,
4866 content_index: 0,
4867 text: String::new(),
4868 })
4869 .is_inert()
4870 );
4871 assert!(session.append_realtime_transcript_event(user).is_inert());
4872 assert_eq!(
4873 serde_json::to_value(session.messages()).unwrap(),
4874 first_messages
4875 );
4876 }
4877
4878 #[test]
4879 fn realtime_transcript_empty_user_final_can_be_filled_by_later_nonempty_replay() {
4880 let mut session = Session::new();
4881
4882 assert!(
4883 session
4884 .append_realtime_transcript_event(RealtimeTranscriptEvent::UserTranscriptFinal {
4885 item_id: "item_user".to_string(),
4886 previous_item_id: None,
4887 content_index: 0,
4888 text: String::new(),
4889 })
4890 .is_inert()
4891 );
4892 assert!(session.messages().is_empty());
4893
4894 let outcome = session.append_realtime_transcript_event(
4895 RealtimeTranscriptEvent::UserTranscriptFinal {
4896 item_id: "item_user".to_string(),
4897 previous_item_id: None,
4898 content_index: 0,
4899 text: "remember amber lantern".to_string(),
4900 },
4901 );
4902 assert_eq!(outcome.materialized_messages.len(), 1);
4903 assert_eq!(session.messages().len(), 1);
4904 assert!(matches!(
4905 &session.messages()[0],
4906 Message::User(user) if user.text_content() == "remember amber lantern"
4907 ));
4908 }
4909
4910 #[test]
4911 fn realtime_transcript_skipped_provider_items_preserve_causal_order_without_content() {
4912 let mut session = Session::new();
4913
4914 let assistant_delta = RealtimeTranscriptEvent::AssistantTextDelta {
4915 response_id: "resp_assistant".to_string(),
4916 delta_id: "evt_delta_1".to_string(),
4917 item_id: "item_assistant".to_string(),
4918 previous_item_id: Some("item_tool".to_string()),
4919 content_index: 0,
4920 delta: "done".to_string(),
4921 };
4922 assert!(
4923 session
4924 .append_realtime_transcript_event(assistant_delta.clone())
4925 .is_inert()
4926 );
4927 let assistant_complete = RealtimeTranscriptEvent::AssistantTurnCompleted {
4928 response_id: "resp_assistant".to_string(),
4929 stop_reason: StopReason::EndTurn,
4930 usage: Usage::default(),
4931 };
4932 assert!(
4933 session
4934 .append_realtime_transcript_event(assistant_complete.clone())
4935 .is_inert()
4936 );
4937
4938 let skipped = RealtimeTranscriptEvent::ItemSkipped {
4939 item_id: "item_tool".to_string(),
4940 previous_item_id: Some("item_user".to_string()),
4941 };
4942 assert!(
4943 session
4944 .append_realtime_transcript_event(skipped.clone())
4945 .is_inert(),
4946 "a skipped provider item must not append transcript content"
4947 );
4948 assert!(session.messages().is_empty());
4949
4950 let outcome = session.append_realtime_transcript_event(
4951 RealtimeTranscriptEvent::UserTranscriptFinal {
4952 item_id: "item_user".to_string(),
4953 previous_item_id: None,
4954 content_index: 0,
4955 text: "please use the tool".to_string(),
4956 },
4957 );
4958 assert_eq!(outcome.materialized_messages.len(), 2);
4959 assert_eq!(session.messages().len(), 2);
4960 assert!(matches!(
4961 &session.messages()[0],
4962 Message::User(user) if user.text_content() == "please use the tool"
4963 ));
4964 assert!(matches!(
4965 &session.messages()[1],
4966 Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "done"
4967 ));
4968
4969 let first_messages = serde_json::to_value(session.messages()).unwrap();
4970 assert!(session.append_realtime_transcript_event(skipped).is_inert());
4971 assert!(
4972 session
4973 .append_realtime_transcript_event(assistant_delta)
4974 .is_inert()
4975 );
4976 assert!(
4977 session
4978 .append_realtime_transcript_event(assistant_complete)
4979 .is_inert()
4980 );
4981 assert_eq!(
4982 serde_json::to_value(session.messages()).unwrap(),
4983 first_messages
4984 );
4985 }
4986
4987 #[test]
4988 fn realtime_transcript_interrupted_assistant_item_unblocks_later_provider_items() {
4989 let mut session = Session::new();
4996
4997 let _ = session.append_realtime_transcript_event(
4998 RealtimeTranscriptEvent::UserTranscriptFinal {
4999 item_id: "item_repeat".to_string(),
5000 previous_item_id: None,
5001 content_index: 0,
5002 text: "repeat until stop".to_string(),
5003 },
5004 );
5005 assert!(
5006 session
5007 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5008 response_id: "resp_loop".to_string(),
5009 delta_id: "evt_loop_1".to_string(),
5010 item_id: "item_loop".to_string(),
5011 previous_item_id: Some("item_repeat".to_string()),
5012 content_index: 0,
5013 delta: "Looping now".to_string(),
5014 })
5015 .is_inert()
5016 );
5017 assert!(
5018 session
5019 .append_realtime_transcript_event(RealtimeTranscriptEvent::UserTranscriptFinal {
5020 item_id: "item_stop".to_string(),
5021 previous_item_id: Some("item_loop".to_string()),
5022 content_index: 0,
5023 text: "Stop.".to_string(),
5024 })
5025 .is_inert(),
5026 "the stop turn waits until the interrupted assistant provider item is resolved"
5027 );
5028
5029 let outcome = session.append_realtime_transcript_event(
5030 RealtimeTranscriptEvent::AssistantTurnInterrupted {
5031 response_id: "resp_loop".to_string(),
5032 },
5033 );
5034
5035 assert_eq!(outcome.materialized_messages.len(), 2);
5038 assert_eq!(session.messages().len(), 3);
5040 assert!(matches!(
5041 &session.messages()[0],
5042 Message::User(user) if user.text_content() == "repeat until stop"
5043 ));
5044 match &session.messages()[1] {
5045 Message::BlockAssistant(assistant) => {
5046 let text = block_assistant_text(assistant);
5047 assert_eq!(text, "Looping now");
5048 }
5049 other => unreachable!(
5050 "Display lane assistant item must be retained on Interrupted, got {other:?}"
5051 ),
5052 }
5053 assert!(matches!(
5054 &session.messages()[2],
5055 Message::User(user) if user.text_content() == "Stop."
5056 ));
5057 }
5058
5059 #[test]
5060 fn realtime_transcript_late_interrupted_assistant_delta_stays_noncanonical() {
5061 let mut session = Session::new();
5062
5063 let _ = session.append_realtime_transcript_event(
5064 RealtimeTranscriptEvent::UserTranscriptFinal {
5065 item_id: "item_repeat".to_string(),
5066 previous_item_id: None,
5067 content_index: 0,
5068 text: "repeat until stop".to_string(),
5069 },
5070 );
5071 assert!(
5072 session
5073 .append_realtime_transcript_event(RealtimeTranscriptEvent::ItemObserved {
5074 item_id: "item_loop".to_string(),
5075 previous_item_id: Some("item_repeat".to_string()),
5076 role: RealtimeTranscriptRole::Assistant,
5077 response_id: None,
5078 })
5079 .is_inert(),
5080 "provider can observe an assistant item before the adapter learns its response id"
5081 );
5082 assert!(
5083 session
5084 .append_realtime_transcript_event(
5085 RealtimeTranscriptEvent::AssistantTurnInterrupted {
5086 response_id: "resp_loop".to_string(),
5087 }
5088 )
5089 .is_inert(),
5090 "an interruption can arrive before delayed transcript deltas for the response"
5091 );
5092 assert!(
5093 session
5094 .append_realtime_transcript_event(RealtimeTranscriptEvent::UserTranscriptFinal {
5095 item_id: "item_stop".to_string(),
5096 previous_item_id: Some("item_loop".to_string()),
5097 content_index: 0,
5098 text: "Stop.".to_string(),
5099 })
5100 .is_inert(),
5101 "the stop turn waits for the provider's interrupted assistant item anchor"
5102 );
5103
5104 let late_delta_outcome =
5105 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5106 response_id: "resp_loop".to_string(),
5107 delta_id: "evt_loop_late".to_string(),
5108 item_id: "item_loop".to_string(),
5109 previous_item_id: Some("item_repeat".to_string()),
5110 content_index: 0,
5111 delta: "Looping now".to_string(),
5112 });
5113 assert_eq!(late_delta_outcome.materialized_messages.len(), 1);
5114 assert!(matches!(
5115 &session.messages()[1],
5116 Message::User(user) if user.text_content() == "Stop."
5117 ));
5118 assert!(
5119 session
5120 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
5121 response_id: "resp_loop".to_string(),
5122 stop_reason: StopReason::EndTurn,
5123 usage: Usage::default(),
5124 })
5125 .is_inert(),
5126 "late completion for an interrupted response must not resurrect its deltas"
5127 );
5128 assert!(
5129 session
5130 .messages()
5131 .iter()
5132 .filter_map(|message| match message {
5133 Message::BlockAssistant(assistant) => Some(block_assistant_text(assistant)),
5134 _ => None,
5135 })
5136 .all(|text| !text.contains("Looping now")),
5137 "late interrupted assistant text must remain non-canonical"
5138 );
5139 }
5140
5141 #[test]
5142 fn realtime_transcript_completion_only_finalizes_matching_response() {
5143 let mut session = Session::new();
5144
5145 let _ = session.append_realtime_transcript_event(
5146 RealtimeTranscriptEvent::UserTranscriptFinal {
5147 item_id: "item_user".to_string(),
5148 previous_item_id: None,
5149 content_index: 0,
5150 text: "question".to_string(),
5151 },
5152 );
5153 assert!(
5154 session
5155 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5156 response_id: "resp_a".to_string(),
5157 delta_id: "evt_a".to_string(),
5158 item_id: "item_a".to_string(),
5159 previous_item_id: Some("item_user".to_string()),
5160 content_index: 0,
5161 delta: "answer a".to_string(),
5162 })
5163 .is_inert()
5164 );
5165
5166 assert!(
5167 session
5168 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
5169 response_id: "resp_b".to_string(),
5170 stop_reason: StopReason::EndTurn,
5171 usage: Usage::default(),
5172 })
5173 .is_inert(),
5174 "a completion for another response must not finalize buffered assistant text"
5175 );
5176 assert_eq!(session.messages().len(), 1);
5177
5178 let outcome = session.append_realtime_transcript_event(
5179 RealtimeTranscriptEvent::AssistantTurnCompleted {
5180 response_id: "resp_a".to_string(),
5181 stop_reason: StopReason::EndTurn,
5182 usage: Usage::default(),
5183 },
5184 );
5185 assert_eq!(outcome.materialized_messages.len(), 1);
5186 assert_eq!(session.messages().len(), 2);
5187 assert!(matches!(
5188 &session.messages()[1],
5189 Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "answer a"
5190 ));
5191 }
5192
5193 #[test]
5194 fn realtime_transcript_completion_before_later_delta_is_response_scoped() {
5195 let mut session = Session::new();
5196
5197 let _ = session.append_realtime_transcript_event(
5198 RealtimeTranscriptEvent::UserTranscriptFinal {
5199 item_id: "item_user".to_string(),
5200 previous_item_id: None,
5201 content_index: 0,
5202 text: "question".to_string(),
5203 },
5204 );
5205 assert!(
5206 session
5207 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
5208 response_id: "resp_a".to_string(),
5209 stop_reason: StopReason::EndTurn,
5210 usage: Usage::default(),
5211 })
5212 .is_inert()
5213 );
5214 assert!(
5215 session
5216 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5217 response_id: "resp_b".to_string(),
5218 delta_id: "evt_b".to_string(),
5219 item_id: "item_b".to_string(),
5220 previous_item_id: Some("item_user".to_string()),
5221 content_index: 0,
5222 delta: "wrong response".to_string(),
5223 })
5224 .is_inert(),
5225 "a later delta for another response must not be finalized by resp_a's pending completion"
5226 );
5227
5228 let outcome =
5229 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5230 response_id: "resp_a".to_string(),
5231 delta_id: "evt_a".to_string(),
5232 item_id: "item_a".to_string(),
5233 previous_item_id: Some("item_user".to_string()),
5234 content_index: 0,
5235 delta: "right response".to_string(),
5236 });
5237
5238 assert_eq!(outcome.materialized_messages.len(), 1);
5239 assert_eq!(session.messages().len(), 2);
5240 assert!(matches!(
5241 &session.messages()[1],
5242 Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "right response"
5243 ));
5244 }
5245
5246 #[test]
5247 fn realtime_transcript_late_duplicate_completion_cannot_finalize_unrelated_response() {
5248 let mut session = Session::new();
5249
5250 let _ = session.append_realtime_transcript_event(
5251 RealtimeTranscriptEvent::UserTranscriptFinal {
5252 item_id: "item_user".to_string(),
5253 previous_item_id: None,
5254 content_index: 0,
5255 text: "question".to_string(),
5256 },
5257 );
5258 let _ =
5259 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5260 response_id: "resp_a".to_string(),
5261 delta_id: "evt_a".to_string(),
5262 item_id: "item_a".to_string(),
5263 previous_item_id: Some("item_user".to_string()),
5264 content_index: 0,
5265 delta: "first".to_string(),
5266 });
5267 let _ = session.append_realtime_transcript_event(
5268 RealtimeTranscriptEvent::AssistantTurnCompleted {
5269 response_id: "resp_a".to_string(),
5270 stop_reason: StopReason::EndTurn,
5271 usage: Usage::default(),
5272 },
5273 );
5274 assert_eq!(session.messages().len(), 2);
5275
5276 assert!(
5277 session
5278 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5279 response_id: "resp_b".to_string(),
5280 delta_id: "evt_b".to_string(),
5281 item_id: "item_b".to_string(),
5282 previous_item_id: Some("item_a".to_string()),
5283 content_index: 0,
5284 delta: "second".to_string(),
5285 })
5286 .is_inert()
5287 );
5288 assert!(
5289 session
5290 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
5291 response_id: "resp_a".to_string(),
5292 stop_reason: StopReason::EndTurn,
5293 usage: Usage::default(),
5294 })
5295 .is_inert(),
5296 "a duplicate late terminal for resp_a must not finalize resp_b"
5297 );
5298 assert_eq!(session.messages().len(), 2);
5299
5300 let outcome = session.append_realtime_transcript_event(
5301 RealtimeTranscriptEvent::AssistantTurnCompleted {
5302 response_id: "resp_b".to_string(),
5303 stop_reason: StopReason::EndTurn,
5304 usage: Usage::default(),
5305 },
5306 );
5307 assert_eq!(outcome.materialized_messages.len(), 1);
5308 assert_eq!(session.messages().len(), 3);
5309 }
5310
5311 #[test]
5312 fn realtime_transcript_interruption_discards_only_matching_response() {
5313 let mut session = Session::new();
5319
5320 let _ = session.append_realtime_transcript_event(
5321 RealtimeTranscriptEvent::UserTranscriptFinal {
5322 item_id: "item_user".to_string(),
5323 previous_item_id: None,
5324 content_index: 0,
5325 text: "question".to_string(),
5326 },
5327 );
5328 let _ =
5329 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5330 response_id: "resp_a".to_string(),
5331 delta_id: "evt_a".to_string(),
5332 item_id: "item_a".to_string(),
5333 previous_item_id: Some("item_user".to_string()),
5334 content_index: 0,
5335 delta: "interrupted display".to_string(),
5336 });
5337 let _ =
5338 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5339 response_id: "resp_b".to_string(),
5340 delta_id: "evt_b".to_string(),
5341 item_id: "item_b".to_string(),
5342 previous_item_id: Some("item_user".to_string()),
5343 content_index: 0,
5344 delta: "keep me".to_string(),
5345 });
5346
5347 let interrupt_outcome = session.append_realtime_transcript_event(
5350 RealtimeTranscriptEvent::AssistantTurnInterrupted {
5351 response_id: "resp_a".to_string(),
5352 },
5353 );
5354 assert_eq!(
5355 interrupt_outcome.materialized_messages.len(),
5356 1,
5357 "resp_a's Display item commits on Interrupted"
5358 );
5359
5360 let outcome = session.append_realtime_transcript_event(
5361 RealtimeTranscriptEvent::AssistantTurnCompleted {
5362 response_id: "resp_b".to_string(),
5363 stop_reason: StopReason::EndTurn,
5364 usage: Usage::default(),
5365 },
5366 );
5367 assert_eq!(
5368 outcome.materialized_messages.len(),
5369 1,
5370 "resp_b commits on its TurnCompleted, untouched by resp_a's Interrupted"
5371 );
5372
5373 assert_eq!(session.messages().len(), 3);
5375 assert!(matches!(
5376 &session.messages()[1],
5377 Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "interrupted display"
5378 ));
5379 assert!(matches!(
5380 &session.messages()[2],
5381 Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "keep me"
5382 ));
5383 }
5384
5385 #[test]
5388 fn test_fork_shares_arc_no_clone() {
5389 let mut session = Session::new();
5390 for i in 0..100 {
5391 session.push(Message::User(UserMessage::text(format!("Message {i}"))));
5392 }
5393
5394 let forked = session.fork();
5396
5397 assert!(Arc::ptr_eq(&session.messages, &forked.messages));
5399 assert_eq!(forked.messages().len(), 100);
5400 }
5401
5402 #[test]
5403 fn test_fork_at_shares_arc_prefix() {
5404 let mut session = Session::new();
5405 for i in 0..100 {
5406 session.push(Message::User(UserMessage::text(format!("Message {i}"))));
5407 }
5408
5409 let forked = session.fork_at(50);
5411 assert_eq!(forked.messages().len(), 50);
5412
5413 assert_eq!(session.messages().len(), 100);
5415 }
5416
5417 #[test]
5418 fn test_fork_at_resets_transcript_history_state_for_branch_identity() {
5419 let mut session = Session::new();
5420 session.push(Message::User(UserMessage::text(
5421 "summarize this".to_string(),
5422 )));
5423 session.push(Message::BlockAssistant(BlockAssistantMessage::new(
5424 vec![AssistantBlock::Text {
5425 text: "long assistant trace".to_string(),
5426 meta: None,
5427 }],
5428 StopReason::EndTurn,
5429 )));
5430 let parent_revision = session.transcript_revision().expect("parent revision");
5431 session
5432 .commit_transcript_rewrite(
5433 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
5434 vec![Message::BlockAssistant(BlockAssistantMessage::new(
5435 vec![AssistantBlock::Text {
5436 text: "compact trace".to_string(),
5437 meta: None,
5438 }],
5439 StopReason::EndTurn,
5440 ))],
5441 TranscriptRewriteReason::new("compaction"),
5442 Some("test".to_string()),
5443 Some(parent_revision),
5444 )
5445 .expect("rewrite should commit");
5446
5447 let source_head = session.transcript_revision().expect("source head");
5448 let mut forked = session.fork_at(1);
5449 assert_ne!(forked.id(), session.id());
5450 assert!(
5451 !forked
5452 .metadata()
5453 .contains_key(SESSION_TRANSCRIPT_HISTORY_STATE_KEY)
5454 );
5455 assert_eq!(
5456 forked.transcript_revision().expect("fork head"),
5457 transcript_messages_digest(forked.messages()).expect("fork digest")
5458 );
5459 assert!(
5460 forked
5461 .transcript_revision_messages(&source_head)
5462 .expect("fork history lookup")
5463 .is_none()
5464 );
5465
5466 let fork_parent = forked.transcript_revision().expect("fork parent");
5467 let commit = forked
5468 .commit_transcript_rewrite(
5469 TranscriptRewriteSelection::MessageRange { start: 0, end: 1 },
5470 vec![Message::User(UserMessage::text(
5471 "branch prompt".to_string(),
5472 ))],
5473 TranscriptRewriteReason::new("branch_edit"),
5474 Some("test".to_string()),
5475 Some(fork_parent.clone()),
5476 )
5477 .expect("fork rewrite should use fork-local parent");
5478 assert_eq!(commit.parent_revision, fork_parent);
5479 }
5480
5481 #[test]
5482 fn test_push_cow_behavior() {
5483 let mut session = Session::new();
5484 session.push(Message::User(UserMessage::text("First".to_string())));
5485
5486 let forked = session.fork();
5488 assert!(Arc::ptr_eq(&session.messages, &forked.messages));
5489
5490 session.push(Message::User(UserMessage::text("Second".to_string())));
5492
5493 assert!(!Arc::ptr_eq(&session.messages, &forked.messages));
5495 assert_eq!(session.messages().len(), 2);
5496 assert_eq!(forked.messages().len(), 1);
5497 }
5498
5499 #[test]
5502 fn test_push_batch_single_timestamp() {
5503 let mut session = Session::new();
5504 let initial_updated = session.updated_at();
5505
5506 session.push_batch(vec![
5508 Message::User(UserMessage::text("First".to_string())),
5509 Message::User(UserMessage::text("Second".to_string())),
5510 Message::User(UserMessage::text("Third".to_string())),
5511 ]);
5512
5513 assert_eq!(session.messages().len(), 3);
5514 assert!(session.updated_at() >= initial_updated);
5516 }
5517
5518 #[test]
5519 fn test_touch_updates_timestamp() {
5520 let mut session = Session::new();
5521 let initial = session.updated_at();
5522
5523 std::thread::sleep(std::time::Duration::from_millis(10));
5524
5525 session.touch();
5527
5528 assert!(session.updated_at() > initial);
5529 }
5530
5531 #[test]
5532 fn test_session_push() {
5533 let mut session = Session::new();
5534 let initial_updated = session.updated_at();
5535
5536 std::thread::sleep(std::time::Duration::from_millis(10));
5538
5539 session.push(Message::User(UserMessage::text("Hello".to_string())));
5540
5541 assert_eq!(session.messages().len(), 1);
5542 assert!(session.updated_at() > initial_updated);
5543 }
5544
5545 #[test]
5546 fn test_session_fork() {
5547 let mut session = Session::new();
5548 session.push(Message::System(SystemMessage::new("System prompt")));
5549 session.push(Message::User(UserMessage::text("Hello".to_string())));
5550 session.push(Message::Assistant(AssistantMessage {
5551 content: "Hi!".to_string(),
5552 tool_calls: vec![],
5553 stop_reason: StopReason::EndTurn,
5554 usage: Usage::default(),
5555 created_at: crate::types::message_timestamp_now(),
5556 }));
5557
5558 let forked = session.fork_at(2);
5560 assert_eq!(forked.messages().len(), 2);
5561 assert_ne!(forked.id(), session.id());
5562
5563 let full_fork = session.fork();
5565 assert_eq!(full_fork.messages().len(), 3);
5566 }
5567
5568 #[test]
5569 fn test_session_metadata() {
5570 let mut session = Session::new();
5571 session.set_metadata("key", serde_json::json!("value"));
5572
5573 assert_eq!(session.metadata().get("key").unwrap(), "value");
5574 }
5575
5576 #[test]
5577 fn test_session_metadata_backfill_preserves_timestamp() {
5578 let mut session = Session::new();
5579 let initial_updated = session.updated_at();
5580
5581 std::thread::sleep(std::time::Duration::from_millis(10));
5582
5583 assert!(session.backfill_metadata_if_absent("key", serde_json::json!("value")));
5584 assert_eq!(session.metadata().get("key").unwrap(), "value");
5585 assert_eq!(session.updated_at(), initial_updated);
5586 assert!(!session.backfill_metadata_if_absent("key", serde_json::json!("other")));
5587 assert_eq!(session.metadata().get("key").unwrap(), "value");
5588 assert_eq!(session.updated_at(), initial_updated);
5589 }
5590
5591 #[test]
5592 fn test_session_mob_tool_authority_context_roundtrip() {
5593 let mut session = Session::new();
5594 let authority = MobToolAuthorityContext::new(
5595 crate::service::OpaquePrincipalToken::new("opaque-principal"),
5596 false,
5597 )
5598 .with_managed_mob_scope(["mob-a"])
5599 .with_audit_invocation_id("audit-1");
5600
5601 session
5602 .set_mob_tool_authority_context(Some(authority.clone()))
5603 .expect("authority should serialize");
5604 assert_eq!(session.mob_tool_authority_context(), Some(authority));
5605
5606 session
5607 .set_mob_tool_authority_context(None)
5608 .expect("authority should clear");
5609 assert!(session.mob_tool_authority_context().is_none());
5610 }
5611
5612 #[test]
5613 fn test_session_tool_visibility_state_roundtrip() {
5614 let mut session = Session::new();
5615 let state = SessionToolVisibilityState {
5616 inherited_base_filter: ToolFilter::Allow(["visible".to_string()].into_iter().collect()),
5617 active_filter: ToolFilter::Allow(
5618 ["visible".to_string(), "missing".to_string()]
5619 .into_iter()
5620 .collect(),
5621 ),
5622 staged_filter: ToolFilter::Allow(
5623 ["visible".to_string(), "missing".to_string()]
5624 .into_iter()
5625 .collect(),
5626 ),
5627 active_revision: 1,
5628 staged_revision: 2,
5629 ..Default::default()
5630 };
5631
5632 session
5633 .set_tool_visibility_state(state.clone())
5634 .expect("tool visibility state should serialize");
5635 assert_eq!(session.tool_visibility_state().unwrap(), Some(state));
5636 }
5637
5638 #[test]
5639 fn test_session_tool_visibility_state_malformed_returns_error() {
5640 let mut session = Session::new();
5641 session.set_metadata(
5642 SESSION_TOOL_VISIBILITY_STATE_KEY,
5643 serde_json::json!({
5644 "active_filter": {
5645 "unexpected_filter_kind": ["secret"]
5646 }
5647 }),
5648 );
5649
5650 assert!(
5651 session.tool_visibility_state().is_err(),
5652 "malformed canonical visibility metadata must not decode as absent/default"
5653 );
5654 }
5655
5656 #[test]
5657 fn test_session_serialization() {
5658 let mut session = Session::new();
5659 session.push(Message::User(UserMessage::text("Test".to_string())));
5660
5661 let json = serde_json::to_string(&session).unwrap();
5662 let parsed: Session = serde_json::from_str(&json).unwrap();
5663
5664 assert_eq!(parsed.id(), session.id());
5665 assert_eq!(parsed.messages().len(), 1);
5666 assert_eq!(parsed.version(), SESSION_VERSION);
5667 }
5668
5669 #[test]
5670 fn test_session_meta_from_session() {
5671 let mut session = Session::new();
5672 session.push(Message::User(UserMessage::text("Hello".to_string())));
5673 session.push(Message::Assistant(AssistantMessage {
5674 content: "Hi!".to_string(),
5675 tool_calls: vec![],
5676 stop_reason: StopReason::EndTurn,
5677 usage: Usage {
5678 input_tokens: 10,
5679 output_tokens: 5,
5680 cache_creation_tokens: None,
5681 cache_read_tokens: None,
5682 },
5683 created_at: crate::types::message_timestamp_now(),
5684 }));
5685 session.record_usage(Usage {
5686 input_tokens: 10,
5687 output_tokens: 5,
5688 cache_creation_tokens: None,
5689 cache_read_tokens: None,
5690 });
5691
5692 let meta = SessionMeta::from(&session);
5693 assert_eq!(meta.id, *session.id());
5694 assert_eq!(meta.message_count, 2);
5695 assert_eq!(meta.total_tokens, 15);
5696 }
5697
5698 #[test]
5699 fn has_pending_boundary_empty_session() {
5700 let session = Session::new();
5701 assert!(!session.has_pending_boundary());
5702 }
5703
5704 #[test]
5705 fn has_pending_boundary_after_user_message() {
5706 let mut session = Session::new();
5707 session.push(Message::User(UserMessage::text("hello")));
5708 assert!(session.has_pending_boundary());
5709 }
5710
5711 #[test]
5712 fn has_pending_boundary_after_assistant_message() {
5713 let mut session = Session::new();
5714 session.push(Message::User(UserMessage::text("hello")));
5715 session.push(Message::BlockAssistant(BlockAssistantMessage::new(
5716 vec![],
5717 StopReason::EndTurn,
5718 )));
5719 assert!(!session.has_pending_boundary());
5720 }
5721
5722 #[test]
5723 fn has_pending_boundary_after_tool_results() {
5724 let mut session = Session::new();
5725 session.push(Message::User(UserMessage::text("hello")));
5726 session.push(Message::tool_results(vec![]));
5727 assert!(session.has_pending_boundary());
5728 }
5729
5730 #[test]
5731 fn has_pending_boundary_after_system() {
5732 let mut session = Session::new();
5733 session.push(Message::System(SystemMessage::new("system")));
5734 assert!(!session.has_pending_boundary());
5735 }
5736
5737 #[test]
5738 fn system_context_state_preserves_applied_runtime_context() {
5739 let accepted_at = SystemTime::UNIX_EPOCH;
5740 let mut state = SessionSystemContextState::default();
5741 state
5742 .stage_append(
5743 &AppendSystemContextRequest {
5744 text: "Authoritative peer token is birch seventeen.".to_string(),
5745 source: Some(
5746 "peer_response_terminal:analyst:018f6f79-7a82-7c4e-a552-a3b86f9630f1"
5747 .to_string(),
5748 ),
5749 idempotency_key: Some("018f6f79-7a82-7c4e-a552-a3b86f9630f1".to_string()),
5750 },
5751 accepted_at,
5752 )
5753 .expect("append should stage");
5754
5755 state.mark_pending_applied();
5756
5757 assert!(state.pending.is_empty());
5758 assert_eq!(state.applied.len(), 1);
5759 assert_eq!(
5760 state.applied[0].text,
5761 "Authoritative peer token is birch seventeen."
5762 );
5763 assert_eq!(
5764 state.applied[0].source.as_deref(),
5765 Some("peer_response_terminal:analyst:018f6f79-7a82-7c4e-a552-a3b86f9630f1")
5766 );
5767
5768 let round_tripped: SessionSystemContextState =
5769 serde_json::from_value(serde_json::to_value(&state).expect("serialize state"))
5770 .expect("deserialize state");
5771 assert_eq!(round_tripped.applied, state.applied);
5772 }
5773
5774 #[test]
5775 fn active_turn_system_context_is_discarded_when_not_applied() {
5776 let mut state = SessionSystemContextState::default();
5777 state
5778 .stage_active_turn_append(
5779 &AppendSystemContextRequest {
5780 text: "only for the active run".to_string(),
5781 source: Some("runtime:steer:input-1".to_string()),
5782 idempotency_key: Some("runtime:steer:input-1".to_string()),
5783 },
5784 SystemTime::UNIX_EPOCH,
5785 )
5786 .expect("active context should stage");
5787
5788 let discarded = state.discard_unapplied_active_turn_pending();
5789
5790 assert_eq!(discarded.len(), 1);
5791 assert!(state.pending.is_empty());
5792 assert!(state.applied.is_empty());
5793 assert!(state.active_turn_pending_keys.is_empty());
5794 assert!(
5795 state.seen.is_empty(),
5796 "discarded active-turn context should not block later idempotency keys"
5797 );
5798 }
5799
5800 #[test]
5801 fn active_turn_system_context_can_roll_back_targeted_keys() {
5802 let mut state = SessionSystemContextState::default();
5803 for key in ["runtime:steer:input-1", "runtime:steer:input-2"] {
5804 state
5805 .stage_active_turn_append(
5806 &AppendSystemContextRequest {
5807 text: format!("context for {key}"),
5808 source: Some(key.to_string()),
5809 idempotency_key: Some(key.to_string()),
5810 },
5811 SystemTime::UNIX_EPOCH,
5812 )
5813 .expect("active context should stage");
5814 }
5815
5816 let discarded =
5817 state.discard_active_turn_pending_by_keys(&["runtime:steer:input-1".to_string()]);
5818
5819 assert_eq!(discarded.len(), 1);
5820 assert_eq!(
5821 discarded[0].idempotency_key.as_deref(),
5822 Some("runtime:steer:input-1")
5823 );
5824 assert_eq!(state.pending.len(), 1);
5825 assert_eq!(
5826 state.pending[0].idempotency_key.as_deref(),
5827 Some("runtime:steer:input-2")
5828 );
5829 assert!(!state.seen.contains_key("runtime:steer:input-1"));
5830 assert!(state.seen.contains_key("runtime:steer:input-2"));
5831 assert!(
5832 !state
5833 .active_turn_pending_keys
5834 .contains("runtime:steer:input-1")
5835 );
5836 assert!(
5837 state
5838 .active_turn_pending_keys
5839 .contains("runtime:steer:input-2")
5840 );
5841 }
5842
5843 #[test]
5844 fn active_turn_system_context_is_transient_when_boundary_consumes_it() {
5845 let mut state = SessionSystemContextState::default();
5846 state
5847 .stage_active_turn_append(
5848 &AppendSystemContextRequest {
5849 text: "visible to this run".to_string(),
5850 source: Some("runtime:steer:input-2".to_string()),
5851 idempotency_key: Some("runtime:steer:input-2".to_string()),
5852 },
5853 SystemTime::UNIX_EPOCH,
5854 )
5855 .expect("active context should stage");
5856
5857 state.mark_pending_applied();
5858 let discarded = state.discard_unapplied_active_turn_pending();
5859
5860 assert!(discarded.is_empty());
5861 assert!(state.pending.is_empty());
5862 assert!(state.applied.is_empty());
5863 assert!(state.active_turn_pending_keys.is_empty());
5864 assert_eq!(
5865 state.seen.get("runtime:steer:input-2"),
5866 None,
5867 "consumed active-turn steer context must not become durable state"
5868 );
5869 }
5870
5871 #[test]
5872 fn discard_transient_runtime_steer_context_removes_legacy_prompt_and_state() {
5873 let mut session = Session::new();
5874 session.set_system_prompt(format!(
5875 "base{}{}{}{}",
5876 SYSTEM_CONTEXT_SEPARATOR,
5877 render_system_context_block(&PendingSystemContextAppend {
5878 text: "old steer".to_string(),
5879 source: Some("runtime:steer:old".to_string()),
5880 idempotency_key: Some("runtime:steer:old".to_string()),
5881 accepted_at: SystemTime::UNIX_EPOCH,
5882 }),
5883 SYSTEM_CONTEXT_SEPARATOR,
5884 render_system_context_block(&PendingSystemContextAppend {
5885 text: "durable peer fact".to_string(),
5886 source: Some("peer_response_terminal:analyst:req".to_string()),
5887 idempotency_key: Some("peer_response_terminal:analyst:req".to_string()),
5888 accepted_at: SystemTime::UNIX_EPOCH,
5889 })
5890 ));
5891 session
5892 .set_system_context_state(SessionSystemContextState {
5893 pending: vec![PendingSystemContextAppend {
5894 text: "pending steer".to_string(),
5895 source: Some("runtime:steer:pending".to_string()),
5896 idempotency_key: Some("runtime:steer:pending".to_string()),
5897 accepted_at: SystemTime::UNIX_EPOCH,
5898 }],
5899 applied: vec![
5900 PendingSystemContextAppend {
5901 text: "old steer".to_string(),
5902 source: Some("runtime:steer:old".to_string()),
5903 idempotency_key: Some("runtime:steer:old".to_string()),
5904 accepted_at: SystemTime::UNIX_EPOCH,
5905 },
5906 PendingSystemContextAppend {
5907 text: "durable peer fact".to_string(),
5908 source: Some("peer_response_terminal:analyst:req".to_string()),
5909 idempotency_key: Some("peer_response_terminal:analyst:req".to_string()),
5910 accepted_at: SystemTime::UNIX_EPOCH,
5911 },
5912 ],
5913 seen: BTreeMap::from([(
5914 "runtime:steer:old".to_string(),
5915 SeenSystemContextKey {
5916 text: "old steer".to_string(),
5917 source: Some("runtime:steer:old".to_string()),
5918 state: SeenSystemContextState::Applied,
5919 },
5920 )]),
5921 active_turn_pending_keys: BTreeSet::from(["runtime:steer:pending".to_string()]),
5922 })
5923 .expect("system context state should serialize");
5924
5925 let removed = session.discard_transient_runtime_steer_context();
5926
5927 assert!(removed >= 4);
5928 let system_prompt = match session.messages().first() {
5929 Some(Message::System(system)) => system.content.as_str(),
5930 other => panic!("expected system prompt, got {other:?}"),
5931 };
5932 assert!(!system_prompt.contains("runtime:steer:"));
5933 assert!(system_prompt.contains("durable peer fact"));
5934 let state = session.system_context_state().unwrap_or_default();
5935 assert!(state.pending.is_empty());
5936 assert_eq!(state.applied.len(), 1);
5937 assert_eq!(state.applied[0].text, "durable peer fact");
5938 assert!(state.seen.is_empty());
5939 assert!(state.active_turn_pending_keys.is_empty());
5940 }
5941
5942 #[test]
5943 fn append_system_context_blocks_records_typed_applied_context() {
5944 let append = PendingSystemContextAppend {
5945 text: "Authoritative peer token is birch seventeen.".to_string(),
5946 source: Some(
5947 "peer_response_terminal:analyst:018f6f79-7a82-7c4e-a552-a3b86f9630f1".to_string(),
5948 ),
5949 idempotency_key: Some("018f6f79-7a82-7c4e-a552-a3b86f9630f1".to_string()),
5950 accepted_at: SystemTime::UNIX_EPOCH,
5951 };
5952 let mut session = Session::new();
5953
5954 session.append_system_context_blocks(std::slice::from_ref(&append));
5955
5956 let state = session
5957 .system_context_state()
5958 .expect("append should persist typed context state");
5959 assert_eq!(state.applied, vec![append]);
5960 }
5961
5962 #[test]
5963 fn append_system_context_blocks_renders_pre_marked_pending_context() {
5964 let accepted_at = SystemTime::UNIX_EPOCH;
5965 let mut state = SessionSystemContextState::default();
5966 state
5967 .stage_append(
5968 &AppendSystemContextRequest {
5969 text: "Apply this staged context at the request boundary.".to_string(),
5970 source: Some("rpc/session_inject_context".to_string()),
5971 idempotency_key: Some("ctx-boundary".to_string()),
5972 },
5973 accepted_at,
5974 )
5975 .expect("append should stage");
5976 let pending = state.pending.clone();
5977 state.mark_pending_applied();
5978 let mut session = Session::new();
5979 session
5980 .set_system_context_state(state)
5981 .expect("state should serialize");
5982
5983 session.append_system_context_blocks(&pending);
5984
5985 let system_prompt = session
5986 .messages()
5987 .first()
5988 .and_then(|message| match message {
5989 Message::System(system) => Some(system.content.as_str()),
5990 _ => None,
5991 })
5992 .unwrap_or_default();
5993 assert!(system_prompt.contains("Apply this staged context at the request boundary."));
5994 let state = session
5995 .system_context_state()
5996 .expect("append should persist typed context state");
5997 assert_eq!(state.applied.len(), 1);
5998 assert_eq!(
5999 state.seen["ctx-boundary"].state,
6000 SeenSystemContextState::Applied
6001 );
6002 }
6003
6004 #[test]
6005 fn append_system_context_blocks_renders_pre_marked_context_without_idempotency_key() {
6006 let accepted_at = SystemTime::UNIX_EPOCH;
6007 let mut state = SessionSystemContextState::default();
6008 state
6009 .stage_append(
6010 &AppendSystemContextRequest {
6011 text: "Apply this unkeyed staged context at the request boundary.".to_string(),
6012 source: Some("rpc/session_inject_context".to_string()),
6013 idempotency_key: None,
6014 },
6015 accepted_at,
6016 )
6017 .expect("append should stage");
6018 let pending = state.pending.clone();
6019 state.mark_pending_applied();
6020 let mut session = Session::new();
6021 session
6022 .set_system_context_state(state)
6023 .expect("state should serialize");
6024
6025 session.append_system_context_blocks(&pending);
6026
6027 let system_prompt = session
6028 .messages()
6029 .first()
6030 .and_then(|message| match message {
6031 Message::System(system) => Some(system.content.as_str()),
6032 _ => None,
6033 })
6034 .unwrap_or_default();
6035 assert!(
6036 system_prompt.contains("Apply this unkeyed staged context at the request boundary.")
6037 );
6038 }
6039
6040 #[test]
6041 fn append_system_context_blocks_skips_duplicate_idempotency_key() {
6042 let first = PendingSystemContextAppend {
6043 text: "Authoritative peer token is birch seventeen.".to_string(),
6044 source: Some("peer_response_terminal:analyst:req-1".to_string()),
6045 idempotency_key: Some("req-1".to_string()),
6046 accepted_at: SystemTime::UNIX_EPOCH,
6047 };
6048 let duplicate = PendingSystemContextAppend {
6049 accepted_at: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1),
6050 ..first.clone()
6051 };
6052 let mut session = Session::new();
6053
6054 session.append_system_context_blocks(std::slice::from_ref(&first));
6055 session.append_system_context_blocks(std::slice::from_ref(&duplicate));
6056
6057 let state = session
6058 .system_context_state()
6059 .expect("append should persist typed context state");
6060 assert_eq!(state.applied, vec![first]);
6061 let system_prompt = session
6062 .messages()
6063 .first()
6064 .and_then(|message| match message {
6065 Message::System(system) => Some(system.content.as_str()),
6066 _ => None,
6067 })
6068 .unwrap_or_default();
6069 assert_eq!(
6070 system_prompt
6071 .matches("Authoritative peer token is birch seventeen.")
6072 .count(),
6073 1
6074 );
6075 }
6076
6077 #[test]
6078 fn append_system_context_blocks_skips_conflicting_duplicate_idempotency_key() {
6079 let first = PendingSystemContextAppend {
6080 text: "Authoritative peer token is birch seventeen.".to_string(),
6081 source: Some("peer_response_terminal:analyst:req-1".to_string()),
6082 idempotency_key: Some("req-1".to_string()),
6083 accepted_at: SystemTime::UNIX_EPOCH,
6084 };
6085 let conflicting = PendingSystemContextAppend {
6086 text: "Conflicting peer token should not reach the prompt.".to_string(),
6087 accepted_at: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1),
6088 ..first.clone()
6089 };
6090 let mut session = Session::new();
6091
6092 session.append_system_context_blocks(std::slice::from_ref(&first));
6093 session.append_system_context_blocks(std::slice::from_ref(&conflicting));
6094
6095 let state = session
6096 .system_context_state()
6097 .expect("append should persist typed context state");
6098 assert_eq!(state.applied, vec![first]);
6099 let system_prompt = session
6100 .messages()
6101 .first()
6102 .and_then(|message| match message {
6103 Message::System(system) => Some(system.content.as_str()),
6104 _ => None,
6105 })
6106 .unwrap_or_default();
6107 assert!(system_prompt.contains("Authoritative peer token is birch seventeen."));
6108 assert!(!system_prompt.contains("Conflicting peer token should not reach the prompt."));
6109 }
6110
6111 #[test]
6123 fn realtime_transcript_assistant_transcript_delta_materializes_transcript_block() {
6124 let mut session = Session::new();
6125
6126 let delta = RealtimeTranscriptEvent::AssistantTranscriptDelta {
6127 response_id: "resp_spoken".to_string(),
6128 delta_id: "evt_delta_spoken_1".to_string(),
6129 item_id: "item_spoken".to_string(),
6130 previous_item_id: None,
6131 content_index: 0,
6132 delta: "I said hi".to_string(),
6133 };
6134 assert!(
6135 session.append_realtime_transcript_event(delta).is_inert(),
6136 "delta alone is inert until turn-completed flushes"
6137 );
6138
6139 let terminal = RealtimeTranscriptEvent::AssistantTurnCompleted {
6140 response_id: "resp_spoken".to_string(),
6141 stop_reason: StopReason::EndTurn,
6142 usage: Usage::default(),
6143 };
6144 let outcome = session.append_realtime_transcript_event(terminal);
6145 assert_eq!(outcome.materialized_messages.len(), 1);
6146
6147 let messages = session.messages();
6149 assert_eq!(messages.len(), 1);
6150 match &messages[0] {
6151 Message::BlockAssistant(assistant) => {
6152 assert_eq!(assistant.blocks.len(), 1);
6153 match &assistant.blocks[0] {
6154 AssistantBlock::Transcript { text, source, .. } => {
6155 assert_eq!(text, "I said hi");
6156 assert_eq!(*source, crate::types::TranscriptSource::Spoken);
6157 }
6158 other => unreachable!(
6159 "AssistantTranscriptDelta must materialize as AssistantBlock::Transcript, got {other:?}"
6160 ),
6161 }
6162 }
6163 other => unreachable!("expected BlockAssistant message, got {other:?}"),
6164 }
6165 }
6166
6167 #[test]
6168 fn round4_cc4_in_flight_response_ids_lists_distinct_unmaterialized_responses() {
6169 let mut session = Session::new();
6175
6176 for (i, response_id) in [
6180 ("resp_a", "resp_a"),
6181 ("resp_a_extra", "resp_a"),
6182 ("resp_b", "resp_b"),
6183 ("resp_c", "resp_c"),
6184 ]
6185 .iter()
6186 .enumerate()
6187 {
6188 let event = RealtimeTranscriptEvent::AssistantTranscriptDelta {
6189 response_id: response_id.1.to_string(),
6190 delta_id: format!("delta_{i}"),
6191 item_id: response_id.0.to_string(),
6192 previous_item_id: None,
6193 content_index: 0,
6194 delta: "x".to_string(),
6195 };
6196 let _ = session.append_realtime_transcript_event(event);
6197 }
6198
6199 let _ = session.append_realtime_transcript_event(
6201 RealtimeTranscriptEvent::AssistantTurnInterrupted {
6202 response_id: "resp_c".to_string(),
6203 },
6204 );
6205
6206 let _ = session.append_realtime_transcript_event(
6209 RealtimeTranscriptEvent::UserTranscriptFinal {
6210 item_id: "u_item".to_string(),
6211 previous_item_id: None,
6212 content_index: 0,
6213 text: "hi".to_string(),
6214 },
6215 );
6216
6217 let in_flight = session.in_flight_realtime_assistant_response_ids();
6218 assert!(in_flight.contains(&"resp_a".to_string()), "{in_flight:?}");
6219 assert!(in_flight.contains(&"resp_b".to_string()), "{in_flight:?}");
6220 assert!(
6221 !in_flight.contains(&"resp_c".to_string()),
6222 "discarded response must not appear in in_flight: {in_flight:?}"
6223 );
6224 assert_eq!(
6226 in_flight.iter().filter(|r| *r == "resp_a").count(),
6227 1,
6228 "distinct response_ids only: {in_flight:?}"
6229 );
6230 }
6231
6232 #[test]
6233 fn round4_cc2_assistant_turn_completed_after_transcript_deltas_materializes_transcript() {
6234 let mut session = Session::new();
6241
6242 let delta = RealtimeTranscriptEvent::AssistantTranscriptDelta {
6243 response_id: "resp_cc2".to_string(),
6244 delta_id: "delta_cc2_1".to_string(),
6245 item_id: "item_cc2".to_string(),
6246 previous_item_id: None,
6247 content_index: 0,
6248 delta: "hello world".to_string(),
6249 };
6250 assert!(session.append_realtime_transcript_event(delta).is_inert());
6251
6252 assert_eq!(
6254 session.in_flight_realtime_assistant_response_ids(),
6255 vec!["resp_cc2".to_string()]
6256 );
6257
6258 let outcome = session.append_realtime_transcript_event(
6259 RealtimeTranscriptEvent::AssistantTurnCompleted {
6260 response_id: "resp_cc2".to_string(),
6261 stop_reason: StopReason::EndTurn,
6262 usage: Usage::default(),
6263 },
6264 );
6265 assert_eq!(outcome.materialized_messages.len(), 1);
6266
6267 assert!(
6269 session
6270 .in_flight_realtime_assistant_response_ids()
6271 .is_empty(),
6272 "materialized items must not appear in in_flight_realtime_assistant_response_ids"
6273 );
6274
6275 let messages = session.messages();
6276 let assistant = messages.iter().find_map(|m| match m {
6277 Message::BlockAssistant(a) => Some(a),
6278 _ => None,
6279 });
6280 let assistant = assistant.expect("assistant block message expected");
6281 assert_eq!(assistant.blocks.len(), 1);
6282 assert!(matches!(
6283 &assistant.blocks[0],
6284 AssistantBlock::Transcript {
6285 source: crate::types::TranscriptSource::Spoken,
6286 ..
6287 }
6288 ));
6289 }
6290
6291 #[test]
6292 fn realtime_transcript_assistant_text_delta_still_materializes_text_block() {
6293 let mut session = Session::new();
6297
6298 let delta = RealtimeTranscriptEvent::AssistantTextDelta {
6299 response_id: "resp_display".to_string(),
6300 delta_id: "evt_delta_display_1".to_string(),
6301 item_id: "item_display".to_string(),
6302 previous_item_id: None,
6303 content_index: 0,
6304 delta: "I wrote".to_string(),
6305 };
6306 let _ = session.append_realtime_transcript_event(delta);
6307
6308 let terminal = RealtimeTranscriptEvent::AssistantTurnCompleted {
6309 response_id: "resp_display".to_string(),
6310 stop_reason: StopReason::EndTurn,
6311 usage: Usage::default(),
6312 };
6313 let outcome = session.append_realtime_transcript_event(terminal);
6314 assert_eq!(outcome.materialized_messages.len(), 1);
6315
6316 let messages = session.messages();
6317 match &messages[0] {
6318 Message::BlockAssistant(assistant) => match &assistant.blocks[0] {
6319 AssistantBlock::Text { text, .. } => assert_eq!(text, "I wrote"),
6320 other => unreachable!(
6321 "AssistantTextDelta must keep materializing AssistantBlock::Text, got {other:?}"
6322 ),
6323 },
6324 other => unreachable!("expected BlockAssistant message, got {other:?}"),
6325 }
6326 }
6327
6328 #[test]
6329 fn round4_cc7_mixed_response_persists_text_and_transcript_in_order() {
6330 let mut session = Session::new();
6348
6349 let display_a = RealtimeTranscriptEvent::AssistantTextDelta {
6351 response_id: "resp_mixed_1".to_string(),
6352 delta_id: "delta_disp_1".to_string(),
6353 item_id: "item_display".to_string(),
6354 previous_item_id: None,
6355 content_index: 0,
6356 delta: "Here's the report:".to_string(),
6357 };
6358 assert!(
6359 session
6360 .append_realtime_transcript_event(display_a)
6361 .is_inert()
6362 );
6363
6364 let display_b = RealtimeTranscriptEvent::AssistantTextDelta {
6365 response_id: "resp_mixed_1".to_string(),
6366 delta_id: "delta_disp_2".to_string(),
6367 item_id: "item_display".to_string(),
6368 previous_item_id: None,
6369 content_index: 0,
6370 delta: " (still writing)".to_string(),
6371 };
6372 assert!(
6373 session
6374 .append_realtime_transcript_event(display_b)
6375 .is_inert()
6376 );
6377
6378 let spoken_a = RealtimeTranscriptEvent::AssistantTranscriptDelta {
6383 response_id: "resp_mixed_1".to_string(),
6384 delta_id: "delta_spoken_1".to_string(),
6385 item_id: "item_spoken".to_string(),
6386 previous_item_id: Some("item_display".to_string()),
6387 content_index: 0,
6388 delta: "I'm reading the report aloud:".to_string(),
6389 };
6390 assert!(
6391 session
6392 .append_realtime_transcript_event(spoken_a)
6393 .is_inert()
6394 );
6395
6396 let spoken_b = RealtimeTranscriptEvent::AssistantTranscriptDelta {
6397 response_id: "resp_mixed_1".to_string(),
6398 delta_id: "delta_spoken_2".to_string(),
6399 item_id: "item_spoken".to_string(),
6400 previous_item_id: Some("item_display".to_string()),
6401 content_index: 0,
6402 delta: " sentence two.".to_string(),
6403 };
6404 assert!(
6405 session
6406 .append_realtime_transcript_event(spoken_b)
6407 .is_inert()
6408 );
6409
6410 let outcome = session.append_realtime_transcript_event(
6413 RealtimeTranscriptEvent::AssistantTurnCompleted {
6414 response_id: "resp_mixed_1".to_string(),
6415 stop_reason: StopReason::EndTurn,
6416 usage: Usage {
6417 input_tokens: 11,
6418 output_tokens: 22,
6419 cache_creation_tokens: None,
6420 cache_read_tokens: None,
6421 },
6422 },
6423 );
6424 assert_eq!(outcome.materialized_messages.len(), 2);
6426
6427 let messages = session.messages();
6430 let assistants: Vec<&BlockAssistantMessage> = messages
6431 .iter()
6432 .filter_map(|m| match m {
6433 Message::BlockAssistant(a) => Some(a),
6434 _ => None,
6435 })
6436 .collect();
6437 assert_eq!(
6438 assistants.len(),
6439 1,
6440 "mixed display+spoken response under one response_id must produce exactly ONE BlockAssistant message, got: {assistants:?}"
6441 );
6442 let assistant = assistants[0];
6443 assert_eq!(
6444 assistant.blocks.len(),
6445 2,
6446 "mixed response message must carry both blocks: {:?}",
6447 assistant.blocks
6448 );
6449
6450 match &assistant.blocks[0] {
6452 AssistantBlock::Text { text, .. } => {
6453 assert_eq!(text, "Here's the report: (still writing)");
6454 }
6455 other => unreachable!(
6456 "first block must be AssistantBlock::Text (display lane), got {other:?}"
6457 ),
6458 }
6459 match &assistant.blocks[1] {
6461 AssistantBlock::Transcript { text, source, .. } => {
6462 assert_eq!(text, "I'm reading the report aloud: sentence two.");
6463 assert_eq!(*source, crate::types::TranscriptSource::Spoken);
6464 }
6465 other => unreachable!(
6466 "second block must be AssistantBlock::Transcript {{ source: Spoken }}, got {other:?}"
6467 ),
6468 }
6469
6470 assert_eq!(session.usage.input_tokens, 11);
6472 assert_eq!(session.usage.output_tokens, 22);
6473 }
6474
6475 #[test]
6476 fn round5_r55_mixed_response_barge_in_preserves_display_drops_spoken() {
6477 let mut session = Session::new();
6493
6494 let display = RealtimeTranscriptEvent::AssistantTextDelta {
6495 response_id: "resp_mixed_2".to_string(),
6496 delta_id: "delta_disp_1".to_string(),
6497 item_id: "item_display_2".to_string(),
6498 previous_item_id: None,
6499 content_index: 0,
6500 delta: "Working on the report...".to_string(),
6501 };
6502 let _ = session.append_realtime_transcript_event(display);
6503
6504 let spoken = RealtimeTranscriptEvent::AssistantTranscriptDelta {
6505 response_id: "resp_mixed_2".to_string(),
6506 delta_id: "delta_spoken_1".to_string(),
6507 item_id: "item_spoken_2".to_string(),
6508 previous_item_id: Some("item_display_2".to_string()),
6509 content_index: 0,
6510 delta: "I'm reading the report".to_string(),
6511 };
6512 let _ = session.append_realtime_transcript_event(spoken);
6513
6514 let outcome = session.append_realtime_transcript_event(
6518 RealtimeTranscriptEvent::AssistantTurnInterrupted {
6519 response_id: "resp_mixed_2".to_string(),
6520 },
6521 );
6522 assert_eq!(
6523 outcome.materialized_messages.len(),
6524 1,
6525 "Display lane item must materialize on Interrupted: {outcome:?}"
6526 );
6527
6528 let late_completion = session.append_realtime_transcript_event(
6532 RealtimeTranscriptEvent::AssistantTurnCompleted {
6533 response_id: "resp_mixed_2".to_string(),
6534 stop_reason: StopReason::Cancelled,
6535 usage: Usage::default(),
6536 },
6537 );
6538 assert_eq!(
6539 late_completion.materialized_messages.len(),
6540 0,
6541 "post-barge-in TurnCompleted must not resurrect anything"
6542 );
6543
6544 let messages = session.messages();
6547 let assistants: Vec<&BlockAssistantMessage> = messages
6548 .iter()
6549 .filter_map(|m| match m {
6550 Message::BlockAssistant(a) => Some(a),
6551 _ => None,
6552 })
6553 .collect();
6554 assert_eq!(
6555 assistants.len(),
6556 1,
6557 "barge-in must commit exactly one BlockAssistant containing the Display lane: {assistants:?}"
6558 );
6559 let assistant = assistants[0];
6560 assert_eq!(assistant.blocks.len(), 1, "blocks: {:?}", assistant.blocks);
6561 match &assistant.blocks[0] {
6562 AssistantBlock::Text { text, .. } => {
6563 assert_eq!(text, "Working on the report...");
6564 }
6565 other => {
6566 unreachable!("Display lane must materialize as AssistantBlock::Text, got {other:?}")
6567 }
6568 }
6569 assert!(
6571 !assistant
6572 .blocks
6573 .iter()
6574 .any(|b| matches!(b, AssistantBlock::Transcript { .. })),
6575 "Spoken lane must be dropped on barge-in"
6576 );
6577
6578 assert!(
6581 !session
6582 .in_flight_realtime_assistant_response_ids()
6583 .contains(&"resp_mixed_2".to_string()),
6584 "barged-in response must not appear in in_flight_realtime_assistant_response_ids"
6585 );
6586 }
6587
6588 #[test]
6589 fn round5_r55_barge_in_preserves_display_lane_drops_spoken() {
6590 let mut session = Session::new();
6594
6595 let _ =
6596 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
6597 response_id: "resp_a".to_string(),
6598 delta_id: "delta_d_1".to_string(),
6599 item_id: "item_display".to_string(),
6600 previous_item_id: None,
6601 content_index: 0,
6602 delta: "display-text".to_string(),
6603 });
6604 let _ = session.append_realtime_transcript_event(
6605 RealtimeTranscriptEvent::AssistantTranscriptDelta {
6606 response_id: "resp_a".to_string(),
6607 delta_id: "delta_s_1".to_string(),
6608 item_id: "item_spoken".to_string(),
6609 previous_item_id: None,
6610 content_index: 0,
6611 delta: "spoken-transcript".to_string(),
6612 },
6613 );
6614
6615 let outcome = session.append_realtime_transcript_event(
6616 RealtimeTranscriptEvent::AssistantTurnInterrupted {
6617 response_id: "resp_a".to_string(),
6618 },
6619 );
6620 assert_eq!(outcome.materialized_messages.len(), 1);
6622
6623 let messages = session.messages();
6624 let assistants: Vec<&BlockAssistantMessage> = messages
6625 .iter()
6626 .filter_map(|m| match m {
6627 Message::BlockAssistant(a) => Some(a),
6628 _ => None,
6629 })
6630 .collect();
6631 assert_eq!(assistants.len(), 1);
6632 assert_eq!(assistants[0].blocks.len(), 1);
6634 match &assistants[0].blocks[0] {
6635 AssistantBlock::Text { text, .. } => assert_eq!(text, "display-text"),
6636 other => unreachable!("expected Text, got {other:?}"),
6637 }
6638 }
6639
6640 #[test]
6641 fn round5_r55_barge_in_finalizes_retained_display_into_committed_block() {
6642 let mut session = Session::new();
6647
6648 let _ =
6649 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
6650 response_id: "resp_a".to_string(),
6651 delta_id: "delta_d_1".to_string(),
6652 item_id: "item_display".to_string(),
6653 previous_item_id: None,
6654 content_index: 0,
6655 delta: "committed-display-text".to_string(),
6656 });
6657
6658 assert!(session.messages().is_empty());
6660
6661 let outcome = session.append_realtime_transcript_event(
6662 RealtimeTranscriptEvent::AssistantTurnInterrupted {
6663 response_id: "resp_a".to_string(),
6664 },
6665 );
6666 assert_eq!(
6667 outcome.materialized_messages.len(),
6668 1,
6669 "Interrupted must finalize retained Display lane immediately"
6670 );
6671
6672 let messages = session.messages();
6674 assert_eq!(messages.len(), 1);
6675 match &messages[0] {
6676 Message::BlockAssistant(assistant) => {
6677 assert_eq!(assistant.blocks.len(), 1);
6678 match &assistant.blocks[0] {
6679 AssistantBlock::Text { text, .. } => {
6680 assert_eq!(text, "committed-display-text");
6681 }
6682 other => unreachable!("expected Text, got {other:?}"),
6683 }
6684 }
6685 other => unreachable!("expected BlockAssistant, got {other:?}"),
6686 }
6687 }
6688
6689 #[test]
6690 fn round5_r56_truncation_promotes_default_lane_item_to_spoken() {
6691 let mut session = Session::new();
6698
6699 let _ = session.append_realtime_transcript_event(
6700 RealtimeTranscriptEvent::AssistantTranscriptTruncated {
6701 response_id: "resp_a".to_string(),
6702 item_id: "item_a".to_string(),
6703 content_index: 0,
6704 text: "what was actually heard".to_string(),
6705 },
6706 );
6707
6708 let outcome = session.append_realtime_transcript_event(
6709 RealtimeTranscriptEvent::AssistantTurnCompleted {
6710 response_id: "resp_a".to_string(),
6711 stop_reason: StopReason::EndTurn,
6712 usage: Usage::default(),
6713 },
6714 );
6715 assert_eq!(outcome.materialized_messages.len(), 1);
6716
6717 assert_eq!(session.messages().len(), 1);
6718 match &session.messages()[0] {
6719 Message::BlockAssistant(assistant) => {
6720 assert_eq!(assistant.blocks.len(), 1);
6721 match &assistant.blocks[0] {
6722 AssistantBlock::Transcript { text, source, .. } => {
6723 assert_eq!(text, "what was actually heard");
6724 assert_eq!(*source, crate::types::TranscriptSource::Spoken);
6725 }
6726 other => unreachable!(
6727 "truncation-only path must materialize as AssistantBlock::Transcript, got {other:?}"
6728 ),
6729 }
6730 }
6731 other => unreachable!("expected BlockAssistant, got {other:?}"),
6732 }
6733 }
6734
6735 #[test]
6736 fn round5_r56_truncation_after_display_delta_is_no_op_keeping_display_content() {
6737 let mut session = Session::new();
6745
6746 let _ =
6747 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
6748 response_id: "resp_a".to_string(),
6749 delta_id: "delta_d_1".to_string(),
6750 item_id: "item_a".to_string(),
6751 previous_item_id: None,
6752 content_index: 0,
6753 delta: "display-text-from-delta".to_string(),
6754 });
6755
6756 let _ = session.append_realtime_transcript_event(
6757 RealtimeTranscriptEvent::AssistantTranscriptTruncated {
6758 response_id: "resp_a".to_string(),
6759 item_id: "item_a".to_string(),
6760 content_index: 0,
6761 text: "spoken-truncation-text".to_string(),
6762 },
6763 );
6764
6765 let _ = session.append_realtime_transcript_event(
6766 RealtimeTranscriptEvent::AssistantTurnCompleted {
6767 response_id: "resp_a".to_string(),
6768 stop_reason: StopReason::EndTurn,
6769 usage: Usage::default(),
6770 },
6771 );
6772
6773 assert_eq!(session.messages().len(), 1);
6776 match &session.messages()[0] {
6777 Message::BlockAssistant(assistant) => {
6778 assert_eq!(assistant.blocks.len(), 1);
6779 match &assistant.blocks[0] {
6780 AssistantBlock::Text { text, .. } => {
6781 assert_eq!(text, "display-text-from-delta");
6782 }
6783 other => unreachable!(
6784 "Display content must survive misrouted truncation, got {other:?}"
6785 ),
6786 }
6787 }
6788 other => unreachable!("expected BlockAssistant, got {other:?}"),
6789 }
6790 }
6791
6792 #[test]
6800 fn round5_r56_sibling_display_delta_skipped_on_spoken_item() {
6801 let mut session = Session::new();
6802
6803 let _ = session.append_realtime_transcript_event(
6805 RealtimeTranscriptEvent::AssistantTranscriptTruncated {
6806 response_id: "resp_a".to_string(),
6807 item_id: "item_a".to_string(),
6808 content_index: 0,
6809 text: "what was actually heard".to_string(),
6810 },
6811 );
6812
6813 let _ =
6816 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
6817 response_id: "resp_a".to_string(),
6818 delta_id: "delta_d_1".to_string(),
6819 item_id: "item_a".to_string(),
6820 previous_item_id: None,
6821 content_index: 0,
6822 delta: "should-not-appear".to_string(),
6823 });
6824
6825 let _ = session.append_realtime_transcript_event(
6826 RealtimeTranscriptEvent::AssistantTurnCompleted {
6827 response_id: "resp_a".to_string(),
6828 stop_reason: StopReason::EndTurn,
6829 usage: Usage::default(),
6830 },
6831 );
6832
6833 assert_eq!(session.messages().len(), 1);
6836 match &session.messages()[0] {
6837 Message::BlockAssistant(assistant) => {
6838 assert_eq!(assistant.blocks.len(), 1);
6839 match &assistant.blocks[0] {
6840 AssistantBlock::Transcript { text, source, .. } => {
6841 assert_eq!(text, "what was actually heard");
6842 assert_eq!(*source, crate::types::TranscriptSource::Spoken);
6843 }
6844 other => unreachable!(
6845 "Spoken-locked item must materialize as Transcript, got {other:?}"
6846 ),
6847 }
6848 }
6849 other => unreachable!("expected BlockAssistant, got {other:?}"),
6850 }
6851 }
6852
6853 #[test]
6860 fn round5_r56_sibling_spoken_delta_skipped_on_display_item() {
6861 let mut session = Session::new();
6862
6863 let _ =
6865 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
6866 response_id: "resp_a".to_string(),
6867 delta_id: "delta_d_1".to_string(),
6868 item_id: "item_a".to_string(),
6869 previous_item_id: None,
6870 content_index: 0,
6871 delta: "display-locked-text".to_string(),
6872 });
6873
6874 let _ = session.append_realtime_transcript_event(
6877 RealtimeTranscriptEvent::AssistantTranscriptDelta {
6878 response_id: "resp_a".to_string(),
6879 delta_id: "delta_s_1".to_string(),
6880 item_id: "item_a".to_string(),
6881 previous_item_id: None,
6882 content_index: 0,
6883 delta: "should-not-appear".to_string(),
6884 },
6885 );
6886
6887 let _ = session.append_realtime_transcript_event(
6888 RealtimeTranscriptEvent::AssistantTurnCompleted {
6889 response_id: "resp_a".to_string(),
6890 stop_reason: StopReason::EndTurn,
6891 usage: Usage::default(),
6892 },
6893 );
6894
6895 assert_eq!(session.messages().len(), 1);
6897 match &session.messages()[0] {
6898 Message::BlockAssistant(assistant) => {
6899 assert_eq!(assistant.blocks.len(), 1);
6900 match &assistant.blocks[0] {
6901 AssistantBlock::Text { text, .. } => {
6902 assert_eq!(text, "display-locked-text");
6903 }
6904 other => {
6905 unreachable!("Display-locked item must materialize as Text, got {other:?}")
6906 }
6907 }
6908 }
6909 other => unreachable!("expected BlockAssistant, got {other:?}"),
6910 }
6911 }
6912
6913 #[test]
6921 fn round5_r57_late_final_text_after_turn_completed_warns_and_skips() {
6922 let mut session = Session::new();
6923
6924 let _ = session.append_realtime_transcript_event(
6926 RealtimeTranscriptEvent::AssistantTranscriptDelta {
6927 response_id: "resp_a".to_string(),
6928 delta_id: "delta_s_1".to_string(),
6929 item_id: "item_a".to_string(),
6930 previous_item_id: None,
6931 content_index: 0,
6932 delta: "delta-accumulated".to_string(),
6933 },
6934 );
6935
6936 let commit_outcome = session.append_realtime_transcript_event(
6938 RealtimeTranscriptEvent::AssistantTurnCompleted {
6939 response_id: "resp_a".to_string(),
6940 stop_reason: StopReason::EndTurn,
6941 usage: Usage::default(),
6942 },
6943 );
6944 assert_eq!(commit_outcome.materialized_messages.len(), 1);
6945
6946 let late_outcome = session.append_realtime_transcript_event(
6950 RealtimeTranscriptEvent::AssistantTranscriptFinalText {
6951 response_id: "resp_a".to_string(),
6952 item_id: "item_a".to_string(),
6953 content_index: 0,
6954 text: "authoritative-final-that-must-not-land".to_string(),
6955 },
6956 );
6957 assert!(
6958 late_outcome.is_inert(),
6959 "late FinalText after materialization must produce inert outcome"
6960 );
6961
6962 assert_eq!(session.messages().len(), 1);
6965 match &session.messages()[0] {
6966 Message::BlockAssistant(assistant) => {
6967 assert_eq!(assistant.blocks.len(), 1);
6968 match &assistant.blocks[0] {
6969 AssistantBlock::Transcript { text, .. } => {
6970 assert_eq!(
6971 text, "delta-accumulated",
6972 "canonical message must preserve delta-accumulated text; \
6973 append-only history forbids late FinalText repair"
6974 );
6975 }
6976 other => unreachable!("expected Transcript, got {other:?}"),
6977 }
6978 }
6979 other => unreachable!("expected BlockAssistant, got {other:?}"),
6980 }
6981 }
6982}