1use crate::Provider;
13use crate::generated::{session_document, session_persistence_version_authority};
14use crate::lifecycle::run_primitive::TurnMetadataOverride;
15use crate::peer_meta::PeerMeta;
16use crate::realtime_transcript::{
17 RealtimeTranscriptApplyOutcome, RealtimeTranscriptEvent, SESSION_REALTIME_TRANSCRIPT_STATE_KEY,
18};
19use crate::realtime_transcript_revision::{self, SessionRealtimeTranscriptState};
20use crate::service::{AppendSystemContextRequest, MobToolAuthorityContext};
21use crate::session_durable_config_authority;
22use crate::time_compat::SystemTime;
23use crate::tool_scope::ToolFilter;
24use crate::types::{
25 AssistantBlock, BlockAssistantMessage, ContentBlock, ContentInput, Message, SessionId,
26 StopReason, ToolDef, ToolName, ToolProvenance, ToolResult, Usage, UserMessage,
27};
28use serde::{Deserialize, Deserializer, Serialize, Serializer};
29use sha2::{Digest, Sha256};
30use std::collections::{BTreeMap, BTreeSet, HashMap};
31use std::sync::Arc;
32
33pub use crate::generated::session_persistence_version_authority::SESSION_VERSION;
40
41pub use crate::generated::session_persistence_version_authority::SESSION_METADATA_SCHEMA_VERSION;
46
47pub fn session_version() -> u32 {
49 session_persistence_version_authority::session_envelope_version()
50}
51
52pub fn session_metadata_schema_version() -> u32 {
54 session_persistence_version_authority::session_metadata_schema_version()
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
62#[serde(tag = "type", rename_all = "snake_case")]
63pub enum TranscriptReplacement {
64 Message { message: Message },
66 UserContentBlock {
68 block_index: usize,
69 block: ContentBlock,
70 },
71 AssistantBlock {
73 block_index: usize,
74 block: AssistantBlock,
75 },
76 ToolResultContentBlock {
78 result_index: usize,
79 block_index: usize,
80 block: ContentBlock,
81 },
82}
83
84pub const SESSION_TRANSCRIPT_HISTORY_STATE_KEY: &str = "session_transcript_history_state_v1";
86
87#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
89#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
90#[serde(tag = "type", rename_all = "snake_case")]
91pub enum TranscriptRewriteSelection {
92 MessageRange { start: usize, end: usize },
94}
95
96impl TranscriptRewriteSelection {
97 fn bounds(&self) -> (usize, usize) {
98 match self {
99 Self::MessageRange { start, end } => (*start, *end),
100 }
101 }
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
110#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
111#[serde(rename_all = "snake_case")]
112pub struct TranscriptRewriteReason {
113 pub kind: String,
114 #[serde(default, skip_serializing_if = "Option::is_none")]
115 pub note: Option<String>,
116}
117
118impl TranscriptRewriteReason {
119 pub fn new(kind: impl Into<String>) -> Self {
120 Self {
121 kind: kind.into(),
122 note: None,
123 }
124 }
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
129#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
130#[serde(rename_all = "snake_case")]
131pub struct TranscriptRewriteCommit {
132 pub parent_revision: String,
133 pub revision: String,
134 pub selection: TranscriptRewriteSelection,
135 pub original_span_digest: String,
136 pub replacement_digest: String,
137 pub messages_before: usize,
138 pub messages_after: usize,
139 pub reason: TranscriptRewriteReason,
140 #[serde(default, skip_serializing_if = "Option::is_none")]
141 pub actor: Option<String>,
142 #[cfg_attr(feature = "schema", schemars(with = "SchemaSystemTime"))]
143 pub committed_at: SystemTime,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
148#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
149#[serde(rename_all = "snake_case")]
150pub struct TranscriptRevisionBody {
151 pub revision: String,
152 #[serde(default, skip_serializing_if = "Option::is_none")]
153 pub parent_revision: Option<String>,
154 #[cfg_attr(feature = "schema", schemars(with = "Vec<serde_json::Value>"))]
155 pub messages: Vec<Message>,
156 #[cfg_attr(feature = "schema", schemars(with = "SchemaSystemTime"))]
157 pub created_at: SystemTime,
158}
159
160#[cfg(feature = "schema")]
161#[allow(dead_code)]
162#[derive(schemars::JsonSchema)]
163#[schemars(rename = "SystemTime")]
164struct SchemaSystemTime {
165 secs_since_epoch: u64,
166 nanos_since_epoch: u32,
167}
168
169#[derive(Debug, Clone, Serialize, Deserialize)]
171#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
172#[serde(rename_all = "snake_case")]
173pub struct TranscriptRewriteRecord {
174 pub commit: TranscriptRewriteCommit,
175 pub parent_body: TranscriptRevisionBody,
176 pub revision_body: TranscriptRevisionBody,
177}
178
179impl TranscriptRewriteRecord {
180 pub fn new(
181 commit: TranscriptRewriteCommit,
182 parent_body: TranscriptRevisionBody,
183 revision_body: TranscriptRevisionBody,
184 ) -> Result<Self, TranscriptEditError> {
185 validate_transcript_rewrite_record(&commit, &parent_body, &revision_body)?;
186 Ok(Self {
187 commit,
188 parent_body,
189 revision_body,
190 })
191 }
192}
193
194#[derive(Debug, Clone, Serialize, Deserialize)]
196#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
197#[serde(rename_all = "snake_case")]
198pub struct TranscriptHistoryState {
199 pub head: String,
200 #[serde(default, skip_serializing_if = "Vec::is_empty")]
201 pub commits: Vec<TranscriptRewriteCommit>,
202 #[serde(default, skip_serializing_if = "Vec::is_empty")]
203 pub revisions: Vec<TranscriptRevisionBody>,
204}
205
206impl TranscriptHistoryState {
207 pub fn from_rewrite_records<I>(records: I) -> Result<Option<Self>, TranscriptEditError>
209 where
210 I: IntoIterator<Item = TranscriptRewriteRecord>,
211 {
212 let mut state: Option<Self> = None;
213 for record in records {
214 validate_transcript_rewrite_record(
215 &record.commit,
216 &record.parent_body,
217 &record.revision_body,
218 )?;
219 let state = state.get_or_insert_with(|| Self {
220 head: record.commit.parent_revision.clone(),
221 commits: Vec::new(),
222 revisions: Vec::new(),
223 });
224 if record.commit.parent_revision != state.head {
225 if revision_body_extends_head(&record.parent_body, &state.revisions, &state.head)? {
226 state.head = record.commit.parent_revision.clone();
227 } else {
228 return Err(TranscriptEditError::HistoryStateMalformed(format!(
229 "rewrite record parent {} does not extend transcript head {}",
230 record.commit.parent_revision, state.head
231 )));
232 }
233 }
234 if !state
235 .revisions
236 .iter()
237 .any(|body| body.revision == record.parent_body.revision)
238 {
239 state.revisions.push(record.parent_body);
240 }
241 if !state
242 .revisions
243 .iter()
244 .any(|body| body.revision == record.revision_body.revision)
245 {
246 state.revisions.push(record.revision_body);
247 }
248 state.head = record.commit.revision.clone();
249 state.commits.push(record.commit);
250 }
251 Ok(state)
252 }
253}
254
255#[derive(Debug, Clone, thiserror::Error)]
257pub enum TranscriptEditError {
258 #[error("message index {message_index} out of bounds for {message_count} messages")]
259 MessageIndexOutOfBounds {
260 message_index: usize,
261 message_count: usize,
262 },
263 #[error("{block_kind} index {block_index} out of bounds for {block_count} blocks")]
264 BlockIndexOutOfBounds {
265 block_kind: &'static str,
266 block_index: usize,
267 block_count: usize,
268 },
269 #[error("replacement expected {expected} at message index {message_index}, found {actual}")]
270 MessageRoleMismatch {
271 message_index: usize,
272 expected: &'static str,
273 actual: &'static str,
274 },
275 #[error("invalid transcript rewrite range {start}..{end} for {message_count} messages")]
276 InvalidRewriteRange {
277 start: usize,
278 end: usize,
279 message_count: usize,
280 },
281 #[error("transcript rewrite does not change transcript revision {revision}")]
282 NoOpRewrite { revision: String },
283 #[error("transcript rewrite parent revision mismatch: expected {expected}, actual {actual}")]
284 RevisionConflict { expected: String, actual: String },
285 #[error("transcript history state is malformed: {0}")]
286 HistoryStateMalformed(String),
287 #[error("invalid transcript shape after rewrite: {0}")]
288 InvalidTranscriptShape(String),
289}
290
291fn message_role_name(message: &Message) -> &'static str {
292 match message {
293 Message::System(_) => "system",
294 Message::SystemNotice(_) => "system_notice",
295 Message::User(_) => "user",
296 Message::BlockAssistant(_) => "block_assistant",
297 Message::ToolResults { .. } => "tool_results",
298 }
299}
300
301fn assistant_tool_use_ids(message: &Message) -> Vec<&str> {
302 match message {
303 Message::BlockAssistant(assistant) => assistant
304 .blocks
305 .iter()
306 .filter_map(|block| match block {
307 AssistantBlock::ToolUse { id, .. } => Some(id.as_str()),
308 _ => None,
309 })
310 .collect(),
311 _ => Vec::new(),
312 }
313}
314
315fn validate_transcript_tool_result_shape(messages: &[Message]) -> Result<(), TranscriptEditError> {
316 for (index, message) in messages.iter().enumerate() {
317 if let Message::ToolResults { results, .. } = message {
318 let Some(previous) = index
319 .checked_sub(1)
320 .and_then(|previous| messages.get(previous))
321 else {
322 return Err(TranscriptEditError::InvalidTranscriptShape(format!(
323 "tool_results at message {index} has no preceding assistant tool-use message"
324 )));
325 };
326 let expected = assistant_tool_use_ids(previous);
327 if expected.is_empty() {
328 return Err(TranscriptEditError::InvalidTranscriptShape(format!(
329 "tool_results at message {index} follows {}, not an assistant tool-use message",
330 message_role_name(previous)
331 )));
332 }
333 let actual = results
334 .iter()
335 .map(|result| result.tool_use_id.as_str())
336 .collect::<Vec<_>>();
337 let actual_set = actual.iter().copied().collect::<BTreeSet<_>>();
338 let expected_set = expected.iter().copied().collect::<BTreeSet<_>>();
339 if actual.len() != actual_set.len() {
340 return Err(TranscriptEditError::InvalidTranscriptShape(format!(
341 "tool_results at message {index} contains duplicate tool ids"
342 )));
343 }
344 if expected.len() != expected_set.len() {
345 return Err(TranscriptEditError::InvalidTranscriptShape(format!(
346 "assistant tool-use message before tool_results at message {index} contains duplicate tool ids"
347 )));
348 }
349 if actual_set != expected_set {
350 return Err(TranscriptEditError::InvalidTranscriptShape(format!(
351 "tool_results at message {index} resolve tool ids {actual_set:?}, expected {expected_set:?}"
352 )));
353 }
354 }
355
356 let tool_use_ids = assistant_tool_use_ids(message);
357 if tool_use_ids.is_empty() {
358 continue;
359 }
360 let Some(next) = messages.get(index + 1) else {
361 return Err(TranscriptEditError::InvalidTranscriptShape(format!(
362 "assistant tool-use message {index} has no following tool_results"
363 )));
364 };
365 if !matches!(next, Message::ToolResults { .. }) {
366 return Err(TranscriptEditError::InvalidTranscriptShape(format!(
367 "assistant tool-use message {index} is followed by {}, not tool_results",
368 message_role_name(next)
369 )));
370 }
371 }
372 Ok(())
373}
374
375fn canonicalize_digest_image_blocks(blocks: &mut [crate::types::ContentBlock]) {
376 for block in blocks.iter_mut() {
377 if let crate::types::ContentBlock::Image {
378 media_type,
379 data: crate::types::ImageData::Inline { data },
380 } = block
381 {
382 let blob_id = crate::blob::content_blob_id(media_type, data);
385 *block = crate::types::ContentBlock::Image {
386 media_type: media_type.clone(),
387 data: crate::types::ImageData::Blob { blob_id },
388 };
389 }
390 }
391}
392
393fn canonicalize_messages_for_digest(messages: &[Message]) -> Vec<Message> {
402 let mut canonical = messages.to_vec();
403 for message in &mut canonical {
404 match message {
405 Message::User(user) => canonicalize_digest_image_blocks(&mut user.content),
406 Message::ToolResults { results, .. } => {
407 for result in results.iter_mut() {
408 canonicalize_digest_image_blocks(&mut result.content);
409 }
410 }
411 Message::SystemNotice(notice) => {
412 for block in &mut notice.blocks {
413 match block {
414 crate::types::SystemNoticeBlock::Comms { content, .. }
415 | crate::types::SystemNoticeBlock::ExternalEvent { content, .. } => {
416 canonicalize_digest_image_blocks(content);
417 }
418 _ => {}
419 }
420 }
421 }
422 _ => {}
423 }
424 }
425 canonical
426}
427
428pub fn transcript_messages_digest(messages: &[Message]) -> Result<String, serde_json::Error> {
429 sha256_json_digest(&canonicalize_messages_for_digest(messages))
430}
431
432fn validate_transcript_rewrite_record(
433 commit: &TranscriptRewriteCommit,
434 parent_body: &TranscriptRevisionBody,
435 revision_body: &TranscriptRevisionBody,
436) -> Result<(), TranscriptEditError> {
437 if parent_body.revision != commit.parent_revision {
438 return Err(TranscriptEditError::HistoryStateMalformed(format!(
439 "parent body revision {} does not match commit parent {}",
440 parent_body.revision, commit.parent_revision
441 )));
442 }
443 if revision_body.revision != commit.revision {
444 return Err(TranscriptEditError::HistoryStateMalformed(format!(
445 "revision body {} does not match commit revision {}",
446 revision_body.revision, commit.revision
447 )));
448 }
449 if commit.parent_revision == commit.revision {
450 return Err(TranscriptEditError::NoOpRewrite {
451 revision: commit.revision.clone(),
452 });
453 }
454 let parent_digest = transcript_messages_digest(&parent_body.messages)
455 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
456 if parent_digest != commit.parent_revision {
457 return Err(TranscriptEditError::HistoryStateMalformed(format!(
458 "parent body digest {parent_digest} does not match commit parent {}",
459 commit.parent_revision
460 )));
461 }
462 let revision_digest = transcript_messages_digest(&revision_body.messages)
463 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
464 if revision_digest != commit.revision {
465 return Err(TranscriptEditError::HistoryStateMalformed(format!(
466 "revision body digest {revision_digest} does not match commit revision {}",
467 commit.revision
468 )));
469 }
470 let (start, end) = commit.selection.bounds();
471 if start > end || end > parent_body.messages.len() {
472 return Err(TranscriptEditError::InvalidRewriteRange {
473 start,
474 end,
475 message_count: parent_body.messages.len(),
476 });
477 }
478 if commit.messages_before != parent_body.messages.len()
479 || commit.messages_after != revision_body.messages.len()
480 {
481 return Err(TranscriptEditError::HistoryStateMalformed(format!(
482 "commit message counts {} -> {} do not match revision bodies {} -> {}",
483 commit.messages_before,
484 commit.messages_after,
485 parent_body.messages.len(),
486 revision_body.messages.len()
487 )));
488 }
489 let original_span_digest = transcript_messages_digest(&parent_body.messages[start..end])
490 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
491 if original_span_digest != commit.original_span_digest {
492 return Err(TranscriptEditError::HistoryStateMalformed(format!(
493 "original span digest {original_span_digest} does not match commit digest {}",
494 commit.original_span_digest
495 )));
496 }
497 let removed_len = end - start;
498 let retained_len = commit
499 .messages_before
500 .checked_sub(removed_len)
501 .ok_or_else(|| {
502 TranscriptEditError::HistoryStateMalformed(
503 "commit removed more messages than it recorded before rewrite".to_string(),
504 )
505 })?;
506 let replacement_len = commit
507 .messages_after
508 .checked_sub(retained_len)
509 .ok_or_else(|| {
510 TranscriptEditError::HistoryStateMalformed(
511 "commit message counts cannot describe a replacement span".to_string(),
512 )
513 })?;
514 let replacement_end = start.checked_add(replacement_len).ok_or_else(|| {
515 TranscriptEditError::HistoryStateMalformed("replacement span end overflowed".to_string())
516 })?;
517 if replacement_end > revision_body.messages.len() {
518 return Err(TranscriptEditError::InvalidRewriteRange {
519 start,
520 end: replacement_end,
521 message_count: revision_body.messages.len(),
522 });
523 }
524 let parent_prefix_digest = transcript_messages_digest(&parent_body.messages[..start])
525 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
526 let revision_prefix_digest = transcript_messages_digest(&revision_body.messages[..start])
527 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
528 if parent_prefix_digest != revision_prefix_digest {
529 return Err(TranscriptEditError::HistoryStateMalformed(
530 "rewrite revision changed messages before the selected span".to_string(),
531 ));
532 }
533 let parent_suffix_digest = transcript_messages_digest(&parent_body.messages[end..])
534 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
535 let revision_suffix_digest =
536 transcript_messages_digest(&revision_body.messages[replacement_end..])
537 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
538 if parent_suffix_digest != revision_suffix_digest {
539 return Err(TranscriptEditError::HistoryStateMalformed(
540 "rewrite revision changed messages after the selected span".to_string(),
541 ));
542 }
543 let replacement_digest =
544 transcript_messages_digest(&revision_body.messages[start..replacement_end])
545 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
546 if replacement_digest != commit.replacement_digest {
547 return Err(TranscriptEditError::HistoryStateMalformed(format!(
548 "replacement span digest {replacement_digest} does not match commit digest {}",
549 commit.replacement_digest
550 )));
551 }
552 Ok(())
553}
554
555fn validate_transcript_history_state(
556 state: &TranscriptHistoryState,
557) -> Result<(), TranscriptEditError> {
558 if state
559 .revisions
560 .iter()
561 .all(|body| body.revision != state.head)
562 {
563 return Err(TranscriptEditError::HistoryStateMalformed(format!(
564 "missing transcript head body {}",
565 state.head
566 )));
567 }
568 for body in &state.revisions {
569 let digest = transcript_messages_digest(&body.messages)
570 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
571 if digest != body.revision {
572 return Err(TranscriptEditError::HistoryStateMalformed(format!(
573 "transcript revision body {} has digest {digest}",
574 body.revision
575 )));
576 }
577 }
578 for commit in &state.commits {
579 let parent_body = state
580 .revisions
581 .iter()
582 .find(|body| body.revision == commit.parent_revision)
583 .ok_or_else(|| {
584 TranscriptEditError::HistoryStateMalformed(format!(
585 "missing parent transcript body {}",
586 commit.parent_revision
587 ))
588 })?;
589 let revision_body = state
590 .revisions
591 .iter()
592 .find(|body| body.revision == commit.revision)
593 .ok_or_else(|| {
594 TranscriptEditError::HistoryStateMalformed(format!(
595 "missing transcript revision body {}",
596 commit.revision
597 ))
598 })?;
599 validate_transcript_rewrite_record(commit, parent_body, revision_body)?;
600 }
601 let Some(first_commit) = state.commits.first() else {
602 return Ok(());
603 };
604 let mut expected_head = first_commit.parent_revision.clone();
605 for commit in &state.commits {
606 let parent_body = state
607 .revisions
608 .iter()
609 .find(|body| body.revision == commit.parent_revision)
610 .ok_or_else(|| {
611 TranscriptEditError::HistoryStateMalformed(format!(
612 "missing parent transcript body {}",
613 commit.parent_revision
614 ))
615 })?;
616 if commit.parent_revision != expected_head
617 && !revision_body_extends_head(parent_body, &state.revisions, &expected_head)?
618 {
619 return Err(TranscriptEditError::HistoryStateMalformed(format!(
620 "rewrite commit parent {} does not extend transcript head {}",
621 commit.parent_revision, expected_head
622 )));
623 }
624 expected_head = commit.revision.clone();
625 }
626 let mut cursor = state.head.clone();
627 while cursor != expected_head {
628 let Some(head_body) = state.revisions.iter().find(|body| body.revision == cursor) else {
629 break;
630 };
631 match head_body.parent_revision.as_deref() {
632 Some(parent) => cursor = parent.to_string(),
633 None => break,
634 }
635 }
636 if cursor != expected_head {
637 return Err(TranscriptEditError::HistoryStateMalformed(format!(
638 "transcript head {} does not extend the rewrite chain",
639 state.head
640 )));
641 }
642 Ok(())
643}
644
645fn revision_body_extends_head(
646 candidate: &TranscriptRevisionBody,
647 revisions: &[TranscriptRevisionBody],
648 head: &str,
649) -> Result<bool, TranscriptEditError> {
650 if candidate.parent_revision.as_deref() == Some(head) {
651 return Ok(true);
652 }
653 let Some(head_body) = revisions.iter().find(|body| body.revision == head) else {
654 return Ok(false);
655 };
656 if candidate.messages.len() < head_body.messages.len() {
657 return Ok(false);
658 }
659 let prefix_digest = transcript_messages_digest(&candidate.messages[..head_body.messages.len()])
660 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
661 Ok(prefix_digest == head)
662}
663
664fn sha256_json_digest<T: Serialize + ?Sized>(value: &T) -> Result<String, serde_json::Error> {
665 let bytes = serde_json::to_vec(value)?;
666 let digest = Sha256::digest(bytes);
667 let mut out = String::with_capacity(digest.len() * 2);
668 const HEX: &[u8; 16] = b"0123456789abcdef";
669 for byte in digest {
670 out.push(HEX[(byte >> 4) as usize] as char);
671 out.push(HEX[(byte & 0x0f) as usize] as char);
672 }
673 Ok(format!("sha256:{out}"))
674}
675
676#[derive(Debug, Clone)]
680pub struct Session {
681 version: u32,
684 id: SessionId,
686 pub(crate) messages: Arc<Vec<Message>>,
688 created_at: SystemTime,
690 updated_at: SystemTime,
692 metadata: serde_json::Map<String, serde_json::Value>,
694 usage: Usage,
696}
697
698#[derive(Serialize, Deserialize)]
700#[serde(rename_all = "snake_case")]
701struct SessionSerde {
702 version: u32,
703 id: SessionId,
704 messages: Vec<Message>,
705 created_at: SystemTime,
706 updated_at: SystemTime,
707 #[serde(default)]
708 metadata: serde_json::Map<String, serde_json::Value>,
709 #[serde(default)]
710 usage: Usage,
711}
712
713impl Serialize for Session {
714 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
715 where
716 S: Serializer,
717 {
718 let serde_repr = SessionSerde {
719 version: self.version,
720 id: self.id.clone(),
721 messages: (*self.messages).clone(),
722 created_at: self.created_at,
723 updated_at: self.updated_at,
724 metadata: self.metadata.clone(),
725 usage: self.usage.clone(),
726 };
727 serde_repr.serialize(serializer)
728 }
729}
730
731impl<'de> Deserialize<'de> for Session {
732 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
733 where
734 D: Deserializer<'de>,
735 {
736 let serde_repr = SessionSerde::deserialize(deserializer)?;
737 let version = session_persistence_version_authority::restore_session_envelope_version(
738 serde_repr.version,
739 )
740 .map_err(<D::Error as serde::de::Error>::custom)?;
741 Ok(Session {
742 version,
743 id: serde_repr.id,
744 messages: Arc::new(serde_repr.messages),
745 created_at: serde_repr.created_at,
746 updated_at: serde_repr.updated_at,
747 metadata: serde_repr.metadata,
748 usage: serde_repr.usage,
749 })
750 }
751}
752
753pub const SESSION_SYSTEM_CONTEXT_STATE_KEY: &str = "session_system_context_state";
755
756pub const SESSION_DEFERRED_TURN_STATE_KEY: &str = "session_deferred_turn_state";
758
759pub const SESSION_BUILD_STATE_KEY: &str = "session_build_state";
761
762pub const SESSION_TOOL_VISIBILITY_STATE_KEY: &str = "session_tool_visibility_state_v1";
764
765pub const SESSION_LIFECYCLE_TERMINAL_KEY: &str = "session_lifecycle_terminal";
767
768pub const VIEW_IMAGE_TOOL_NAME: &str = "view_image";
770
771pub const SYSTEM_CONTEXT_SEPARATOR: &str = "\n\n---\n\n";
773
774#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
775#[error("metadata key `{key}` is reserved for session authority")]
776pub struct ReservedSessionMetadataKey {
777 key: String,
778}
779
780impl ReservedSessionMetadataKey {
781 fn new(key: &str) -> Self {
782 Self {
783 key: key.to_string(),
784 }
785 }
786}
787
788fn is_session_authority_metadata_key(key: &str) -> bool {
789 crate::surface_metadata::ReservedMetadataKey::is_session_authority(key)
792}
793
794#[allow(clippy::panic)]
795fn fail_closed_generated_restore(authority: &'static str, err: serde_json::Error) -> ! {
796 tracing::error!(
797 authority,
798 error = %err,
799 "generated authority rejected durable restore"
800 );
801 panic!("generated {authority} authority rejected durable restore: {err}");
802}
803
804#[derive(Clone)]
810pub struct SystemContextStateHandle {
811 inner: Arc<std::sync::Mutex<SessionSystemContextState>>,
812}
813
814impl std::fmt::Debug for SystemContextStateHandle {
815 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
816 f.debug_struct("SystemContextStateHandle")
817 .field("inner", &"<Arc<Mutex<SessionSystemContextState>>>")
818 .finish()
819 }
820}
821
822impl SystemContextStateHandle {
823 pub fn new(state: SessionSystemContextState) -> Result<Self, serde_json::Error> {
824 let state = system_context_authority::restore_system_context_state(state)
825 .map_err(<serde_json::Error as serde::de::Error>::custom)?;
826 Ok(Self {
827 inner: Arc::new(std::sync::Mutex::new(state)),
828 })
829 }
830
831 pub fn from_shared_authority_state(
832 inner: Arc<std::sync::Mutex<SessionSystemContextState>>,
833 ) -> Self {
834 Self { inner }
835 }
836
837 pub fn snapshot(&self) -> SessionSystemContextState {
838 match self.inner.lock() {
839 Ok(guard) => guard.clone(),
840 Err(poisoned) => {
841 tracing::warn!("system-context state lock poisoned while reading snapshot");
842 poisoned.into_inner().clone()
843 }
844 }
845 }
846
847 pub fn replace_from_generated_restore(
848 &self,
849 state: SessionSystemContextState,
850 ) -> Result<(), serde_json::Error> {
851 let state = system_context_authority::restore_system_context_state(state)
852 .map_err(<serde_json::Error as serde::de::Error>::custom)?;
853 match self.inner.lock() {
854 Ok(mut guard) => {
855 *guard = state;
856 }
857 Err(poisoned) => {
858 tracing::warn!("system-context state lock poisoned while restoring state");
859 *poisoned.into_inner() = state;
860 }
861 }
862 Ok(())
863 }
864
865 pub fn replace_from_generated_restore_if_changed(
866 &self,
867 state: SessionSystemContextState,
868 ) -> Result<bool, serde_json::Error> {
869 let state = system_context_authority::restore_system_context_state(state)
870 .map_err(<serde_json::Error as serde::de::Error>::custom)?;
871 let mut guard = match self.inner.lock() {
872 Ok(guard) => guard,
873 Err(poisoned) => {
874 tracing::warn!(
875 "system-context state lock poisoned while replacing generated-restored state"
876 );
877 poisoned.into_inner()
878 }
879 };
880 if *guard == state {
881 return Ok(false);
882 }
883 *guard = state;
884 Ok(true)
885 }
886
887 pub fn replace_from_generated_restore_if_current(
888 &self,
889 current: &SessionSystemContextState,
890 replacement: SessionSystemContextState,
891 ) -> Result<bool, serde_json::Error> {
892 let replacement = system_context_authority::restore_system_context_state(replacement)
893 .map_err(<serde_json::Error as serde::de::Error>::custom)?;
894 let mut guard = match self.inner.lock() {
895 Ok(guard) => guard,
896 Err(poisoned) => {
897 tracing::warn!(
898 "system-context state lock poisoned while conditionally replacing generated-restored state"
899 );
900 poisoned.into_inner()
901 }
902 };
903 if *guard != *current {
904 return Ok(false);
905 }
906 *guard = replacement;
907 Ok(true)
908 }
909
910 pub fn stage_append_with_snapshot(
911 &self,
912 req: &AppendSystemContextRequest,
913 accepted_at: SystemTime,
914 ) -> Result<
915 (
916 crate::service::AppendSystemContextStatus,
917 SessionSystemContextState,
918 SessionSystemContextState,
919 ),
920 SystemContextStageError,
921 > {
922 let mut guard = match self.inner.lock() {
923 Ok(guard) => guard,
924 Err(poisoned) => {
925 tracing::warn!("system-context state lock poisoned while staging append");
926 poisoned.into_inner()
927 }
928 };
929 let snapshot = guard.clone();
930 let status = guard.stage_append(req, accepted_at)?;
931 let staged = guard.clone();
932 Ok((status, snapshot, staged))
933 }
934
935 pub fn stage_active_turn_appends_with_snapshot(
936 &self,
937 appends: Vec<(AppendSystemContextRequest, SystemTime)>,
938 ) -> Result<(SessionSystemContextState, SessionSystemContextState), SystemContextStageError>
939 {
940 let mut guard = match self.inner.lock() {
941 Ok(guard) => guard,
942 Err(poisoned) => {
943 tracing::warn!(
944 "system-context state lock poisoned while staging active-turn appends"
945 );
946 poisoned.into_inner()
947 }
948 };
949 let snapshot = guard.clone();
950 let mut candidate = snapshot.clone();
951 for (req, accepted_at) in appends {
952 candidate.stage_active_turn_append(&req, accepted_at)?;
953 }
954 *guard = candidate.clone();
955 let staged = candidate;
956 Ok((snapshot, staged))
957 }
958
959 pub fn discard_unapplied_active_turn_pending(&self) -> usize {
960 let discarded = match self.inner.lock() {
961 Ok(mut guard) => guard.discard_unapplied_active_turn_pending(),
962 Err(poisoned) => {
963 tracing::warn!(
964 "system-context state lock poisoned while discarding active-turn context"
965 );
966 poisoned
967 .into_inner()
968 .discard_unapplied_active_turn_pending()
969 }
970 };
971 discarded.len()
972 }
973
974 pub fn discard_active_turn_pending_by_keys(
975 &self,
976 idempotency_keys: &[String],
977 ) -> Vec<PendingSystemContextAppend> {
978 match self.inner.lock() {
979 Ok(mut guard) => guard.discard_active_turn_pending_by_keys(idempotency_keys),
980 Err(poisoned) => {
981 tracing::warn!(
982 "system-context state lock poisoned while discarding active-turn pending appends"
983 );
984 poisoned
985 .into_inner()
986 .discard_active_turn_pending_by_keys(idempotency_keys)
987 }
988 }
989 }
990
991 pub fn stage_active_turn_append(
992 &self,
993 req: &AppendSystemContextRequest,
994 accepted_at: SystemTime,
995 ) -> Result<crate::service::AppendSystemContextStatus, SystemContextStageError> {
996 match self.inner.lock() {
997 Ok(mut guard) => guard.stage_active_turn_append(req, accepted_at),
998 Err(poisoned) => {
999 tracing::warn!(
1000 "system-context state lock poisoned while staging active-turn context"
1001 );
1002 poisoned
1003 .into_inner()
1004 .stage_active_turn_append(req, accepted_at)
1005 }
1006 }
1007 }
1008}
1009
1010#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
1014#[serde(rename_all = "snake_case")]
1015pub struct SessionSystemContextState {
1016 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1017 pub(crate) pending: Vec<PendingSystemContextAppend>,
1018 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1019 pub(crate) applied: Vec<PendingSystemContextAppend>,
1020 #[serde(default, skip_serializing_if = "std::collections::BTreeMap::is_empty")]
1021 pub(crate) seen: std::collections::BTreeMap<String, SeenSystemContextKey>,
1022 #[serde(default, skip_serializing_if = "std::collections::BTreeSet::is_empty")]
1023 pub(crate) active_turn_pending_keys: std::collections::BTreeSet<String>,
1024}
1025
1026#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
1036#[serde(rename_all = "snake_case")]
1037pub enum SystemContextSource {
1038 #[default]
1040 Normal,
1041 RuntimeSteer,
1044}
1045
1046impl From<SystemContextSource> for session_document::SystemContextSource {
1047 fn from(value: SystemContextSource) -> Self {
1048 match value {
1049 SystemContextSource::Normal => Self::Normal,
1050 SystemContextSource::RuntimeSteer => Self::RuntimeSteer,
1051 }
1052 }
1053}
1054
1055impl SystemContextSource {
1056 #[must_use]
1059 pub fn is_normal(&self) -> bool {
1060 matches!(self, Self::Normal)
1061 }
1062
1063 #[must_use]
1065 pub fn is_runtime_steer(&self) -> bool {
1066 matches!(self, Self::RuntimeSteer)
1067 }
1068}
1069
1070#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1074#[serde(rename_all = "snake_case")]
1075pub struct PendingSystemContextAppend {
1076 pub content: crate::lifecycle::run_primitive::CoreRenderable,
1084 #[serde(default, skip_serializing_if = "Option::is_none")]
1085 pub source: Option<String>,
1086 #[serde(default, skip_serializing_if = "Option::is_none")]
1087 pub idempotency_key: Option<String>,
1088 #[serde(default, skip_serializing_if = "SystemContextSource::is_normal")]
1090 pub source_kind: SystemContextSource,
1091 #[serde(default, skip_serializing_if = "Option::is_none")]
1099 pub peer_response_terminal: Option<crate::handles::PeerResponseTerminalFact>,
1100 pub accepted_at: SystemTime,
1101}
1102
1103#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1118#[serde(rename_all = "snake_case")]
1119pub enum SessionLifecycleTerminal {
1120 Active,
1122 Archived,
1124}
1125
1126impl SessionLifecycleTerminal {
1127 #[must_use]
1129 pub fn is_archived(self) -> bool {
1130 matches!(self, Self::Archived)
1131 }
1132}
1133
1134impl From<SessionLifecycleTerminal> for session_document::SessionDocumentLifecycle {
1135 fn from(value: SessionLifecycleTerminal) -> Self {
1136 match value {
1137 SessionLifecycleTerminal::Active => Self::Active,
1138 SessionLifecycleTerminal::Archived => Self::Archived,
1139 }
1140 }
1141}
1142
1143impl From<session_document::SessionDocumentLifecycle> for SessionLifecycleTerminal {
1144 fn from(value: session_document::SessionDocumentLifecycle) -> Self {
1145 match value {
1146 session_document::SessionDocumentLifecycle::Active => Self::Active,
1147 session_document::SessionDocumentLifecycle::Archived => Self::Archived,
1148 }
1149 }
1150}
1151
1152#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
1154#[serde(rename_all = "snake_case")]
1155pub struct SessionDeferredTurnState {
1156 #[serde(default, skip_serializing_if = "DeferredFirstTurnPhase::is_inactive")]
1157 pub(crate) first_turn_phase: DeferredFirstTurnPhase,
1158 #[serde(default, skip_serializing_if = "Option::is_none")]
1159 pub(crate) pending_initial_prompt: Option<PendingDeferredPrompt>,
1160 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1161 pub(crate) pending_tool_results: Vec<PendingToolResultsMessage>,
1162}
1163
1164#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq)]
1166#[serde(rename_all = "snake_case")]
1167pub enum DeferredFirstTurnPhase {
1168 #[default]
1170 Inactive,
1171 Pending,
1173 Consumed,
1175}
1176
1177impl DeferredFirstTurnPhase {
1178 pub fn is_inactive(&self) -> bool {
1179 matches!(self, Self::Inactive)
1180 }
1181}
1182
1183impl From<DeferredFirstTurnPhase> for session_document::SessionFirstTurnPhase {
1184 fn from(value: DeferredFirstTurnPhase) -> Self {
1185 match value {
1186 DeferredFirstTurnPhase::Inactive => Self::Inactive,
1187 DeferredFirstTurnPhase::Pending => Self::Pending,
1188 DeferredFirstTurnPhase::Consumed => Self::Consumed,
1189 }
1190 }
1191}
1192
1193impl From<session_document::SessionFirstTurnPhase> for DeferredFirstTurnPhase {
1194 fn from(value: session_document::SessionFirstTurnPhase) -> Self {
1195 match value {
1196 session_document::SessionFirstTurnPhase::Inactive => Self::Inactive,
1197 session_document::SessionFirstTurnPhase::Pending => Self::Pending,
1198 session_document::SessionFirstTurnPhase::Consumed => Self::Consumed,
1199 }
1200 }
1201}
1202
1203fn is_default_hook_run_overrides(value: &crate::HookRunOverrides) -> bool {
1204 value == &crate::HookRunOverrides::default()
1205}
1206
1207fn is_default_call_timeout_override(value: &crate::CallTimeoutOverride) -> bool {
1208 value == &crate::CallTimeoutOverride::default()
1209}
1210
1211fn is_tool_filter_all(value: &ToolFilter) -> bool {
1212 matches!(value, ToolFilter::All)
1213}
1214
1215fn is_zero(value: &u64) -> bool {
1216 *value == 0
1217}
1218
1219pub fn capability_base_filter_for_image_tool_results(image_tool_results: bool) -> ToolFilter {
1221 if image_tool_results {
1222 ToolFilter::All
1223 } else {
1224 ToolFilter::Deny([VIEW_IMAGE_TOOL_NAME.to_string()].into_iter().collect())
1225 }
1226}
1227
1228#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
1235#[serde(rename_all = "snake_case")]
1236pub struct ToolVisibilityWitness {
1237 #[serde(default, skip_serializing_if = "Option::is_none")]
1238 pub last_seen_provenance: Option<ToolProvenance>,
1239}
1240
1241impl ToolVisibilityWitness {
1242 pub fn has_identity_witness(&self) -> bool {
1243 self.last_seen_provenance.is_some()
1244 }
1245}
1246
1247#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1253#[serde(rename_all = "snake_case")]
1254pub struct DeferredToolLoadAuthority {
1255 pub name: ToolName,
1256 pub witness: ToolVisibilityWitness,
1257}
1258
1259impl DeferredToolLoadAuthority {
1260 pub fn new(name: impl Into<ToolName>, witness: ToolVisibilityWitness) -> Self {
1261 Self {
1262 name: name.into(),
1263 witness,
1264 }
1265 }
1266
1267 pub fn into_parts(self) -> (ToolName, ToolVisibilityWitness) {
1268 (self.name, self.witness)
1269 }
1270}
1271
1272#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
1275#[serde(rename_all = "snake_case")]
1276pub struct WitnessedToolFilter {
1277 pub filter: ToolFilter,
1278 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
1279 pub witnesses: BTreeMap<ToolName, ToolVisibilityWitness>,
1280}
1281
1282impl WitnessedToolFilter {
1283 pub fn new(filter: ToolFilter, witnesses: BTreeMap<ToolName, ToolVisibilityWitness>) -> Self {
1284 Self { filter, witnesses }
1285 }
1286
1287 pub fn into_parts(self) -> (ToolFilter, BTreeMap<ToolName, ToolVisibilityWitness>) {
1288 (self.filter, self.witnesses)
1289 }
1290}
1291
1292#[derive(Debug, Clone, PartialEq, Eq)]
1299pub struct InheritedToolVisibilityAuthority {
1300 filter: ToolFilter,
1301 witnesses: BTreeMap<ToolName, ToolVisibilityWitness>,
1302}
1303
1304impl InheritedToolVisibilityAuthority {
1305 pub(crate) fn from_generated_composition_authority(
1306 filter: ToolFilter,
1307 witnesses: BTreeMap<ToolName, ToolVisibilityWitness>,
1308 ) -> Self {
1309 Self { filter, witnesses }
1310 }
1311
1312 pub fn filter(&self) -> &ToolFilter {
1313 &self.filter
1314 }
1315
1316 pub fn witnesses(&self) -> &BTreeMap<ToolName, ToolVisibilityWitness> {
1317 &self.witnesses
1318 }
1319
1320 pub(crate) fn into_initial_visibility_state(self) -> SessionToolVisibilityState {
1321 SessionToolVisibilityState {
1322 inherited_base_filter: self.filter,
1323 filter_witnesses: self.witnesses,
1324 ..Default::default()
1325 }
1326 }
1327}
1328
1329#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
1331#[serde(rename_all = "snake_case")]
1332pub struct SessionToolVisibilityState {
1333 #[serde(default, skip_serializing_if = "is_tool_filter_all")]
1334 pub capability_base_filter: ToolFilter,
1335 #[serde(default, skip_serializing_if = "is_tool_filter_all")]
1336 pub inherited_base_filter: ToolFilter,
1337 #[serde(default, skip_serializing_if = "is_tool_filter_all")]
1338 pub active_filter: ToolFilter,
1339 #[serde(default, skip_serializing_if = "is_tool_filter_all")]
1340 pub staged_filter: ToolFilter,
1341 #[serde(default, skip_serializing_if = "BTreeSet::is_empty")]
1342 pub active_requested_deferred_names: BTreeSet<ToolName>,
1343 #[serde(default, skip_serializing_if = "BTreeSet::is_empty")]
1344 pub staged_requested_deferred_names: BTreeSet<ToolName>,
1345 #[serde(default, skip_serializing_if = "is_zero")]
1346 pub active_revision: u64,
1347 #[serde(default, skip_serializing_if = "is_zero")]
1348 pub staged_revision: u64,
1349 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
1350 pub requested_witnesses: BTreeMap<ToolName, ToolVisibilityWitness>,
1351 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
1352 pub filter_witnesses: BTreeMap<ToolName, ToolVisibilityWitness>,
1353}
1354
1355#[derive(Debug, Clone, PartialEq, Eq)]
1361pub struct AuthorizedSessionToolVisibilityState {
1362 state: SessionToolVisibilityState,
1363}
1364
1365impl AuthorizedSessionToolVisibilityState {
1366 pub(crate) fn from_generated_authority(state: SessionToolVisibilityState) -> Self {
1367 Self { state }
1368 }
1369
1370 pub fn as_state(&self) -> &SessionToolVisibilityState {
1371 &self.state
1372 }
1373
1374 pub fn into_state(self) -> SessionToolVisibilityState {
1375 self.state
1376 }
1377}
1378
1379#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1382#[serde(rename_all = "snake_case")]
1383pub struct SessionBuildState {
1384 #[serde(
1385 default,
1386 skip_serializing_if = "crate::config::SystemPromptOverride::is_inherit"
1387 )]
1388 pub system_prompt: crate::config::SystemPromptOverride,
1389 #[serde(default, skip_serializing_if = "Option::is_none")]
1390 pub output_schema: Option<crate::OutputSchema>,
1391 #[serde(default, skip_serializing_if = "is_default_hook_run_overrides")]
1392 pub hooks_override: crate::HookRunOverrides,
1393 #[serde(default, skip_serializing_if = "Option::is_none")]
1394 pub budget_limits: Option<crate::BudgetLimits>,
1395 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1396 pub recoverable_tool_defs: Vec<ToolDef>,
1397 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1398 pub silent_comms_intents: Vec<String>,
1399 #[serde(default, skip_serializing_if = "Option::is_none")]
1400 pub max_inline_peer_notifications: Option<i32>,
1401 #[serde(default, skip_serializing_if = "Option::is_none")]
1402 pub app_context: Option<serde_json::Value>,
1403 #[serde(default, skip_serializing_if = "Option::is_none")]
1404 pub additional_instructions: Option<Vec<String>>,
1405 #[serde(default, skip_serializing_if = "Option::is_none")]
1406 pub shell_env: Option<HashMap<String, String>>,
1407 #[serde(default, skip_serializing_if = "Option::is_none")]
1413 pub mob_tool_authority_context: Option<MobToolAuthorityContext>,
1414 #[serde(default, skip_serializing_if = "is_default_call_timeout_override")]
1415 pub call_timeout_override: crate::CallTimeoutOverride,
1416}
1417
1418#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1420#[serde(rename_all = "snake_case")]
1421pub struct PendingDeferredPrompt {
1422 pub prompt: ContentInput,
1423 pub accepted_at: SystemTime,
1424}
1425
1426#[derive(Debug, Clone, Serialize, Deserialize)]
1428#[serde(rename_all = "snake_case")]
1429pub struct PendingToolResultsMessage {
1430 pub results: Vec<ToolResult>,
1431 pub accepted_at: SystemTime,
1432}
1433
1434impl PartialEq for PendingToolResultsMessage {
1435 fn eq(&self, other: &Self) -> bool {
1436 self.accepted_at == other.accepted_at
1437 && serde_json::to_value(&self.results).ok() == serde_json::to_value(&other.results).ok()
1438 }
1439}
1440
1441#[derive(Debug, Clone, Default, PartialEq)]
1443pub struct ConsumedDeferredTurnInputs {
1444 pub(crate) restore_first_turn_pending: bool,
1445 pub(crate) pending_initial_prompt: Option<PendingDeferredPrompt>,
1446 pub(crate) pending_tool_results: Vec<PendingToolResultsMessage>,
1447}
1448
1449impl ConsumedDeferredTurnInputs {
1450 pub fn is_empty(&self) -> bool {
1451 !self.restore_first_turn_pending
1452 && self.pending_initial_prompt.is_none()
1453 && self.pending_tool_results.is_empty()
1454 }
1455
1456 pub fn pending_initial_prompt(&self) -> Option<&PendingDeferredPrompt> {
1457 self.pending_initial_prompt.as_ref()
1458 }
1459
1460 pub fn pending_tool_results(&self) -> &[PendingToolResultsMessage] {
1461 &self.pending_tool_results
1462 }
1463}
1464
1465#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1467#[serde(rename_all = "snake_case")]
1468pub struct SeenSystemContextKey {
1469 pub content: crate::lifecycle::run_primitive::CoreRenderable,
1471 #[serde(default, skip_serializing_if = "Option::is_none")]
1472 pub source: Option<String>,
1473 #[serde(default, skip_serializing_if = "SystemContextSource::is_normal")]
1476 pub source_kind: SystemContextSource,
1477 pub state: SeenSystemContextState,
1478}
1479
1480#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
1482#[serde(rename_all = "snake_case")]
1483pub enum SeenSystemContextState {
1484 Pending,
1485 Applied,
1486}
1487
1488impl SessionSystemContextState {
1489 pub fn pending(&self) -> &[PendingSystemContextAppend] {
1490 &self.pending
1491 }
1492
1493 pub fn applied(&self) -> &[PendingSystemContextAppend] {
1494 &self.applied
1495 }
1496
1497 pub fn seen(&self) -> &BTreeMap<String, SeenSystemContextKey> {
1498 &self.seen
1499 }
1500
1501 pub fn active_turn_pending_keys(&self) -> &BTreeSet<String> {
1502 &self.active_turn_pending_keys
1503 }
1504
1505 pub fn pending_len(&self) -> usize {
1506 self.pending.len()
1507 }
1508
1509 pub fn applied_len(&self) -> usize {
1510 self.applied.len()
1511 }
1512
1513 pub fn active_turn_pending_len(&self) -> usize {
1514 self.active_turn_pending_keys.len()
1515 }
1516
1517 pub fn realtime_projection_appends(&self) -> Vec<PendingSystemContextAppend> {
1518 self.applied
1519 .iter()
1520 .chain(self.pending.iter())
1521 .cloned()
1522 .collect()
1523 }
1524
1525 pub fn stage_append(
1527 &mut self,
1528 req: &AppendSystemContextRequest,
1529 accepted_at: SystemTime,
1530 ) -> Result<crate::service::AppendSystemContextStatus, SystemContextStageError> {
1531 system_context_authority::stage_append(self, req, accepted_at, false)
1532 }
1533
1534 fn stage_append_with_generated_authority(
1535 &mut self,
1536 req: &AppendSystemContextRequest,
1537 accepted_at: SystemTime,
1538 active_turn_scoped: bool,
1539 ) -> Result<crate::service::AppendSystemContextStatus, SystemContextStageError> {
1540 system_context_authority::stage_append(self, req, accepted_at, active_turn_scoped)
1541 }
1542
1543 pub fn stage_active_turn_append(
1550 &mut self,
1551 req: &AppendSystemContextRequest,
1552 accepted_at: SystemTime,
1553 ) -> Result<crate::service::AppendSystemContextStatus, SystemContextStageError> {
1554 self.stage_append_with_generated_authority(req, accepted_at, true)
1555 }
1556
1557 pub fn mark_pending_applied(&mut self) {
1559 system_context_authority::mark_pending_applied(self);
1560 }
1561
1562 pub fn discard_unapplied_active_turn_pending(&mut self) -> Vec<PendingSystemContextAppend> {
1565 system_context_authority::discard_unapplied_active_turn_pending(self)
1566 }
1567
1568 pub fn discard_active_turn_pending_by_keys(
1575 &mut self,
1576 idempotency_keys: &[String],
1577 ) -> Vec<PendingSystemContextAppend> {
1578 system_context_authority::discard_active_turn_pending_by_keys(self, idempotency_keys)
1579 }
1580
1581 pub fn restore_from_snapshot(self) -> Result<Self, SystemContextStageError> {
1585 system_context_authority::restore_system_context_state(self)
1586 }
1587
1588 pub fn record_applied_blocks(
1592 &mut self,
1593 appends: &[PendingSystemContextAppend],
1594 current_system_prompt: &str,
1595 ) -> Vec<PendingSystemContextAppend> {
1596 system_context_authority::record_applied_system_context_blocks(
1597 self,
1598 appends,
1599 current_system_prompt,
1600 )
1601 }
1602}
1603
1604const SESSION_DOCUMENT_FIRST_TURN_KEY: &str = "first_turn";
1609
1610fn usize_to_u64(value: usize) -> u64 {
1611 u64::try_from(value).unwrap_or(u64::MAX)
1612}
1613
1614fn validate_deferred_turn_snapshot(
1622 state: SessionDeferredTurnState,
1623) -> Result<SessionDeferredTurnState, session_document::SessionDocumentError> {
1624 let mut authority = session_document::SessionDocumentMachineAuthority::new();
1625 let key = session_document::SessionDocumentKey::new(SESSION_DOCUMENT_FIRST_TURN_KEY);
1626 authority.recover_session_first_turn_phase(
1630 key,
1631 state.first_turn_phase.into(),
1632 state.pending_initial_prompt.is_some(),
1633 usize_to_u64(state.pending_tool_results.len()),
1634 )?;
1635 Ok(state)
1636}
1637
1638impl SessionDeferredTurnState {
1639 pub fn first_turn_phase(&self) -> DeferredFirstTurnPhase {
1640 self.first_turn_phase
1641 }
1642
1643 pub fn pending_initial_prompt(&self) -> Option<&PendingDeferredPrompt> {
1644 self.pending_initial_prompt.as_ref()
1645 }
1646
1647 pub fn pending_tool_results(&self) -> &[PendingToolResultsMessage] {
1648 &self.pending_tool_results
1649 }
1650
1651 pub fn pending_tool_results_len(&self) -> usize {
1652 self.pending_tool_results.len()
1653 }
1654
1655 pub(crate) fn pending_initial_prompt_mut_for_blob_rewrite(
1656 &mut self,
1657 ) -> Option<&mut PendingDeferredPrompt> {
1658 self.pending_initial_prompt.as_mut()
1659 }
1660
1661 pub(crate) fn pending_tool_results_mut_for_blob_rewrite(
1662 &mut self,
1663 ) -> &mut [PendingToolResultsMessage] {
1664 &mut self.pending_tool_results
1665 }
1666
1667 fn document_authority(
1677 &self,
1678 ) -> (
1679 session_document::SessionDocumentMachineAuthority,
1680 session_document::SessionDocumentKey,
1681 ) {
1682 let mut authority = session_document::SessionDocumentMachineAuthority::new();
1683 let key = session_document::SessionDocumentKey::new(SESSION_DOCUMENT_FIRST_TURN_KEY);
1684 if let Err(err) = authority.recover_session_first_turn_phase(
1685 key.clone(),
1686 self.first_turn_phase.into(),
1687 self.pending_initial_prompt.is_some(),
1688 usize_to_u64(self.pending_tool_results.len()),
1689 ) {
1690 tracing::warn!(
1691 error = %err,
1692 "generated session document authority rejected first-turn recovery"
1693 );
1694 }
1695 (authority, key)
1696 }
1697
1698 fn mirror_first_turn_phase(
1701 &mut self,
1702 effects: &[session_document::SessionDocumentEffect],
1703 ) -> Option<bool> {
1704 for effect in effects {
1705 if let session_document::SessionDocumentEffect::SessionFirstTurnPhaseResolved {
1706 phase,
1707 was_pending,
1708 } = effect
1709 {
1710 self.first_turn_phase = (*phase).into();
1711 return Some(*was_pending);
1712 }
1713 }
1714 None
1715 }
1716
1717 pub fn mark_initial_turn_pending(&mut self) {
1719 let (mut authority, key) = self.document_authority();
1720 match authority.mark_session_initial_turn_pending(key) {
1721 Ok(effects) => {
1722 self.mirror_first_turn_phase(&effects);
1723 }
1724 Err(err) => tracing::warn!(
1725 error = %err,
1726 "generated session document authority rejected pending mark"
1727 ),
1728 }
1729 }
1730
1731 pub fn mark_initial_turn_started(&mut self) -> bool {
1735 let (mut authority, key) = self.document_authority();
1736 match authority.start_session_initial_turn(key) {
1737 Ok(effects) => self.mirror_first_turn_phase(&effects).unwrap_or(false),
1738 Err(err) => {
1739 tracing::warn!(
1740 error = %err,
1741 "generated session document authority rejected first-turn start"
1742 );
1743 false
1744 }
1745 }
1746 }
1747
1748 pub fn restore_initial_turn_pending(&mut self) {
1750 let (mut authority, key) = self.document_authority();
1755 match authority.restore_session_consumed_inputs(
1756 key.clone(),
1757 true,
1758 self.pending_initial_prompt.is_some(),
1759 usize_to_u64(self.pending_tool_results.len()),
1760 ) {
1761 Ok(_) => {
1762 if let Some(phase) = authority.session_first_turn_phase_for(&key) {
1765 self.first_turn_phase = phase.into();
1766 }
1767 }
1768 Err(err) => tracing::warn!(
1769 error = %err,
1770 "generated session document authority rejected pending restore"
1771 ),
1772 }
1773 }
1774
1775 pub fn allows_initial_turn_overrides(&self) -> bool {
1777 let (mut authority, key) = self.document_authority();
1778 match authority.resolve_session_first_turn_overrides_allowed(key) {
1779 Ok(effects) => effects
1780 .iter()
1781 .find_map(|effect| {
1782 match effect {
1783 session_document::SessionDocumentEffect::SessionFirstTurnOverridesResolved {
1784 allowed,
1785 } => Some(*allowed),
1786 _ => None,
1787 }
1788 })
1789 .unwrap_or(false),
1790 Err(err) => {
1791 tracing::warn!(
1792 error = %err,
1793 "generated session document authority rejected override resolution"
1794 );
1795 false
1796 }
1797 }
1798 }
1799
1800 pub fn stage_initial_prompt(&mut self, prompt: ContentInput, accepted_at: SystemTime) {
1802 let prompt_has_content = prompt.has_images() || !prompt.text_content().trim().is_empty();
1803 let (mut authority, key) = self.document_authority();
1804 match authority.stage_session_initial_prompt(key, prompt_has_content) {
1805 Ok(effects) => {
1806 let decision = effects.iter().find_map(|effect| {
1807 match effect {
1808 session_document::SessionDocumentEffect::SessionInitialPromptStageResolved {
1809 decision,
1810 } => Some(*decision),
1811 _ => None,
1812 }
1813 });
1814 match decision {
1815 Some(session_document::SessionInitialPromptStageDecision::Store) => {
1816 self.pending_initial_prompt = Some(PendingDeferredPrompt {
1817 prompt,
1818 accepted_at,
1819 });
1820 }
1821 Some(session_document::SessionInitialPromptStageDecision::Clear) => {
1822 self.pending_initial_prompt = None;
1823 }
1824 None => tracing::warn!(
1825 "generated session document authority returned no prompt-stage decision"
1826 ),
1827 }
1828 }
1829 Err(err) => tracing::warn!(
1830 error = %err,
1831 "generated session document authority rejected initial prompt stage"
1832 ),
1833 }
1834 }
1835
1836 pub fn stage_tool_results(
1838 &mut self,
1839 results: Vec<ToolResult>,
1840 accepted_at: SystemTime,
1841 ) -> usize {
1842 let (mut authority, key) = self.document_authority();
1843 let accepted = match authority.stage_session_tool_results(key, usize_to_u64(results.len()))
1844 {
1845 Ok(effects) => effects.iter().find_map(|effect| match effect {
1846 session_document::SessionDocumentEffect::SessionToolResultsStageResolved {
1847 accepted_count,
1848 } => Some(*accepted_count),
1849 _ => None,
1850 }),
1851 Err(err) => {
1852 tracing::warn!(
1853 error = %err,
1854 "generated session document authority rejected tool-results stage"
1855 );
1856 return 0;
1857 }
1858 };
1859 let Some(accepted) = accepted else {
1860 tracing::warn!(
1861 "generated session document authority returned no tool-results decision"
1862 );
1863 return 0;
1864 };
1865 if accepted == 0 {
1866 return 0;
1867 }
1868 let accepted = usize::try_from(accepted).unwrap_or(usize::MAX);
1869 self.pending_tool_results.push(PendingToolResultsMessage {
1870 results,
1871 accepted_at,
1872 });
1873 accepted
1874 }
1875
1876 pub fn has_pending_tool_results(&self) -> bool {
1878 !self.pending_tool_results.is_empty()
1879 }
1880
1881 pub fn consume_for_started_turn(&mut self) -> ConsumedDeferredTurnInputs {
1883 let (mut authority, key) = self.document_authority();
1884 let was_pending = match authority.consume_session_deferred_inputs(key) {
1885 Ok(effects) => self.mirror_first_turn_phase(&effects).unwrap_or(false),
1886 Err(err) => {
1887 tracing::warn!(
1888 error = %err,
1889 "generated session document authority rejected started-turn consumption"
1890 );
1891 return ConsumedDeferredTurnInputs::default();
1892 }
1893 };
1894 ConsumedDeferredTurnInputs {
1895 restore_first_turn_pending: was_pending,
1896 pending_initial_prompt: self.pending_initial_prompt.take(),
1897 pending_tool_results: std::mem::take(&mut self.pending_tool_results),
1898 }
1899 }
1900
1901 pub fn restore_consumed_turn_inputs(&mut self, consumed: ConsumedDeferredTurnInputs) {
1903 if consumed.is_empty() {
1904 return;
1905 }
1906 let (mut authority, key) = self.document_authority();
1907 let effects = match authority.restore_session_consumed_inputs(
1908 key,
1909 consumed.restore_first_turn_pending,
1910 consumed.pending_initial_prompt.is_some(),
1911 usize_to_u64(consumed.pending_tool_results.len()),
1912 ) {
1913 Ok(effects) => effects,
1914 Err(err) => {
1915 tracing::warn!(
1916 error = %err,
1917 "generated session document authority rejected consumed input restore"
1918 );
1919 return;
1920 }
1921 };
1922 let Some((restore_first_turn_pending, restore_initial_prompt, restore_tool_results)) =
1923 effects.iter().find_map(|effect| match effect {
1924 session_document::SessionDocumentEffect::SessionConsumedInputsRestoreResolved {
1925 restore_first_turn_pending,
1926 restore_initial_prompt,
1927 restore_tool_results,
1928 } => Some((
1929 *restore_first_turn_pending,
1930 *restore_initial_prompt,
1931 *restore_tool_results,
1932 )),
1933 _ => None,
1934 })
1935 else {
1936 tracing::warn!(
1937 "generated session document authority returned no consumed-input restore decision"
1938 );
1939 return;
1940 };
1941 if restore_first_turn_pending {
1942 self.restore_initial_turn_pending();
1943 }
1944 if restore_initial_prompt && self.pending_initial_prompt.is_none() {
1945 self.pending_initial_prompt = consumed.pending_initial_prompt;
1946 }
1947 if restore_tool_results {
1948 let mut restored = consumed.pending_tool_results;
1949 restored.extend(std::mem::take(&mut self.pending_tool_results));
1950 self.pending_tool_results = restored;
1951 }
1952 }
1953}
1954
1955#[derive(Debug, Clone, PartialEq, Eq)]
1957pub enum SystemContextStageError {
1958 InvalidRequest(String),
1959 Conflict {
1960 key: String,
1961 existing_text: String,
1962 existing_source: Option<String>,
1963 },
1964}
1965
1966impl std::fmt::Display for SystemContextStageError {
1967 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1968 match self {
1969 Self::InvalidRequest(message) => {
1970 write!(f, "invalid system-context append request: {message}")
1971 }
1972 Self::Conflict { key, .. } => {
1973 write!(
1974 f,
1975 "system-context append conflict for idempotency key `{key}`"
1976 )
1977 }
1978 }
1979 }
1980}
1981
1982impl std::error::Error for SystemContextStageError {}
1983
1984fn render_system_context_block(append: &PendingSystemContextAppend) -> String {
1992 let mut rendered = String::from(SYSTEM_CONTEXT_RENDER_LABEL);
1993 if let Some(source) = &append.source {
1994 rendered.push_str("\nsource: ");
1995 rendered.push_str(source);
1996 }
1997 rendered.push_str("\n\n");
1998 rendered.push_str(append.content.render_text().trim());
2001 rendered
2002}
2003
2004const SYSTEM_CONTEXT_RENDER_LABEL: &str = "[Runtime System Context]";
2009
2010mod system_context_authority {
2021 use super::{
2022 AppendSystemContextRequest, BTreeSet, PendingSystemContextAppend, SeenSystemContextKey,
2023 SeenSystemContextState, SessionSystemContextState, SystemContextSource,
2024 SystemContextStageError, SystemTime, render_system_context_block, session_document,
2025 usize_to_u64,
2026 };
2027 use crate::service::AppendSystemContextStatus;
2028
2029 fn document_authority() -> session_document::SessionDocumentMachineAuthority {
2030 session_document::SessionDocumentMachineAuthority::new()
2031 }
2032
2033 fn resolve_append_decision(
2035 trimmed_text_byte_count: u64,
2036 idempotency_key_present: bool,
2037 existing_key_matches: bool,
2038 existing_key_conflicts: bool,
2039 active_turn_scoped: bool,
2040 ) -> Result<session_document::SystemContextAppendDecision, SystemContextStageError> {
2041 let mut authority = document_authority();
2042 let effects = authority
2043 .resolve_system_context_append(
2044 trimmed_text_byte_count,
2045 idempotency_key_present,
2046 existing_key_matches,
2047 existing_key_conflicts,
2048 active_turn_scoped,
2049 )
2050 .map_err(|err| SystemContextStageError::InvalidRequest(err.to_string()))?;
2051 effects
2052 .into_iter()
2053 .find_map(|effect| match effect {
2054 session_document::SessionDocumentEffect::SystemContextAppendResolved {
2055 decision,
2056 ..
2057 } => Some(decision),
2058 _ => None,
2059 })
2060 .ok_or_else(|| {
2061 SystemContextStageError::InvalidRequest(
2062 "generated session document authority returned no append decision".to_string(),
2063 )
2064 })
2065 }
2066
2067 fn pending_apply_item(source_kind: SystemContextSource) -> Option<(bool, bool, bool)> {
2070 let mut authority = document_authority();
2071 match authority.resolve_system_context_pending_apply_item(source_kind.into()) {
2072 Ok(effects) => effects.into_iter().find_map(|effect| {
2073 match effect {
2074 session_document::SessionDocumentEffect::SystemContextPendingApplyItemResolved {
2075 promote_to_applied,
2076 mark_seen_applied,
2077 remove_seen,
2078 } => Some((promote_to_applied, mark_seen_applied, remove_seen)),
2079 _ => None,
2080 }
2081 }),
2082 Err(err) => {
2083 tracing::warn!(
2084 error = %err,
2085 "generated session document authority rejected system-context apply item"
2086 );
2087 None
2088 }
2089 }
2090 }
2091
2092 fn steer_cleanup_discards(source_kind: SystemContextSource) -> bool {
2095 let mut authority = document_authority();
2096 match authority.resolve_system_context_steer_cleanup_item(source_kind.into()) {
2097 Ok(effects) => effects
2098 .into_iter()
2099 .find_map(|effect| {
2100 match effect {
2101 session_document::SessionDocumentEffect::SystemContextSteerCleanupItemResolved {
2102 discard,
2103 } => Some(discard),
2104 _ => None,
2105 }
2106 })
2107 .unwrap_or(false),
2108 Err(err) => {
2109 tracing::warn!(
2110 error = %err,
2111 "generated session document authority rejected system-context steer cleanup item"
2112 );
2113 false
2114 }
2115 }
2116 }
2117
2118 pub(super) fn restore_system_context_state(
2119 state: SessionSystemContextState,
2120 ) -> Result<SessionSystemContextState, SystemContextStageError> {
2121 let active_keys_have_known_pending_or_seen =
2122 state.active_turn_pending_keys.iter().all(|key| {
2123 state.seen.contains_key(key)
2124 || state
2125 .pending
2126 .iter()
2127 .any(|append| append.idempotency_key.as_ref() == Some(key))
2128 });
2129 let seen_keys_match_known_appends = state.seen.iter().all(|(key, seen)| {
2130 state
2131 .pending
2132 .iter()
2133 .chain(state.applied.iter())
2134 .any(|append| {
2135 append.idempotency_key.as_ref() == Some(key)
2136 && seen.content == append.content
2137 && seen.source.as_deref() == append.source.as_deref()
2138 })
2139 });
2140 let mut authority = document_authority();
2141 authority
2142 .restore_system_context_snapshot(
2143 active_keys_have_known_pending_or_seen,
2144 seen_keys_match_known_appends,
2145 )
2146 .map_err(|err| SystemContextStageError::InvalidRequest(err.to_string()))?;
2147 Ok(state)
2148 }
2149
2150 pub(super) fn stage_append(
2151 state: &mut SessionSystemContextState,
2152 req: &AppendSystemContextRequest,
2153 accepted_at: SystemTime,
2154 active_turn_scoped: bool,
2155 ) -> Result<AppendSystemContextStatus, SystemContextStageError> {
2156 let rendered_text = req.content.render_text();
2160 let rendered_len = rendered_text.trim().len();
2161 let existing = req
2162 .idempotency_key
2163 .as_ref()
2164 .and_then(|key| state.seen.get(key));
2165 let existing_key_matches = existing.is_some_and(|existing| {
2166 existing.content == req.content && existing.source.as_deref() == req.source.as_deref()
2167 });
2168 let existing_key_conflicts = existing.is_some() && !existing_key_matches;
2169 let decision = resolve_append_decision(
2170 usize_to_u64(rendered_len),
2171 req.idempotency_key.is_some(),
2172 existing_key_matches,
2173 existing_key_conflicts,
2174 active_turn_scoped,
2175 )?;
2176
2177 match decision {
2178 session_document::SystemContextAppendDecision::RejectEmpty => {
2179 return Err(SystemContextStageError::InvalidRequest(
2180 "system context text must not be empty".to_string(),
2181 ));
2182 }
2183 session_document::SystemContextAppendDecision::RejectConflict => {
2184 let Some(key) = req.idempotency_key.as_ref() else {
2185 return Err(SystemContextStageError::InvalidRequest(
2186 "generated system-context authority rejected append without a key"
2187 .to_string(),
2188 ));
2189 };
2190 let Some(existing) = existing else {
2191 return Err(SystemContextStageError::InvalidRequest(
2192 "generated system-context authority rejected append without a conflict"
2193 .to_string(),
2194 ));
2195 };
2196 return Err(SystemContextStageError::Conflict {
2197 key: key.clone(),
2198 existing_text: existing.content.render_text(),
2199 existing_source: existing.source.clone(),
2200 });
2201 }
2202 session_document::SystemContextAppendDecision::Duplicate => {
2203 return Ok(AppendSystemContextStatus::Duplicate);
2204 }
2205 session_document::SystemContextAppendDecision::Staged => {}
2206 }
2207
2208 let append = PendingSystemContextAppend {
2209 content: req.content.clone(),
2210 source: req.source.clone(),
2211 idempotency_key: req.idempotency_key.clone(),
2212 source_kind: req.source_kind,
2213 peer_response_terminal: req.peer_response_terminal.clone(),
2217 accepted_at,
2218 };
2219 if let Some(key) = req.idempotency_key.as_ref() {
2220 state.seen.insert(
2221 key.clone(),
2222 SeenSystemContextKey {
2223 content: append.content.clone(),
2224 source: append.source.clone(),
2225 source_kind: append.source_kind,
2226 state: SeenSystemContextState::Pending,
2227 },
2228 );
2229 }
2230 if active_turn_scoped && let Some(key) = req.idempotency_key.as_ref() {
2231 state.active_turn_pending_keys.insert(key.clone());
2232 }
2233 state.pending.push(append);
2234 Ok(AppendSystemContextStatus::Staged)
2235 }
2236
2237 pub(super) fn mark_pending_applied(state: &mut SessionSystemContextState) {
2238 let pending = std::mem::take(&mut state.pending);
2241 let mut seen_to_remove = Vec::new();
2242 for append in &pending {
2243 let Some((promote_to_applied, mark_seen_applied, remove_seen)) =
2244 pending_apply_item(append.source_kind)
2245 else {
2246 continue;
2247 };
2248 if promote_to_applied && !state.applied.contains(append) {
2249 state.applied.push(append.clone());
2250 }
2251 if let Some(key) = append.idempotency_key.as_ref() {
2252 if remove_seen {
2253 seen_to_remove.push(key.clone());
2254 } else if mark_seen_applied && let Some(seen) = state.seen.get_mut(key) {
2255 seen.state = SeenSystemContextState::Applied;
2256 }
2257 }
2258 }
2259 for key in seen_to_remove {
2260 state.seen.remove(&key);
2261 }
2262 state.active_turn_pending_keys.clear();
2263 }
2264
2265 pub(super) fn discard_unapplied_active_turn_pending(
2266 state: &mut SessionSystemContextState,
2267 ) -> Vec<PendingSystemContextAppend> {
2268 if state.active_turn_pending_keys.is_empty() {
2269 return Vec::new();
2270 }
2271 let active_keys = std::mem::take(&mut state.active_turn_pending_keys);
2272 let mut discarded = Vec::new();
2273 state.pending.retain(|append| {
2274 let should_discard = append
2275 .idempotency_key
2276 .as_ref()
2277 .is_some_and(|key| active_keys.contains(key));
2278 if should_discard {
2279 discarded.push(append.clone());
2280 }
2281 !should_discard
2282 });
2283
2284 for append in &discarded {
2285 if let Some(key) = append.idempotency_key.as_ref()
2286 && state
2287 .seen
2288 .get(key)
2289 .is_some_and(|seen| seen.state == SeenSystemContextState::Pending)
2290 {
2291 state.seen.remove(key);
2292 }
2293 }
2294
2295 discarded
2296 }
2297
2298 pub(super) fn discard_active_turn_pending_by_keys(
2299 state: &mut SessionSystemContextState,
2300 idempotency_keys: &[String],
2301 ) -> Vec<PendingSystemContextAppend> {
2302 if idempotency_keys.is_empty() || state.active_turn_pending_keys.is_empty() {
2303 return Vec::new();
2304 }
2305 let requested_keys: BTreeSet<&str> = idempotency_keys.iter().map(String::as_str).collect();
2306 let mut discarded = Vec::new();
2307 let mut discarded_keys = Vec::new();
2308 state.pending.retain(|append| {
2309 let should_discard = append.idempotency_key.as_ref().is_some_and(|key| {
2310 requested_keys.contains(key.as_str())
2311 && state.active_turn_pending_keys.contains(key)
2312 });
2313 if should_discard {
2314 if let Some(key) = append.idempotency_key.as_ref() {
2315 discarded_keys.push(key.clone());
2316 }
2317 discarded.push(append.clone());
2318 }
2319 !should_discard
2320 });
2321
2322 for key in discarded_keys {
2323 state.active_turn_pending_keys.remove(&key);
2324 if state
2325 .seen
2326 .get(&key)
2327 .is_some_and(|seen| seen.state == SeenSystemContextState::Pending)
2328 {
2329 state.seen.remove(&key);
2330 }
2331 }
2332
2333 discarded
2334 }
2335
2336 pub(super) fn discard_transient_runtime_steer_state(
2337 state: &mut SessionSystemContextState,
2338 ) -> usize {
2339 let mut removed = 0usize;
2340
2341 let before_pending = state.pending.len();
2342 state
2343 .pending
2344 .retain(|append| !steer_cleanup_discards(append.source_kind));
2345 removed += before_pending.saturating_sub(state.pending.len());
2346
2347 let before_applied = state.applied.len();
2348 state
2349 .applied
2350 .retain(|append| !steer_cleanup_discards(append.source_kind));
2351 removed += before_applied.saturating_sub(state.applied.len());
2352
2353 let before_seen = state.seen.len();
2354 state
2355 .seen
2356 .retain(|_key, seen| !steer_cleanup_discards(seen.source_kind));
2357 removed += before_seen.saturating_sub(state.seen.len());
2358
2359 let before_active = state.active_turn_pending_keys.len();
2363 let steer_keys: BTreeSet<String> = state
2364 .seen
2365 .iter()
2366 .filter(|(_key, seen)| steer_cleanup_discards(seen.source_kind))
2367 .map(|(key, _seen)| key.clone())
2368 .collect();
2369 state
2373 .active_turn_pending_keys
2374 .retain(|key| state.seen.contains_key(key) && !steer_keys.contains(key));
2375 removed += before_active.saturating_sub(state.active_turn_pending_keys.len());
2376
2377 removed
2378 }
2379
2380 pub(super) fn remove_runtime_steer_blocks_for_rendered(
2381 system_prompt: &str,
2382 runtime_steer_appends: &[PendingSystemContextAppend],
2383 ) -> (String, usize) {
2384 if runtime_steer_appends.is_empty() {
2385 return (system_prompt.to_string(), 0);
2386 }
2387 let steer_blocks: BTreeSet<String> = runtime_steer_appends
2391 .iter()
2392 .map(render_system_context_block)
2393 .collect();
2394 let parts = system_prompt
2395 .split(super::SYSTEM_CONTEXT_SEPARATOR)
2396 .map(str::to_string)
2397 .collect::<Vec<_>>();
2398 let original_len = parts.len();
2399 let retained = parts
2400 .into_iter()
2401 .filter(|part| !steer_blocks.contains(part))
2402 .collect::<Vec<_>>();
2403 let removed = original_len.saturating_sub(retained.len());
2404 (retained.join(super::SYSTEM_CONTEXT_SEPARATOR), removed)
2405 }
2406
2407 pub(super) fn record_applied_system_context_blocks(
2408 state: &mut SessionSystemContextState,
2409 appends: &[PendingSystemContextAppend],
2410 current_system_prompt: &str,
2411 ) -> Vec<PendingSystemContextAppend> {
2412 let mut new_appends: Vec<PendingSystemContextAppend> = Vec::new();
2413 for append in appends {
2414 if append.content.render_text().trim().is_empty() {
2415 continue;
2416 }
2417 let rendered = render_system_context_block(append);
2418 if let Some(key) = append.idempotency_key.as_ref() {
2419 if let Some(existing) = state.seen.get(key)
2420 && !seen_system_context_matches(existing, append)
2421 {
2422 tracing::warn!(
2423 idempotency_key = %key,
2424 "skipping conflicting runtime system-context append"
2425 );
2426 continue;
2427 }
2428 if let Some(existing) = state
2429 .applied
2430 .iter()
2431 .find(|applied| applied.idempotency_key.as_ref() == Some(key))
2432 && !pending_system_context_matches(existing, append)
2433 {
2434 tracing::warn!(
2435 idempotency_key = %key,
2436 "skipping conflicting runtime system-context append"
2437 );
2438 continue;
2439 }
2440 if let Some(existing) = new_appends
2441 .iter()
2442 .find(|pending| pending.idempotency_key.as_ref() == Some(key))
2443 {
2444 if !pending_system_context_matches(existing, append) {
2445 tracing::warn!(
2446 idempotency_key = %key,
2447 "skipping conflicting runtime system-context append"
2448 );
2449 }
2450 continue;
2451 }
2452 if current_system_prompt.contains(&rendered) {
2453 record_applied_append(state, append);
2454 continue;
2455 }
2456 } else if new_appends.contains(append) || current_system_prompt.contains(&rendered) {
2457 continue;
2458 }
2459 record_applied_append(state, append);
2460 new_appends.push(append.clone());
2461 }
2462 new_appends
2463 }
2464
2465 fn record_applied_append(
2466 state: &mut SessionSystemContextState,
2467 append: &PendingSystemContextAppend,
2468 ) {
2469 if let Some(key) = append.idempotency_key.as_ref() {
2470 state.seen.insert(
2471 key.clone(),
2472 SeenSystemContextKey {
2473 content: append.content.clone(),
2474 source: append.source.clone(),
2475 source_kind: append.source_kind,
2476 state: SeenSystemContextState::Applied,
2477 },
2478 );
2479 if state
2480 .applied
2481 .iter()
2482 .any(|applied| applied.idempotency_key.as_ref() == Some(key))
2483 {
2484 return;
2485 }
2486 } else if state.applied.contains(append) {
2487 return;
2488 }
2489 state.applied.push(append.clone());
2490 }
2491
2492 fn seen_system_context_matches(
2493 seen: &SeenSystemContextKey,
2494 append: &PendingSystemContextAppend,
2495 ) -> bool {
2496 seen.content == append.content && seen.source.as_deref() == append.source.as_deref()
2497 }
2498
2499 fn pending_system_context_matches(
2500 existing: &PendingSystemContextAppend,
2501 append: &PendingSystemContextAppend,
2502 ) -> bool {
2503 existing.content == append.content && existing.source.as_deref() == append.source.as_deref()
2504 }
2505}
2506
2507impl Session {
2508 pub fn new() -> Self {
2510 let now = SystemTime::now();
2511 Self {
2512 version: session_version(),
2513 id: SessionId::new(),
2514 messages: Arc::new(Vec::new()),
2515 created_at: now,
2516 updated_at: now,
2517 metadata: serde_json::Map::new(),
2518 usage: Usage::default(),
2519 }
2520 }
2521
2522 pub fn with_id(id: SessionId) -> Self {
2524 let mut session = Self::new();
2525 session.id = id;
2526 session
2527 }
2528
2529 pub fn id(&self) -> &SessionId {
2531 &self.id
2532 }
2533
2534 pub fn version(&self) -> u32 {
2536 self.version
2537 }
2538
2539 pub fn messages(&self) -> &[Message] {
2541 &self.messages
2542 }
2543
2544 pub(crate) fn replace_messages_internal(
2550 &mut self,
2551 messages: Vec<Message>,
2552 reason: TranscriptRewriteReason,
2553 ) -> Result<Option<TranscriptRewriteCommit>, TranscriptEditError> {
2554 if transcript_messages_digest(self.messages()).ok()
2555 == transcript_messages_digest(&messages).ok()
2556 {
2557 return Ok(None);
2558 }
2559 let commit = self.commit_transcript_rewrite(
2560 TranscriptRewriteSelection::MessageRange {
2561 start: 0,
2562 end: self.messages.len(),
2563 },
2564 messages,
2565 reason,
2566 Some("meerkat-core".to_string()),
2567 None,
2568 )?;
2569 Ok(Some(commit))
2570 }
2571
2572 pub(crate) fn retain_messages_internal<F>(
2574 &mut self,
2575 mut retain: F,
2576 reason: TranscriptRewriteReason,
2577 ) -> Result<Option<TranscriptRewriteCommit>, TranscriptEditError>
2578 where
2579 F: FnMut(&Message) -> bool,
2580 {
2581 let retained = self
2582 .messages
2583 .iter()
2584 .filter(|message| retain(message))
2585 .cloned()
2586 .collect::<Vec<_>>();
2587 if retained.len() == self.messages.len()
2588 && transcript_messages_digest(self.messages()).ok()
2589 == transcript_messages_digest(&retained).ok()
2590 {
2591 return Ok(None);
2592 }
2593 self.replace_messages_internal(retained, reason)
2594 }
2595
2596 pub fn replace_synthetic_notices(
2606 &mut self,
2607 kind: crate::types::SystemNoticeKind,
2608 replacements: Vec<Message>,
2609 ) -> Result<(), TranscriptEditError> {
2610 for (index, message) in replacements.iter().enumerate() {
2611 let matches_kind =
2612 matches!(message, Message::SystemNotice(notice) if notice.kind == kind);
2613 if !matches_kind {
2614 return Err(TranscriptEditError::InvalidTranscriptShape(format!(
2615 "replacement {index} for synthetic notice kind {kind:?} is not a system notice of that kind"
2616 )));
2617 }
2618 }
2619 self.retain_messages_internal(
2620 |message| !matches!(message, Message::SystemNotice(notice) if notice.kind == kind),
2621 TranscriptRewriteReason::new("synthetic_notice_cleanup"),
2622 )?;
2623 for message in replacements {
2624 self.push(message);
2625 }
2626 Ok(())
2627 }
2628
2629 pub fn created_at(&self) -> SystemTime {
2631 self.created_at
2632 }
2633
2634 pub fn updated_at(&self) -> SystemTime {
2636 self.updated_at
2637 }
2638
2639 pub fn push(&mut self, message: Message) {
2643 Arc::make_mut(&mut self.messages).push(message);
2644 self.updated_at = SystemTime::now();
2645 self.refresh_transcript_head_after_message_mutation();
2646 }
2647
2648 pub fn push_batch(&mut self, messages: Vec<Message>) {
2652 if messages.is_empty() {
2653 return;
2654 }
2655 let inner = Arc::make_mut(&mut self.messages);
2656 inner.extend(messages);
2657 self.updated_at = SystemTime::now();
2658 self.refresh_transcript_head_after_message_mutation();
2659 }
2660
2661 pub async fn externalize_media(
2672 &mut self,
2673 blob_store: &dyn crate::BlobStore,
2674 start: usize,
2675 ) -> Result<(), crate::blob::BlobStoreError> {
2676 let previous_digest = if self
2677 .metadata
2678 .contains_key(SESSION_TRANSCRIPT_HISTORY_STATE_KEY)
2679 {
2680 transcript_messages_digest(self.messages()).ok()
2681 } else {
2682 None
2683 };
2684 let messages = Arc::make_mut(&mut self.messages);
2685 crate::image_content::externalize_messages_from(blob_store, messages, start).await?;
2686 if let Some(previous_digest) = previous_digest
2687 && transcript_messages_digest(self.messages()).ok().as_ref() != Some(&previous_digest)
2688 {
2689 self.refresh_transcript_head_after_message_mutation();
2690 }
2691 Ok(())
2692 }
2693
2694 pub fn touch(&mut self) {
2698 self.updated_at = SystemTime::now();
2699 }
2700
2701 pub fn last_n(&self, n: usize) -> &[Message] {
2703 let start = self.messages.len().saturating_sub(n);
2704 &self.messages[start..]
2705 }
2706
2707 pub fn total_tokens(&self) -> u64 {
2709 self.usage.total_tokens()
2710 }
2711
2712 pub fn total_usage(&self) -> Usage {
2714 self.usage.clone()
2715 }
2716
2717 pub fn record_usage(&mut self, turn_usage: Usage) {
2719 self.usage.add(&turn_usage);
2720 self.updated_at = SystemTime::now();
2721 }
2722
2723 pub fn append_external_user_content(&mut self, content: ContentInput) {
2725 self.push(Message::User(UserMessage::with_blocks(
2726 content.into_blocks(),
2727 )));
2728 }
2729
2730 pub fn append_external_assistant_blocks(
2732 &mut self,
2733 blocks: Vec<AssistantBlock>,
2734 stop_reason: StopReason,
2735 usage: Usage,
2736 ) {
2737 if !blocks.is_empty() {
2738 self.push(Message::BlockAssistant(BlockAssistantMessage::new(
2739 blocks,
2740 stop_reason,
2741 )));
2742 }
2743 if usage != Usage::default() {
2744 self.record_usage(usage);
2745 }
2746 }
2747
2748 pub fn append_realtime_transcript_event(
2756 &mut self,
2757 event: RealtimeTranscriptEvent,
2758 ) -> RealtimeTranscriptApplyOutcome {
2759 let mut state = self.realtime_transcript_state();
2760 let commit =
2761 realtime_transcript_revision::apply_realtime_transcript_event(&mut state, event)
2762 .unwrap_or_else(|err| {
2763 fail_closed_generated_restore(
2764 "realtime-transcript",
2765 <serde_json::Error as serde::de::Error>::custom(err),
2766 )
2767 });
2768 self.store_realtime_transcript_state(&state);
2769 self.push_batch(commit.messages);
2770 if commit.usage != Usage::default() {
2771 self.record_usage(commit.usage);
2772 }
2773 commit.outcome
2774 }
2775
2776 #[must_use]
2795 pub fn in_flight_realtime_assistant_response_ids(&self) -> Vec<String> {
2796 let state = self.realtime_transcript_state();
2797 realtime_transcript_revision::in_flight_realtime_assistant_response_ids(&state)
2798 }
2799
2800 fn realtime_transcript_state(&self) -> SessionRealtimeTranscriptState {
2801 match self.try_realtime_transcript_state() {
2802 Ok(Some(state)) => state,
2803 Ok(None) => SessionRealtimeTranscriptState::default(),
2804 Err(err) => fail_closed_generated_restore("realtime-transcript", err),
2805 }
2806 }
2807
2808 fn try_realtime_transcript_state(
2809 &self,
2810 ) -> Result<Option<SessionRealtimeTranscriptState>, serde_json::Error> {
2811 self.metadata
2812 .get(SESSION_REALTIME_TRANSCRIPT_STATE_KEY)
2813 .map(|value| {
2814 let state = serde_json::from_value(value.clone())?;
2815 realtime_transcript_revision::restore_realtime_transcript_state(state)
2816 .map_err(<serde_json::Error as serde::de::Error>::custom)
2817 })
2818 .transpose()
2819 }
2820
2821 fn store_realtime_transcript_state(&mut self, state: &SessionRealtimeTranscriptState) {
2822 match serde_json::to_value(state) {
2823 Ok(value) => self.set_metadata_unchecked(SESSION_REALTIME_TRANSCRIPT_STATE_KEY, value),
2824 Err(error) => {
2825 tracing::warn!(error = %error, "failed to serialize realtime transcript state");
2826 }
2827 }
2828 }
2829
2830 fn apply_authorized_system_prompt(
2831 &mut self,
2832 prompt: session_durable_config_authority::AuthorizedSystemPrompt,
2833 ) {
2834 use crate::types::SystemMessage;
2835
2836 let mutation_kind = prompt.mutation_kind();
2841 let (prompt, _replacing_existing) = prompt.into_parts();
2842 let message = SystemMessage::with_mutation_kind(prompt, mutation_kind);
2843 let inner = Arc::make_mut(&mut self.messages);
2844 if let Some(Message::System(_)) = inner.first() {
2846 inner[0] = Message::System(message);
2847 } else {
2848 inner.insert(0, Message::System(message));
2849 }
2850 self.updated_at = SystemTime::now();
2851 self.refresh_transcript_head_after_message_mutation();
2852 }
2853
2854 pub fn set_system_prompt_with_source(
2856 &mut self,
2857 prompt: String,
2858 source: session_durable_config_authority::SessionSystemPromptSource,
2859 ) -> Result<(), session_durable_config_authority::SessionDurableConfigAuthorityError> {
2860 let replacing_existing = matches!(self.messages.first(), Some(Message::System(_)));
2861 let prompt = session_durable_config_authority::authorize_system_prompt_mutation(
2862 prompt,
2863 source,
2864 replacing_existing,
2865 )?;
2866 self.apply_authorized_system_prompt(prompt);
2867 Ok(())
2868 }
2869
2870 pub fn set_system_prompt(&mut self, prompt: String) {
2872 if let Err(err) = self.set_system_prompt_with_source(
2873 prompt,
2874 session_durable_config_authority::SessionSystemPromptSource::DirectMutation,
2875 ) {
2876 tracing::warn!(error = %err, "generated session durable-config authority rejected system prompt mutation");
2877 }
2878 }
2879
2880 pub fn discard_transient_runtime_steer_context(&mut self) -> usize {
2886 let mut removed = 0usize;
2887
2888 let mut state = match self.try_system_context_state() {
2889 Ok(state) => state.unwrap_or_default(),
2890 Err(err) => {
2891 tracing::warn!(
2892 error = %err,
2893 "generated system-context authority rejected runtime steer cleanup state"
2894 );
2895 return removed;
2896 }
2897 };
2898
2899 let runtime_steer_appends = state
2904 .pending
2905 .iter()
2906 .chain(state.applied.iter())
2907 .filter(|append| append.source_kind.is_runtime_steer())
2908 .cloned()
2909 .collect::<Vec<_>>();
2910 if let Some(Message::System(system)) = self.messages.first() {
2911 let (retained_prompt, removed_blocks) =
2912 system_context_authority::remove_runtime_steer_blocks_for_rendered(
2913 &system.content,
2914 &runtime_steer_appends,
2915 );
2916 if removed_blocks > 0 {
2917 removed += removed_blocks;
2918 if let Err(err) = self.set_system_prompt_with_source(
2919 retained_prompt,
2920 session_durable_config_authority::SessionSystemPromptSource::RuntimeSteerCleanup,
2921 ) {
2922 tracing::warn!(
2923 error = %err,
2924 "generated session durable-config authority rejected runtime steer prompt cleanup"
2925 );
2926 }
2927 }
2928 }
2929
2930 removed += system_context_authority::discard_transient_runtime_steer_state(&mut state);
2931
2932 if removed > 0
2933 && let Err(err) = self.set_system_context_state(state)
2934 {
2935 tracing::warn!(
2936 error = %err,
2937 "failed to persist runtime steer context cleanup"
2938 );
2939 }
2940
2941 removed
2942 }
2943
2944 pub fn append_system_context_blocks(&mut self, appends: &[PendingSystemContextAppend]) {
2946 if appends.is_empty() {
2947 return;
2948 }
2949
2950 let current_system_prompt = self
2951 .messages
2952 .first()
2953 .and_then(|message| match message {
2954 Message::System(system) => Some(system.content.as_str()),
2955 _ => None,
2956 })
2957 .unwrap_or_default();
2958 let mut state = match self.try_system_context_state() {
2959 Ok(state) => state.unwrap_or_default(),
2960 Err(err) => {
2961 tracing::warn!(
2962 error = %err,
2963 "generated system-context authority rejected applied context state"
2964 );
2965 return;
2966 }
2967 };
2968 let new_appends = system_context_authority::record_applied_system_context_blocks(
2969 &mut state,
2970 appends,
2971 current_system_prompt,
2972 );
2973 if new_appends.is_empty() {
2974 if let Err(err) = self.set_system_context_state(state) {
2975 tracing::warn!(error = %err, "failed to persist applied system-context state");
2976 }
2977 return;
2978 }
2979
2980 let rendered = new_appends
2981 .iter()
2982 .map(render_system_context_block)
2983 .collect::<Vec<_>>()
2984 .join(SYSTEM_CONTEXT_SEPARATOR);
2985
2986 let next = match self.messages.first() {
2987 Some(Message::System(sys)) if !sys.content.is_empty() => {
2988 format!("{}{}{}", sys.content, SYSTEM_CONTEXT_SEPARATOR, rendered)
2989 }
2990 _ => rendered,
2991 };
2992 if let Err(err) = self.set_system_prompt_with_source(
2993 next,
2994 session_durable_config_authority::SessionSystemPromptSource::RuntimeContextAppend,
2995 ) {
2996 tracing::warn!(
2997 error = %err,
2998 "generated session durable-config authority rejected system-context prompt append"
2999 );
3000 return;
3001 }
3002 if let Err(err) = self.set_system_context_state(state) {
3003 tracing::warn!(error = %err, "failed to persist applied system-context state");
3004 }
3005 }
3006
3007 pub fn last_assistant_text(&self) -> Option<String> {
3014 self.messages.iter().rev().find_map(|m| match m {
3015 Message::BlockAssistant(a) => {
3016 let mut buf = String::new();
3017 for block in &a.blocks {
3018 match block {
3019 crate::types::AssistantBlock::Text { text, .. }
3020 | crate::types::AssistantBlock::Transcript { text, .. } => {
3021 buf.push_str(text);
3022 }
3023 _ => {}
3024 }
3025 }
3026 if buf.is_empty() { None } else { Some(buf) }
3027 }
3028 _ => None,
3029 })
3030 }
3031
3032 pub fn tool_call_count(&self) -> usize {
3034 self.messages
3035 .iter()
3036 .filter_map(|m| match m {
3037 Message::BlockAssistant(a) => Some(
3038 a.blocks
3039 .iter()
3040 .filter(|b| matches!(b, crate::types::AssistantBlock::ToolUse { .. }))
3041 .count(),
3042 ),
3043 _ => None,
3044 })
3045 .sum()
3046 }
3047
3048 pub fn metadata(&self) -> &serde_json::Map<String, serde_json::Value> {
3050 &self.metadata
3051 }
3052
3053 fn set_metadata_unchecked(&mut self, key: &str, value: serde_json::Value) {
3054 self.metadata.insert(key.to_string(), value);
3055 self.updated_at = SystemTime::now();
3056 }
3057
3058 #[cfg(test)]
3059 pub(crate) fn set_metadata_unchecked_for_test(&mut self, key: &str, value: serde_json::Value) {
3060 self.set_metadata_unchecked(key, value);
3061 }
3062
3063 fn fork_metadata_projection(&self) -> serde_json::Map<String, serde_json::Value> {
3064 let mut metadata = self.metadata.clone();
3065 metadata.retain(|key, _| !is_session_authority_metadata_key(key));
3066 metadata
3067 }
3068
3069 fn remove_metadata_unchecked(&mut self, key: &str) {
3070 self.metadata.remove(key);
3071 self.updated_at = SystemTime::now();
3072 }
3073
3074 pub fn try_set_metadata(
3076 &mut self,
3077 key: &str,
3078 value: serde_json::Value,
3079 ) -> Result<(), ReservedSessionMetadataKey> {
3080 if is_session_authority_metadata_key(key) {
3081 return Err(ReservedSessionMetadataKey::new(key));
3082 }
3083 self.set_metadata_unchecked(key, value);
3084 Ok(())
3085 }
3086
3087 pub fn set_metadata(&mut self, key: &str, value: serde_json::Value) {
3092 if let Err(err) = self.try_set_metadata(key, value) {
3093 tracing::warn!(error = %err, "rejected raw session metadata mutation");
3094 }
3095 }
3096
3097 pub fn backfill_metadata_if_absent(&mut self, key: &str, value: serde_json::Value) -> bool {
3103 if is_session_authority_metadata_key(key) {
3104 tracing::warn!(
3105 metadata_key = key,
3106 "rejected raw session metadata backfill for authority key"
3107 );
3108 return false;
3109 }
3110 if self.metadata.contains_key(key) {
3111 false
3112 } else {
3113 self.metadata.insert(key.to_string(), value);
3114 true
3115 }
3116 }
3117
3118 pub fn remove_metadata(&mut self, key: &str) {
3120 if is_session_authority_metadata_key(key) {
3121 tracing::warn!(
3122 metadata_key = key,
3123 "rejected raw session metadata removal for authority key"
3124 );
3125 return;
3126 }
3127 self.metadata.remove(key);
3128 self.updated_at = SystemTime::now();
3129 }
3130
3131 pub fn set_session_metadata(
3133 &mut self,
3134 metadata: SessionMetadata,
3135 ) -> Result<(), serde_json::Error> {
3136 let metadata =
3137 session_durable_config_authority::authorize_session_metadata_persist(metadata)
3138 .map_err(<serde_json::Error as serde::ser::Error>::custom)?
3139 .into_metadata();
3140 let value = serde_json::to_value(metadata)?;
3141 self.set_metadata_unchecked(SESSION_METADATA_KEY, value);
3142 Ok(())
3143 }
3144
3145 pub fn session_metadata(&self) -> Option<SessionMetadata> {
3150 match self.try_session_metadata() {
3151 Ok(metadata) => metadata,
3152 Err(err) => fail_closed_generated_restore("session-metadata", err),
3153 }
3154 }
3155
3156 pub fn try_session_metadata(&self) -> Result<Option<SessionMetadata>, serde_json::Error> {
3158 let Some(value) = self.metadata.get(SESSION_METADATA_KEY) else {
3159 return Ok(None);
3160 };
3161 let mut metadata = serde_json::from_value::<SessionMetadata>(value.clone())?;
3162 metadata.schema_version =
3163 session_persistence_version_authority::restore_session_metadata_schema_version(
3164 metadata.schema_version,
3165 )
3166 .map_err(<serde_json::Error as serde::de::Error>::custom)?;
3167 session_durable_config_authority::restore_session_metadata(metadata)
3168 .map(Some)
3169 .map_err(<serde_json::Error as serde::de::Error>::custom)
3170 }
3171
3172 pub fn set_system_context_state(
3174 &mut self,
3175 state: SessionSystemContextState,
3176 ) -> Result<(), serde_json::Error> {
3177 let state = system_context_authority::restore_system_context_state(state)
3178 .map_err(<serde_json::Error as serde::ser::Error>::custom)?;
3179 let value = serde_json::to_value(state)?;
3180 self.set_metadata_unchecked(SESSION_SYSTEM_CONTEXT_STATE_KEY, value);
3181 Ok(())
3182 }
3183
3184 pub fn try_system_context_state(
3186 &self,
3187 ) -> Result<Option<SessionSystemContextState>, serde_json::Error> {
3188 self.metadata
3189 .get(SESSION_SYSTEM_CONTEXT_STATE_KEY)
3190 .map(|value| {
3191 let state = serde_json::from_value(value.clone())?;
3192 system_context_authority::restore_system_context_state(state)
3193 .map_err(<serde_json::Error as serde::de::Error>::custom)
3194 })
3195 .transpose()
3196 }
3197
3198 pub fn system_context_state(&self) -> Option<SessionSystemContextState> {
3204 match self.try_system_context_state() {
3205 Ok(state) => state,
3206 Err(err) => fail_closed_generated_restore("system-context", err),
3207 }
3208 }
3209
3210 pub fn set_deferred_turn_state(
3212 &mut self,
3213 state: SessionDeferredTurnState,
3214 ) -> Result<(), serde_json::Error> {
3215 let state = validate_deferred_turn_snapshot(state)
3216 .map_err(<serde_json::Error as serde::ser::Error>::custom)?;
3217 let value = serde_json::to_value(state)?;
3218 self.set_metadata_unchecked(SESSION_DEFERRED_TURN_STATE_KEY, value);
3219 Ok(())
3220 }
3221
3222 pub fn try_deferred_turn_state(
3224 &self,
3225 ) -> Result<Option<SessionDeferredTurnState>, serde_json::Error> {
3226 self.metadata
3227 .get(SESSION_DEFERRED_TURN_STATE_KEY)
3228 .map(|value| {
3229 let state = serde_json::from_value(value.clone())?;
3230 validate_deferred_turn_snapshot(state)
3231 .map_err(<serde_json::Error as serde::de::Error>::custom)
3232 })
3233 .transpose()
3234 }
3235
3236 pub fn deferred_turn_state(&self) -> Option<SessionDeferredTurnState> {
3242 match self.try_deferred_turn_state() {
3243 Ok(state) => state,
3244 Err(err) => fail_closed_generated_restore("deferred-turn", err),
3245 }
3246 }
3247
3248 pub fn set_lifecycle_terminal(
3257 &mut self,
3258 terminal: SessionLifecycleTerminal,
3259 ) -> Result<(), serde_json::Error> {
3260 let value = serde_json::to_value(terminal)?;
3261 self.set_metadata_unchecked(SESSION_LIFECYCLE_TERMINAL_KEY, value);
3262 Ok(())
3263 }
3264
3265 pub fn try_lifecycle_terminal(
3270 &self,
3271 ) -> Result<Option<SessionLifecycleTerminal>, serde_json::Error> {
3272 match self.metadata.get(SESSION_LIFECYCLE_TERMINAL_KEY) {
3273 Some(value) => serde_json::from_value(value.clone()).map(Some),
3274 None => Ok(None),
3275 }
3276 }
3277
3278 pub fn lifecycle_terminal(&self) -> Option<SessionLifecycleTerminal> {
3284 match self.try_lifecycle_terminal() {
3285 Ok(state) => state,
3286 Err(err) => fail_closed_generated_restore("session-lifecycle-terminal", err),
3287 }
3288 }
3289
3290 pub fn set_build_state(&mut self, state: SessionBuildState) -> Result<(), serde_json::Error> {
3292 let state = session_durable_config_authority::authorize_session_build_state_persist(state)
3293 .map_err(<serde_json::Error as serde::ser::Error>::custom)?
3294 .into_state();
3295 let value = serde_json::to_value(state)?;
3296 self.set_metadata_unchecked(SESSION_BUILD_STATE_KEY, value);
3297 Ok(())
3298 }
3299
3300 pub fn build_state(&self) -> Option<SessionBuildState> {
3305 match self.try_build_state() {
3306 Ok(state) => state,
3307 Err(err) => fail_closed_generated_restore("session-build-state", err),
3308 }
3309 }
3310
3311 pub fn try_build_state(&self) -> Result<Option<SessionBuildState>, serde_json::Error> {
3313 let Some(value) = self.metadata.get(SESSION_BUILD_STATE_KEY) else {
3314 return Ok(None);
3315 };
3316 let state = serde_json::from_value::<SessionBuildState>(value.clone())?;
3317 session_durable_config_authority::restore_session_build_state(state)
3318 .map(Some)
3319 .map_err(<serde_json::Error as serde::de::Error>::custom)
3320 }
3321
3322 pub fn set_tool_visibility_state(
3324 &mut self,
3325 state: AuthorizedSessionToolVisibilityState,
3326 ) -> Result<(), serde_json::Error> {
3327 let value = serde_json::to_value(state.into_state())?;
3328 self.set_metadata_unchecked(SESSION_TOOL_VISIBILITY_STATE_KEY, value);
3329 Ok(())
3330 }
3331
3332 #[cfg(test)]
3337 pub(crate) fn clear_tool_visibility_state(&mut self) {
3338 self.remove_metadata_unchecked(SESSION_TOOL_VISIBILITY_STATE_KEY);
3339 }
3340
3341 pub fn tool_visibility_state(
3343 &self,
3344 ) -> Result<Option<SessionToolVisibilityState>, serde_json::Error> {
3345 self.try_tool_visibility_state()
3346 }
3347
3348 pub fn try_tool_visibility_state(
3351 &self,
3352 ) -> Result<Option<SessionToolVisibilityState>, serde_json::Error> {
3353 self.metadata
3354 .get(SESSION_TOOL_VISIBILITY_STATE_KEY)
3355 .map(|value| serde_json::from_value(value.clone()))
3356 .transpose()
3357 }
3358
3359 pub fn transcript_history_state(
3361 &self,
3362 ) -> Result<Option<TranscriptHistoryState>, serde_json::Error> {
3363 self.metadata
3364 .get(SESSION_TRANSCRIPT_HISTORY_STATE_KEY)
3365 .map(|value| serde_json::from_value(value.clone()))
3366 .transpose()
3367 }
3368
3369 pub fn validate_transcript_history_state(&self) -> Result<(), TranscriptEditError> {
3371 let Some(state) = self
3372 .transcript_history_state()
3373 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?
3374 else {
3375 return Ok(());
3376 };
3377 validate_transcript_history_state(&state)
3378 }
3379
3380 pub fn clear_transcript_history_state(&mut self) {
3383 self.remove_metadata_unchecked(SESSION_TRANSCRIPT_HISTORY_STATE_KEY);
3384 }
3385
3386 pub fn transcript_revision_body(
3388 &self,
3389 revision: &str,
3390 ) -> Result<Option<TranscriptRevisionBody>, serde_json::Error> {
3391 Ok(self.transcript_history_state()?.and_then(|state| {
3392 state
3393 .revisions
3394 .into_iter()
3395 .find(|body| body.revision == revision)
3396 }))
3397 }
3398
3399 pub fn transcript_revision_messages(
3401 &self,
3402 revision: &str,
3403 ) -> Result<Option<Vec<Message>>, serde_json::Error> {
3404 Ok(self
3405 .transcript_revision_body(revision)?
3406 .map(|body| body.messages))
3407 }
3408
3409 pub fn apply_transcript_history_state(
3411 &mut self,
3412 state: TranscriptHistoryState,
3413 ) -> Result<(), TranscriptEditError> {
3414 validate_transcript_history_state(&state)?;
3415 let head_body = state
3416 .revisions
3417 .iter()
3418 .find(|body| body.revision == state.head)
3419 .ok_or_else(|| {
3420 TranscriptEditError::HistoryStateMalformed(format!(
3421 "missing transcript head body {}",
3422 state.head
3423 ))
3424 })?
3425 .clone();
3426 let value = serde_json::to_value(&state)
3427 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
3428 self.set_metadata_unchecked(SESSION_TRANSCRIPT_HISTORY_STATE_KEY, value);
3429 let mut updated_at = head_body.created_at;
3430 for commit in &state.commits {
3431 if commit.committed_at > updated_at {
3432 updated_at = commit.committed_at;
3433 }
3434 }
3435 self.messages = Arc::new(head_body.messages);
3436 self.updated_at = updated_at;
3437 Ok(())
3438 }
3439
3440 pub fn transcript_revision(&self) -> Result<String, serde_json::Error> {
3443 if let Some(state) = self.transcript_history_state()? {
3444 Ok(state.head)
3445 } else {
3446 transcript_messages_digest(self.messages())
3447 }
3448 }
3449
3450 pub fn commit_transcript_rewrite(
3452 &mut self,
3453 selection: TranscriptRewriteSelection,
3454 replacement: Vec<Message>,
3455 reason: TranscriptRewriteReason,
3456 actor: Option<String>,
3457 expected_parent_revision: Option<String>,
3458 ) -> Result<TranscriptRewriteCommit, TranscriptEditError> {
3459 let parent_revision = self
3460 .transcript_revision()
3461 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
3462 if let Some(expected) = expected_parent_revision
3463 && expected != parent_revision
3464 {
3465 return Err(TranscriptEditError::RevisionConflict {
3466 expected,
3467 actual: parent_revision,
3468 });
3469 }
3470
3471 let (start, end) = selection.bounds();
3472 let message_count = self.messages.len();
3473 if start > end || end > message_count {
3474 return Err(TranscriptEditError::InvalidRewriteRange {
3475 start,
3476 end,
3477 message_count,
3478 });
3479 }
3480
3481 let replacement_len = replacement.len();
3482 let mut rewritten = Vec::with_capacity(
3483 start
3484 .saturating_add(replacement_len)
3485 .saturating_add(message_count.saturating_sub(end)),
3486 );
3487 rewritten.extend_from_slice(&self.messages[..start]);
3488 rewritten.extend(replacement);
3489 rewritten.extend_from_slice(&self.messages[end..]);
3490 validate_transcript_tool_result_shape(&rewritten)?;
3491
3492 let original_span_digest = transcript_messages_digest(&self.messages[start..end])
3493 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
3494 let replacement_digest =
3495 transcript_messages_digest(&rewritten[start..start + replacement_len])
3496 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
3497 let revision = transcript_messages_digest(&rewritten)
3498 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
3499 if revision == parent_revision {
3500 return Err(TranscriptEditError::NoOpRewrite { revision });
3501 }
3502
3503 let commit = TranscriptRewriteCommit {
3504 parent_revision,
3505 revision: revision.clone(),
3506 selection,
3507 original_span_digest,
3508 replacement_digest,
3509 messages_before: message_count,
3510 messages_after: rewritten.len(),
3511 reason,
3512 actor,
3513 committed_at: SystemTime::now(),
3514 };
3515
3516 let mut state = self
3517 .transcript_history_state()
3518 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?
3519 .unwrap_or_else(|| TranscriptHistoryState {
3520 head: commit.parent_revision.clone(),
3521 commits: Vec::new(),
3522 revisions: Vec::new(),
3523 });
3524 if !state
3525 .revisions
3526 .iter()
3527 .any(|body| body.revision == commit.parent_revision)
3528 {
3529 state.revisions.push(TranscriptRevisionBody {
3530 revision: commit.parent_revision.clone(),
3531 parent_revision: None,
3532 messages: self.messages().to_vec(),
3533 created_at: self.updated_at,
3534 });
3535 }
3536 if !state
3537 .revisions
3538 .iter()
3539 .any(|body| body.revision == commit.revision)
3540 {
3541 state.revisions.push(TranscriptRevisionBody {
3542 revision: commit.revision.clone(),
3543 parent_revision: Some(commit.parent_revision.clone()),
3544 messages: rewritten.clone(),
3545 created_at: commit.committed_at,
3546 });
3547 }
3548 state.head = revision;
3549 state.commits.push(commit.clone());
3550 let value = serde_json::to_value(state)
3551 .map_err(|err| TranscriptEditError::HistoryStateMalformed(err.to_string()))?;
3552 self.set_metadata_unchecked(SESSION_TRANSCRIPT_HISTORY_STATE_KEY, value);
3553
3554 self.messages = Arc::new(rewritten);
3555 self.updated_at = SystemTime::now();
3556 Ok(commit)
3557 }
3558
3559 fn refresh_transcript_head_after_message_mutation(&mut self) {
3560 if !self
3561 .metadata
3562 .contains_key(SESSION_TRANSCRIPT_HISTORY_STATE_KEY)
3563 {
3564 return;
3565 }
3566 let Ok(Some(mut state)) = self.transcript_history_state() else {
3567 tracing::warn!(
3568 session_id = %self.id,
3569 "transcript history state is malformed; leaving head unchanged after message mutation"
3570 );
3571 return;
3572 };
3573 let Ok(head) = transcript_messages_digest(self.messages()) else {
3574 tracing::warn!(
3575 session_id = %self.id,
3576 "failed to digest transcript after message mutation; leaving head unchanged"
3577 );
3578 return;
3579 };
3580 let previous_head = state.head.clone();
3581 if !state.revisions.iter().any(|body| body.revision == head) {
3582 state.revisions.push(TranscriptRevisionBody {
3583 revision: head.clone(),
3584 parent_revision: Some(previous_head),
3585 messages: self.messages().to_vec(),
3586 created_at: SystemTime::now(),
3587 });
3588 }
3589 state.head = head;
3590 match serde_json::to_value(state) {
3591 Ok(value) => self.set_metadata_unchecked(SESSION_TRANSCRIPT_HISTORY_STATE_KEY, value),
3592 Err(error) => {
3593 tracing::warn!(
3594 session_id = %self.id,
3595 error = %error,
3596 "failed to serialize transcript history state after message mutation"
3597 );
3598 }
3599 }
3600 }
3601
3602 pub fn set_mob_tool_authority_context(
3610 &mut self,
3611 authority_context: Option<MobToolAuthorityContext>,
3612 ) -> Result<(), serde_json::Error> {
3613 if let Some(authority_context) = authority_context.as_ref()
3614 && !authority_context.is_generated_authority_context()
3615 {
3616 return Err(<serde_json::Error as serde::de::Error>::custom(
3617 "mob authority context was not minted by generated authority",
3618 ));
3619 }
3620 let mut build_state = self.build_state().ok_or_else(|| {
3621 <serde_json::Error as serde::de::Error>::custom(format!(
3622 "session {} is missing session build state",
3623 self.id
3624 ))
3625 })?;
3626 build_state.mob_tool_authority_context = authority_context;
3627 self.set_build_state(build_state)
3628 }
3629
3630 pub fn mob_tool_authority_context(&self) -> Option<MobToolAuthorityContext> {
3635 self.build_state()
3636 .and_then(|state| state.mob_tool_authority_context)
3637 .filter(MobToolAuthorityContext::is_generated_authority_context)
3638 }
3639
3640 pub fn fork_at(&self, index: usize) -> Self {
3645 let now = SystemTime::now();
3646 let truncated = self.messages[..index.min(self.messages.len())].to_vec();
3647 Self {
3648 version: session_version(),
3649 id: SessionId::new(),
3650 messages: Arc::new(truncated),
3651 created_at: now,
3652 updated_at: now,
3653 metadata: self.fork_metadata_projection(),
3654 usage: self.usage.clone(),
3655 }
3656 }
3657
3658 pub fn fork_replacing(
3665 &self,
3666 message_index: usize,
3667 replacement: TranscriptReplacement,
3668 ) -> Result<Self, TranscriptEditError> {
3669 let Some(original) = self.messages.get(message_index) else {
3670 return Err(TranscriptEditError::MessageIndexOutOfBounds {
3671 message_index,
3672 message_count: self.messages.len(),
3673 });
3674 };
3675
3676 let replacement_message = match replacement {
3677 TranscriptReplacement::Message { message } => message,
3678 TranscriptReplacement::UserContentBlock { block_index, block } => {
3679 let Message::User(user) = original else {
3680 return Err(TranscriptEditError::MessageRoleMismatch {
3681 message_index,
3682 expected: "user",
3683 actual: message_role_name(original),
3684 });
3685 };
3686 if block_index >= user.content.len() {
3687 return Err(TranscriptEditError::BlockIndexOutOfBounds {
3688 block_kind: "user content block",
3689 block_index,
3690 block_count: user.content.len(),
3691 });
3692 }
3693 let mut edited = user.clone();
3694 edited.content[block_index] = block;
3695 Message::User(edited)
3696 }
3697 TranscriptReplacement::AssistantBlock { block_index, block } => {
3698 let Message::BlockAssistant(assistant) = original else {
3699 return Err(TranscriptEditError::MessageRoleMismatch {
3700 message_index,
3701 expected: "block_assistant",
3702 actual: message_role_name(original),
3703 });
3704 };
3705 if block_index >= assistant.blocks.len() {
3706 return Err(TranscriptEditError::BlockIndexOutOfBounds {
3707 block_kind: "assistant block",
3708 block_index,
3709 block_count: assistant.blocks.len(),
3710 });
3711 }
3712 let mut edited = assistant.clone();
3713 edited.blocks[block_index] = block;
3714 Message::BlockAssistant(edited)
3715 }
3716 TranscriptReplacement::ToolResultContentBlock {
3717 result_index,
3718 block_index,
3719 block,
3720 } => {
3721 let Message::ToolResults {
3722 results,
3723 created_at,
3724 } = original
3725 else {
3726 return Err(TranscriptEditError::MessageRoleMismatch {
3727 message_index,
3728 expected: "tool_results",
3729 actual: message_role_name(original),
3730 });
3731 };
3732 let Some(result) = results.get(result_index) else {
3733 return Err(TranscriptEditError::BlockIndexOutOfBounds {
3734 block_kind: "tool result",
3735 block_index: result_index,
3736 block_count: results.len(),
3737 });
3738 };
3739 if block_index >= result.content.len() {
3740 return Err(TranscriptEditError::BlockIndexOutOfBounds {
3741 block_kind: "tool result content block",
3742 block_index,
3743 block_count: result.content.len(),
3744 });
3745 }
3746 let mut edited_results = results.clone();
3747 edited_results[result_index].content[block_index] = block;
3748 Message::ToolResults {
3749 results: edited_results,
3750 created_at: *created_at,
3751 }
3752 }
3753 };
3754
3755 let mut forked = self.fork_at(message_index);
3756 forked.push(replacement_message);
3757 Ok(forked)
3758 }
3759
3760 pub fn fork(&self) -> Self {
3765 let now = SystemTime::now();
3766 Self {
3767 version: session_version(),
3768 id: SessionId::new(),
3769 messages: Arc::clone(&self.messages),
3770 created_at: now,
3771 updated_at: now,
3772 metadata: self.fork_metadata_projection(),
3773 usage: self.usage.clone(),
3774 }
3775 }
3776}
3777
3778impl Default for Session {
3779 fn default() -> Self {
3780 Self::new()
3781 }
3782}
3783
3784#[derive(Debug, Clone, Serialize, Deserialize)]
3786#[serde(rename_all = "snake_case")]
3787pub struct SessionMeta {
3788 pub id: SessionId,
3789 pub created_at: SystemTime,
3790 pub updated_at: SystemTime,
3791 pub message_count: usize,
3792 pub total_tokens: u64,
3793 #[serde(default)]
3794 pub metadata: serde_json::Map<String, serde_json::Value>,
3795}
3796
3797#[derive(Debug, Clone, Serialize, Deserialize)]
3799#[serde(rename_all = "snake_case")]
3800pub struct SessionMetadata {
3801 pub schema_version: u32,
3808 pub model: String,
3809 pub max_tokens: u32,
3810 #[serde(default = "crate::config::default_structured_output_retries")]
3811 pub structured_output_retries: u32,
3812 pub provider: Provider,
3813 #[serde(default, skip_serializing_if = "Option::is_none")]
3814 pub self_hosted_server_id: Option<String>,
3815 #[serde(default, skip_serializing_if = "Option::is_none")]
3818 pub provider_params: Option<crate::lifecycle::run_primitive::ProviderParamsOverride>,
3819 pub tooling: SessionTooling,
3820 #[serde(default)]
3821 pub keep_alive: bool,
3822 pub comms_name: Option<String>,
3823 #[serde(default, skip_serializing_if = "Option::is_none")]
3825 pub peer_meta: Option<PeerMeta>,
3826 #[serde(default, skip_serializing_if = "Option::is_none")]
3832 pub realm_id: Option<crate::RealmId>,
3833 #[serde(default, skip_serializing_if = "Option::is_none")]
3835 pub instance_id: Option<String>,
3836 #[serde(default, skip_serializing_if = "Option::is_none")]
3838 pub backend: Option<String>,
3839 #[serde(default, skip_serializing_if = "Option::is_none")]
3841 pub config_generation: Option<u64>,
3842 #[serde(default, skip_serializing_if = "Option::is_none")]
3852 pub auth_binding: Option<crate::AuthBindingRef>,
3853 #[serde(default, skip_serializing_if = "Option::is_none")]
3866 pub mob_member_binding: Option<crate::MobMemberBinding>,
3867}
3868
3869#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
3871#[serde(rename_all = "snake_case")]
3872pub struct SessionLlmIdentity {
3873 pub model: String,
3874 pub provider: Provider,
3875 #[serde(default, skip_serializing_if = "Option::is_none")]
3876 pub self_hosted_server_id: Option<String>,
3877 #[serde(default, skip_serializing_if = "Option::is_none")]
3879 pub provider_params: Option<crate::lifecycle::run_primitive::ProviderParamsOverride>,
3880 #[serde(default, skip_serializing_if = "Option::is_none")]
3893 pub auth_binding: Option<crate::AuthBindingRef>,
3894}
3895
3896pub struct SessionLlmIdentityOverride<'a> {
3904 pub model: Option<&'a str>,
3905 pub provider: Option<Provider>,
3906 pub provider_params:
3907 Option<TurnMetadataOverride<&'a crate::lifecycle::run_primitive::ProviderParamsOverride>>,
3908 pub auth_binding: Option<TurnMetadataOverride<&'a crate::AuthBindingRef>>,
3909}
3910
3911#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
3912pub enum SessionLlmIdentityOverrideError {
3913 #[error("provider override requires model on an existing session")]
3914 ProviderRequiresModel,
3915 #[error("{0}")]
3916 ProviderModelMismatch(String),
3917 #[error("self-hosted provider requires a registered model alias; '{model}' is not configured")]
3918 MissingSelfHostedAlias { model: String },
3919}
3920
3921pub fn resolve_session_llm_identity_override(
3929 current: &SessionLlmIdentity,
3930 registry: &crate::ModelRegistry,
3931 overrides: SessionLlmIdentityOverride<'_>,
3932) -> Result<SessionLlmIdentity, SessionLlmIdentityOverrideError> {
3933 if overrides.provider.is_some() && overrides.model.is_none() {
3934 return Err(SessionLlmIdentityOverrideError::ProviderRequiresModel);
3935 }
3936
3937 let model = overrides
3938 .model
3939 .map(str::to_string)
3940 .unwrap_or_else(|| current.model.clone());
3941 let provider = if let Some(provider) = overrides.provider {
3942 provider
3943 } else if overrides.model.is_some() {
3944 registry
3945 .entry(&model)
3946 .map_or(current.provider, |entry| entry.provider)
3947 } else {
3948 current.provider
3949 };
3950
3951 if (overrides.model.is_some() || overrides.provider.is_some())
3952 && let Some(reason) = registry.provider_override_mismatch_reason(provider, &model)
3953 {
3954 return Err(SessionLlmIdentityOverrideError::ProviderModelMismatch(
3955 reason,
3956 ));
3957 }
3958
3959 let provider_params = match overrides.provider_params {
3960 Some(TurnMetadataOverride::Clear) => None,
3961 Some(TurnMetadataOverride::Set(value)) => Some(value.clone()),
3962 None => current.provider_params.clone(),
3963 };
3964 let self_hosted_server_id = if provider == Provider::SelfHosted {
3965 if overrides.model.is_none() {
3966 current.self_hosted_server_id.clone().or_else(|| {
3967 registry
3968 .entry_for_provider(Provider::SelfHosted, &model)
3969 .and_then(|entry| entry.self_hosted.as_ref())
3970 .map(|server| server.server_id.clone())
3971 })
3972 } else {
3973 let entry = registry
3974 .entry_for_provider(Provider::SelfHosted, &model)
3975 .ok_or_else(|| SessionLlmIdentityOverrideError::MissingSelfHostedAlias {
3976 model: model.clone(),
3977 })?;
3978 entry
3979 .self_hosted
3980 .as_ref()
3981 .map(|server| server.server_id.clone())
3982 }
3983 } else {
3984 None
3985 };
3986
3987 let auth_binding = match overrides.auth_binding {
3988 Some(TurnMetadataOverride::Clear) => None,
3989 Some(TurnMetadataOverride::Set(value)) => Some(value.clone()),
3990 None if provider != current.provider => None,
3993 None => current.auth_binding.clone(),
3994 };
3995
3996 Ok(SessionLlmIdentity {
3997 model,
3998 provider,
3999 self_hosted_server_id,
4000 provider_params,
4001 auth_binding,
4002 })
4003}
4004
4005#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
4012#[serde(rename_all = "snake_case")]
4013pub struct SessionLlmRequestPolicy {
4014 pub model: String,
4015 #[serde(default, skip_serializing_if = "Option::is_none")]
4017 pub provider_params: Option<crate::lifecycle::run_primitive::ProviderParamsOverride>,
4018 #[serde(default, skip_serializing_if = "Option::is_none")]
4020 pub provider_tool_defaults: Option<crate::lifecycle::run_primitive::ProviderTag>,
4021}
4022
4023impl SessionMetadata {
4024 pub fn llm_identity(&self) -> SessionLlmIdentity {
4026 SessionLlmIdentity {
4027 model: self.model.clone(),
4028 provider: self.provider,
4029 self_hosted_server_id: self.self_hosted_server_id.clone(),
4030 provider_params: self.provider_params.clone(),
4031 auth_binding: self.auth_binding.clone(),
4032 }
4033 }
4034
4035 pub fn apply_llm_identity(&mut self, identity: &SessionLlmIdentity) {
4037 self.model = identity.model.clone();
4038 self.provider = identity.provider;
4039 self.self_hosted_server_id = identity.self_hosted_server_id.clone();
4040 self.provider_params = identity.provider_params.clone();
4041 self.auth_binding = identity.auth_binding.clone();
4042 }
4043}
4044
4045pub const SESSION_METADATA_KEY: &str = "session_metadata";
4047
4048#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
4056#[serde(rename_all = "snake_case")]
4057pub enum ToolCategoryOverride {
4058 #[default]
4060 Inherit,
4061 Enable,
4063 Disable,
4065}
4066
4067impl ToolCategoryOverride {
4068 #[must_use]
4074 pub fn resolve(self, runtime_default: bool) -> bool {
4075 match self {
4076 Self::Enable => true,
4077 Self::Disable => false,
4078 Self::Inherit => runtime_default,
4079 }
4080 }
4081
4082 #[must_use]
4088 pub fn to_override(self) -> Option<bool> {
4089 match self {
4090 Self::Enable => Some(true),
4091 Self::Disable => Some(false),
4092 Self::Inherit => None,
4093 }
4094 }
4095
4096 #[must_use]
4104 pub fn from_effective(enabled: bool) -> Self {
4105 if enabled { Self::Enable } else { Self::Disable }
4106 }
4107
4108 #[must_use]
4118 pub fn from_override(value: Option<bool>) -> Self {
4119 match value {
4120 Some(true) => Self::Enable,
4121 Some(false) => Self::Disable,
4122 None => Self::Inherit,
4123 }
4124 }
4125}
4126
4127#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
4134#[serde(rename_all = "snake_case")]
4135pub struct SessionTooling {
4136 #[serde(default)]
4137 pub builtins: ToolCategoryOverride,
4138 #[serde(default)]
4139 pub shell: ToolCategoryOverride,
4140 #[serde(default)]
4141 pub comms: ToolCategoryOverride,
4142 #[serde(default)]
4144 pub mob: ToolCategoryOverride,
4145 #[serde(default)]
4147 pub memory: ToolCategoryOverride,
4148 #[serde(default)]
4150 pub schedule: ToolCategoryOverride,
4151 #[serde(default)]
4153 pub workgraph: ToolCategoryOverride,
4154 #[serde(default)]
4156 pub image_generation: ToolCategoryOverride,
4157 #[serde(default)]
4159 pub web_search: ToolCategoryOverride,
4160 #[serde(default, skip_serializing_if = "Option::is_none")]
4162 pub active_skills: Option<Vec<crate::skills::SkillKey>>,
4163}
4164
4165impl From<&Session> for SessionMeta {
4166 fn from(session: &Session) -> Self {
4167 Self {
4168 id: session.id.clone(),
4169 created_at: session.created_at,
4170 updated_at: session.updated_at,
4171 message_count: session.messages.len(),
4172 total_tokens: session.total_tokens(),
4173 metadata: session.metadata.clone(),
4174 }
4175 }
4176}
4177
4178#[cfg(test)]
4179#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
4180mod tests {
4181 use super::*;
4182 use crate::realtime_transcript::RealtimeTranscriptRole;
4183 use crate::types::{
4184 AssistantBlock, BlockAssistantMessage, ContentBlock, StopReason, SystemMessage, Usage,
4185 UserMessage,
4186 };
4187 use std::sync::Arc;
4188
4189 fn block_assistant_text(message: &BlockAssistantMessage) -> String {
4190 message
4191 .blocks
4192 .iter()
4193 .filter_map(|block| match block {
4194 AssistantBlock::Text { text, .. } => Some(text.as_str()),
4195 _ => None,
4196 })
4197 .collect()
4198 }
4199
4200 #[test]
4204 fn replace_synthetic_notices_leaves_only_replacements_of_kind() {
4205 use crate::types::{SystemNoticeKind, SystemNoticeMessage};
4206
4207 let mut session = Session::new();
4208 session.push(Message::User(UserMessage::text("hello".to_string())));
4209 session.push(Message::SystemNotice(SystemNoticeMessage::new(
4210 SystemNoticeKind::McpPending,
4211 "stale one",
4212 )));
4213 session.push(Message::SystemNotice(SystemNoticeMessage::new(
4214 SystemNoticeKind::McpPending,
4215 "stale two",
4216 )));
4217 session.push(Message::SystemNotice(SystemNoticeMessage::new(
4219 SystemNoticeKind::BackgroundJob,
4220 "other-kind",
4221 )));
4222
4223 session
4224 .replace_synthetic_notices(
4225 SystemNoticeKind::McpPending,
4226 vec![Message::SystemNotice(SystemNoticeMessage::new(
4227 SystemNoticeKind::McpPending,
4228 "fresh",
4229 ))],
4230 )
4231 .expect("notice refresh succeeds");
4232
4233 let mcp_pending: Vec<&SystemNoticeMessage> = session
4234 .messages()
4235 .iter()
4236 .filter_map(|message| match message {
4237 Message::SystemNotice(notice) if notice.kind == SystemNoticeKind::McpPending => {
4238 Some(notice)
4239 }
4240 _ => None,
4241 })
4242 .collect();
4243 assert_eq!(mcp_pending.len(), 1, "exactly one notice of the kind");
4244 assert_eq!(mcp_pending[0].body.as_deref(), Some("fresh"));
4245 assert!(
4246 session.messages().iter().any(|message| matches!(
4247 message,
4248 Message::SystemNotice(notice) if notice.kind == SystemNoticeKind::BackgroundJob
4249 )),
4250 "other-kind notices are untouched"
4251 );
4252
4253 session
4255 .replace_synthetic_notices(SystemNoticeKind::McpPending, Vec::new())
4256 .expect("pure strip succeeds");
4257 assert!(
4258 !session.messages().iter().any(|message| matches!(
4259 message,
4260 Message::SystemNotice(notice) if notice.kind == SystemNoticeKind::McpPending
4261 )),
4262 "empty replacement clears the kind"
4263 );
4264 }
4265
4266 #[test]
4270 fn replace_synthetic_notices_rejects_mismatched_kind_without_mutation() {
4271 use crate::types::{SystemNoticeKind, SystemNoticeMessage};
4272
4273 let mut session = Session::new();
4274 session.push(Message::SystemNotice(SystemNoticeMessage::new(
4275 SystemNoticeKind::McpPending,
4276 "stale",
4277 )));
4278 let before = session.messages().to_vec();
4279
4280 let err = session
4281 .replace_synthetic_notices(
4282 SystemNoticeKind::McpPending,
4283 vec![Message::User(UserMessage::text("not a notice".to_string()))],
4284 )
4285 .expect_err("mismatched replacement must fail typed");
4286 assert!(
4287 matches!(err, TranscriptEditError::InvalidTranscriptShape(_)),
4288 "expected InvalidTranscriptShape, got {err:?}"
4289 );
4290 assert_eq!(
4291 session.messages(),
4292 before.as_slice(),
4293 "fault must leave the transcript unchanged (no partial strip)"
4294 );
4295 }
4296
4297 #[test]
4298 fn transcript_rewrite_preserves_full_assistant_block_trace() {
4299 let mut session = Session::new();
4300 session.push(Message::User(UserMessage::text(
4301 "run the trace".to_string(),
4302 )));
4303 session.push(Message::BlockAssistant(BlockAssistantMessage::new(
4304 vec![AssistantBlock::Text {
4305 text: "original assistant trace".to_string(),
4306 meta: None,
4307 }],
4308 StopReason::EndTurn,
4309 )));
4310
4311 let parent_revision = session.transcript_revision().expect("parent revision");
4312 let replacement = vec![
4313 Message::BlockAssistant(BlockAssistantMessage::new(
4314 vec![
4315 AssistantBlock::Text {
4316 text: "compacted assistant trace".to_string(),
4317 meta: None,
4318 },
4319 AssistantBlock::ToolUse {
4320 id: "toolu_trace".to_string(),
4321 name: "trace_probe".to_string(),
4322 args: serde_json::value::RawValue::from_string(
4323 r#"{"path":"N-3"}"#.to_string(),
4324 )
4325 .expect("valid tool args"),
4326 meta: None,
4327 },
4328 ],
4329 StopReason::ToolUse,
4330 )),
4331 Message::tool_results(vec![ToolResult::new(
4332 "toolu_trace".to_string(),
4333 "trace complete".to_string(),
4334 false,
4335 )]),
4336 ];
4337
4338 let commit = session
4339 .commit_transcript_rewrite(
4340 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4341 replacement,
4342 TranscriptRewriteReason::new("compaction"),
4343 Some("unit-test".to_string()),
4344 Some(parent_revision.clone()),
4345 )
4346 .expect("rewrite should commit");
4347
4348 assert_eq!(commit.parent_revision, parent_revision);
4349 let current = session
4350 .transcript_revision_messages(&commit.revision)
4351 .expect("history state should decode")
4352 .expect("current revision should be retained");
4353 let Message::BlockAssistant(assistant) = ¤t[1] else {
4354 panic!("replacement should remain a block assistant message");
4355 };
4356 assert!(assistant.blocks.iter().any(|block| matches!(
4357 block,
4358 AssistantBlock::ToolUse { name, args, .. }
4359 if name == "trace_probe" && args.get().contains("\"N-3\"")
4360 )));
4361
4362 let parent = session
4363 .transcript_revision_messages(&parent_revision)
4364 .expect("history state should decode")
4365 .expect("parent revision should remain retained");
4366 assert!(matches!(
4367 &parent[1],
4368 Message::BlockAssistant(assistant)
4369 if block_assistant_text(assistant).contains("original assistant trace")
4370 ));
4371 }
4372
4373 #[test]
4374 fn transcript_rewrite_rejects_trailing_block_assistant_tool_call() {
4375 let mut session = Session::new();
4376 session.push(Message::User(UserMessage::text("question".to_string())));
4377 session.push(Message::BlockAssistant(BlockAssistantMessage {
4378 blocks: vec![AssistantBlock::Text {
4379 text: "plain answer".to_string(),
4380 meta: None,
4381 }],
4382 stop_reason: StopReason::EndTurn,
4383 created_at: crate::types::message_timestamp_now(),
4384 }));
4385 let parent_revision = session.transcript_revision().expect("parent revision");
4386
4387 let err = session
4388 .commit_transcript_rewrite(
4389 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4390 vec![Message::BlockAssistant(BlockAssistantMessage::new(
4391 vec![AssistantBlock::ToolUse {
4392 id: "toolu_1".to_string(),
4393 name: "lookup".to_string(),
4394 args: serde_json::value::RawValue::from_string("{}".to_string())
4395 .expect("valid args"),
4396 meta: None,
4397 }],
4398 StopReason::ToolUse,
4399 ))],
4400 TranscriptRewriteReason::new("compaction"),
4401 Some("unit-test".to_string()),
4402 Some(parent_revision),
4403 )
4404 .expect_err("rewrite should reject trailing unresolved block-assistant tool call");
4405 assert!(matches!(
4406 err,
4407 TranscriptEditError::InvalidTranscriptShape(_)
4408 ));
4409 }
4410
4411 #[test]
4412 fn transcript_rewrite_rejects_no_op_self_edge() {
4413 let mut session = Session::new();
4414 session.push(Message::User(UserMessage::text(
4415 "keep this exact transcript".to_string(),
4416 )));
4417 session.push(Message::BlockAssistant(BlockAssistantMessage {
4418 blocks: vec![AssistantBlock::Text {
4419 text: "unchanged".to_string(),
4420 meta: None,
4421 }],
4422 stop_reason: StopReason::EndTurn,
4423 created_at: crate::types::message_timestamp_now(),
4424 }));
4425
4426 let parent_revision = session.transcript_revision().expect("parent revision");
4427 let err = session
4428 .commit_transcript_rewrite(
4429 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4430 vec![session.messages()[1].clone()],
4431 TranscriptRewriteReason::new("retry"),
4432 Some("unit-test".to_string()),
4433 Some(parent_revision.clone()),
4434 )
4435 .expect_err("same-content rewrite should not emit a self-edge commit");
4436
4437 assert!(matches!(
4438 err,
4439 TranscriptEditError::NoOpRewrite { revision } if revision == parent_revision
4440 ));
4441 assert!(
4442 session
4443 .transcript_history_state()
4444 .expect("history state should decode")
4445 .is_none()
4446 );
4447 }
4448
4449 #[test]
4450 fn transcript_rewrite_run_boundary_guard_accepts_rewrite_then_append() {
4451 let mut original = Session::new();
4452 original.push(Message::User(UserMessage::text("question".to_string())));
4453 original.push(Message::BlockAssistant(BlockAssistantMessage {
4454 blocks: vec![AssistantBlock::Text {
4455 text: "verbose answer".to_string(),
4456 meta: None,
4457 }],
4458 stop_reason: StopReason::EndTurn,
4459 created_at: crate::types::message_timestamp_now(),
4460 }));
4461
4462 let parent_revision = original.transcript_revision().expect("parent revision");
4463 let mut incoming = original.clone();
4464 incoming
4465 .commit_transcript_rewrite(
4466 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4467 vec![Message::BlockAssistant(BlockAssistantMessage {
4468 blocks: vec![AssistantBlock::Text {
4469 text: "compact answer".to_string(),
4470 meta: None,
4471 }],
4472 stop_reason: StopReason::EndTurn,
4473 created_at: crate::types::message_timestamp_now(),
4474 })],
4475 TranscriptRewriteReason::new("compaction"),
4476 Some("unit-test".to_string()),
4477 Some(parent_revision),
4478 )
4479 .expect("rewrite should commit");
4480 incoming.push(Message::User(UserMessage::text("follow-up".to_string())));
4481 incoming.push(Message::BlockAssistant(BlockAssistantMessage {
4482 blocks: vec![AssistantBlock::Text {
4483 text: "follow-up answer".to_string(),
4484 meta: None,
4485 }],
4486 stop_reason: StopReason::EndTurn,
4487 created_at: crate::types::message_timestamp_now(),
4488 }));
4489
4490 crate::session_store::run_boundary_snapshot_save_guard(&incoming, Some(&original))
4491 .expect("rewrite plus appended turn should be a valid run-boundary commit");
4492 }
4493
4494 #[test]
4495 fn transcript_rewrite_rejects_orphaned_tool_results() {
4496 let mut session = Session::new();
4497 session.push(Message::User(UserMessage::text("use a tool".to_string())));
4498 session.push(Message::BlockAssistant(BlockAssistantMessage::new(
4499 vec![AssistantBlock::ToolUse {
4500 id: "toolu_1".to_string(),
4501 name: "lookup".to_string(),
4502 args: serde_json::value::RawValue::from_string("{}".to_string())
4503 .expect("valid args"),
4504 meta: None,
4505 }],
4506 StopReason::ToolUse,
4507 )));
4508 session.push(Message::tool_results(vec![ToolResult::new(
4509 "toolu_1".to_string(),
4510 "done".to_string(),
4511 false,
4512 )]));
4513 let parent_revision = session.transcript_revision().expect("parent revision");
4514
4515 let err = session
4516 .commit_transcript_rewrite(
4517 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4518 vec![Message::BlockAssistant(BlockAssistantMessage {
4519 blocks: vec![AssistantBlock::Text {
4520 text: "no tool after all".to_string(),
4521 meta: None,
4522 }],
4523 stop_reason: StopReason::EndTurn,
4524 created_at: crate::types::message_timestamp_now(),
4525 })],
4526 TranscriptRewriteReason::new("compaction"),
4527 Some("unit-test".to_string()),
4528 Some(parent_revision),
4529 )
4530 .expect_err("rewrite should reject stranded tool results");
4531 assert!(matches!(
4532 err,
4533 TranscriptEditError::InvalidTranscriptShape(_)
4534 ));
4535 }
4536
4537 #[test]
4538 fn transcript_rewrite_rejects_trailing_assistant_tool_call() {
4539 let mut session = Session::new();
4540 session.push(Message::User(UserMessage::text("question".to_string())));
4541 session.push(Message::BlockAssistant(BlockAssistantMessage {
4542 blocks: vec![AssistantBlock::Text {
4543 text: "plain answer".to_string(),
4544 meta: None,
4545 }],
4546 stop_reason: StopReason::EndTurn,
4547 created_at: crate::types::message_timestamp_now(),
4548 }));
4549 let parent_revision = session.transcript_revision().expect("parent revision");
4550
4551 let err = session
4552 .commit_transcript_rewrite(
4553 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4554 vec![Message::BlockAssistant(BlockAssistantMessage {
4555 blocks: vec![AssistantBlock::ToolUse {
4556 id: "toolu_1".to_string(),
4557 name: "lookup".to_string(),
4558 args: serde_json::value::RawValue::from_string("{}".to_string())
4559 .expect("valid args"),
4560 meta: None,
4561 }],
4562 stop_reason: StopReason::ToolUse,
4563 created_at: crate::types::message_timestamp_now(),
4564 })],
4565 TranscriptRewriteReason::new("compaction"),
4566 Some("unit-test".to_string()),
4567 Some(parent_revision),
4568 )
4569 .expect_err("rewrite should reject trailing unresolved tool call");
4570 assert!(matches!(
4571 err,
4572 TranscriptEditError::InvalidTranscriptShape(_)
4573 ));
4574 }
4575
4576 #[test]
4577 fn transcript_rewrite_rejects_duplicate_tool_results() {
4578 let mut session = Session::new();
4579 session.push(Message::User(UserMessage::text("use a tool".to_string())));
4580 session.push(Message::BlockAssistant(BlockAssistantMessage {
4581 blocks: vec![AssistantBlock::Text {
4582 text: "plain answer".to_string(),
4583 meta: None,
4584 }],
4585 stop_reason: StopReason::EndTurn,
4586 created_at: crate::types::message_timestamp_now(),
4587 }));
4588 let parent_revision = session.transcript_revision().expect("parent revision");
4589
4590 let err = session
4591 .commit_transcript_rewrite(
4592 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4593 vec![
4594 Message::BlockAssistant(BlockAssistantMessage::new(
4595 vec![AssistantBlock::ToolUse {
4596 id: "toolu_1".to_string(),
4597 name: "lookup".to_string(),
4598 args: serde_json::value::RawValue::from_string("{}".to_string())
4599 .expect("valid args"),
4600 meta: None,
4601 }],
4602 StopReason::ToolUse,
4603 )),
4604 Message::tool_results(vec![
4605 ToolResult::new("toolu_1".to_string(), "one".to_string(), false),
4606 ToolResult::new("toolu_1".to_string(), "two".to_string(), false),
4607 ]),
4608 ],
4609 TranscriptRewriteReason::new("compaction"),
4610 Some("unit-test".to_string()),
4611 Some(parent_revision),
4612 )
4613 .expect_err("rewrite should reject duplicate tool results");
4614 assert!(matches!(
4615 err,
4616 TranscriptEditError::InvalidTranscriptShape(_)
4617 ));
4618 }
4619
4620 #[test]
4621 fn transcript_rewrite_record_rejects_prefix_or_suffix_tampering() {
4622 let mut session = Session::new();
4623 session.push(Message::System(SystemMessage::new("keep prefix")));
4624 session.push(Message::BlockAssistant(BlockAssistantMessage {
4625 blocks: vec![AssistantBlock::Text {
4626 text: "verbose answer".to_string(),
4627 meta: None,
4628 }],
4629 stop_reason: StopReason::EndTurn,
4630 created_at: crate::types::message_timestamp_now(),
4631 }));
4632 session.push(Message::User(UserMessage::text("keep suffix".to_string())));
4633
4634 let parent_revision = session.transcript_revision().expect("parent revision");
4635 let commit = session
4636 .commit_transcript_rewrite(
4637 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4638 vec![Message::BlockAssistant(BlockAssistantMessage {
4639 blocks: vec![AssistantBlock::Text {
4640 text: "compact answer".to_string(),
4641 meta: None,
4642 }],
4643 stop_reason: StopReason::EndTurn,
4644 created_at: crate::types::message_timestamp_now(),
4645 })],
4646 TranscriptRewriteReason::new("compaction"),
4647 Some("unit-test".to_string()),
4648 Some(parent_revision),
4649 )
4650 .expect("rewrite should commit");
4651 let state = session
4652 .transcript_history_state()
4653 .expect("history state should decode")
4654 .expect("history state should exist");
4655 let parent_body = state
4656 .revisions
4657 .iter()
4658 .find(|body| body.revision == commit.parent_revision)
4659 .expect("parent body retained")
4660 .clone();
4661 let revision_body = state
4662 .revisions
4663 .iter()
4664 .find(|body| body.revision == commit.revision)
4665 .expect("revision body retained")
4666 .clone();
4667
4668 let mut forged_body = revision_body;
4669 forged_body.messages[0] = Message::System(SystemMessage::new("tampered prefix"));
4670 forged_body.revision =
4671 transcript_messages_digest(&forged_body.messages).expect("forged digest");
4672 let mut forged_commit = commit;
4673 forged_commit.revision = forged_body.revision.clone();
4674 let err = TranscriptRewriteRecord::new(forged_commit, parent_body, forged_body)
4675 .expect_err("record validation must reject changes outside selected span");
4676 assert!(
4677 err.to_string().contains("before the selected span"),
4678 "unexpected error: {err}"
4679 );
4680 }
4681
4682 #[test]
4683 fn transcript_rewrite_replay_allows_normal_turn_revisions_between_rewrites() {
4684 let mut session = Session::new();
4685 session.push(Message::User(UserMessage::text("first".to_string())));
4686 session.push(Message::BlockAssistant(BlockAssistantMessage {
4687 blocks: vec![AssistantBlock::Text {
4688 text: "verbose first answer".to_string(),
4689 meta: None,
4690 }],
4691 stop_reason: StopReason::EndTurn,
4692 created_at: crate::types::message_timestamp_now(),
4693 }));
4694
4695 let first_parent = session.transcript_revision().expect("first parent");
4696 let first_commit = session
4697 .commit_transcript_rewrite(
4698 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4699 vec![Message::BlockAssistant(BlockAssistantMessage {
4700 blocks: vec![AssistantBlock::Text {
4701 text: "compact first answer".to_string(),
4702 meta: None,
4703 }],
4704 stop_reason: StopReason::EndTurn,
4705 created_at: crate::types::message_timestamp_now(),
4706 })],
4707 TranscriptRewriteReason::new("compaction"),
4708 Some("unit-test".to_string()),
4709 Some(first_parent),
4710 )
4711 .expect("first rewrite");
4712
4713 session.push(Message::User(UserMessage::text("normal turn".to_string())));
4714 session.push(Message::BlockAssistant(BlockAssistantMessage {
4715 blocks: vec![AssistantBlock::Text {
4716 text: "verbose second answer".to_string(),
4717 meta: None,
4718 }],
4719 stop_reason: StopReason::EndTurn,
4720 created_at: crate::types::message_timestamp_now(),
4721 }));
4722 let bridge_parent = session
4723 .transcript_revision()
4724 .expect("normal turn should advance transcript head");
4725 assert_ne!(bridge_parent, first_commit.revision);
4726 validate_transcript_history_state(
4727 &session
4728 .transcript_history_state()
4729 .expect("history state should decode")
4730 .expect("history state should exist"),
4731 )
4732 .expect("normal turn head may legitimately differ from last rewrite commit");
4733
4734 let second_commit = session
4735 .commit_transcript_rewrite(
4736 TranscriptRewriteSelection::MessageRange { start: 3, end: 4 },
4737 vec![Message::BlockAssistant(BlockAssistantMessage {
4738 blocks: vec![AssistantBlock::Text {
4739 text: "compact second answer".to_string(),
4740 meta: None,
4741 }],
4742 stop_reason: StopReason::EndTurn,
4743 created_at: crate::types::message_timestamp_now(),
4744 })],
4745 TranscriptRewriteReason::new("compaction"),
4746 Some("unit-test".to_string()),
4747 Some(bridge_parent.clone()),
4748 )
4749 .expect("second rewrite");
4750
4751 let state = session
4752 .transcript_history_state()
4753 .expect("history state should decode")
4754 .expect("history state should exist");
4755 let records = state.commits.iter().map(|commit| {
4756 let parent_body = state
4757 .revisions
4758 .iter()
4759 .find(|body| body.revision == commit.parent_revision)
4760 .expect("parent body retained")
4761 .clone();
4762 let revision_body = state
4763 .revisions
4764 .iter()
4765 .find(|body| body.revision == commit.revision)
4766 .expect("revision body retained")
4767 .clone();
4768 TranscriptRewriteRecord::new(commit.clone(), parent_body, revision_body)
4769 .expect("record should validate")
4770 });
4771
4772 let replayed = TranscriptHistoryState::from_rewrite_records(records)
4773 .expect("rewrite replay should accept normal-turn bridge revisions")
4774 .expect("rewrite records should exist");
4775 assert_eq!(replayed.head, second_commit.revision);
4776 assert!(
4777 replayed
4778 .revisions
4779 .iter()
4780 .any(|body| body.revision == bridge_parent)
4781 );
4782 }
4783
4784 #[test]
4785 fn transcript_rewrite_replay_rejects_branched_rewrite_records() {
4786 let mut base = Session::new();
4787 base.push(Message::User(UserMessage::text("question".to_string())));
4788 base.push(Message::BlockAssistant(BlockAssistantMessage {
4789 blocks: vec![AssistantBlock::Text {
4790 text: "verbose answer".to_string(),
4791 meta: None,
4792 }],
4793 stop_reason: StopReason::EndTurn,
4794 created_at: crate::types::message_timestamp_now(),
4795 }));
4796 let parent = base.transcript_revision().expect("parent revision");
4797
4798 let mut first = base.clone();
4799 let first_commit = first
4800 .commit_transcript_rewrite(
4801 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4802 vec![Message::BlockAssistant(BlockAssistantMessage {
4803 blocks: vec![AssistantBlock::Text {
4804 text: "first compact answer".to_string(),
4805 meta: None,
4806 }],
4807 stop_reason: StopReason::EndTurn,
4808 created_at: crate::types::message_timestamp_now(),
4809 })],
4810 TranscriptRewriteReason::new("compaction"),
4811 Some("unit-test".to_string()),
4812 Some(parent.clone()),
4813 )
4814 .expect("first rewrite");
4815 let first_state = first
4816 .transcript_history_state()
4817 .expect("first state decodes")
4818 .expect("first state exists");
4819
4820 let mut second = base;
4821 let second_commit = second
4822 .commit_transcript_rewrite(
4823 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4824 vec![Message::BlockAssistant(BlockAssistantMessage {
4825 blocks: vec![AssistantBlock::Text {
4826 text: "second compact answer".to_string(),
4827 meta: None,
4828 }],
4829 stop_reason: StopReason::EndTurn,
4830 created_at: crate::types::message_timestamp_now(),
4831 })],
4832 TranscriptRewriteReason::new("compaction"),
4833 Some("unit-test".to_string()),
4834 Some(parent),
4835 )
4836 .expect("second rewrite");
4837 let second_state = second
4838 .transcript_history_state()
4839 .expect("second state decodes")
4840 .expect("second state exists");
4841
4842 let record = |state: &TranscriptHistoryState, commit: &TranscriptRewriteCommit| {
4843 let parent_body = state
4844 .revisions
4845 .iter()
4846 .find(|body| body.revision == commit.parent_revision)
4847 .expect("parent body retained")
4848 .clone();
4849 let revision_body = state
4850 .revisions
4851 .iter()
4852 .find(|body| body.revision == commit.revision)
4853 .expect("revision body retained")
4854 .clone();
4855 TranscriptRewriteRecord::new(commit.clone(), parent_body, revision_body)
4856 .expect("record should validate")
4857 };
4858
4859 let err = TranscriptHistoryState::from_rewrite_records(vec![
4860 record(&first_state, &first_commit),
4861 record(&second_state, &second_commit),
4862 ])
4863 .expect_err("branched rewrite records must not replay as a linear source history");
4864 assert!(
4865 err.to_string().contains("does not extend transcript head"),
4866 "unexpected error: {err}"
4867 );
4868 }
4869
4870 #[test]
4871 fn internal_message_rewrites_refresh_transcript_history_head() {
4872 let mut session = Session::new();
4873 session.push(Message::User(UserMessage::text("question".to_string())));
4874 session.push(Message::BlockAssistant(BlockAssistantMessage {
4875 blocks: vec![AssistantBlock::Text {
4876 text: "verbose answer".to_string(),
4877 meta: None,
4878 }],
4879 stop_reason: StopReason::EndTurn,
4880 created_at: crate::types::message_timestamp_now(),
4881 }));
4882
4883 let parent = session.transcript_revision().expect("parent revision");
4884 session
4885 .commit_transcript_rewrite(
4886 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4887 vec![Message::BlockAssistant(BlockAssistantMessage {
4888 blocks: vec![AssistantBlock::Text {
4889 text: "compact answer".to_string(),
4890 meta: None,
4891 }],
4892 stop_reason: StopReason::EndTurn,
4893 created_at: crate::types::message_timestamp_now(),
4894 })],
4895 TranscriptRewriteReason::new("compaction"),
4896 Some("unit-test".to_string()),
4897 Some(parent),
4898 )
4899 .expect("rewrite should commit");
4900
4901 session.push(Message::User(UserMessage::text(
4902 "notice-bearing turn".to_string(),
4903 )));
4904 session
4905 .retain_messages_internal(
4906 |message| {
4907 !matches!(
4908 message,
4909 Message::User(user)
4910 if user.content.iter().any(|block| matches!(
4911 block,
4912 ContentBlock::Text { text } if text.contains("notice-bearing")
4913 ))
4914 )
4915 },
4916 TranscriptRewriteReason::new("synthetic_notice_cleanup"),
4917 )
4918 .expect("retain should commit internal rewrite");
4919 let retained_digest =
4920 transcript_messages_digest(session.messages()).expect("retained digest");
4921 assert_eq!(
4922 session.transcript_revision().expect("retained head"),
4923 retained_digest
4924 );
4925
4926 session
4927 .replace_messages_internal(
4928 vec![
4929 Message::User(UserMessage::text("compacted question".to_string())),
4930 Message::BlockAssistant(BlockAssistantMessage {
4931 blocks: vec![AssistantBlock::Text {
4932 text: "compacted answer".to_string(),
4933 meta: None,
4934 }],
4935 stop_reason: StopReason::EndTurn,
4936 created_at: crate::types::message_timestamp_now(),
4937 }),
4938 ],
4939 TranscriptRewriteReason::new("compaction"),
4940 )
4941 .expect("replace should commit internal rewrite");
4942 let replaced_digest =
4943 transcript_messages_digest(session.messages()).expect("replaced digest");
4944 assert_eq!(
4945 session.transcript_revision().expect("replaced head"),
4946 replaced_digest
4947 );
4948 let state = session
4949 .transcript_history_state()
4950 .expect("history state should decode")
4951 .expect("history state should exist");
4952 assert!(
4953 state
4954 .revisions
4955 .iter()
4956 .any(|body| body.revision == replaced_digest)
4957 );
4958 validate_transcript_history_state(&state).expect("history state remains valid");
4959 }
4960
4961 #[test]
4962 fn set_system_prompt_refreshes_transcript_history_head_after_rewrite() {
4963 let mut session = Session::new();
4964 session.push(Message::User(UserMessage::text("question".to_string())));
4965 session.push(Message::BlockAssistant(BlockAssistantMessage {
4966 blocks: vec![AssistantBlock::Text {
4967 text: "verbose answer".to_string(),
4968 meta: None,
4969 }],
4970 stop_reason: StopReason::EndTurn,
4971 created_at: crate::types::message_timestamp_now(),
4972 }));
4973
4974 let parent = session.transcript_revision().expect("parent revision");
4975 let rewrite = session
4976 .commit_transcript_rewrite(
4977 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
4978 vec![Message::BlockAssistant(BlockAssistantMessage {
4979 blocks: vec![AssistantBlock::Text {
4980 text: "compact answer".to_string(),
4981 meta: None,
4982 }],
4983 stop_reason: StopReason::EndTurn,
4984 created_at: crate::types::message_timestamp_now(),
4985 })],
4986 TranscriptRewriteReason::new("compaction"),
4987 Some("unit-test".to_string()),
4988 Some(parent),
4989 )
4990 .expect("rewrite should commit");
4991
4992 session.set_system_prompt("durable system prompt".to_string());
4993
4994 let head = session
4995 .transcript_revision()
4996 .expect("system prompt should refresh transcript head");
4997 assert_ne!(head, rewrite.revision);
4998 assert_eq!(
4999 head,
5000 transcript_messages_digest(session.messages()).expect("current digest")
5001 );
5002 let head_messages = session
5003 .transcript_revision_messages(&head)
5004 .expect("history state should decode")
5005 .expect("refreshed head body should be retained");
5006 assert_eq!(
5007 serde_json::to_value(&head_messages).expect("head serializes"),
5008 serde_json::to_value(session.messages()).expect("session serializes")
5009 );
5010 validate_transcript_history_state(
5011 &session
5012 .transcript_history_state()
5013 .expect("history state should decode")
5014 .expect("history state should exist"),
5015 )
5016 .expect("history state remains valid after system prompt update");
5017 }
5018
5019 #[test]
5020 fn apply_transcript_history_state_uses_latest_commit_time_for_restored_head() {
5021 let mut session = Session::new();
5022 session.push(Message::User(UserMessage::text("question".to_string())));
5023 session.push(Message::BlockAssistant(BlockAssistantMessage {
5024 blocks: vec![AssistantBlock::Text {
5025 text: "verbose answer".to_string(),
5026 meta: None,
5027 }],
5028 stop_reason: StopReason::EndTurn,
5029 created_at: crate::types::message_timestamp_now(),
5030 }));
5031 let original_messages = session.messages().to_vec();
5032 let parent = session.transcript_revision().expect("parent revision");
5033 let compact = session
5034 .commit_transcript_rewrite(
5035 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
5036 vec![Message::BlockAssistant(BlockAssistantMessage {
5037 blocks: vec![AssistantBlock::Text {
5038 text: "compact answer".to_string(),
5039 meta: None,
5040 }],
5041 stop_reason: StopReason::EndTurn,
5042 created_at: crate::types::message_timestamp_now(),
5043 })],
5044 TranscriptRewriteReason::new("compaction"),
5045 Some("unit-test".to_string()),
5046 Some(parent.clone()),
5047 )
5048 .expect("rewrite should commit");
5049
5050 std::thread::sleep(std::time::Duration::from_millis(2));
5051 let restore = session
5052 .commit_transcript_rewrite(
5053 TranscriptRewriteSelection::MessageRange {
5054 start: 0,
5055 end: session.messages().len(),
5056 },
5057 original_messages.clone(),
5058 TranscriptRewriteReason::new("restore"),
5059 Some("unit-test".to_string()),
5060 Some(compact.revision),
5061 )
5062 .expect("restore should commit");
5063 assert_eq!(restore.revision, parent);
5064
5065 let state = session
5066 .transcript_history_state()
5067 .expect("history state should decode")
5068 .expect("history state should exist");
5069 let restored_body_created_at = state
5070 .revisions
5071 .iter()
5072 .find(|body| body.revision == restore.revision)
5073 .expect("restored body should be retained")
5074 .created_at;
5075 assert!(
5076 restored_body_created_at < restore.committed_at,
5077 "test requires restore commit to be newer than retained body"
5078 );
5079
5080 let mut replayed = Session::new();
5081 replayed
5082 .apply_transcript_history_state(state)
5083 .expect("replay should materialize restored head");
5084 assert_eq!(
5085 serde_json::to_value(replayed.messages()).expect("replayed serializes"),
5086 serde_json::to_value(&original_messages).expect("original serializes")
5087 );
5088 assert_eq!(replayed.updated_at(), restore.committed_at);
5089 }
5090
5091 #[test]
5092 fn test_session_new() {
5093 let session = Session::new();
5094 assert_eq!(session.version(), SESSION_VERSION);
5095 assert!(session.messages().is_empty());
5096 assert!(session.created_at() <= session.updated_at());
5097 }
5098
5099 #[test]
5100 fn llm_identity_model_override_switches_to_catalog_provider() {
5101 let registry = crate::ModelRegistry::from_config(
5102 &crate::Config::default(),
5103 *crate::model_profile::test_catalog::TEST_CATALOG,
5104 )
5105 .unwrap();
5106 let current = SessionLlmIdentity {
5107 model: "test-anthropic-default".to_string(),
5108 provider: Provider::Anthropic,
5109 self_hosted_server_id: None,
5110 provider_params: None,
5111 auth_binding: Some(crate::AuthBindingRef {
5112 realm: crate::RealmId::parse("tenant_a").unwrap(),
5113 binding: crate::BindingId::parse("anthropic_default").unwrap(),
5114 profile: None,
5115 origin: crate::BindingOrigin::Configured,
5116 }),
5117 };
5118
5119 let resolved = resolve_session_llm_identity_override(
5120 ¤t,
5121 ®istry,
5122 SessionLlmIdentityOverride {
5123 model: Some("test-openai-default"),
5124 provider: None,
5125 provider_params: None,
5126 auth_binding: None,
5127 },
5128 )
5129 .unwrap();
5130
5131 assert_eq!(resolved.model, "test-openai-default");
5132 assert_eq!(resolved.provider, Provider::OpenAI);
5133 assert!(
5134 resolved.auth_binding.is_none(),
5135 "provider switches must not inherit a binding from the previous provider"
5136 );
5137 }
5138
5139 #[test]
5140 fn llm_identity_model_override_keeps_uncatalogued_model_on_current_provider() {
5141 let registry = crate::ModelRegistry::from_config(
5142 &crate::Config::default(),
5143 *crate::model_profile::test_catalog::TEST_CATALOG,
5144 )
5145 .unwrap();
5146 let current = SessionLlmIdentity {
5147 model: "custom-model".to_string(),
5148 provider: Provider::Anthropic,
5149 self_hosted_server_id: None,
5150 provider_params: None,
5151 auth_binding: None,
5152 };
5153
5154 let resolved = resolve_session_llm_identity_override(
5155 ¤t,
5156 ®istry,
5157 SessionLlmIdentityOverride {
5158 model: Some("uncatalogued-custom-model"),
5159 provider: None,
5160 provider_params: None,
5161 auth_binding: None,
5162 },
5163 )
5164 .unwrap();
5165
5166 assert_eq!(resolved.model, "uncatalogued-custom-model");
5167 assert_eq!(resolved.provider, Provider::Anthropic);
5168 }
5169
5170 #[test]
5171 fn realtime_transcript_append_is_idempotent_by_provider_item_and_delta_id() {
5172 let mut session = Session::new();
5173
5174 let user = RealtimeTranscriptEvent::UserTranscriptFinal {
5175 item_id: "item_user".to_string(),
5176 previous_item_id: None,
5177 content_index: 0,
5178 text: "hello".to_string(),
5179 };
5180 assert!(
5181 !session
5182 .append_realtime_transcript_event(user.clone())
5183 .is_inert()
5184 );
5185 assert!(session.append_realtime_transcript_event(user).is_inert());
5186
5187 let delta = RealtimeTranscriptEvent::AssistantTextDelta {
5188 response_id: "resp_assistant".to_string(),
5189 delta_id: "evt_delta_1".to_string(),
5190 item_id: "item_assistant".to_string(),
5191 previous_item_id: Some("item_user".to_string()),
5192 content_index: 0,
5193 delta: "hi".to_string(),
5194 };
5195 assert!(
5196 session
5197 .append_realtime_transcript_event(delta.clone())
5198 .is_inert()
5199 );
5200 assert!(session.append_realtime_transcript_event(delta).is_inert());
5201
5202 let terminal = RealtimeTranscriptEvent::AssistantTurnCompleted {
5203 response_id: "resp_assistant".to_string(),
5204 stop_reason: StopReason::EndTurn,
5205 usage: Usage::default(),
5206 };
5207 assert!(
5208 !session
5209 .append_realtime_transcript_event(terminal.clone())
5210 .is_inert()
5211 );
5212 assert!(
5213 session
5214 .append_realtime_transcript_event(terminal)
5215 .is_inert()
5216 );
5217
5218 assert_eq!(session.messages().len(), 2);
5219 assert!(matches!(
5220 &session.messages()[0],
5221 Message::User(user) if user.text_content() == "hello"
5222 ));
5223 assert!(matches!(
5224 &session.messages()[1],
5225 Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "hi"
5226 ));
5227 }
5228
5229 #[test]
5234 fn realtime_transcript_final_text_overrides_partial_delta_and_promotes_to_spoken_lane() {
5235 let mut session = Session::new();
5236
5237 assert!(
5240 session
5241 .append_realtime_transcript_event(
5242 RealtimeTranscriptEvent::AssistantTranscriptDelta {
5243 response_id: "resp_a".to_string(),
5244 delta_id: "evt_1".to_string(),
5245 item_id: "item_a".to_string(),
5246 previous_item_id: None,
5247 content_index: 0,
5248 delta: "incom".to_string(),
5249 }
5250 )
5251 .is_inert()
5252 );
5253
5254 assert!(
5256 session
5257 .append_realtime_transcript_event(
5258 RealtimeTranscriptEvent::AssistantTranscriptFinalText {
5259 response_id: "resp_a".to_string(),
5260 item_id: "item_a".to_string(),
5261 content_index: 0,
5262 text: "complete answer".to_string(),
5263 }
5264 )
5265 .is_inert()
5266 );
5267
5268 let outcome = session.append_realtime_transcript_event(
5270 RealtimeTranscriptEvent::AssistantTurnCompleted {
5271 response_id: "resp_a".to_string(),
5272 stop_reason: StopReason::EndTurn,
5273 usage: Usage::default(),
5274 },
5275 );
5276 assert!(!outcome.is_inert());
5277
5278 assert_eq!(session.messages().len(), 1);
5281 match &session.messages()[0] {
5282 Message::BlockAssistant(assistant) => {
5283 let mut found_transcript = false;
5284 for block in &assistant.blocks {
5285 if let AssistantBlock::Transcript { text, .. } = block {
5286 assert_eq!(text, "complete answer");
5287 found_transcript = true;
5288 }
5289 }
5290 assert!(
5291 found_transcript,
5292 "AssistantTranscriptFinalText must promote to the Spoken lane and \
5293 materialize as AssistantBlock::Transcript"
5294 );
5295 }
5296 other => unreachable!("expected BlockAssistant, got {other:?}"),
5297 }
5298 }
5299
5300 #[test]
5303 fn realtime_transcript_final_text_creates_item_when_no_delta_staged() {
5304 let mut session = Session::new();
5305
5306 assert!(
5307 session
5308 .append_realtime_transcript_event(
5309 RealtimeTranscriptEvent::AssistantTranscriptFinalText {
5310 response_id: "resp_a".to_string(),
5311 item_id: "item_a".to_string(),
5312 content_index: 0,
5313 text: "spoken-final-only".to_string(),
5314 }
5315 )
5316 .is_inert()
5317 );
5318
5319 let outcome = session.append_realtime_transcript_event(
5320 RealtimeTranscriptEvent::AssistantTurnCompleted {
5321 response_id: "resp_a".to_string(),
5322 stop_reason: StopReason::EndTurn,
5323 usage: Usage::default(),
5324 },
5325 );
5326 assert!(!outcome.is_inert());
5327
5328 assert_eq!(session.messages().len(), 1);
5329 match &session.messages()[0] {
5330 Message::BlockAssistant(assistant) => {
5331 let has_transcript = assistant.blocks.iter().any(|b| {
5332 matches!(b, AssistantBlock::Transcript { text, .. } if text == "spoken-final-only")
5333 });
5334 assert!(
5335 has_transcript,
5336 "final-only provider path must materialize as Transcript on the Spoken lane"
5337 );
5338 }
5339 other => unreachable!("expected BlockAssistant, got {other:?}"),
5340 }
5341 }
5342
5343 #[test]
5344 fn realtime_transcript_append_orders_causally_equivalent_out_of_order_items() {
5345 let mut session = Session::new();
5346
5347 assert!(
5348 session
5349 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5350 response_id: "resp_assistant".to_string(),
5351 delta_id: "evt_delta_1".to_string(),
5352 item_id: "item_assistant".to_string(),
5353 previous_item_id: Some("item_user".to_string()),
5354 content_index: 0,
5355 delta: "answer".to_string(),
5356 })
5357 .is_inert()
5358 );
5359 assert!(
5360 session
5361 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
5362 response_id: "resp_assistant".to_string(),
5363 stop_reason: StopReason::EndTurn,
5364 usage: Usage::default(),
5365 })
5366 .is_inert()
5367 );
5368
5369 let outcome = session.append_realtime_transcript_event(
5370 RealtimeTranscriptEvent::UserTranscriptFinal {
5371 item_id: "item_user".to_string(),
5372 previous_item_id: None,
5373 content_index: 0,
5374 text: "question".to_string(),
5375 },
5376 );
5377
5378 assert_eq!(outcome.materialized_messages.len(), 2);
5379 assert_eq!(session.messages().len(), 2);
5380 assert!(matches!(
5381 &session.messages()[0],
5382 Message::User(user) if user.text_content() == "question"
5383 ));
5384 assert!(matches!(
5385 &session.messages()[1],
5386 Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "answer"
5387 ));
5388 }
5389
5390 #[test]
5391 fn realtime_transcript_replay_of_seen_provider_items_is_inert() {
5392 let mut session = Session::new();
5393 let events = vec![
5394 RealtimeTranscriptEvent::UserTranscriptFinal {
5395 item_id: "item_user".to_string(),
5396 previous_item_id: None,
5397 content_index: 0,
5398 text: "hello".to_string(),
5399 },
5400 RealtimeTranscriptEvent::AssistantTextDelta {
5401 response_id: "resp_assistant".to_string(),
5402 delta_id: "evt_delta_1".to_string(),
5403 item_id: "item_assistant".to_string(),
5404 previous_item_id: Some("item_user".to_string()),
5405 content_index: 0,
5406 delta: "world".to_string(),
5407 },
5408 RealtimeTranscriptEvent::AssistantTurnCompleted {
5409 response_id: "resp_assistant".to_string(),
5410 stop_reason: StopReason::EndTurn,
5411 usage: Usage::default(),
5412 },
5413 ];
5414
5415 for event in events.iter().cloned() {
5416 let _ = session.append_realtime_transcript_event(event);
5417 }
5418 let first_messages = serde_json::to_value(session.messages()).unwrap();
5419
5420 for event in events {
5421 assert!(session.append_realtime_transcript_event(event).is_inert());
5422 }
5423
5424 assert_eq!(
5425 serde_json::to_value(session.messages()).unwrap(),
5426 first_messages
5427 );
5428 }
5429
5430 #[test]
5431 fn realtime_transcript_user_final_replay_cannot_erase_existing_segment() {
5432 let mut session = Session::new();
5433
5434 let user = RealtimeTranscriptEvent::UserTranscriptFinal {
5435 item_id: "item_user".to_string(),
5436 previous_item_id: None,
5437 content_index: 0,
5438 text: "remember amber lantern".to_string(),
5439 };
5440 assert!(
5441 !session
5442 .append_realtime_transcript_event(user.clone())
5443 .is_inert()
5444 );
5445 let first_messages = serde_json::to_value(session.messages()).unwrap();
5446
5447 assert!(
5448 session
5449 .append_realtime_transcript_event(RealtimeTranscriptEvent::UserTranscriptFinal {
5450 item_id: "item_user".to_string(),
5451 previous_item_id: None,
5452 content_index: 0,
5453 text: String::new(),
5454 })
5455 .is_inert()
5456 );
5457 assert!(session.append_realtime_transcript_event(user).is_inert());
5458 assert_eq!(
5459 serde_json::to_value(session.messages()).unwrap(),
5460 first_messages
5461 );
5462 }
5463
5464 #[test]
5465 fn realtime_transcript_empty_user_final_can_be_filled_by_later_nonempty_replay() {
5466 let mut session = Session::new();
5467
5468 assert!(
5469 session
5470 .append_realtime_transcript_event(RealtimeTranscriptEvent::UserTranscriptFinal {
5471 item_id: "item_user".to_string(),
5472 previous_item_id: None,
5473 content_index: 0,
5474 text: String::new(),
5475 })
5476 .is_inert()
5477 );
5478 assert!(session.messages().is_empty());
5479
5480 let outcome = session.append_realtime_transcript_event(
5481 RealtimeTranscriptEvent::UserTranscriptFinal {
5482 item_id: "item_user".to_string(),
5483 previous_item_id: None,
5484 content_index: 0,
5485 text: "remember amber lantern".to_string(),
5486 },
5487 );
5488 assert_eq!(outcome.materialized_messages.len(), 1);
5489 assert_eq!(session.messages().len(), 1);
5490 assert!(matches!(
5491 &session.messages()[0],
5492 Message::User(user) if user.text_content() == "remember amber lantern"
5493 ));
5494 }
5495
5496 #[test]
5497 fn realtime_transcript_skipped_provider_items_preserve_causal_order_without_content() {
5498 let mut session = Session::new();
5499
5500 let assistant_delta = RealtimeTranscriptEvent::AssistantTextDelta {
5501 response_id: "resp_assistant".to_string(),
5502 delta_id: "evt_delta_1".to_string(),
5503 item_id: "item_assistant".to_string(),
5504 previous_item_id: Some("item_tool".to_string()),
5505 content_index: 0,
5506 delta: "done".to_string(),
5507 };
5508 assert!(
5509 session
5510 .append_realtime_transcript_event(assistant_delta.clone())
5511 .is_inert()
5512 );
5513 let assistant_complete = RealtimeTranscriptEvent::AssistantTurnCompleted {
5514 response_id: "resp_assistant".to_string(),
5515 stop_reason: StopReason::EndTurn,
5516 usage: Usage::default(),
5517 };
5518 assert!(
5519 session
5520 .append_realtime_transcript_event(assistant_complete.clone())
5521 .is_inert()
5522 );
5523
5524 let skipped = RealtimeTranscriptEvent::ItemSkipped {
5525 item_id: "item_tool".to_string(),
5526 previous_item_id: Some("item_user".to_string()),
5527 };
5528 assert!(
5529 session
5530 .append_realtime_transcript_event(skipped.clone())
5531 .is_inert(),
5532 "a skipped provider item must not append transcript content"
5533 );
5534 assert!(session.messages().is_empty());
5535
5536 let outcome = session.append_realtime_transcript_event(
5537 RealtimeTranscriptEvent::UserTranscriptFinal {
5538 item_id: "item_user".to_string(),
5539 previous_item_id: None,
5540 content_index: 0,
5541 text: "please use the tool".to_string(),
5542 },
5543 );
5544 assert_eq!(outcome.materialized_messages.len(), 2);
5545 assert_eq!(session.messages().len(), 2);
5546 assert!(matches!(
5547 &session.messages()[0],
5548 Message::User(user) if user.text_content() == "please use the tool"
5549 ));
5550 assert!(matches!(
5551 &session.messages()[1],
5552 Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "done"
5553 ));
5554
5555 let first_messages = serde_json::to_value(session.messages()).unwrap();
5556 assert!(session.append_realtime_transcript_event(skipped).is_inert());
5557 assert!(
5558 session
5559 .append_realtime_transcript_event(assistant_delta)
5560 .is_inert()
5561 );
5562 assert!(
5563 session
5564 .append_realtime_transcript_event(assistant_complete)
5565 .is_inert()
5566 );
5567 assert_eq!(
5568 serde_json::to_value(session.messages()).unwrap(),
5569 first_messages
5570 );
5571 }
5572
5573 #[test]
5574 fn realtime_transcript_interrupted_assistant_item_unblocks_later_provider_items() {
5575 let mut session = Session::new();
5582
5583 let _ = session.append_realtime_transcript_event(
5584 RealtimeTranscriptEvent::UserTranscriptFinal {
5585 item_id: "item_repeat".to_string(),
5586 previous_item_id: None,
5587 content_index: 0,
5588 text: "repeat until stop".to_string(),
5589 },
5590 );
5591 assert!(
5592 session
5593 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5594 response_id: "resp_loop".to_string(),
5595 delta_id: "evt_loop_1".to_string(),
5596 item_id: "item_loop".to_string(),
5597 previous_item_id: Some("item_repeat".to_string()),
5598 content_index: 0,
5599 delta: "Looping now".to_string(),
5600 })
5601 .is_inert()
5602 );
5603 assert!(
5604 session
5605 .append_realtime_transcript_event(RealtimeTranscriptEvent::UserTranscriptFinal {
5606 item_id: "item_stop".to_string(),
5607 previous_item_id: Some("item_loop".to_string()),
5608 content_index: 0,
5609 text: "Stop.".to_string(),
5610 })
5611 .is_inert(),
5612 "the stop turn waits until the interrupted assistant provider item is resolved"
5613 );
5614
5615 let outcome = session.append_realtime_transcript_event(
5616 RealtimeTranscriptEvent::AssistantTurnInterrupted {
5617 response_id: "resp_loop".to_string(),
5618 },
5619 );
5620
5621 assert_eq!(outcome.materialized_messages.len(), 2);
5624 assert_eq!(session.messages().len(), 3);
5626 assert!(matches!(
5627 &session.messages()[0],
5628 Message::User(user) if user.text_content() == "repeat until stop"
5629 ));
5630 match &session.messages()[1] {
5631 Message::BlockAssistant(assistant) => {
5632 let text = block_assistant_text(assistant);
5633 assert_eq!(text, "Looping now");
5634 }
5635 other => unreachable!(
5636 "Display lane assistant item must be retained on Interrupted, got {other:?}"
5637 ),
5638 }
5639 assert!(matches!(
5640 &session.messages()[2],
5641 Message::User(user) if user.text_content() == "Stop."
5642 ));
5643 }
5644
5645 #[test]
5646 fn realtime_transcript_late_interrupted_assistant_delta_stays_noncanonical() {
5647 let mut session = Session::new();
5648
5649 let _ = session.append_realtime_transcript_event(
5650 RealtimeTranscriptEvent::UserTranscriptFinal {
5651 item_id: "item_repeat".to_string(),
5652 previous_item_id: None,
5653 content_index: 0,
5654 text: "repeat until stop".to_string(),
5655 },
5656 );
5657 assert!(
5658 session
5659 .append_realtime_transcript_event(RealtimeTranscriptEvent::ItemObserved {
5660 item_id: "item_loop".to_string(),
5661 previous_item_id: Some("item_repeat".to_string()),
5662 role: RealtimeTranscriptRole::Assistant,
5663 response_id: None,
5664 })
5665 .is_inert(),
5666 "provider can observe an assistant item before the adapter learns its response id"
5667 );
5668 assert!(
5669 session
5670 .append_realtime_transcript_event(
5671 RealtimeTranscriptEvent::AssistantTurnInterrupted {
5672 response_id: "resp_loop".to_string(),
5673 }
5674 )
5675 .is_inert(),
5676 "an interruption can arrive before delayed transcript deltas for the response"
5677 );
5678 assert!(
5679 session
5680 .append_realtime_transcript_event(RealtimeTranscriptEvent::UserTranscriptFinal {
5681 item_id: "item_stop".to_string(),
5682 previous_item_id: Some("item_loop".to_string()),
5683 content_index: 0,
5684 text: "Stop.".to_string(),
5685 })
5686 .is_inert(),
5687 "the stop turn waits for the provider's interrupted assistant item anchor"
5688 );
5689
5690 let late_delta_outcome =
5691 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5692 response_id: "resp_loop".to_string(),
5693 delta_id: "evt_loop_late".to_string(),
5694 item_id: "item_loop".to_string(),
5695 previous_item_id: Some("item_repeat".to_string()),
5696 content_index: 0,
5697 delta: "Looping now".to_string(),
5698 });
5699 assert_eq!(late_delta_outcome.materialized_messages.len(), 1);
5700 assert!(matches!(
5701 &session.messages()[1],
5702 Message::User(user) if user.text_content() == "Stop."
5703 ));
5704 assert!(
5705 session
5706 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
5707 response_id: "resp_loop".to_string(),
5708 stop_reason: StopReason::EndTurn,
5709 usage: Usage::default(),
5710 })
5711 .is_inert(),
5712 "late completion for an interrupted response must not resurrect its deltas"
5713 );
5714 assert!(
5715 session
5716 .messages()
5717 .iter()
5718 .filter_map(|message| match message {
5719 Message::BlockAssistant(assistant) => Some(block_assistant_text(assistant)),
5720 _ => None,
5721 })
5722 .all(|text| !text.contains("Looping now")),
5723 "late interrupted assistant text must remain non-canonical"
5724 );
5725 }
5726
5727 #[test]
5728 fn realtime_transcript_completion_only_finalizes_matching_response() {
5729 let mut session = Session::new();
5730
5731 let _ = session.append_realtime_transcript_event(
5732 RealtimeTranscriptEvent::UserTranscriptFinal {
5733 item_id: "item_user".to_string(),
5734 previous_item_id: None,
5735 content_index: 0,
5736 text: "question".to_string(),
5737 },
5738 );
5739 assert!(
5740 session
5741 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5742 response_id: "resp_a".to_string(),
5743 delta_id: "evt_a".to_string(),
5744 item_id: "item_a".to_string(),
5745 previous_item_id: Some("item_user".to_string()),
5746 content_index: 0,
5747 delta: "answer a".to_string(),
5748 })
5749 .is_inert()
5750 );
5751
5752 assert!(
5753 session
5754 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
5755 response_id: "resp_b".to_string(),
5756 stop_reason: StopReason::EndTurn,
5757 usage: Usage::default(),
5758 })
5759 .is_inert(),
5760 "a completion for another response must not finalize buffered assistant text"
5761 );
5762 assert_eq!(session.messages().len(), 1);
5763
5764 let outcome = session.append_realtime_transcript_event(
5765 RealtimeTranscriptEvent::AssistantTurnCompleted {
5766 response_id: "resp_a".to_string(),
5767 stop_reason: StopReason::EndTurn,
5768 usage: Usage::default(),
5769 },
5770 );
5771 assert_eq!(outcome.materialized_messages.len(), 1);
5772 assert_eq!(session.messages().len(), 2);
5773 assert!(matches!(
5774 &session.messages()[1],
5775 Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "answer a"
5776 ));
5777 }
5778
5779 #[test]
5780 fn realtime_transcript_completion_before_later_delta_is_response_scoped() {
5781 let mut session = Session::new();
5782
5783 let _ = session.append_realtime_transcript_event(
5784 RealtimeTranscriptEvent::UserTranscriptFinal {
5785 item_id: "item_user".to_string(),
5786 previous_item_id: None,
5787 content_index: 0,
5788 text: "question".to_string(),
5789 },
5790 );
5791 assert!(
5792 session
5793 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
5794 response_id: "resp_a".to_string(),
5795 stop_reason: StopReason::EndTurn,
5796 usage: Usage::default(),
5797 })
5798 .is_inert()
5799 );
5800 assert!(
5801 session
5802 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5803 response_id: "resp_b".to_string(),
5804 delta_id: "evt_b".to_string(),
5805 item_id: "item_b".to_string(),
5806 previous_item_id: Some("item_user".to_string()),
5807 content_index: 0,
5808 delta: "wrong response".to_string(),
5809 })
5810 .is_inert(),
5811 "a later delta for another response must not be finalized by resp_a's pending completion"
5812 );
5813
5814 let outcome =
5815 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5816 response_id: "resp_a".to_string(),
5817 delta_id: "evt_a".to_string(),
5818 item_id: "item_a".to_string(),
5819 previous_item_id: Some("item_user".to_string()),
5820 content_index: 0,
5821 delta: "right response".to_string(),
5822 });
5823
5824 assert_eq!(outcome.materialized_messages.len(), 1);
5825 assert_eq!(session.messages().len(), 2);
5826 assert!(matches!(
5827 &session.messages()[1],
5828 Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "right response"
5829 ));
5830 }
5831
5832 #[test]
5833 fn realtime_transcript_late_duplicate_completion_cannot_finalize_unrelated_response() {
5834 let mut session = Session::new();
5835
5836 let _ = session.append_realtime_transcript_event(
5837 RealtimeTranscriptEvent::UserTranscriptFinal {
5838 item_id: "item_user".to_string(),
5839 previous_item_id: None,
5840 content_index: 0,
5841 text: "question".to_string(),
5842 },
5843 );
5844 let _ =
5845 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5846 response_id: "resp_a".to_string(),
5847 delta_id: "evt_a".to_string(),
5848 item_id: "item_a".to_string(),
5849 previous_item_id: Some("item_user".to_string()),
5850 content_index: 0,
5851 delta: "first".to_string(),
5852 });
5853 let _ = session.append_realtime_transcript_event(
5854 RealtimeTranscriptEvent::AssistantTurnCompleted {
5855 response_id: "resp_a".to_string(),
5856 stop_reason: StopReason::EndTurn,
5857 usage: Usage::default(),
5858 },
5859 );
5860 assert_eq!(session.messages().len(), 2);
5861
5862 assert!(
5863 session
5864 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5865 response_id: "resp_b".to_string(),
5866 delta_id: "evt_b".to_string(),
5867 item_id: "item_b".to_string(),
5868 previous_item_id: Some("item_a".to_string()),
5869 content_index: 0,
5870 delta: "second".to_string(),
5871 })
5872 .is_inert()
5873 );
5874 assert!(
5875 session
5876 .append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTurnCompleted {
5877 response_id: "resp_a".to_string(),
5878 stop_reason: StopReason::EndTurn,
5879 usage: Usage::default(),
5880 })
5881 .is_inert(),
5882 "a duplicate late terminal for resp_a must not finalize resp_b"
5883 );
5884 assert_eq!(session.messages().len(), 2);
5885
5886 let outcome = session.append_realtime_transcript_event(
5887 RealtimeTranscriptEvent::AssistantTurnCompleted {
5888 response_id: "resp_b".to_string(),
5889 stop_reason: StopReason::EndTurn,
5890 usage: Usage::default(),
5891 },
5892 );
5893 assert_eq!(outcome.materialized_messages.len(), 1);
5894 assert_eq!(session.messages().len(), 3);
5895 }
5896
5897 #[test]
5898 fn realtime_transcript_interruption_discards_only_matching_response() {
5899 let mut session = Session::new();
5905
5906 let _ = session.append_realtime_transcript_event(
5907 RealtimeTranscriptEvent::UserTranscriptFinal {
5908 item_id: "item_user".to_string(),
5909 previous_item_id: None,
5910 content_index: 0,
5911 text: "question".to_string(),
5912 },
5913 );
5914 let _ =
5915 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5916 response_id: "resp_a".to_string(),
5917 delta_id: "evt_a".to_string(),
5918 item_id: "item_a".to_string(),
5919 previous_item_id: Some("item_user".to_string()),
5920 content_index: 0,
5921 delta: "interrupted display".to_string(),
5922 });
5923 let _ =
5924 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
5925 response_id: "resp_b".to_string(),
5926 delta_id: "evt_b".to_string(),
5927 item_id: "item_b".to_string(),
5928 previous_item_id: Some("item_user".to_string()),
5929 content_index: 0,
5930 delta: "keep me".to_string(),
5931 });
5932
5933 let interrupt_outcome = session.append_realtime_transcript_event(
5936 RealtimeTranscriptEvent::AssistantTurnInterrupted {
5937 response_id: "resp_a".to_string(),
5938 },
5939 );
5940 assert_eq!(
5941 interrupt_outcome.materialized_messages.len(),
5942 1,
5943 "resp_a's Display item commits on Interrupted"
5944 );
5945
5946 let outcome = session.append_realtime_transcript_event(
5947 RealtimeTranscriptEvent::AssistantTurnCompleted {
5948 response_id: "resp_b".to_string(),
5949 stop_reason: StopReason::EndTurn,
5950 usage: Usage::default(),
5951 },
5952 );
5953 assert_eq!(
5954 outcome.materialized_messages.len(),
5955 1,
5956 "resp_b commits on its TurnCompleted, untouched by resp_a's Interrupted"
5957 );
5958
5959 assert_eq!(session.messages().len(), 3);
5961 assert!(matches!(
5962 &session.messages()[1],
5963 Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "interrupted display"
5964 ));
5965 assert!(matches!(
5966 &session.messages()[2],
5967 Message::BlockAssistant(assistant) if block_assistant_text(assistant) == "keep me"
5968 ));
5969 }
5970
5971 #[test]
5974 fn test_fork_shares_arc_no_clone() {
5975 let mut session = Session::new();
5976 for i in 0..100 {
5977 session.push(Message::User(UserMessage::text(format!("Message {i}"))));
5978 }
5979
5980 let forked = session.fork();
5982
5983 assert!(Arc::ptr_eq(&session.messages, &forked.messages));
5985 assert_eq!(forked.messages().len(), 100);
5986 }
5987
5988 #[test]
5989 fn test_fork_at_shares_arc_prefix() {
5990 let mut session = Session::new();
5991 for i in 0..100 {
5992 session.push(Message::User(UserMessage::text(format!("Message {i}"))));
5993 }
5994
5995 let forked = session.fork_at(50);
5997 assert_eq!(forked.messages().len(), 50);
5998
5999 assert_eq!(session.messages().len(), 100);
6001 }
6002
6003 #[test]
6004 fn test_fork_at_resets_transcript_history_state_for_branch_identity() {
6005 let mut session = Session::new();
6006 session.push(Message::User(UserMessage::text(
6007 "summarize this".to_string(),
6008 )));
6009 session.push(Message::BlockAssistant(BlockAssistantMessage::new(
6010 vec![AssistantBlock::Text {
6011 text: "long assistant trace".to_string(),
6012 meta: None,
6013 }],
6014 StopReason::EndTurn,
6015 )));
6016 let parent_revision = session.transcript_revision().expect("parent revision");
6017 session
6018 .commit_transcript_rewrite(
6019 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
6020 vec![Message::BlockAssistant(BlockAssistantMessage::new(
6021 vec![AssistantBlock::Text {
6022 text: "compact trace".to_string(),
6023 meta: None,
6024 }],
6025 StopReason::EndTurn,
6026 ))],
6027 TranscriptRewriteReason::new("compaction"),
6028 Some("test".to_string()),
6029 Some(parent_revision),
6030 )
6031 .expect("rewrite should commit");
6032
6033 let source_head = session.transcript_revision().expect("source head");
6034 let mut forked = session.fork_at(1);
6035 assert_ne!(forked.id(), session.id());
6036 assert!(
6037 !forked
6038 .metadata()
6039 .contains_key(SESSION_TRANSCRIPT_HISTORY_STATE_KEY)
6040 );
6041 assert_eq!(
6042 forked.transcript_revision().expect("fork head"),
6043 transcript_messages_digest(forked.messages()).expect("fork digest")
6044 );
6045 assert!(
6046 forked
6047 .transcript_revision_messages(&source_head)
6048 .expect("fork history lookup")
6049 .is_none()
6050 );
6051
6052 let fork_parent = forked.transcript_revision().expect("fork parent");
6053 let commit = forked
6054 .commit_transcript_rewrite(
6055 TranscriptRewriteSelection::MessageRange { start: 0, end: 1 },
6056 vec![Message::User(UserMessage::text(
6057 "branch prompt".to_string(),
6058 ))],
6059 TranscriptRewriteReason::new("branch_edit"),
6060 Some("test".to_string()),
6061 Some(fork_parent.clone()),
6062 )
6063 .expect("fork rewrite should use fork-local parent");
6064 assert_eq!(commit.parent_revision, fork_parent);
6065 }
6066
6067 #[test]
6068 fn test_push_cow_behavior() {
6069 let mut session = Session::new();
6070 session.push(Message::User(UserMessage::text("First".to_string())));
6071
6072 let forked = session.fork();
6074 assert!(Arc::ptr_eq(&session.messages, &forked.messages));
6075
6076 session.push(Message::User(UserMessage::text("Second".to_string())));
6078
6079 assert!(!Arc::ptr_eq(&session.messages, &forked.messages));
6081 assert_eq!(session.messages().len(), 2);
6082 assert_eq!(forked.messages().len(), 1);
6083 }
6084
6085 #[test]
6088 fn test_push_batch_single_timestamp() {
6089 let mut session = Session::new();
6090 let initial_updated = session.updated_at();
6091
6092 session.push_batch(vec![
6094 Message::User(UserMessage::text("First".to_string())),
6095 Message::User(UserMessage::text("Second".to_string())),
6096 Message::User(UserMessage::text("Third".to_string())),
6097 ]);
6098
6099 assert_eq!(session.messages().len(), 3);
6100 assert!(session.updated_at() >= initial_updated);
6102 }
6103
6104 #[test]
6105 fn test_touch_updates_timestamp() {
6106 let mut session = Session::new();
6107 let initial = session.updated_at();
6108
6109 std::thread::sleep(std::time::Duration::from_millis(10));
6110
6111 session.touch();
6113
6114 assert!(session.updated_at() > initial);
6115 }
6116
6117 #[test]
6118 fn test_session_push() {
6119 let mut session = Session::new();
6120 let initial_updated = session.updated_at();
6121
6122 std::thread::sleep(std::time::Duration::from_millis(10));
6124
6125 session.push(Message::User(UserMessage::text("Hello".to_string())));
6126
6127 assert_eq!(session.messages().len(), 1);
6128 assert!(session.updated_at() > initial_updated);
6129 }
6130
6131 #[test]
6132 fn test_session_fork() {
6133 let mut session = Session::new();
6134 session.push(Message::System(SystemMessage::new("System prompt")));
6135 session.push(Message::User(UserMessage::text("Hello".to_string())));
6136 session.push(Message::BlockAssistant(BlockAssistantMessage {
6137 blocks: vec![AssistantBlock::Text {
6138 text: "Hi!".to_string(),
6139 meta: None,
6140 }],
6141 stop_reason: StopReason::EndTurn,
6142 created_at: crate::types::message_timestamp_now(),
6143 }));
6144
6145 let forked = session.fork_at(2);
6147 assert_eq!(forked.messages().len(), 2);
6148 assert_ne!(forked.id(), session.id());
6149
6150 let full_fork = session.fork();
6152 assert_eq!(full_fork.messages().len(), 3);
6153 }
6154
6155 #[test]
6156 fn test_session_forks_drop_generated_authority_metadata() {
6157 let mut session = Session::new();
6158 session.push(Message::User(UserMessage::text("original")));
6159 session.set_metadata("ordinary", serde_json::json!("keep"));
6160 session
6161 .set_build_state(SessionBuildState::default())
6162 .expect("build state should serialize");
6163 session
6164 .set_system_context_state(SessionSystemContextState::default())
6165 .expect("system-context state should serialize");
6166 session
6167 .set_deferred_turn_state(SessionDeferredTurnState::default())
6168 .expect("deferred-turn state should serialize");
6169 session
6170 .set_tool_visibility_state(
6171 AuthorizedSessionToolVisibilityState::from_generated_authority(
6172 SessionToolVisibilityState::default(),
6173 ),
6174 )
6175 .expect("visibility state should serialize");
6176 let _ = session.append_realtime_transcript_event(RealtimeTranscriptEvent::ItemObserved {
6177 item_id: "rt-item".to_string(),
6178 previous_item_id: None,
6179 role: RealtimeTranscriptRole::User,
6180 response_id: None,
6181 });
6182 assert!(
6183 session
6184 .metadata()
6185 .contains_key(SESSION_REALTIME_TRANSCRIPT_STATE_KEY),
6186 "test setup should install realtime transcript authority state"
6187 );
6188
6189 let forked_at = session.fork_at(1);
6190 let full_fork = session.fork();
6191 let replaced = session
6192 .fork_replacing(
6193 0,
6194 TranscriptReplacement::Message {
6195 message: Message::User(UserMessage::text("replacement")),
6196 },
6197 )
6198 .expect("replacement fork should succeed");
6199
6200 for forked in [&forked_at, &full_fork, &replaced] {
6201 assert_eq!(forked.metadata().get("ordinary").unwrap(), "keep");
6202 assert!(
6203 !forked.metadata().contains_key(SESSION_BUILD_STATE_KEY),
6204 "forked sessions must not raw-copy durable build-state authority"
6205 );
6206 assert!(
6207 !forked
6208 .metadata()
6209 .contains_key(SESSION_SYSTEM_CONTEXT_STATE_KEY),
6210 "forked sessions must not raw-copy system-context authority state"
6211 );
6212 assert!(
6213 !forked
6214 .metadata()
6215 .contains_key(SESSION_DEFERRED_TURN_STATE_KEY),
6216 "forked sessions must not raw-copy deferred-turn authority state"
6217 );
6218 assert!(
6219 !forked
6220 .metadata()
6221 .contains_key(SESSION_TOOL_VISIBILITY_STATE_KEY),
6222 "forked sessions must not raw-copy tool-visibility authority state"
6223 );
6224 assert!(
6225 !forked
6226 .metadata()
6227 .contains_key(SESSION_REALTIME_TRANSCRIPT_STATE_KEY),
6228 "forked sessions must not raw-copy realtime transcript authority state"
6229 );
6230 }
6231 }
6232
6233 #[test]
6234 fn test_session_metadata() {
6235 let mut session = Session::new();
6236 session.set_metadata("key", serde_json::json!("value"));
6237
6238 assert_eq!(session.metadata().get("key").unwrap(), "value");
6239 }
6240
6241 #[test]
6242 fn session_metadata_realm_id_is_back_read_compatible_string() {
6243 let metadata = SessionMetadata {
6246 schema_version: SESSION_METADATA_SCHEMA_VERSION,
6247 model: "test-model".to_string(),
6248 max_tokens: 1024,
6249 structured_output_retries: 2,
6250 provider: Provider::Other,
6251 self_hosted_server_id: None,
6252 provider_params: None,
6253 tooling: SessionTooling::default(),
6254 keep_alive: false,
6255 comms_name: None,
6256 peer_meta: None,
6257 realm_id: Some(crate::RealmId::parse("env_default").unwrap()),
6258 instance_id: None,
6259 backend: None,
6260 config_generation: None,
6261 auth_binding: None,
6262 mob_member_binding: None,
6263 };
6264 let value = serde_json::to_value(&metadata).unwrap();
6265 assert_eq!(
6266 value.get("realm_id"),
6267 Some(&serde_json::json!("env_default")),
6268 "typed realm_id must serialize as a bare slug string"
6269 );
6270
6271 let legacy = serde_json::json!({
6274 "schema_version": SESSION_METADATA_SCHEMA_VERSION,
6275 "model": "test-model",
6276 "max_tokens": 1024,
6277 "structured_output_retries": 2,
6278 "provider": "other",
6279 "tooling": SessionTooling::default(),
6280 "keep_alive": false,
6281 "comms_name": null,
6282 "realm_id": "legacy_realm",
6283 });
6284 let restored: SessionMetadata = serde_json::from_value(legacy).unwrap();
6285 assert_eq!(
6286 restored.realm_id.as_ref().map(crate::RealmId::as_str),
6287 Some("legacy_realm")
6288 );
6289 }
6290
6291 #[test]
6292 fn lifecycle_terminal_typed_round_trip() {
6293 let mut session = Session::new();
6294 assert_eq!(session.lifecycle_terminal(), None);
6295
6296 session
6297 .set_lifecycle_terminal(SessionLifecycleTerminal::Archived)
6298 .expect("typed terminal write should serialize");
6299 assert_eq!(
6300 session.lifecycle_terminal(),
6301 Some(SessionLifecycleTerminal::Archived)
6302 );
6303 assert!(
6304 session
6305 .lifecycle_terminal()
6306 .is_some_and(SessionLifecycleTerminal::is_archived)
6307 );
6308 assert_eq!(
6310 session
6311 .metadata()
6312 .get(SESSION_LIFECYCLE_TERMINAL_KEY)
6313 .unwrap(),
6314 &serde_json::json!("archived")
6315 );
6316 }
6317
6318 #[test]
6319 fn lifecycle_terminal_key_rejects_raw_mutation() {
6320 let mut session = Session::new();
6321 assert!(
6322 session
6323 .try_set_metadata(
6324 SESSION_LIFECYCLE_TERMINAL_KEY,
6325 serde_json::json!("archived")
6326 )
6327 .is_err(),
6328 "the typed lifecycle-terminal key is reserved for session authority"
6329 );
6330 }
6331
6332 #[test]
6333 fn test_session_metadata_backfill_preserves_timestamp() {
6334 let mut session = Session::new();
6335 let initial_updated = session.updated_at();
6336
6337 std::thread::sleep(std::time::Duration::from_millis(10));
6338
6339 assert!(session.backfill_metadata_if_absent("key", serde_json::json!("value")));
6340 assert_eq!(session.metadata().get("key").unwrap(), "value");
6341 assert_eq!(session.updated_at(), initial_updated);
6342 assert!(!session.backfill_metadata_if_absent("key", serde_json::json!("other")));
6343 assert_eq!(session.metadata().get("key").unwrap(), "value");
6344 assert_eq!(session.updated_at(), initial_updated);
6345 }
6346
6347 #[test]
6348 fn test_reserved_generated_authority_metadata_rejects_raw_mutation() {
6349 let mut session = Session::new();
6350
6351 assert!(
6352 session
6353 .try_set_metadata(SESSION_SYSTEM_CONTEXT_STATE_KEY, serde_json::json!({}))
6354 .is_err()
6355 );
6356 assert!(
6357 session
6358 .try_set_metadata(SESSION_METADATA_KEY, serde_json::json!({}))
6359 .is_err()
6360 );
6361 assert!(
6362 session
6363 .try_set_metadata(SESSION_BUILD_STATE_KEY, serde_json::json!({}))
6364 .is_err()
6365 );
6366 session
6367 .set_session_metadata(SessionMetadata {
6368 schema_version: SESSION_METADATA_SCHEMA_VERSION,
6369 model: "test-model".to_string(),
6370 max_tokens: 1024,
6371 structured_output_retries: 2,
6372 provider: Provider::Other,
6373 self_hosted_server_id: None,
6374 provider_params: None,
6375 tooling: SessionTooling::default(),
6376 keep_alive: false,
6377 comms_name: None,
6378 peer_meta: None,
6379 realm_id: None,
6380 instance_id: None,
6381 backend: None,
6382 config_generation: None,
6383 auth_binding: None,
6384 mob_member_binding: None,
6385 })
6386 .expect("typed metadata setter should route through generated authority");
6387 session
6388 .set_build_state(SessionBuildState::default())
6389 .expect("typed build-state setter should route through generated authority");
6390 session.remove_metadata(SESSION_METADATA_KEY);
6391 session.remove_metadata(SESSION_BUILD_STATE_KEY);
6392 assert!(
6393 session.metadata().contains_key(SESSION_METADATA_KEY),
6394 "raw removal must not delete generated-authority session metadata"
6395 );
6396 assert!(
6397 session.metadata().contains_key(SESSION_BUILD_STATE_KEY),
6398 "raw removal must not delete generated-authority build state"
6399 );
6400 session.set_metadata(SESSION_DEFERRED_TURN_STATE_KEY, serde_json::json!({}));
6401 assert!(
6402 !session
6403 .metadata()
6404 .contains_key(SESSION_DEFERRED_TURN_STATE_KEY)
6405 );
6406 assert!(
6407 !session.backfill_metadata_if_absent(
6408 SESSION_SYSTEM_CONTEXT_STATE_KEY,
6409 serde_json::json!({})
6410 )
6411 );
6412
6413 let state = SessionSystemContextState::default();
6414 session
6415 .set_system_context_state(state.clone())
6416 .expect("typed setter should route through generated authority");
6417 session.remove_metadata(SESSION_SYSTEM_CONTEXT_STATE_KEY);
6418 assert_eq!(
6419 session
6420 .try_system_context_state()
6421 .expect("typed state should restore"),
6422 Some(state)
6423 );
6424
6425 session.metadata.insert(
6426 SESSION_SYSTEM_CONTEXT_STATE_KEY.to_string(),
6427 serde_json::json!("not-a-state"),
6428 );
6429 assert!(
6430 session.try_system_context_state().is_err(),
6431 "malformed generated authority state must not decode as absent/default"
6432 );
6433
6434 session.metadata.insert(
6435 SESSION_METADATA_KEY.to_string(),
6436 serde_json::json!("not-metadata"),
6437 );
6438 assert!(
6439 session.try_session_metadata().is_err(),
6440 "malformed session metadata must not decode as absent/default"
6441 );
6442
6443 session.metadata.insert(
6444 SESSION_BUILD_STATE_KEY.to_string(),
6445 serde_json::json!("not-build-state"),
6446 );
6447 assert!(
6448 session.try_build_state().is_err(),
6449 "malformed build state must not decode as absent/default"
6450 );
6451
6452 assert!(
6453 session
6454 .try_set_metadata(SESSION_TOOL_VISIBILITY_STATE_KEY, serde_json::json!({}))
6455 .is_err()
6456 );
6457 session
6458 .set_tool_visibility_state(
6459 AuthorizedSessionToolVisibilityState::from_generated_authority(
6460 SessionToolVisibilityState::default(),
6461 ),
6462 )
6463 .expect("typed visibility setter should route through typed authority handoff");
6464 session.remove_metadata(SESSION_TOOL_VISIBILITY_STATE_KEY);
6465 assert!(
6466 session
6467 .metadata()
6468 .contains_key(SESSION_TOOL_VISIBILITY_STATE_KEY)
6469 );
6470 session.clear_tool_visibility_state();
6471 assert!(
6472 !session
6473 .metadata()
6474 .contains_key(SESSION_TOOL_VISIBILITY_STATE_KEY)
6475 );
6476 assert!(
6477 session
6478 .try_set_metadata(SESSION_REALTIME_TRANSCRIPT_STATE_KEY, serde_json::json!({}))
6479 .is_err()
6480 );
6481 let _ = session.append_realtime_transcript_event(RealtimeTranscriptEvent::ItemObserved {
6482 item_id: "rt-item".to_string(),
6483 previous_item_id: None,
6484 role: RealtimeTranscriptRole::User,
6485 response_id: None,
6486 });
6487 assert!(
6488 session
6489 .metadata()
6490 .contains_key(SESSION_REALTIME_TRANSCRIPT_STATE_KEY),
6491 "typed realtime transcript append should retain authority to persist its state"
6492 );
6493 session.metadata.insert(
6494 SESSION_REALTIME_TRANSCRIPT_STATE_KEY.to_string(),
6495 serde_json::json!("not-a-state"),
6496 );
6497 assert!(
6498 session.try_realtime_transcript_state().is_err(),
6499 "malformed realtime generated authority state must not decode as absent/default"
6500 );
6501 }
6502
6503 #[test]
6504 fn test_session_mob_tool_authority_context_persists_projection_without_authority_seal() {
6505 let mut session = Session::new();
6506 session
6507 .set_build_state(SessionBuildState::default())
6508 .expect("session build state should serialize");
6509 let authority = MobToolAuthorityContext::generated_for_test(
6510 crate::service::OpaquePrincipalToken::new("opaque-principal"),
6511 false,
6512 false,
6513 false,
6514 std::collections::BTreeSet::from(["mob-a".to_string()]),
6515 std::collections::BTreeMap::new(),
6516 None,
6517 Some("audit-1".to_string()),
6518 );
6519
6520 session
6521 .set_mob_tool_authority_context(Some(authority))
6522 .expect("authority should serialize");
6523 assert!(session.mob_tool_authority_context().is_none());
6524 let stored = session
6525 .build_state()
6526 .and_then(|state| state.mob_tool_authority_context)
6527 .expect("stored projection should deserialize");
6528 assert!(!stored.is_generated_authority_context());
6529 assert!(!stored.can_manage_mob("mob-a"));
6530
6531 session
6532 .set_mob_tool_authority_context(None)
6533 .expect("authority should clear");
6534 assert!(session.mob_tool_authority_context().is_none());
6535 }
6536
6537 #[test]
6538 fn test_session_build_state_rejects_forged_mob_authority_projection() {
6539 let mut session = Session::new();
6540 let authority = MobToolAuthorityContext::generated_for_test(
6541 crate::service::OpaquePrincipalToken::new("opaque-principal"),
6542 false,
6543 false,
6544 false,
6545 std::collections::BTreeSet::from(["mob-a".to_string()]),
6546 std::collections::BTreeMap::new(),
6547 None,
6548 Some("audit-1".to_string()),
6549 );
6550 let forged_projection: MobToolAuthorityContext =
6551 serde_json::from_value(serde_json::to_value(authority).expect("serialize authority"))
6552 .expect("deserialize projection");
6553 assert!(!forged_projection.is_generated_authority_context());
6554
6555 let err = session
6556 .set_build_state(SessionBuildState {
6557 mob_tool_authority_context: Some(forged_projection),
6558 ..Default::default()
6559 })
6560 .expect_err("forged build state must be rejected by generated authority");
6561 assert!(
6565 err.to_string()
6566 .contains("generated session document authority rejected"),
6567 "unexpected error: {err}"
6568 );
6569 }
6570
6571 #[test]
6572 fn test_session_tool_visibility_state_roundtrip() {
6573 let mut session = Session::new();
6574 let state = SessionToolVisibilityState {
6575 inherited_base_filter: ToolFilter::Allow(["visible".to_string()].into_iter().collect()),
6576 active_filter: ToolFilter::Allow(
6577 ["visible".to_string(), "missing".to_string()]
6578 .into_iter()
6579 .collect(),
6580 ),
6581 staged_filter: ToolFilter::Allow(
6582 ["visible".to_string(), "missing".to_string()]
6583 .into_iter()
6584 .collect(),
6585 ),
6586 active_revision: 1,
6587 staged_revision: 2,
6588 ..Default::default()
6589 };
6590
6591 session
6592 .set_tool_visibility_state(
6593 AuthorizedSessionToolVisibilityState::from_generated_authority(state.clone()),
6594 )
6595 .expect("tool visibility state should serialize");
6596 assert_eq!(session.tool_visibility_state().unwrap(), Some(state));
6597 }
6598
6599 #[test]
6600 fn test_session_tool_visibility_state_malformed_returns_error() {
6601 let mut session = Session::new();
6602 session.metadata.insert(
6603 SESSION_TOOL_VISIBILITY_STATE_KEY.to_string(),
6604 serde_json::json!({
6605 "active_filter": {
6606 "unexpected_filter_kind": ["secret"]
6607 }
6608 }),
6609 );
6610
6611 assert!(
6612 session.tool_visibility_state().is_err(),
6613 "malformed canonical visibility metadata must not decode as absent/default"
6614 );
6615 }
6616
6617 #[test]
6618 fn test_session_serialization() {
6619 let mut session = Session::new();
6620 session.push(Message::User(UserMessage::text("Test".to_string())));
6621
6622 let json = serde_json::to_string(&session).unwrap();
6623 let parsed: Session = serde_json::from_str(&json).unwrap();
6624
6625 assert_eq!(parsed.id(), session.id());
6626 assert_eq!(parsed.messages().len(), 1);
6627 assert_eq!(parsed.version(), SESSION_VERSION);
6628 }
6629
6630 #[test]
6631 fn test_session_meta_from_session() {
6632 let mut session = Session::new();
6633 session.push(Message::User(UserMessage::text("Hello".to_string())));
6634 session.push(Message::BlockAssistant(BlockAssistantMessage {
6635 blocks: vec![AssistantBlock::Text {
6636 text: "Hi!".to_string(),
6637 meta: None,
6638 }],
6639 stop_reason: StopReason::EndTurn,
6640 created_at: crate::types::message_timestamp_now(),
6641 }));
6642 session.record_usage(Usage {
6643 input_tokens: 10,
6644 output_tokens: 5,
6645 cache_creation_tokens: None,
6646 cache_read_tokens: None,
6647 });
6648
6649 let meta = SessionMeta::from(&session);
6650 assert_eq!(meta.id, *session.id());
6651 assert_eq!(meta.message_count, 2);
6652 assert_eq!(meta.total_tokens, 15);
6653 }
6654
6655 #[test]
6656 fn system_context_state_preserves_applied_runtime_context() {
6657 let accepted_at = SystemTime::UNIX_EPOCH;
6658 let mut state = SessionSystemContextState::default();
6659 state
6660 .stage_append(
6661 &AppendSystemContextRequest {
6662 content: crate::lifecycle::run_primitive::CoreRenderable::text(
6663 "Authoritative peer token is birch seventeen.".to_string(),
6664 ),
6665 source: Some(
6666 "peer_response_terminal:analyst:018f6f79-7a82-7c4e-a552-a3b86f9630f1"
6667 .to_string(),
6668 ),
6669 idempotency_key: Some("018f6f79-7a82-7c4e-a552-a3b86f9630f1".to_string()),
6670 source_kind: SystemContextSource::Normal,
6671 peer_response_terminal: None,
6672 },
6673 accepted_at,
6674 )
6675 .expect("append should stage");
6676
6677 state.mark_pending_applied();
6678
6679 assert!(state.pending.is_empty());
6680 assert_eq!(state.applied.len(), 1);
6681 assert_eq!(
6682 state.applied[0].content.render_text(),
6683 "Authoritative peer token is birch seventeen."
6684 );
6685 assert_eq!(
6686 state.applied[0].source.as_deref(),
6687 Some("peer_response_terminal:analyst:018f6f79-7a82-7c4e-a552-a3b86f9630f1")
6688 );
6689
6690 let round_tripped: SessionSystemContextState =
6691 serde_json::from_value(serde_json::to_value(&state).expect("serialize state"))
6692 .expect("deserialize state");
6693 assert_eq!(round_tripped.applied, state.applied);
6694 }
6695
6696 #[test]
6697 fn active_turn_system_context_is_discarded_when_not_applied() {
6698 let mut state = SessionSystemContextState::default();
6699 state
6700 .stage_active_turn_append(
6701 &AppendSystemContextRequest {
6702 content: crate::lifecycle::run_primitive::CoreRenderable::text(
6703 "only for the active run".to_string(),
6704 ),
6705 source: Some("runtime:steer:input-1".to_string()),
6706 idempotency_key: Some("runtime:steer:input-1".to_string()),
6707 source_kind: SystemContextSource::RuntimeSteer,
6708 peer_response_terminal: None,
6709 },
6710 SystemTime::UNIX_EPOCH,
6711 )
6712 .expect("active context should stage");
6713
6714 let discarded = state.discard_unapplied_active_turn_pending();
6715
6716 assert_eq!(discarded.len(), 1);
6717 assert!(state.pending.is_empty());
6718 assert!(state.applied.is_empty());
6719 assert!(state.active_turn_pending_keys.is_empty());
6720 assert!(
6721 state.seen.is_empty(),
6722 "discarded active-turn context should not block later idempotency keys"
6723 );
6724 }
6725
6726 #[test]
6727 fn active_turn_system_context_can_roll_back_targeted_keys() {
6728 let mut state = SessionSystemContextState::default();
6729 for key in ["runtime:steer:input-1", "runtime:steer:input-2"] {
6730 state
6731 .stage_active_turn_append(
6732 &AppendSystemContextRequest {
6733 content: crate::lifecycle::run_primitive::CoreRenderable::text(format!(
6734 "context for {key}"
6735 )),
6736 source: Some(key.to_string()),
6737 idempotency_key: Some(key.to_string()),
6738 source_kind: SystemContextSource::RuntimeSteer,
6739 peer_response_terminal: None,
6740 },
6741 SystemTime::UNIX_EPOCH,
6742 )
6743 .expect("active context should stage");
6744 }
6745
6746 let discarded =
6747 state.discard_active_turn_pending_by_keys(&["runtime:steer:input-1".to_string()]);
6748
6749 assert_eq!(discarded.len(), 1);
6750 assert_eq!(
6751 discarded[0].idempotency_key.as_deref(),
6752 Some("runtime:steer:input-1")
6753 );
6754 assert_eq!(state.pending.len(), 1);
6755 assert_eq!(
6756 state.pending[0].idempotency_key.as_deref(),
6757 Some("runtime:steer:input-2")
6758 );
6759 assert!(!state.seen.contains_key("runtime:steer:input-1"));
6760 assert!(state.seen.contains_key("runtime:steer:input-2"));
6761 assert!(
6762 !state
6763 .active_turn_pending_keys
6764 .contains("runtime:steer:input-1")
6765 );
6766 assert!(
6767 state
6768 .active_turn_pending_keys
6769 .contains("runtime:steer:input-2")
6770 );
6771 }
6772
6773 #[test]
6774 fn active_turn_system_context_is_transient_when_boundary_consumes_it() {
6775 let mut state = SessionSystemContextState::default();
6776 state
6777 .stage_active_turn_append(
6778 &AppendSystemContextRequest {
6779 content: crate::lifecycle::run_primitive::CoreRenderable::text(
6780 "visible to this run".to_string(),
6781 ),
6782 source: Some("runtime:steer:input-2".to_string()),
6783 idempotency_key: Some("runtime:steer:input-2".to_string()),
6784 source_kind: SystemContextSource::RuntimeSteer,
6785 peer_response_terminal: None,
6786 },
6787 SystemTime::UNIX_EPOCH,
6788 )
6789 .expect("active context should stage");
6790
6791 state.mark_pending_applied();
6792 let discarded = state.discard_unapplied_active_turn_pending();
6793
6794 assert!(discarded.is_empty());
6795 assert!(state.pending.is_empty());
6796 assert!(state.applied.is_empty());
6797 assert!(state.active_turn_pending_keys.is_empty());
6798 assert_eq!(
6799 state.seen.get("runtime:steer:input-2"),
6800 None,
6801 "consumed active-turn steer context must not become durable state"
6802 );
6803 }
6804
6805 #[test]
6806 fn discard_transient_runtime_steer_context_removes_steer_via_typed_marker() {
6807 let mut session = Session::new();
6808 session.set_system_prompt(format!(
6812 "base{}{}{}{}",
6813 SYSTEM_CONTEXT_SEPARATOR,
6814 render_system_context_block(&PendingSystemContextAppend {
6815 content: crate::lifecycle::run_primitive::CoreRenderable::text(
6816 "old steer".to_string()
6817 ),
6818 source: Some("steer-source-old".to_string()),
6819 idempotency_key: Some("steer-key-old".to_string()),
6820 source_kind: SystemContextSource::RuntimeSteer,
6821 peer_response_terminal: None,
6822 accepted_at: SystemTime::UNIX_EPOCH,
6823 }),
6824 SYSTEM_CONTEXT_SEPARATOR,
6825 render_system_context_block(&PendingSystemContextAppend {
6826 content: crate::lifecycle::run_primitive::CoreRenderable::text(
6827 "durable peer fact".to_string()
6828 ),
6829 source: Some("peer_response_terminal:analyst:req".to_string()),
6830 idempotency_key: Some("peer_response_terminal:analyst:req".to_string()),
6831 source_kind: SystemContextSource::Normal,
6832 peer_response_terminal: None,
6833 accepted_at: SystemTime::UNIX_EPOCH,
6834 })
6835 ));
6836 session
6837 .set_system_context_state(SessionSystemContextState {
6838 pending: vec![PendingSystemContextAppend {
6839 content: crate::lifecycle::run_primitive::CoreRenderable::text(
6840 "pending steer".to_string(),
6841 ),
6842 source: Some("steer-source-pending".to_string()),
6843 idempotency_key: Some("steer-key-pending".to_string()),
6844 source_kind: SystemContextSource::RuntimeSteer,
6845 peer_response_terminal: None,
6846 accepted_at: SystemTime::UNIX_EPOCH,
6847 }],
6848 applied: vec![
6849 PendingSystemContextAppend {
6850 content: crate::lifecycle::run_primitive::CoreRenderable::text(
6851 "old steer".to_string(),
6852 ),
6853 source: Some("steer-source-old".to_string()),
6854 idempotency_key: Some("steer-key-old".to_string()),
6855 source_kind: SystemContextSource::RuntimeSteer,
6856 peer_response_terminal: None,
6857 accepted_at: SystemTime::UNIX_EPOCH,
6858 },
6859 PendingSystemContextAppend {
6860 content: crate::lifecycle::run_primitive::CoreRenderable::text(
6861 "durable peer fact".to_string(),
6862 ),
6863 source: Some("peer_response_terminal:analyst:req".to_string()),
6864 idempotency_key: Some("peer_response_terminal:analyst:req".to_string()),
6865 source_kind: SystemContextSource::Normal,
6866 peer_response_terminal: None,
6867 accepted_at: SystemTime::UNIX_EPOCH,
6868 },
6869 ],
6870 seen: BTreeMap::from([(
6871 "steer-key-old".to_string(),
6872 SeenSystemContextKey {
6873 content: crate::lifecycle::run_primitive::CoreRenderable::text(
6874 "old steer".to_string(),
6875 ),
6876 source: Some("steer-source-old".to_string()),
6877 source_kind: SystemContextSource::RuntimeSteer,
6878 state: SeenSystemContextState::Applied,
6879 },
6880 )]),
6881 active_turn_pending_keys: BTreeSet::from(["steer-key-pending".to_string()]),
6882 })
6883 .expect("system context state should serialize");
6884
6885 let removed = session.discard_transient_runtime_steer_context();
6886
6887 assert!(removed >= 4);
6888 let system_prompt = match session.messages().first() {
6889 Some(Message::System(system)) => system.content.as_str(),
6890 other => panic!("expected system prompt, got {other:?}"),
6891 };
6892 assert!(!system_prompt.contains("old steer"));
6893 assert!(system_prompt.contains("durable peer fact"));
6894 let state = session.system_context_state().unwrap_or_default();
6895 assert!(state.pending.is_empty());
6896 assert_eq!(state.applied.len(), 1);
6897 assert_eq!(state.applied[0].content.render_text(), "durable peer fact");
6898 assert!(state.seen.is_empty());
6899 assert!(state.active_turn_pending_keys.is_empty());
6900 }
6901
6902 #[test]
6903 fn append_system_context_blocks_records_typed_applied_context() {
6904 let append = PendingSystemContextAppend {
6905 content: crate::lifecycle::run_primitive::CoreRenderable::text(
6906 "Authoritative peer token is birch seventeen.".to_string(),
6907 ),
6908 source: Some(
6909 "peer_response_terminal:analyst:018f6f79-7a82-7c4e-a552-a3b86f9630f1".to_string(),
6910 ),
6911 idempotency_key: Some("018f6f79-7a82-7c4e-a552-a3b86f9630f1".to_string()),
6912 source_kind: SystemContextSource::Normal,
6913 peer_response_terminal: None,
6914 accepted_at: SystemTime::UNIX_EPOCH,
6915 };
6916 let mut session = Session::new();
6917
6918 session.append_system_context_blocks(std::slice::from_ref(&append));
6919
6920 let state = session
6921 .system_context_state()
6922 .expect("append should persist typed context state");
6923 assert_eq!(state.applied, vec![append]);
6924 }
6925
6926 #[test]
6927 fn append_system_context_blocks_renders_pre_marked_pending_context() {
6928 let accepted_at = SystemTime::UNIX_EPOCH;
6929 let mut state = SessionSystemContextState::default();
6930 state
6931 .stage_append(
6932 &AppendSystemContextRequest {
6933 content: crate::lifecycle::run_primitive::CoreRenderable::text(
6934 "Apply this staged context at the request boundary.".to_string(),
6935 ),
6936 source: Some("rpc/session_inject_context".to_string()),
6937 idempotency_key: Some("ctx-boundary".to_string()),
6938 source_kind: SystemContextSource::Normal,
6939 peer_response_terminal: None,
6940 },
6941 accepted_at,
6942 )
6943 .expect("append should stage");
6944 let pending = state.pending.clone();
6945 state.mark_pending_applied();
6946 let mut session = Session::new();
6947 session
6948 .set_system_context_state(state)
6949 .expect("state should serialize");
6950
6951 session.append_system_context_blocks(&pending);
6952
6953 let system_prompt = session
6954 .messages()
6955 .first()
6956 .and_then(|message| match message {
6957 Message::System(system) => Some(system.content.as_str()),
6958 _ => None,
6959 })
6960 .unwrap_or_default();
6961 assert!(system_prompt.contains("Apply this staged context at the request boundary."));
6962 let state = session
6963 .system_context_state()
6964 .expect("append should persist typed context state");
6965 assert_eq!(state.applied.len(), 1);
6966 assert_eq!(
6967 state.seen["ctx-boundary"].state,
6968 SeenSystemContextState::Applied
6969 );
6970 }
6971
6972 #[test]
6973 fn append_system_context_blocks_renders_pre_marked_context_without_idempotency_key() {
6974 let accepted_at = SystemTime::UNIX_EPOCH;
6975 let mut state = SessionSystemContextState::default();
6976 state
6977 .stage_append(
6978 &AppendSystemContextRequest {
6979 content: crate::lifecycle::run_primitive::CoreRenderable::text(
6980 "Apply this unkeyed staged context at the request boundary.".to_string(),
6981 ),
6982 source: Some("rpc/session_inject_context".to_string()),
6983 idempotency_key: None,
6984 source_kind: SystemContextSource::Normal,
6985 peer_response_terminal: None,
6986 },
6987 accepted_at,
6988 )
6989 .expect("append should stage");
6990 let pending = state.pending.clone();
6991 state.mark_pending_applied();
6992 let mut session = Session::new();
6993 session
6994 .set_system_context_state(state)
6995 .expect("state should serialize");
6996
6997 session.append_system_context_blocks(&pending);
6998
6999 let system_prompt = session
7000 .messages()
7001 .first()
7002 .and_then(|message| match message {
7003 Message::System(system) => Some(system.content.as_str()),
7004 _ => None,
7005 })
7006 .unwrap_or_default();
7007 assert!(
7008 system_prompt.contains("Apply this unkeyed staged context at the request boundary.")
7009 );
7010 }
7011
7012 #[test]
7016 fn staged_system_context_carries_typed_renderable_to_render_seam() {
7017 use crate::lifecycle::run_primitive::CoreRenderable;
7018
7019 let accepted_at = SystemTime::UNIX_EPOCH;
7020 let mut state = SessionSystemContextState::default();
7021 let renderable = CoreRenderable::Json {
7022 value: serde_json::json!({"alert": "disk-full", "severity": 2}),
7023 };
7024 state
7025 .stage_append(
7026 &AppendSystemContextRequest {
7027 content: renderable.clone(),
7028 source: Some("ops/monitor".to_string()),
7029 idempotency_key: Some("alert-1".to_string()),
7030 source_kind: SystemContextSource::Normal,
7031 peer_response_terminal: None,
7032 },
7033 accepted_at,
7034 )
7035 .expect("typed renderable append should stage");
7036
7037 assert_eq!(state.pending.len(), 1);
7040 assert_eq!(state.pending[0].content, renderable);
7041
7042 let rendered = render_system_context_block(&state.pending[0]);
7045 assert!(rendered.starts_with(SYSTEM_CONTEXT_RENDER_LABEL));
7046 assert!(
7047 rendered.contains(renderable.render_text().trim()),
7048 "render seam must lower via CoreRenderable::render_text: {rendered}"
7049 );
7050 }
7051
7052 #[test]
7053 fn append_system_context_blocks_skips_duplicate_idempotency_key() {
7054 let first = PendingSystemContextAppend {
7055 content: crate::lifecycle::run_primitive::CoreRenderable::text(
7056 "Authoritative peer token is birch seventeen.".to_string(),
7057 ),
7058 source: Some("peer_response_terminal:analyst:req-1".to_string()),
7059 idempotency_key: Some("req-1".to_string()),
7060 source_kind: SystemContextSource::Normal,
7061 peer_response_terminal: None,
7062 accepted_at: SystemTime::UNIX_EPOCH,
7063 };
7064 let duplicate = PendingSystemContextAppend {
7065 accepted_at: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1),
7066 ..first.clone()
7067 };
7068 let mut session = Session::new();
7069
7070 session.append_system_context_blocks(std::slice::from_ref(&first));
7071 session.append_system_context_blocks(std::slice::from_ref(&duplicate));
7072
7073 let state = session
7074 .system_context_state()
7075 .expect("append should persist typed context state");
7076 assert_eq!(state.applied, vec![first]);
7077 let system_prompt = session
7078 .messages()
7079 .first()
7080 .and_then(|message| match message {
7081 Message::System(system) => Some(system.content.as_str()),
7082 _ => None,
7083 })
7084 .unwrap_or_default();
7085 assert_eq!(
7086 system_prompt
7087 .matches("Authoritative peer token is birch seventeen.")
7088 .count(),
7089 1
7090 );
7091 }
7092
7093 #[test]
7094 fn append_system_context_blocks_skips_conflicting_duplicate_idempotency_key() {
7095 let first = PendingSystemContextAppend {
7096 content: crate::lifecycle::run_primitive::CoreRenderable::text(
7097 "Authoritative peer token is birch seventeen.".to_string(),
7098 ),
7099 source: Some("peer_response_terminal:analyst:req-1".to_string()),
7100 idempotency_key: Some("req-1".to_string()),
7101 source_kind: SystemContextSource::Normal,
7102 peer_response_terminal: None,
7103 accepted_at: SystemTime::UNIX_EPOCH,
7104 };
7105 let conflicting = PendingSystemContextAppend {
7106 content: crate::lifecycle::run_primitive::CoreRenderable::text(
7107 "Conflicting peer token should not reach the prompt.".to_string(),
7108 ),
7109 accepted_at: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1),
7110 ..first.clone()
7111 };
7112 let mut session = Session::new();
7113
7114 session.append_system_context_blocks(std::slice::from_ref(&first));
7115 session.append_system_context_blocks(std::slice::from_ref(&conflicting));
7116
7117 let state = session
7118 .system_context_state()
7119 .expect("append should persist typed context state");
7120 assert_eq!(state.applied, vec![first]);
7121 let system_prompt = session
7122 .messages()
7123 .first()
7124 .and_then(|message| match message {
7125 Message::System(system) => Some(system.content.as_str()),
7126 _ => None,
7127 })
7128 .unwrap_or_default();
7129 assert!(system_prompt.contains("Authoritative peer token is birch seventeen."));
7130 assert!(!system_prompt.contains("Conflicting peer token should not reach the prompt."));
7131 }
7132
7133 #[test]
7145 fn realtime_transcript_assistant_transcript_delta_materializes_transcript_block() {
7146 let mut session = Session::new();
7147
7148 let delta = RealtimeTranscriptEvent::AssistantTranscriptDelta {
7149 response_id: "resp_spoken".to_string(),
7150 delta_id: "evt_delta_spoken_1".to_string(),
7151 item_id: "item_spoken".to_string(),
7152 previous_item_id: None,
7153 content_index: 0,
7154 delta: "I said hi".to_string(),
7155 };
7156 assert!(
7157 session.append_realtime_transcript_event(delta).is_inert(),
7158 "delta alone is inert until turn-completed flushes"
7159 );
7160
7161 let terminal = RealtimeTranscriptEvent::AssistantTurnCompleted {
7162 response_id: "resp_spoken".to_string(),
7163 stop_reason: StopReason::EndTurn,
7164 usage: Usage::default(),
7165 };
7166 let outcome = session.append_realtime_transcript_event(terminal);
7167 assert_eq!(outcome.materialized_messages.len(), 1);
7168
7169 let messages = session.messages();
7171 assert_eq!(messages.len(), 1);
7172 match &messages[0] {
7173 Message::BlockAssistant(assistant) => {
7174 assert_eq!(assistant.blocks.len(), 1);
7175 match &assistant.blocks[0] {
7176 AssistantBlock::Transcript { text, source, .. } => {
7177 assert_eq!(text, "I said hi");
7178 assert_eq!(*source, crate::types::TranscriptSource::Spoken);
7179 }
7180 other => unreachable!(
7181 "AssistantTranscriptDelta must materialize as AssistantBlock::Transcript, got {other:?}"
7182 ),
7183 }
7184 }
7185 other => unreachable!("expected BlockAssistant message, got {other:?}"),
7186 }
7187 }
7188
7189 #[test]
7190 fn round4_cc4_in_flight_response_ids_lists_distinct_unmaterialized_responses() {
7191 let mut session = Session::new();
7197
7198 for (i, response_id) in [
7202 ("resp_a", "resp_a"),
7203 ("resp_a_extra", "resp_a"),
7204 ("resp_b", "resp_b"),
7205 ("resp_c", "resp_c"),
7206 ]
7207 .iter()
7208 .enumerate()
7209 {
7210 let event = RealtimeTranscriptEvent::AssistantTranscriptDelta {
7211 response_id: response_id.1.to_string(),
7212 delta_id: format!("delta_{i}"),
7213 item_id: response_id.0.to_string(),
7214 previous_item_id: None,
7215 content_index: 0,
7216 delta: "x".to_string(),
7217 };
7218 let _ = session.append_realtime_transcript_event(event);
7219 }
7220
7221 let _ = session.append_realtime_transcript_event(
7223 RealtimeTranscriptEvent::AssistantTurnInterrupted {
7224 response_id: "resp_c".to_string(),
7225 },
7226 );
7227
7228 let _ = session.append_realtime_transcript_event(
7231 RealtimeTranscriptEvent::UserTranscriptFinal {
7232 item_id: "u_item".to_string(),
7233 previous_item_id: None,
7234 content_index: 0,
7235 text: "hi".to_string(),
7236 },
7237 );
7238
7239 let in_flight = session.in_flight_realtime_assistant_response_ids();
7240 assert!(in_flight.contains(&"resp_a".to_string()), "{in_flight:?}");
7241 assert!(in_flight.contains(&"resp_b".to_string()), "{in_flight:?}");
7242 assert!(
7243 !in_flight.contains(&"resp_c".to_string()),
7244 "discarded response must not appear in in_flight: {in_flight:?}"
7245 );
7246 assert_eq!(
7248 in_flight.iter().filter(|r| *r == "resp_a").count(),
7249 1,
7250 "distinct response_ids only: {in_flight:?}"
7251 );
7252 }
7253
7254 #[test]
7255 fn round4_cc2_assistant_turn_completed_after_transcript_deltas_materializes_transcript() {
7256 let mut session = Session::new();
7263
7264 let delta = RealtimeTranscriptEvent::AssistantTranscriptDelta {
7265 response_id: "resp_cc2".to_string(),
7266 delta_id: "delta_cc2_1".to_string(),
7267 item_id: "item_cc2".to_string(),
7268 previous_item_id: None,
7269 content_index: 0,
7270 delta: "hello world".to_string(),
7271 };
7272 assert!(session.append_realtime_transcript_event(delta).is_inert());
7273
7274 assert_eq!(
7276 session.in_flight_realtime_assistant_response_ids(),
7277 vec!["resp_cc2".to_string()]
7278 );
7279
7280 let outcome = session.append_realtime_transcript_event(
7281 RealtimeTranscriptEvent::AssistantTurnCompleted {
7282 response_id: "resp_cc2".to_string(),
7283 stop_reason: StopReason::EndTurn,
7284 usage: Usage::default(),
7285 },
7286 );
7287 assert_eq!(outcome.materialized_messages.len(), 1);
7288
7289 assert!(
7291 session
7292 .in_flight_realtime_assistant_response_ids()
7293 .is_empty(),
7294 "materialized items must not appear in in_flight_realtime_assistant_response_ids"
7295 );
7296
7297 let messages = session.messages();
7298 let assistant = messages.iter().find_map(|m| match m {
7299 Message::BlockAssistant(a) => Some(a),
7300 _ => None,
7301 });
7302 let assistant = assistant.expect("assistant block message expected");
7303 assert_eq!(assistant.blocks.len(), 1);
7304 assert!(matches!(
7305 &assistant.blocks[0],
7306 AssistantBlock::Transcript {
7307 source: crate::types::TranscriptSource::Spoken,
7308 ..
7309 }
7310 ));
7311 }
7312
7313 #[test]
7314 fn realtime_transcript_assistant_text_delta_still_materializes_text_block() {
7315 let mut session = Session::new();
7319
7320 let delta = RealtimeTranscriptEvent::AssistantTextDelta {
7321 response_id: "resp_display".to_string(),
7322 delta_id: "evt_delta_display_1".to_string(),
7323 item_id: "item_display".to_string(),
7324 previous_item_id: None,
7325 content_index: 0,
7326 delta: "I wrote".to_string(),
7327 };
7328 let _ = session.append_realtime_transcript_event(delta);
7329
7330 let terminal = RealtimeTranscriptEvent::AssistantTurnCompleted {
7331 response_id: "resp_display".to_string(),
7332 stop_reason: StopReason::EndTurn,
7333 usage: Usage::default(),
7334 };
7335 let outcome = session.append_realtime_transcript_event(terminal);
7336 assert_eq!(outcome.materialized_messages.len(), 1);
7337
7338 let messages = session.messages();
7339 match &messages[0] {
7340 Message::BlockAssistant(assistant) => match &assistant.blocks[0] {
7341 AssistantBlock::Text { text, .. } => assert_eq!(text, "I wrote"),
7342 other => unreachable!(
7343 "AssistantTextDelta must keep materializing AssistantBlock::Text, got {other:?}"
7344 ),
7345 },
7346 other => unreachable!("expected BlockAssistant message, got {other:?}"),
7347 }
7348 }
7349
7350 #[test]
7351 fn round4_cc7_mixed_response_persists_text_and_transcript_in_order() {
7352 let mut session = Session::new();
7370
7371 let display_a = RealtimeTranscriptEvent::AssistantTextDelta {
7373 response_id: "resp_mixed_1".to_string(),
7374 delta_id: "delta_disp_1".to_string(),
7375 item_id: "item_display".to_string(),
7376 previous_item_id: None,
7377 content_index: 0,
7378 delta: "Here's the report:".to_string(),
7379 };
7380 assert!(
7381 session
7382 .append_realtime_transcript_event(display_a)
7383 .is_inert()
7384 );
7385
7386 let display_b = RealtimeTranscriptEvent::AssistantTextDelta {
7387 response_id: "resp_mixed_1".to_string(),
7388 delta_id: "delta_disp_2".to_string(),
7389 item_id: "item_display".to_string(),
7390 previous_item_id: None,
7391 content_index: 0,
7392 delta: " (still writing)".to_string(),
7393 };
7394 assert!(
7395 session
7396 .append_realtime_transcript_event(display_b)
7397 .is_inert()
7398 );
7399
7400 let spoken_a = RealtimeTranscriptEvent::AssistantTranscriptDelta {
7405 response_id: "resp_mixed_1".to_string(),
7406 delta_id: "delta_spoken_1".to_string(),
7407 item_id: "item_spoken".to_string(),
7408 previous_item_id: Some("item_display".to_string()),
7409 content_index: 0,
7410 delta: "I'm reading the report aloud:".to_string(),
7411 };
7412 assert!(
7413 session
7414 .append_realtime_transcript_event(spoken_a)
7415 .is_inert()
7416 );
7417
7418 let spoken_b = RealtimeTranscriptEvent::AssistantTranscriptDelta {
7419 response_id: "resp_mixed_1".to_string(),
7420 delta_id: "delta_spoken_2".to_string(),
7421 item_id: "item_spoken".to_string(),
7422 previous_item_id: Some("item_display".to_string()),
7423 content_index: 0,
7424 delta: " sentence two.".to_string(),
7425 };
7426 assert!(
7427 session
7428 .append_realtime_transcript_event(spoken_b)
7429 .is_inert()
7430 );
7431
7432 let outcome = session.append_realtime_transcript_event(
7435 RealtimeTranscriptEvent::AssistantTurnCompleted {
7436 response_id: "resp_mixed_1".to_string(),
7437 stop_reason: StopReason::EndTurn,
7438 usage: Usage {
7439 input_tokens: 11,
7440 output_tokens: 22,
7441 cache_creation_tokens: None,
7442 cache_read_tokens: None,
7443 },
7444 },
7445 );
7446 assert_eq!(outcome.materialized_messages.len(), 2);
7448
7449 let messages = session.messages();
7452 let assistants: Vec<&BlockAssistantMessage> = messages
7453 .iter()
7454 .filter_map(|m| match m {
7455 Message::BlockAssistant(a) => Some(a),
7456 _ => None,
7457 })
7458 .collect();
7459 assert_eq!(
7460 assistants.len(),
7461 1,
7462 "mixed display+spoken response under one response_id must produce exactly ONE BlockAssistant message, got: {assistants:?}"
7463 );
7464 let assistant = assistants[0];
7465 assert_eq!(
7466 assistant.blocks.len(),
7467 2,
7468 "mixed response message must carry both blocks: {:?}",
7469 assistant.blocks
7470 );
7471
7472 match &assistant.blocks[0] {
7474 AssistantBlock::Text { text, .. } => {
7475 assert_eq!(text, "Here's the report: (still writing)");
7476 }
7477 other => unreachable!(
7478 "first block must be AssistantBlock::Text (display lane), got {other:?}"
7479 ),
7480 }
7481 match &assistant.blocks[1] {
7483 AssistantBlock::Transcript { text, source, .. } => {
7484 assert_eq!(text, "I'm reading the report aloud: sentence two.");
7485 assert_eq!(*source, crate::types::TranscriptSource::Spoken);
7486 }
7487 other => unreachable!(
7488 "second block must be AssistantBlock::Transcript {{ source: Spoken }}, got {other:?}"
7489 ),
7490 }
7491
7492 assert_eq!(session.usage.input_tokens, 11);
7494 assert_eq!(session.usage.output_tokens, 22);
7495 }
7496
7497 #[test]
7498 fn round5_r55_mixed_response_barge_in_preserves_display_drops_spoken() {
7499 let mut session = Session::new();
7515
7516 let display = RealtimeTranscriptEvent::AssistantTextDelta {
7517 response_id: "resp_mixed_2".to_string(),
7518 delta_id: "delta_disp_1".to_string(),
7519 item_id: "item_display_2".to_string(),
7520 previous_item_id: None,
7521 content_index: 0,
7522 delta: "Working on the report...".to_string(),
7523 };
7524 let _ = session.append_realtime_transcript_event(display);
7525
7526 let spoken = RealtimeTranscriptEvent::AssistantTranscriptDelta {
7527 response_id: "resp_mixed_2".to_string(),
7528 delta_id: "delta_spoken_1".to_string(),
7529 item_id: "item_spoken_2".to_string(),
7530 previous_item_id: Some("item_display_2".to_string()),
7531 content_index: 0,
7532 delta: "I'm reading the report".to_string(),
7533 };
7534 let _ = session.append_realtime_transcript_event(spoken);
7535
7536 let outcome = session.append_realtime_transcript_event(
7540 RealtimeTranscriptEvent::AssistantTurnInterrupted {
7541 response_id: "resp_mixed_2".to_string(),
7542 },
7543 );
7544 assert_eq!(
7545 outcome.materialized_messages.len(),
7546 1,
7547 "Display lane item must materialize on Interrupted: {outcome:?}"
7548 );
7549
7550 let late_completion = session.append_realtime_transcript_event(
7554 RealtimeTranscriptEvent::AssistantTurnCompleted {
7555 response_id: "resp_mixed_2".to_string(),
7556 stop_reason: StopReason::Cancelled,
7557 usage: Usage::default(),
7558 },
7559 );
7560 assert_eq!(
7561 late_completion.materialized_messages.len(),
7562 0,
7563 "post-barge-in TurnCompleted must not resurrect anything"
7564 );
7565
7566 let messages = session.messages();
7569 let assistants: Vec<&BlockAssistantMessage> = messages
7570 .iter()
7571 .filter_map(|m| match m {
7572 Message::BlockAssistant(a) => Some(a),
7573 _ => None,
7574 })
7575 .collect();
7576 assert_eq!(
7577 assistants.len(),
7578 1,
7579 "barge-in must commit exactly one BlockAssistant containing the Display lane: {assistants:?}"
7580 );
7581 let assistant = assistants[0];
7582 assert_eq!(assistant.blocks.len(), 1, "blocks: {:?}", assistant.blocks);
7583 match &assistant.blocks[0] {
7584 AssistantBlock::Text { text, .. } => {
7585 assert_eq!(text, "Working on the report...");
7586 }
7587 other => {
7588 unreachable!("Display lane must materialize as AssistantBlock::Text, got {other:?}")
7589 }
7590 }
7591 assert!(
7593 !assistant
7594 .blocks
7595 .iter()
7596 .any(|b| matches!(b, AssistantBlock::Transcript { .. })),
7597 "Spoken lane must be dropped on barge-in"
7598 );
7599
7600 assert!(
7603 !session
7604 .in_flight_realtime_assistant_response_ids()
7605 .contains(&"resp_mixed_2".to_string()),
7606 "barged-in response must not appear in in_flight_realtime_assistant_response_ids"
7607 );
7608 }
7609
7610 #[test]
7611 fn round5_r55_barge_in_preserves_display_lane_drops_spoken() {
7612 let mut session = Session::new();
7616
7617 let _ =
7618 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
7619 response_id: "resp_a".to_string(),
7620 delta_id: "delta_d_1".to_string(),
7621 item_id: "item_display".to_string(),
7622 previous_item_id: None,
7623 content_index: 0,
7624 delta: "display-text".to_string(),
7625 });
7626 let _ = session.append_realtime_transcript_event(
7627 RealtimeTranscriptEvent::AssistantTranscriptDelta {
7628 response_id: "resp_a".to_string(),
7629 delta_id: "delta_s_1".to_string(),
7630 item_id: "item_spoken".to_string(),
7631 previous_item_id: None,
7632 content_index: 0,
7633 delta: "spoken-transcript".to_string(),
7634 },
7635 );
7636
7637 let outcome = session.append_realtime_transcript_event(
7638 RealtimeTranscriptEvent::AssistantTurnInterrupted {
7639 response_id: "resp_a".to_string(),
7640 },
7641 );
7642 assert_eq!(outcome.materialized_messages.len(), 1);
7644
7645 let messages = session.messages();
7646 let assistants: Vec<&BlockAssistantMessage> = messages
7647 .iter()
7648 .filter_map(|m| match m {
7649 Message::BlockAssistant(a) => Some(a),
7650 _ => None,
7651 })
7652 .collect();
7653 assert_eq!(assistants.len(), 1);
7654 assert_eq!(assistants[0].blocks.len(), 1);
7656 match &assistants[0].blocks[0] {
7657 AssistantBlock::Text { text, .. } => assert_eq!(text, "display-text"),
7658 other => unreachable!("expected Text, got {other:?}"),
7659 }
7660 }
7661
7662 #[test]
7663 fn round5_r55_barge_in_finalizes_retained_display_into_committed_block() {
7664 let mut session = Session::new();
7669
7670 let _ =
7671 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
7672 response_id: "resp_a".to_string(),
7673 delta_id: "delta_d_1".to_string(),
7674 item_id: "item_display".to_string(),
7675 previous_item_id: None,
7676 content_index: 0,
7677 delta: "committed-display-text".to_string(),
7678 });
7679
7680 assert!(session.messages().is_empty());
7682
7683 let outcome = session.append_realtime_transcript_event(
7684 RealtimeTranscriptEvent::AssistantTurnInterrupted {
7685 response_id: "resp_a".to_string(),
7686 },
7687 );
7688 assert_eq!(
7689 outcome.materialized_messages.len(),
7690 1,
7691 "Interrupted must finalize retained Display lane immediately"
7692 );
7693
7694 let messages = session.messages();
7696 assert_eq!(messages.len(), 1);
7697 match &messages[0] {
7698 Message::BlockAssistant(assistant) => {
7699 assert_eq!(assistant.blocks.len(), 1);
7700 match &assistant.blocks[0] {
7701 AssistantBlock::Text { text, .. } => {
7702 assert_eq!(text, "committed-display-text");
7703 }
7704 other => unreachable!("expected Text, got {other:?}"),
7705 }
7706 }
7707 other => unreachable!("expected BlockAssistant, got {other:?}"),
7708 }
7709 }
7710
7711 #[test]
7712 fn round5_r56_truncation_promotes_default_lane_item_to_spoken() {
7713 let mut session = Session::new();
7720
7721 let _ = session.append_realtime_transcript_event(
7722 RealtimeTranscriptEvent::AssistantTranscriptTruncated {
7723 response_id: "resp_a".to_string(),
7724 item_id: "item_a".to_string(),
7725 content_index: 0,
7726 text: "what was actually heard".to_string(),
7727 },
7728 );
7729
7730 let outcome = session.append_realtime_transcript_event(
7731 RealtimeTranscriptEvent::AssistantTurnCompleted {
7732 response_id: "resp_a".to_string(),
7733 stop_reason: StopReason::EndTurn,
7734 usage: Usage::default(),
7735 },
7736 );
7737 assert_eq!(outcome.materialized_messages.len(), 1);
7738
7739 assert_eq!(session.messages().len(), 1);
7740 match &session.messages()[0] {
7741 Message::BlockAssistant(assistant) => {
7742 assert_eq!(assistant.blocks.len(), 1);
7743 match &assistant.blocks[0] {
7744 AssistantBlock::Transcript { text, source, .. } => {
7745 assert_eq!(text, "what was actually heard");
7746 assert_eq!(*source, crate::types::TranscriptSource::Spoken);
7747 }
7748 other => unreachable!(
7749 "truncation-only path must materialize as AssistantBlock::Transcript, got {other:?}"
7750 ),
7751 }
7752 }
7753 other => unreachable!("expected BlockAssistant, got {other:?}"),
7754 }
7755 }
7756
7757 #[test]
7758 fn round5_r56_truncation_after_display_delta_is_no_op_keeping_display_content() {
7759 let mut session = Session::new();
7767
7768 let _ =
7769 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
7770 response_id: "resp_a".to_string(),
7771 delta_id: "delta_d_1".to_string(),
7772 item_id: "item_a".to_string(),
7773 previous_item_id: None,
7774 content_index: 0,
7775 delta: "display-text-from-delta".to_string(),
7776 });
7777
7778 let _ = session.append_realtime_transcript_event(
7779 RealtimeTranscriptEvent::AssistantTranscriptTruncated {
7780 response_id: "resp_a".to_string(),
7781 item_id: "item_a".to_string(),
7782 content_index: 0,
7783 text: "spoken-truncation-text".to_string(),
7784 },
7785 );
7786
7787 let _ = session.append_realtime_transcript_event(
7788 RealtimeTranscriptEvent::AssistantTurnCompleted {
7789 response_id: "resp_a".to_string(),
7790 stop_reason: StopReason::EndTurn,
7791 usage: Usage::default(),
7792 },
7793 );
7794
7795 assert_eq!(session.messages().len(), 1);
7798 match &session.messages()[0] {
7799 Message::BlockAssistant(assistant) => {
7800 assert_eq!(assistant.blocks.len(), 1);
7801 match &assistant.blocks[0] {
7802 AssistantBlock::Text { text, .. } => {
7803 assert_eq!(text, "display-text-from-delta");
7804 }
7805 other => unreachable!(
7806 "Display content must survive misrouted truncation, got {other:?}"
7807 ),
7808 }
7809 }
7810 other => unreachable!("expected BlockAssistant, got {other:?}"),
7811 }
7812 }
7813
7814 #[test]
7822 fn round5_r56_sibling_display_delta_skipped_on_spoken_item() {
7823 let mut session = Session::new();
7824
7825 let _ = session.append_realtime_transcript_event(
7827 RealtimeTranscriptEvent::AssistantTranscriptTruncated {
7828 response_id: "resp_a".to_string(),
7829 item_id: "item_a".to_string(),
7830 content_index: 0,
7831 text: "what was actually heard".to_string(),
7832 },
7833 );
7834
7835 let _ =
7838 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
7839 response_id: "resp_a".to_string(),
7840 delta_id: "delta_d_1".to_string(),
7841 item_id: "item_a".to_string(),
7842 previous_item_id: None,
7843 content_index: 0,
7844 delta: "should-not-appear".to_string(),
7845 });
7846
7847 let _ = session.append_realtime_transcript_event(
7848 RealtimeTranscriptEvent::AssistantTurnCompleted {
7849 response_id: "resp_a".to_string(),
7850 stop_reason: StopReason::EndTurn,
7851 usage: Usage::default(),
7852 },
7853 );
7854
7855 assert_eq!(session.messages().len(), 1);
7858 match &session.messages()[0] {
7859 Message::BlockAssistant(assistant) => {
7860 assert_eq!(assistant.blocks.len(), 1);
7861 match &assistant.blocks[0] {
7862 AssistantBlock::Transcript { text, source, .. } => {
7863 assert_eq!(text, "what was actually heard");
7864 assert_eq!(*source, crate::types::TranscriptSource::Spoken);
7865 }
7866 other => unreachable!(
7867 "Spoken-locked item must materialize as Transcript, got {other:?}"
7868 ),
7869 }
7870 }
7871 other => unreachable!("expected BlockAssistant, got {other:?}"),
7872 }
7873 }
7874
7875 #[test]
7882 fn round5_r56_sibling_spoken_delta_skipped_on_display_item() {
7883 let mut session = Session::new();
7884
7885 let _ =
7887 session.append_realtime_transcript_event(RealtimeTranscriptEvent::AssistantTextDelta {
7888 response_id: "resp_a".to_string(),
7889 delta_id: "delta_d_1".to_string(),
7890 item_id: "item_a".to_string(),
7891 previous_item_id: None,
7892 content_index: 0,
7893 delta: "display-locked-text".to_string(),
7894 });
7895
7896 let _ = session.append_realtime_transcript_event(
7899 RealtimeTranscriptEvent::AssistantTranscriptDelta {
7900 response_id: "resp_a".to_string(),
7901 delta_id: "delta_s_1".to_string(),
7902 item_id: "item_a".to_string(),
7903 previous_item_id: None,
7904 content_index: 0,
7905 delta: "should-not-appear".to_string(),
7906 },
7907 );
7908
7909 let _ = session.append_realtime_transcript_event(
7910 RealtimeTranscriptEvent::AssistantTurnCompleted {
7911 response_id: "resp_a".to_string(),
7912 stop_reason: StopReason::EndTurn,
7913 usage: Usage::default(),
7914 },
7915 );
7916
7917 assert_eq!(session.messages().len(), 1);
7919 match &session.messages()[0] {
7920 Message::BlockAssistant(assistant) => {
7921 assert_eq!(assistant.blocks.len(), 1);
7922 match &assistant.blocks[0] {
7923 AssistantBlock::Text { text, .. } => {
7924 assert_eq!(text, "display-locked-text");
7925 }
7926 other => {
7927 unreachable!("Display-locked item must materialize as Text, got {other:?}")
7928 }
7929 }
7930 }
7931 other => unreachable!("expected BlockAssistant, got {other:?}"),
7932 }
7933 }
7934
7935 #[test]
7943 fn round5_r57_late_final_text_after_turn_completed_warns_and_skips() {
7944 let mut session = Session::new();
7945
7946 let _ = session.append_realtime_transcript_event(
7948 RealtimeTranscriptEvent::AssistantTranscriptDelta {
7949 response_id: "resp_a".to_string(),
7950 delta_id: "delta_s_1".to_string(),
7951 item_id: "item_a".to_string(),
7952 previous_item_id: None,
7953 content_index: 0,
7954 delta: "delta-accumulated".to_string(),
7955 },
7956 );
7957
7958 let commit_outcome = session.append_realtime_transcript_event(
7960 RealtimeTranscriptEvent::AssistantTurnCompleted {
7961 response_id: "resp_a".to_string(),
7962 stop_reason: StopReason::EndTurn,
7963 usage: Usage::default(),
7964 },
7965 );
7966 assert_eq!(commit_outcome.materialized_messages.len(), 1);
7967
7968 let late_outcome = session.append_realtime_transcript_event(
7972 RealtimeTranscriptEvent::AssistantTranscriptFinalText {
7973 response_id: "resp_a".to_string(),
7974 item_id: "item_a".to_string(),
7975 content_index: 0,
7976 text: "authoritative-final-that-must-not-land".to_string(),
7977 },
7978 );
7979 assert!(
7980 late_outcome.is_inert(),
7981 "late FinalText after materialization must produce inert outcome"
7982 );
7983
7984 assert_eq!(session.messages().len(), 1);
7987 match &session.messages()[0] {
7988 Message::BlockAssistant(assistant) => {
7989 assert_eq!(assistant.blocks.len(), 1);
7990 match &assistant.blocks[0] {
7991 AssistantBlock::Transcript { text, .. } => {
7992 assert_eq!(
7993 text, "delta-accumulated",
7994 "canonical message must preserve delta-accumulated text; \
7995 append-only history forbids late FinalText repair"
7996 );
7997 }
7998 other => unreachable!("expected Transcript, got {other:?}"),
7999 }
8000 }
8001 other => unreachable!("expected BlockAssistant, got {other:?}"),
8002 }
8003 }
8004}