1use async_trait::async_trait;
21use sha2::{Digest, Sha256};
22
23use crate::session::{SYSTEM_CONTEXT_SEPARATOR, SessionMeta};
24use crate::time_compat::SystemTime;
25use crate::types::{Message, SessionId};
26use crate::{
27 Session, TranscriptHistoryState, TranscriptRewriteCommit, TranscriptRewriteSelection,
28 transcript_messages_digest,
29};
30
31#[derive(Debug, Clone, Default)]
33pub struct SessionFilter {
34 pub created_after: Option<SystemTime>,
36 pub updated_after: Option<SystemTime>,
38 pub limit: Option<usize>,
40 pub offset: Option<usize>,
42}
43
44#[derive(Debug, thiserror::Error)]
49pub enum SessionStoreError {
50 #[error("IO error: {0}")]
51 Io(#[from] std::io::Error),
52
53 #[error("Serialization error: {0}")]
54 Serialization(String),
55
56 #[error("Session not found: {0}")]
57 NotFound(SessionId),
58
59 #[error("Session corrupted: {0}")]
60 Corrupted(SessionId),
61
62 #[error(
63 "session {id} save rejected: new message count {new_len} is shorter than previously \
64 persisted {prev_len} without transcript-continuity proof"
65 )]
66 MonotonicityViolation {
67 id: SessionId,
68 prev_len: usize,
69 new_len: usize,
70 },
71
72 #[error(
73 "session {id} save rejected: incoming transcript is not a continuation of persisted revision {previous_revision}"
74 )]
75 TranscriptContinuityViolation {
76 id: SessionId,
77 previous_revision: String,
78 incoming_revision: String,
79 reason: String,
80 },
81
82 #[error(
83 "session {id} rewrite rejected: previous transcript revision {actual} did not match commit parent {expected}"
84 )]
85 TranscriptRevisionConflict {
86 id: SessionId,
87 expected: String,
88 actual: String,
89 },
90
91 #[error("session {id} rewrite rejected: {reason}")]
92 InvalidTranscriptRewrite { id: SessionId, reason: String },
93
94 #[error("Internal error: {0}")]
95 Internal(String),
96}
97
98pub fn session_projection_cas_token(session: &Session) -> Result<String, SessionStoreError> {
100 let bytes = serde_json::to_vec(session).map_err(|err| {
101 SessionStoreError::Serialization(format!(
102 "failed to serialize session projection CAS token: {err}"
103 ))
104 })?;
105 Ok(format!("row-sha256:{:x}", Sha256::digest(bytes)))
106}
107
108pub fn append_only_save_guard(
122 incoming: &Session,
123 previous: Option<&Session>,
124) -> Result<(), SessionStoreError> {
125 incoming
126 .validate_transcript_history_state()
127 .map_err(|err| SessionStoreError::InvalidTranscriptRewrite {
128 id: incoming.id().clone(),
129 reason: format!("incoming transcript history state is malformed: {err}"),
130 })?;
131 let incoming_revision =
132 transcript_messages_digest(incoming.messages()).map_err(SessionStoreError::from)?;
133 let incoming_state = incoming.transcript_history_state().map_err(|err| {
134 SessionStoreError::InvalidTranscriptRewrite {
135 id: incoming.id().clone(),
136 reason: format!("incoming transcript history state is malformed: {err}"),
137 }
138 })?;
139 if let Some(state) = incoming_state.as_ref()
140 && state.head != incoming_revision
141 {
142 return Err(SessionStoreError::InvalidTranscriptRewrite {
143 id: incoming.id().clone(),
144 reason: format!(
145 "incoming transcript graph head {} does not match current message digest {incoming_revision}",
146 state.head
147 ),
148 });
149 }
150
151 let Some(previous) = previous else {
152 if incoming_state.is_some() {
153 return Err(SessionStoreError::InvalidTranscriptRewrite {
154 id: incoming.id().clone(),
155 reason: "incoming first save would seed transcript history state outside the rewrite/audit path"
156 .to_string(),
157 });
158 }
159 validate_plain_save_transcript_history_preservation(
160 incoming,
161 None,
162 None,
163 incoming_state.as_ref(),
164 )?;
165 return Ok(());
166 };
167 let previous_state = previous.transcript_history_state().map_err(|err| {
168 SessionStoreError::InvalidTranscriptRewrite {
169 id: incoming.id().clone(),
170 reason: format!("previous transcript history state is malformed: {err}"),
171 }
172 })?;
173 let previous_had_history = previous_state.is_some();
174 let incoming_has_history = incoming_state.is_some();
175 if previous_had_history && !incoming_has_history {
176 return Err(SessionStoreError::InvalidTranscriptRewrite {
177 id: incoming.id().clone(),
178 reason: "incoming save would erase retained transcript history state".to_string(),
179 });
180 }
181 let previous_revision =
182 transcript_messages_digest(previous.messages()).map_err(SessionStoreError::from)?;
183 if previous_revision == incoming_revision {
184 validate_plain_save_transcript_history_preservation(
185 incoming,
186 Some(previous),
187 previous_state.as_ref(),
188 incoming_state.as_ref(),
189 )?;
190 return Ok(());
191 }
192
193 let prev_len = previous.messages().len();
194 let new_len = incoming.messages().len();
195 if new_len >= prev_len {
196 let incoming_prefix_revision = transcript_messages_digest(&incoming.messages()[..prev_len])
197 .map_err(SessionStoreError::from)?;
198 if incoming_prefix_revision == previous_revision {
199 validate_plain_save_transcript_history_preservation(
200 incoming,
201 Some(previous),
202 previous_state.as_ref(),
203 incoming_state.as_ref(),
204 )?;
205 return Ok(());
206 }
207 }
208 if incoming_preserves_conversation_tail_with_system_context_append(incoming, previous)? {
209 validate_plain_save_transcript_history_preservation(
210 incoming,
211 Some(previous),
212 previous_state.as_ref(),
213 incoming_state.as_ref(),
214 )?;
215 return Ok(());
216 }
217 if incoming_preserves_prefix_after_transient_notice_cleanup(incoming, previous)? {
218 validate_plain_save_transcript_history_preservation(
219 incoming,
220 Some(previous),
221 previous_state.as_ref(),
222 incoming_state.as_ref(),
223 )?;
224 return Ok(());
225 }
226 if new_len < prev_len {
227 return Err(SessionStoreError::MonotonicityViolation {
228 id: incoming.id().clone(),
229 prev_len,
230 new_len,
231 });
232 }
233
234 Err(SessionStoreError::TranscriptContinuityViolation {
235 id: incoming.id().clone(),
236 previous_revision,
237 incoming_revision,
238 reason: "incoming transcript neither preserves the persisted prefix nor records a graph edge from the persisted head".to_string(),
239 })
240}
241
242fn validate_plain_save_transcript_history_preservation(
243 incoming: &Session,
244 previous: Option<&Session>,
245 previous_state: Option<&TranscriptHistoryState>,
246 incoming_state: Option<&TranscriptHistoryState>,
247) -> Result<(), SessionStoreError> {
248 let Some(previous) = previous else {
249 if incoming_state.is_some() {
250 return Err(SessionStoreError::InvalidTranscriptRewrite {
251 id: incoming.id().clone(),
252 reason: "incoming first save would seed transcript history state outside the rewrite/audit path"
253 .to_string(),
254 });
255 }
256 return Ok(());
257 };
258 if previous_state.is_none() && incoming_state.is_some() {
259 return Err(SessionStoreError::InvalidTranscriptRewrite {
260 id: incoming.id().clone(),
261 reason: "incoming append-only save would seed transcript history state outside the rewrite/audit path"
262 .to_string(),
263 });
264 }
265 let Some(previous_state) = previous_state else {
266 return Ok(());
267 };
268 let Some(incoming_state) = incoming_state else {
269 return Err(SessionStoreError::InvalidTranscriptRewrite {
270 id: incoming.id().clone(),
271 reason: "incoming append-only save would erase retained transcript history state"
272 .to_string(),
273 });
274 };
275 let previous_commits = previous_state.commits.as_slice();
276 let incoming_commits = incoming_state.commits.as_slice();
277 if incoming_commits != previous_commits {
278 return Err(SessionStoreError::InvalidTranscriptRewrite {
279 id: incoming.id().clone(),
280 reason: "incoming append-only save would change retained transcript rewrite commits"
281 .to_string(),
282 });
283 }
284 let retained_revisions_preserved =
285 transcript_revision_bodies_preserved(previous_state, incoming_state)?;
286 if retained_revisions_preserved
287 && incoming_state.revisions.len() == previous_state.revisions.len()
288 && incoming_state.head == previous_state.head
289 {
290 return Ok(());
291 }
292 if incoming_state.revisions.len() != previous_state.revisions.len() + 1
293 || !retained_revisions_preserved
294 {
295 return Err(SessionStoreError::InvalidTranscriptRewrite {
296 id: incoming.id().clone(),
297 reason: "incoming append-only save would change retained transcript revision graph"
298 .to_string(),
299 });
300 }
301 let incoming_revision =
302 transcript_messages_digest(incoming.messages()).map_err(SessionStoreError::from)?;
303 let previous_revision =
304 transcript_messages_digest(previous.messages()).map_err(SessionStoreError::from)?;
305 if previous_state.head != previous_revision {
306 return Err(SessionStoreError::InvalidTranscriptRewrite {
307 id: incoming.id().clone(),
308 reason: "previous transcript history head does not match persisted message digest"
309 .to_string(),
310 });
311 }
312 let added = &incoming_state.revisions[previous_state.revisions.len()];
313 if incoming_state.head != incoming_revision
314 || added.revision != incoming_revision
315 || added.parent_revision.as_deref() != Some(previous_state.head.as_str())
316 || transcript_messages_digest(&added.messages).map_err(SessionStoreError::from)?
317 != incoming_revision
318 {
319 return Err(SessionStoreError::InvalidTranscriptRewrite {
320 id: incoming.id().clone(),
321 reason: "incoming append-only save would add a transcript revision body that is not the current append"
322 .to_string(),
323 });
324 }
325 Ok(())
326}
327
328fn transcript_revision_bodies_preserved(
329 previous_state: &TranscriptHistoryState,
330 incoming_state: &TranscriptHistoryState,
331) -> Result<bool, SessionStoreError> {
332 if incoming_state.revisions.len() < previous_state.revisions.len() {
333 return Ok(false);
334 }
335 previous_state
336 .revisions
337 .iter()
338 .zip(incoming_state.revisions.iter())
339 .map(|(previous, incoming)| {
340 Ok(previous.revision == incoming.revision
341 && previous.parent_revision == incoming.parent_revision
342 && previous.created_at == incoming.created_at
343 && transcript_messages_digest(&previous.messages)
344 .map_err(SessionStoreError::from)?
345 == transcript_messages_digest(&incoming.messages)
346 .map_err(SessionStoreError::from)?)
347 })
348 .try_fold(true, |acc, preserved| {
349 preserved.map(|preserved| acc && preserved)
350 })
351}
352
353fn validate_rewrite_save_retains_previous_commits(
354 incoming: &Session,
355 previous: &Session,
356 incoming_state: &TranscriptHistoryState,
357) -> Result<(), SessionStoreError> {
358 let previous_state = previous.transcript_history_state().map_err(|err| {
359 SessionStoreError::InvalidTranscriptRewrite {
360 id: incoming.id().clone(),
361 reason: format!("previous transcript history state is malformed: {err}"),
362 }
363 })?;
364 let Some(previous_state) = previous_state.as_ref() else {
365 return Ok(());
366 };
367 if incoming_state.commits.len() < previous_state.commits.len()
368 || incoming_state.commits[..previous_state.commits.len()] != previous_state.commits
369 {
370 return Err(SessionStoreError::InvalidTranscriptRewrite {
371 id: incoming.id().clone(),
372 reason: "incoming rewrite save would drop retained transcript rewrite commits"
373 .to_string(),
374 });
375 }
376 Ok(())
377}
378
379pub fn authoritative_projection_current_revision_guard(
382 incoming: &Session,
383 previous: Option<&Session>,
384 expected_current_revision: Option<&str>,
385) -> Result<(), SessionStoreError> {
386 let previous_token = previous.map(session_projection_cas_token).transpose()?;
387 if previous_token.as_deref() == expected_current_revision {
388 return Ok(());
389 }
390 let incoming_revision =
391 transcript_messages_digest(incoming.messages()).map_err(SessionStoreError::from)?;
392 Err(SessionStoreError::TranscriptContinuityViolation {
393 id: incoming.id().clone(),
394 previous_revision: previous_token.unwrap_or_else(|| "<missing>".to_string()),
395 incoming_revision,
396 reason: format!(
397 "authoritative projection expected persisted projection token {}, but current row has diverged",
398 expected_current_revision.unwrap_or("<missing>")
399 ),
400 })
401}
402
403fn incoming_preserves_conversation_tail_with_system_context_append(
404 incoming: &Session,
405 previous: &Session,
406) -> Result<bool, SessionStoreError> {
407 messages_preserve_conversation_tail_with_system_context_append(
408 incoming.messages(),
409 previous.messages(),
410 )
411}
412
413fn messages_preserve_conversation_tail_with_system_context_append(
414 incoming: &[Message],
415 previous: &[Message],
416) -> Result<bool, SessionStoreError> {
417 let (previous_system, previous_tail) = split_single_leading_system(previous);
418 let (incoming_system, incoming_tail) = split_single_leading_system(incoming);
419 let Some(incoming_system) = incoming_system else {
420 return Ok(false);
421 };
422 if !system_context_is_append(previous_system, incoming_system) {
423 return Ok(false);
424 }
425 if incoming_tail.len() < previous_tail.len() {
426 return Ok(false);
427 }
428 let previous_tail_revision =
429 transcript_messages_digest(previous_tail).map_err(SessionStoreError::from)?;
430 let incoming_tail_prefix_revision =
431 transcript_messages_digest(&incoming_tail[..previous_tail.len()])
432 .map_err(SessionStoreError::from)?;
433 Ok(previous_tail_revision == incoming_tail_prefix_revision)
434}
435
436fn split_single_leading_system(messages: &[Message]) -> (Option<&str>, &[Message]) {
437 match messages.first() {
438 Some(Message::System(system)) => (Some(system.content.as_str()), &messages[1..]),
439 _ => (None, messages),
440 }
441}
442
443fn system_context_is_append(previous: Option<&str>, incoming: &str) -> bool {
444 let appended = match previous {
445 Some(previous) if incoming == previous => return true,
446 Some(previous) if incoming.starts_with(previous) => {
447 let appended = &incoming[previous.len()..];
448 appended.strip_prefix(SYSTEM_CONTEXT_SEPARATOR)
449 }
450 Some(_) => None,
451 None => Some(incoming),
452 };
453 appended.is_some_and(|appended| appended.starts_with("[Runtime System Context]"))
454}
455
456fn incoming_preserves_prefix_after_transient_notice_cleanup(
457 incoming: &Session,
458 previous: &Session,
459) -> Result<bool, SessionStoreError> {
460 let previous_without_transient = previous
461 .messages()
462 .iter()
463 .filter(|message| !is_transient_system_notice(message))
464 .cloned()
465 .collect::<Vec<_>>();
466 if previous_without_transient.len() == previous.messages().len()
467 || incoming.messages().len() < previous_without_transient.len()
468 {
469 return Ok(false);
470 }
471 let previous_revision =
472 transcript_messages_digest(&previous_without_transient).map_err(SessionStoreError::from)?;
473 let incoming_prefix_revision =
474 transcript_messages_digest(&incoming.messages()[..previous_without_transient.len()])
475 .map_err(SessionStoreError::from)?;
476 Ok(previous_revision == incoming_prefix_revision)
477}
478
479fn is_transient_system_notice(message: &Message) -> bool {
480 let Message::SystemNotice(notice) = message else {
481 return false;
482 };
483 notice.kind == crate::types::SystemNoticeKind::McpPending
484 && notice.blocks.iter().all(|block| {
485 matches!(
486 block,
487 crate::types::SystemNoticeBlock::Mcp {
488 persisted: false,
489 ..
490 }
491 )
492 })
493}
494
495pub fn run_boundary_snapshot_save_guard(
504 incoming: &Session,
505 previous: Option<&Session>,
506) -> Result<(), SessionStoreError> {
507 match append_only_save_guard(incoming, previous) {
508 Ok(()) => Ok(()),
509 Err(append_error) => {
510 let Some(previous) = previous else {
511 return Err(append_error);
512 };
513 let incoming_revision =
514 transcript_messages_digest(incoming.messages()).map_err(SessionStoreError::from)?;
515 let Some(state) = incoming.transcript_history_state().map_err(|err| {
516 SessionStoreError::InvalidTranscriptRewrite {
517 id: incoming.id().clone(),
518 reason: format!("incoming transcript history state is malformed: {err}"),
519 }
520 })?
521 else {
522 return Err(append_error);
523 };
524 validate_rewrite_save_retains_previous_commits(incoming, previous, &state)?;
525 let commits = find_transcript_rewrite_commit_chain_extending_session(
526 &state,
527 previous,
528 &incoming_revision,
529 )?;
530 let Some(commits) = commits else {
531 return Err(append_error);
532 };
533 let Some(commit) = commits.first() else {
534 return Err(append_error);
535 };
536 transcript_rewrite_bridge_save_guard(incoming, commit, &state, &incoming_revision)?;
537 for commit in commits.iter().skip(1) {
538 validate_transcript_rewrite_commit_bodies(incoming, commit, &state)?;
539 }
540 Ok(())
541 }
542 }
543}
544
545pub fn find_transcript_rewrite_commit_extending<'a>(
548 state: &'a TranscriptHistoryState,
549 previous_revision: &str,
550 incoming_revision: &str,
551) -> Option<&'a TranscriptRewriteCommit> {
552 find_transcript_rewrite_commit_chain_extending(state, previous_revision, incoming_revision)
553 .and_then(|commits| commits.into_iter().next())
554}
555
556pub fn find_transcript_rewrite_commit_chain_extending<'a>(
559 state: &'a TranscriptHistoryState,
560 previous_revision: &str,
561 incoming_revision: &str,
562) -> Option<Vec<&'a TranscriptRewriteCommit>> {
563 let mut chain = Vec::new();
564 let mut cursor = previous_revision;
565 let mut visited = std::collections::BTreeSet::new();
566 loop {
567 if incoming_revision == cursor {
568 return Some(chain);
569 }
570 if !visited.insert(cursor.to_string()) {
571 return None;
572 }
573 let commit = state.commits.iter().find(|commit| {
574 (commit.parent_revision == cursor
575 || transcript_history_revision_extends(state, &commit.parent_revision, cursor))
576 && transcript_history_revision_extends(state, incoming_revision, &commit.revision)
577 });
578 let Some(commit) = commit else {
579 return transcript_history_revision_extends(state, incoming_revision, cursor)
580 .then_some(chain);
581 };
582 cursor = &commit.revision;
583 chain.push(commit);
584 }
585}
586
587pub fn find_transcript_rewrite_commit_chain_extending_session<'a>(
596 state: &'a TranscriptHistoryState,
597 previous: &Session,
598 incoming_revision: &str,
599) -> Result<Option<Vec<&'a TranscriptRewriteCommit>>, SessionStoreError> {
600 let previous_revision =
601 transcript_messages_digest(previous.messages()).map_err(SessionStoreError::from)?;
602 let mut chain = Vec::new();
603 let mut cursor = previous_revision.as_str();
604 let mut visited = std::collections::BTreeSet::new();
605 loop {
606 if incoming_revision == cursor {
607 return Ok(Some(chain));
608 }
609 if !visited.insert(cursor.to_string()) {
610 return Ok(None);
611 }
612
613 let Some(cursor_messages) = transcript_history_messages_for_revision(
614 state,
615 cursor,
616 &previous_revision,
617 previous.messages(),
618 ) else {
619 return Ok(None);
620 };
621 let mut selected = None;
622 for commit in &state.commits {
623 if !transcript_history_revision_extends(state, incoming_revision, &commit.revision) {
624 continue;
625 }
626 let parent_extends_cursor = commit.parent_revision == cursor
627 || revision_body_preserves_append_continuation_prefix(
628 state,
629 &commit.parent_revision,
630 cursor_messages,
631 cursor,
632 )?;
633 if parent_extends_cursor {
634 selected = Some(commit);
635 break;
636 }
637 }
638
639 let Some(commit) = selected else {
640 if revision_body_preserves_append_continuation_prefix(
641 state,
642 incoming_revision,
643 cursor_messages,
644 cursor,
645 )? {
646 return Ok(Some(chain));
647 }
648 return Ok(None);
649 };
650 cursor = &commit.revision;
651 chain.push(commit);
652 }
653}
654
655fn transcript_history_messages_for_revision<'a>(
656 state: &'a TranscriptHistoryState,
657 revision: &str,
658 previous_revision: &str,
659 previous_messages: &'a [Message],
660) -> Option<&'a [Message]> {
661 if revision == previous_revision {
662 return Some(previous_messages);
663 }
664 state
665 .revisions
666 .iter()
667 .find(|body| body.revision == revision)
668 .map(|body| body.messages.as_slice())
669}
670
671fn revision_body_preserves_append_continuation_prefix(
672 state: &TranscriptHistoryState,
673 revision: &str,
674 ancestor_messages: &[Message],
675 ancestor_revision: &str,
676) -> Result<bool, SessionStoreError> {
677 if revision == ancestor_revision {
678 return Ok(true);
679 }
680 let Some(body) = state
681 .revisions
682 .iter()
683 .find(|body| body.revision == revision)
684 else {
685 return Ok(false);
686 };
687 if body.messages.len() >= ancestor_messages.len() {
688 let prefix_revision = transcript_messages_digest(&body.messages[..ancestor_messages.len()])
689 .map_err(SessionStoreError::from)?;
690 if prefix_revision == ancestor_revision {
691 return Ok(true);
692 }
693 }
694 Ok(
695 messages_preserve_conversation_tail_with_system_context_append(
696 &body.messages,
697 ancestor_messages,
698 )? || messages_preserve_tail_after_leading_system_refresh(
699 &body.messages,
700 ancestor_messages,
701 )?,
702 )
703}
704
705fn messages_preserve_tail_after_leading_system_refresh(
706 incoming: &[Message],
707 previous: &[Message],
708) -> Result<bool, SessionStoreError> {
709 let (Some(Message::System(_)), Some(Message::System(_))) = (incoming.first(), previous.first())
710 else {
711 return Ok(false);
712 };
713 if incoming.len() < previous.len() {
714 return Ok(false);
715 }
716 let previous_tail_len = previous.len().saturating_sub(1);
717 if previous_tail_len == 0 {
718 return Ok(true);
719 }
720 let previous_tail_revision =
721 transcript_messages_digest(&previous[1..]).map_err(SessionStoreError::from)?;
722 let incoming_tail = &incoming[1..];
723 if incoming_tail.len() < previous_tail_len {
724 return Ok(false);
725 }
726 let incoming_tail_prefix_revision =
727 transcript_messages_digest(&incoming_tail[..previous_tail_len])
728 .map_err(SessionStoreError::from)?;
729 Ok(incoming_tail_prefix_revision == previous_tail_revision)
730}
731
732fn transcript_history_revision_extends(
733 state: &TranscriptHistoryState,
734 descendant: &str,
735 ancestor: &str,
736) -> bool {
737 if descendant == ancestor {
738 return true;
739 }
740 let mut cursor = descendant;
741 while let Some(body) = state.revisions.iter().find(|body| body.revision == cursor) {
742 let Some(parent) = body.parent_revision.as_deref() else {
743 return false;
744 };
745 if parent == ancestor {
746 return true;
747 }
748 cursor = parent;
749 }
750 false
751}
752
753fn transcript_rewrite_bridge_save_guard(
754 incoming: &Session,
755 commit: &TranscriptRewriteCommit,
756 incoming_state: &TranscriptHistoryState,
757 incoming_message_digest: &str,
758) -> Result<(), SessionStoreError> {
759 validate_transcript_rewrite_commit_bodies(incoming, commit, incoming_state)?;
760 if incoming_state.head != incoming_message_digest {
761 return Err(SessionStoreError::InvalidTranscriptRewrite {
762 id: incoming.id().clone(),
763 reason: format!(
764 "incoming transcript graph head {} does not match current message digest {incoming_message_digest}",
765 incoming_state.head
766 ),
767 });
768 }
769 if !transcript_history_revision_extends(
770 incoming_state,
771 incoming_message_digest,
772 &commit.revision,
773 ) {
774 return Err(SessionStoreError::InvalidTranscriptRewrite {
775 id: incoming.id().clone(),
776 reason: format!(
777 "incoming transcript head {incoming_message_digest} does not extend rewrite revision {}",
778 commit.revision
779 ),
780 });
781 }
782 Ok(())
783}
784
785pub fn transcript_rewrite_save_guard(
788 incoming: &Session,
789 previous: Option<&Session>,
790 commit: &TranscriptRewriteCommit,
791) -> Result<(), SessionStoreError> {
792 incoming
793 .validate_transcript_history_state()
794 .map_err(|err| SessionStoreError::InvalidTranscriptRewrite {
795 id: incoming.id().clone(),
796 reason: format!("incoming transcript history state is malformed: {err}"),
797 })?;
798 let Some(previous) = previous else {
799 return Err(SessionStoreError::InvalidTranscriptRewrite {
800 id: incoming.id().clone(),
801 reason: "rewrite target has no previously persisted session".to_string(),
802 });
803 };
804 if incoming.id() != previous.id() {
805 return Err(SessionStoreError::InvalidTranscriptRewrite {
806 id: incoming.id().clone(),
807 reason: format!(
808 "incoming session id {} differs from previous session id {}",
809 incoming.id(),
810 previous.id()
811 ),
812 });
813 }
814 let previous_revision = previous.transcript_revision().map_err(|err| {
815 SessionStoreError::InvalidTranscriptRewrite {
816 id: incoming.id().clone(),
817 reason: format!("previous transcript revision is malformed: {err}"),
818 }
819 })?;
820 if previous_revision != commit.parent_revision {
821 return Err(SessionStoreError::TranscriptRevisionConflict {
822 id: incoming.id().clone(),
823 expected: commit.parent_revision.clone(),
824 actual: previous_revision,
825 });
826 }
827 let previous_message_digest =
828 transcript_messages_digest(previous.messages()).map_err(|err| {
829 SessionStoreError::InvalidTranscriptRewrite {
830 id: incoming.id().clone(),
831 reason: format!("previous current transcript is not digestible: {err}"),
832 }
833 })?;
834 if previous_message_digest != commit.parent_revision {
835 return Err(SessionStoreError::InvalidTranscriptRewrite {
836 id: incoming.id().clone(),
837 reason: format!(
838 "previous current transcript digest {previous_message_digest} does not match commit parent {}",
839 commit.parent_revision
840 ),
841 });
842 }
843 let incoming_revision = incoming.transcript_revision().map_err(|err| {
844 SessionStoreError::InvalidTranscriptRewrite {
845 id: incoming.id().clone(),
846 reason: format!("incoming transcript revision is malformed: {err}"),
847 }
848 })?;
849 if incoming_revision != commit.revision {
850 return Err(SessionStoreError::InvalidTranscriptRewrite {
851 id: incoming.id().clone(),
852 reason: format!(
853 "incoming transcript revision {incoming_revision} does not match commit revision {}",
854 commit.revision
855 ),
856 });
857 }
858 let incoming_message_digest =
859 transcript_messages_digest(incoming.messages()).map_err(|err| {
860 SessionStoreError::InvalidTranscriptRewrite {
861 id: incoming.id().clone(),
862 reason: format!("incoming current transcript is not digestible: {err}"),
863 }
864 })?;
865 if incoming_message_digest != commit.revision {
866 return Err(SessionStoreError::InvalidTranscriptRewrite {
867 id: incoming.id().clone(),
868 reason: format!(
869 "incoming current transcript digest {incoming_message_digest} does not match commit revision {}",
870 commit.revision
871 ),
872 });
873 }
874 let Some(incoming_state) = incoming.transcript_history_state().map_err(|err| {
875 SessionStoreError::InvalidTranscriptRewrite {
876 id: incoming.id().clone(),
877 reason: format!("incoming transcript history state is malformed: {err}"),
878 }
879 })?
880 else {
881 return Err(SessionStoreError::InvalidTranscriptRewrite {
882 id: incoming.id().clone(),
883 reason: "incoming rewrite did not persist a transcript revision graph".to_string(),
884 });
885 };
886 validate_rewrite_save_retains_previous_commits(incoming, previous, &incoming_state)?;
887 validate_transcript_rewrite_commit_bodies(incoming, commit, &incoming_state)
888}
889
890fn validate_transcript_rewrite_commit_bodies(
891 incoming: &Session,
892 commit: &TranscriptRewriteCommit,
893 incoming_state: &TranscriptHistoryState,
894) -> Result<(), SessionStoreError> {
895 if !incoming_state
896 .commits
897 .iter()
898 .any(|persisted| persisted == commit)
899 {
900 return Err(SessionStoreError::InvalidTranscriptRewrite {
901 id: incoming.id().clone(),
902 reason: format!(
903 "incoming rewrite did not persist the rewrite commit in the transcript graph (wanted {} -> {}, graph commits: {:?})",
904 commit.parent_revision,
905 commit.revision,
906 incoming_state
907 .commits
908 .iter()
909 .map(|commit| (&commit.parent_revision, &commit.revision))
910 .collect::<Vec<_>>()
911 ),
912 });
913 }
914 let Some(parent_body) = incoming_state
915 .revisions
916 .iter()
917 .find(|body| body.revision == commit.parent_revision)
918 else {
919 return Err(SessionStoreError::InvalidTranscriptRewrite {
920 id: incoming.id().clone(),
921 reason: format!(
922 "incoming rewrite omitted parent revision body {}",
923 commit.parent_revision
924 ),
925 });
926 };
927 let Some(revision_body) = incoming_state
928 .revisions
929 .iter()
930 .find(|body| body.revision == commit.revision)
931 else {
932 return Err(SessionStoreError::InvalidTranscriptRewrite {
933 id: incoming.id().clone(),
934 reason: format!(
935 "incoming rewrite omitted new revision body {}",
936 commit.revision
937 ),
938 });
939 };
940 if parent_body.messages.len() != commit.messages_before
941 || revision_body.messages.len() != commit.messages_after
942 {
943 return Err(SessionStoreError::InvalidTranscriptRewrite {
944 id: incoming.id().clone(),
945 reason: format!(
946 "commit message counts {} -> {} do not match persisted rewrite {} -> {}",
947 commit.messages_before,
948 commit.messages_after,
949 parent_body.messages.len(),
950 revision_body.messages.len()
951 ),
952 });
953 }
954 let parent_body_revision =
955 transcript_messages_digest(&parent_body.messages).map_err(|err| {
956 SessionStoreError::InvalidTranscriptRewrite {
957 id: incoming.id().clone(),
958 reason: format!("parent revision body is not digestible: {err}"),
959 }
960 })?;
961 if parent_body_revision != commit.parent_revision {
962 return Err(SessionStoreError::InvalidTranscriptRewrite {
963 id: incoming.id().clone(),
964 reason: format!(
965 "parent revision body digest {parent_body_revision} does not match commit parent {}",
966 commit.parent_revision
967 ),
968 });
969 }
970 let (start, end) = match &commit.selection {
971 TranscriptRewriteSelection::MessageRange { start, end } => (*start, *end),
972 };
973 if start > end || end > parent_body.messages.len() {
974 return Err(SessionStoreError::InvalidTranscriptRewrite {
975 id: incoming.id().clone(),
976 reason: format!(
977 "commit selection {start}..{end} is invalid for parent revision with {} messages",
978 parent_body.messages.len()
979 ),
980 });
981 }
982 let original_span_digest = transcript_messages_digest(&parent_body.messages[start..end])
983 .map_err(|err| SessionStoreError::InvalidTranscriptRewrite {
984 id: incoming.id().clone(),
985 reason: format!("original span body is not digestible: {err}"),
986 })?;
987 if original_span_digest != commit.original_span_digest {
988 return Err(SessionStoreError::InvalidTranscriptRewrite {
989 id: incoming.id().clone(),
990 reason: format!(
991 "original span digest {original_span_digest} does not match commit digest {}",
992 commit.original_span_digest
993 ),
994 });
995 }
996 let revision_body_digest =
997 transcript_messages_digest(&revision_body.messages).map_err(|err| {
998 SessionStoreError::InvalidTranscriptRewrite {
999 id: incoming.id().clone(),
1000 reason: format!("new revision body is not digestible: {err}"),
1001 }
1002 })?;
1003 if revision_body_digest != commit.revision {
1004 return Err(SessionStoreError::InvalidTranscriptRewrite {
1005 id: incoming.id().clone(),
1006 reason: format!(
1007 "new revision body digest {revision_body_digest} does not match commit revision {}",
1008 commit.revision
1009 ),
1010 });
1011 }
1012 let removed_len = end - start;
1013 let retained_len = commit
1014 .messages_before
1015 .checked_sub(removed_len)
1016 .ok_or_else(|| SessionStoreError::InvalidTranscriptRewrite {
1017 id: incoming.id().clone(),
1018 reason: "commit removed more messages than it recorded before rewrite".to_string(),
1019 })?;
1020 let replacement_len = commit
1021 .messages_after
1022 .checked_sub(retained_len)
1023 .ok_or_else(|| SessionStoreError::InvalidTranscriptRewrite {
1024 id: incoming.id().clone(),
1025 reason: "commit message counts cannot describe a replacement span".to_string(),
1026 })?;
1027 let replacement_end = start.checked_add(replacement_len).ok_or_else(|| {
1028 SessionStoreError::InvalidTranscriptRewrite {
1029 id: incoming.id().clone(),
1030 reason: "replacement span end overflowed".to_string(),
1031 }
1032 })?;
1033 if replacement_end > revision_body.messages.len() {
1034 return Err(SessionStoreError::InvalidTranscriptRewrite {
1035 id: incoming.id().clone(),
1036 reason: format!(
1037 "replacement span {start}..{replacement_end} is invalid for revision with {} messages",
1038 revision_body.messages.len()
1039 ),
1040 });
1041 }
1042 let parent_prefix_digest =
1043 transcript_messages_digest(&parent_body.messages[..start]).map_err(|err| {
1044 SessionStoreError::InvalidTranscriptRewrite {
1045 id: incoming.id().clone(),
1046 reason: format!("parent prefix body is not digestible: {err}"),
1047 }
1048 })?;
1049 let revision_prefix_digest = transcript_messages_digest(&revision_body.messages[..start])
1050 .map_err(|err| SessionStoreError::InvalidTranscriptRewrite {
1051 id: incoming.id().clone(),
1052 reason: format!("revision prefix body is not digestible: {err}"),
1053 })?;
1054 if parent_prefix_digest != revision_prefix_digest {
1055 return Err(SessionStoreError::InvalidTranscriptRewrite {
1056 id: incoming.id().clone(),
1057 reason: "rewrite revision changed messages before the selected span".to_string(),
1058 });
1059 }
1060 let parent_suffix_digest =
1061 transcript_messages_digest(&parent_body.messages[end..]).map_err(|err| {
1062 SessionStoreError::InvalidTranscriptRewrite {
1063 id: incoming.id().clone(),
1064 reason: format!("parent suffix body is not digestible: {err}"),
1065 }
1066 })?;
1067 let revision_suffix_digest =
1068 transcript_messages_digest(&revision_body.messages[replacement_end..]).map_err(|err| {
1069 SessionStoreError::InvalidTranscriptRewrite {
1070 id: incoming.id().clone(),
1071 reason: format!("revision suffix body is not digestible: {err}"),
1072 }
1073 })?;
1074 if parent_suffix_digest != revision_suffix_digest {
1075 return Err(SessionStoreError::InvalidTranscriptRewrite {
1076 id: incoming.id().clone(),
1077 reason: "rewrite revision changed messages after the selected span".to_string(),
1078 });
1079 }
1080 let replacement_digest = transcript_messages_digest(
1081 &revision_body.messages[start..replacement_end],
1082 )
1083 .map_err(|err| SessionStoreError::InvalidTranscriptRewrite {
1084 id: incoming.id().clone(),
1085 reason: format!("replacement span body is not digestible: {err}"),
1086 })?;
1087 if replacement_digest != commit.replacement_digest {
1088 return Err(SessionStoreError::InvalidTranscriptRewrite {
1089 id: incoming.id().clone(),
1090 reason: format!(
1091 "replacement span digest {replacement_digest} does not match commit digest {}",
1092 commit.replacement_digest
1093 ),
1094 });
1095 }
1096 Ok(())
1097}
1098
1099impl From<serde_json::Error> for SessionStoreError {
1100 fn from(e: serde_json::Error) -> Self {
1101 Self::Serialization(e.to_string())
1102 }
1103}
1104
1105#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
1130#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
1131pub trait SessionStore: Send + Sync {
1132 async fn save(&self, session: &Session) -> Result<(), SessionStoreError>;
1138
1139 async fn save_transcript_rewrite(
1145 &self,
1146 session: &Session,
1147 commit: &TranscriptRewriteCommit,
1148 ) -> Result<(), SessionStoreError> {
1149 let _ = (session, commit);
1150 Err(SessionStoreError::Internal(
1151 "save_transcript_rewrite is not supported by this SessionStore".to_string(),
1152 ))
1153 }
1154
1155 async fn save_authoritative_projection(
1164 &self,
1165 session: &Session,
1166 ) -> Result<(), SessionStoreError> {
1167 self.save(session).await
1168 }
1169
1170 async fn save_authoritative_projection_if_current_revision(
1173 &self,
1174 session: &Session,
1175 expected_current_revision: Option<String>,
1176 ) -> Result<(), SessionStoreError> {
1177 let _ = (session, expected_current_revision);
1178 Err(SessionStoreError::Internal(
1179 "save_authoritative_projection_if_current_revision is not supported by this SessionStore"
1180 .to_string(),
1181 ))
1182 }
1183
1184 async fn load(&self, id: &SessionId) -> Result<Option<Session>, SessionStoreError>;
1186
1187 async fn list(&self, filter: SessionFilter) -> Result<Vec<SessionMeta>, SessionStoreError>;
1189
1190 async fn delete(&self, id: &SessionId) -> Result<(), SessionStoreError>;
1192
1193 async fn delete_if_current_revision(
1196 &self,
1197 id: &SessionId,
1198 expected_current_revision: &str,
1199 ) -> Result<bool, SessionStoreError>;
1200
1201 async fn exists(&self, id: &SessionId) -> Result<bool, SessionStoreError> {
1203 Ok(self.load(id).await?.is_some())
1204 }
1205}
1206
1207#[cfg(test)]
1208mod tests {
1209 use super::*;
1210 use crate::types::{
1211 AssistantMessage, BlockAssistantMessage, StopReason, SystemMessage, SystemNoticeBlock,
1212 SystemNoticeKind, SystemNoticeMessage, Usage, UserMessage,
1213 };
1214
1215 #[test]
1216 fn append_only_guard_rejects_leading_system_message_replacement() {
1217 let mut previous = Session::new();
1218 previous.push(Message::System(SystemMessage::new("original system")));
1219 previous.push(Message::User(UserMessage::text("hello".to_string())));
1220
1221 let mut incoming = previous.clone();
1222 let rewrite_result = incoming.replace_messages_internal(
1223 vec![
1224 Message::System(SystemMessage::new("rewritten system")),
1225 Message::User(UserMessage::text("hello".to_string())),
1226 ],
1227 crate::TranscriptRewriteReason::new("unit-test"),
1228 );
1229 assert!(
1230 rewrite_result.is_ok(),
1231 "typed rewrite should be constructible: {rewrite_result:?}"
1232 );
1233
1234 assert!(matches!(
1235 append_only_save_guard(&incoming, Some(&previous)),
1236 Err(SessionStoreError::TranscriptContinuityViolation { .. })
1237 ));
1238 }
1239
1240 #[test]
1241 fn append_only_guard_accepts_runtime_system_context_append() {
1242 let mut previous = Session::new();
1243 previous.push(Message::System(SystemMessage::new("base system")));
1244 previous.push(Message::User(UserMessage::text("hello".to_string())));
1245
1246 let mut incoming = previous.clone();
1247 incoming.set_system_prompt(format!(
1248 "base system{SYSTEM_CONTEXT_SEPARATOR}[Runtime System Context]\nsource: unit-test\n\nextra context"
1249 ));
1250
1251 assert!(append_only_save_guard(&incoming, Some(&previous)).is_ok());
1252 }
1253
1254 #[test]
1255 fn append_only_guard_accepts_system_timestamp_refresh_without_content_change() {
1256 let mut previous = Session::new();
1257 previous.push(Message::System(SystemMessage::new("base system")));
1258
1259 let mut incoming = previous.clone();
1260 incoming.set_system_prompt("base system".to_string());
1261
1262 assert!(append_only_save_guard(&incoming, Some(&previous)).is_ok());
1263 }
1264
1265 #[test]
1266 fn run_boundary_guard_accepts_compaction_after_uncheckpointed_runtime_append()
1267 -> Result<(), Box<dyn std::error::Error>> {
1268 let mut previous = Session::new();
1269 previous.push(Message::System(SystemMessage::new("base system")));
1270 previous.push(Message::User(UserMessage::text("turn one".to_string())));
1271 previous.push(Message::Assistant(AssistantMessage {
1272 content: "answer one".to_string(),
1273 tool_calls: Vec::new(),
1274 stop_reason: StopReason::EndTurn,
1275 usage: Usage::default(),
1276 created_at: crate::types::message_timestamp_now(),
1277 }));
1278
1279 let mut parent = previous.clone();
1280 parent.set_system_prompt("refreshed runtime system projection".to_string());
1281 parent.push(Message::User(UserMessage::text(
1282 "runtime-only turn".to_string(),
1283 )));
1284 parent.push(Message::Assistant(AssistantMessage {
1285 content: "runtime-only answer".to_string(),
1286 tool_calls: Vec::new(),
1287 stop_reason: StopReason::EndTurn,
1288 usage: Usage::default(),
1289 created_at: crate::types::message_timestamp_now(),
1290 }));
1291 let parent_revision = parent.transcript_revision()?;
1292
1293 let mut incoming = parent.clone();
1294 let mut replacement = vec![
1295 parent.messages()[0].clone(),
1296 Message::User(UserMessage::text("[Context compacted] summary".to_string())),
1297 ];
1298 replacement.extend_from_slice(&parent.messages()[1..]);
1299 incoming.commit_transcript_rewrite(
1300 TranscriptRewriteSelection::MessageRange {
1301 start: 0,
1302 end: parent.messages().len(),
1303 },
1304 replacement,
1305 crate::TranscriptRewriteReason::new("compaction"),
1306 Some("meerkat-core".to_string()),
1307 Some(parent_revision),
1308 )?;
1309
1310 assert!(matches!(
1311 append_only_save_guard(&incoming, Some(&previous)),
1312 Err(SessionStoreError::TranscriptContinuityViolation { .. })
1313 ));
1314 assert!(run_boundary_snapshot_save_guard(&incoming, Some(&previous)).is_ok());
1315 Ok(())
1316 }
1317
1318 #[test]
1319 fn run_boundary_guard_accepts_compaction_with_retained_tail_window()
1320 -> Result<(), Box<dyn std::error::Error>> {
1321 let mut previous = Session::new();
1322 previous.push(Message::System(SystemMessage::new("base system")));
1323 previous.push(Message::User(UserMessage::text("turn one".to_string())));
1324 previous.push(Message::BlockAssistant(BlockAssistantMessage::new(
1325 vec![crate::types::AssistantBlock::Text {
1326 text: "answer one".to_string(),
1327 meta: None,
1328 }],
1329 StopReason::EndTurn,
1330 )));
1331
1332 let mut parent = previous.clone();
1333 parent.set_system_prompt("refreshed runtime system projection".to_string());
1334 parent.push(Message::SystemNotice(SystemNoticeMessage::new(
1335 SystemNoticeKind::Comms,
1336 "peer response queued",
1337 )));
1338 let parent_revision = parent.transcript_revision()?;
1339
1340 let mut incoming = parent.clone();
1341 let mut replacement = vec![
1342 parent.messages()[0].clone(),
1343 Message::User(UserMessage::text("[Context compacted] summary".to_string())),
1344 ];
1345 replacement.extend_from_slice(&parent.messages()[1..]);
1346 incoming.commit_transcript_rewrite(
1347 TranscriptRewriteSelection::MessageRange {
1348 start: 0,
1349 end: parent.messages().len(),
1350 },
1351 replacement,
1352 crate::TranscriptRewriteReason::new("compaction"),
1353 Some("meerkat-core".to_string()),
1354 Some(parent_revision),
1355 )?;
1356
1357 assert!(matches!(
1358 append_only_save_guard(&incoming, Some(&previous)),
1359 Err(SessionStoreError::TranscriptContinuityViolation { .. })
1360 ));
1361 assert!(run_boundary_snapshot_save_guard(&incoming, Some(&previous)).is_ok());
1362 Ok(())
1363 }
1364
1365 #[test]
1366 fn run_boundary_guard_rejects_commitless_history_parent_edge()
1367 -> Result<(), Box<dyn std::error::Error>> {
1368 let mut previous = Session::new();
1369 previous.push(Message::System(SystemMessage::new("base system")));
1370 previous.push(Message::User(UserMessage::text("turn one".to_string())));
1371 let previous_revision = previous.transcript_revision()?;
1372
1373 let mut incoming = previous.clone();
1374 incoming.set_system_prompt("forged replacement system".to_string());
1375 let incoming_revision = incoming.transcript_revision()?;
1376 let history = TranscriptHistoryState {
1377 head: incoming_revision.clone(),
1378 commits: Vec::new(),
1379 revisions: vec![
1380 crate::TranscriptRevisionBody {
1381 revision: previous_revision,
1382 parent_revision: None,
1383 messages: previous.messages().to_vec(),
1384 created_at: previous.updated_at(),
1385 },
1386 crate::TranscriptRevisionBody {
1387 revision: incoming_revision,
1388 parent_revision: Some(previous.transcript_revision()?),
1389 messages: incoming.messages().to_vec(),
1390 created_at: incoming.updated_at(),
1391 },
1392 ],
1393 };
1394 incoming.set_metadata(
1395 crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1396 serde_json::to_value(history)?,
1397 );
1398
1399 assert!(matches!(
1400 append_only_save_guard(&incoming, Some(&previous)),
1401 Err(SessionStoreError::TranscriptContinuityViolation { .. })
1402 ));
1403 assert!(matches!(
1404 run_boundary_snapshot_save_guard(&incoming, Some(&previous)),
1405 Err(SessionStoreError::TranscriptContinuityViolation { .. }
1406 | SessionStoreError::MonotonicityViolation { .. })
1407 ));
1408 Ok(())
1409 }
1410
1411 #[test]
1412 fn append_only_guard_rejects_history_head_that_does_not_match_current_messages()
1413 -> Result<(), Box<dyn std::error::Error>> {
1414 let mut previous = Session::new();
1415 previous.push(Message::User(UserMessage::text("persisted".to_string())));
1416
1417 let mut incoming = previous.clone();
1418 incoming.push(Message::User(UserMessage::text("append".to_string())));
1419 let poisoned_messages = vec![Message::User(UserMessage::text(
1420 "unrelated poisoned history".to_string(),
1421 ))];
1422 let poisoned_revision = transcript_messages_digest(&poisoned_messages)?;
1423 incoming.set_metadata(
1424 crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1425 serde_json::to_value(TranscriptHistoryState {
1426 head: poisoned_revision.clone(),
1427 commits: Vec::new(),
1428 revisions: vec![crate::TranscriptRevisionBody {
1429 revision: poisoned_revision,
1430 parent_revision: None,
1431 messages: poisoned_messages,
1432 created_at: incoming.updated_at(),
1433 }],
1434 })?,
1435 );
1436
1437 assert!(matches!(
1438 append_only_save_guard(&incoming, Some(&previous)),
1439 Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1440 ));
1441 assert!(matches!(
1442 append_only_save_guard(&incoming, None),
1443 Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1444 ));
1445 assert!(matches!(
1446 run_boundary_snapshot_save_guard(&incoming, Some(&previous)),
1447 Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1448 ));
1449 Ok(())
1450 }
1451
1452 #[test]
1453 fn append_only_guard_rejects_new_rewrite_commits_on_plain_append()
1454 -> Result<(), Box<dyn std::error::Error>> {
1455 let mut previous = Session::new();
1456 previous.push(Message::User(UserMessage::text("persisted".to_string())));
1457 let previous_revision = previous.transcript_revision()?;
1458
1459 let mut incoming = previous.clone();
1460 let appended = Message::Assistant(AssistantMessage {
1461 content: "plain append".to_string(),
1462 tool_calls: Vec::new(),
1463 stop_reason: StopReason::EndTurn,
1464 usage: Usage::default(),
1465 created_at: crate::types::message_timestamp_now(),
1466 });
1467 incoming.commit_transcript_rewrite(
1468 TranscriptRewriteSelection::MessageRange { start: 1, end: 1 },
1469 vec![appended],
1470 crate::TranscriptRewriteReason::new("forged-append"),
1471 Some("unit-test".to_string()),
1472 Some(previous_revision),
1473 )?;
1474
1475 assert!(matches!(
1476 append_only_save_guard(&incoming, Some(&previous)),
1477 Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1478 ));
1479 Ok(())
1480 }
1481
1482 #[test]
1483 fn append_only_guard_rejects_first_save_with_rewrite_commits()
1484 -> Result<(), Box<dyn std::error::Error>> {
1485 let mut incoming = Session::new();
1486 incoming.push(Message::User(UserMessage::text("seed".to_string())));
1487 let parent_messages = incoming.messages().to_vec();
1488 let parent_updated_at = incoming.updated_at();
1489 let parent_revision = incoming.transcript_revision()?;
1490 let commit = incoming.commit_transcript_rewrite(
1491 TranscriptRewriteSelection::MessageRange { start: 0, end: 1 },
1492 vec![Message::User(UserMessage::text(
1493 "compacted seed".to_string(),
1494 ))],
1495 crate::TranscriptRewriteReason::new("compaction"),
1496 Some("meerkat-core".to_string()),
1497 Some(parent_revision),
1498 )?;
1499 let incoming_revision = incoming.transcript_revision()?;
1500 let commit_parent_revision = commit.parent_revision.clone();
1501 incoming.set_metadata(
1502 crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1503 serde_json::to_value(TranscriptHistoryState {
1504 head: incoming_revision.clone(),
1505 commits: vec![commit],
1506 revisions: vec![
1507 crate::TranscriptRevisionBody {
1508 revision: commit_parent_revision.clone(),
1509 parent_revision: None,
1510 messages: parent_messages,
1511 created_at: parent_updated_at,
1512 },
1513 crate::TranscriptRevisionBody {
1514 revision: incoming_revision,
1515 parent_revision: Some(commit_parent_revision),
1516 messages: incoming.messages().to_vec(),
1517 created_at: incoming.updated_at(),
1518 },
1519 ],
1520 })?,
1521 );
1522
1523 assert!(matches!(
1524 append_only_save_guard(&incoming, None),
1525 Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1526 ));
1527 Ok(())
1528 }
1529
1530 #[test]
1531 fn transcript_rewrite_guard_rejects_poisoned_history_graph()
1532 -> Result<(), Box<dyn std::error::Error>> {
1533 let mut previous = Session::new();
1534 previous.push(Message::User(UserMessage::text("persisted".to_string())));
1535 let parent_revision = previous.transcript_revision()?;
1536
1537 let mut first = previous.clone();
1538 let first_commit = first.commit_transcript_rewrite(
1539 TranscriptRewriteSelection::MessageRange { start: 0, end: 1 },
1540 vec![Message::User(UserMessage::text(
1541 "compacted persisted".to_string(),
1542 ))],
1543 crate::TranscriptRewriteReason::new("compaction"),
1544 Some("unit-test".to_string()),
1545 Some(parent_revision),
1546 )?;
1547 let first_snapshot = first.clone();
1548
1549 first.commit_transcript_rewrite(
1550 TranscriptRewriteSelection::MessageRange { start: 0, end: 1 },
1551 vec![Message::User(UserMessage::text(
1552 "uncommitted poisoned fork".to_string(),
1553 ))],
1554 crate::TranscriptRewriteReason::new("poison"),
1555 Some("unit-test".to_string()),
1556 Some(first_commit.revision.clone()),
1557 )?;
1558 let mut poisoned_state = first
1559 .transcript_history_state()?
1560 .ok_or_else(|| "second rewrite should retain history state".to_string())?;
1561 poisoned_state.head = first_commit.revision.clone();
1562
1563 let mut poisoned = first_snapshot;
1564 poisoned.set_metadata(
1565 crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1566 serde_json::to_value(poisoned_state)?,
1567 );
1568
1569 assert!(matches!(
1570 transcript_rewrite_save_guard(&poisoned, Some(&previous), &first_commit),
1571 Err(SessionStoreError::InvalidTranscriptRewrite { reason, .. })
1572 if reason.contains("incoming transcript history state is malformed")
1573 ));
1574 Ok(())
1575 }
1576
1577 #[test]
1578 fn authoritative_projection_guard_rejects_changed_persisted_revision()
1579 -> Result<(), Box<dyn std::error::Error>> {
1580 let mut previous = Session::new();
1581 previous.push(Message::User(UserMessage::text("persisted A".to_string())));
1582 let expected_revision = previous.transcript_revision()?;
1583
1584 let mut current = previous.clone();
1585 current.push(Message::Assistant(AssistantMessage {
1586 content: "persisted B".to_string(),
1587 tool_calls: Vec::new(),
1588 stop_reason: StopReason::EndTurn,
1589 usage: Usage::default(),
1590 created_at: crate::types::message_timestamp_now(),
1591 }));
1592 let mut incoming = previous.clone();
1593 incoming.push(Message::User(UserMessage::text(
1594 "incoming from A".to_string(),
1595 )));
1596
1597 assert!(matches!(
1598 authoritative_projection_current_revision_guard(
1599 &incoming,
1600 Some(¤t),
1601 Some(&expected_revision)
1602 ),
1603 Err(SessionStoreError::TranscriptContinuityViolation { .. })
1604 ));
1605 Ok(())
1606 }
1607
1608 #[test]
1609 fn append_only_guard_rejects_rewrite_commits_on_first_save()
1610 -> Result<(), Box<dyn std::error::Error>> {
1611 let mut incoming = Session::new();
1612 incoming.push(Message::User(UserMessage::text("persisted".to_string())));
1613 let parent_revision = incoming.transcript_revision()?;
1614 incoming.commit_transcript_rewrite(
1615 TranscriptRewriteSelection::MessageRange { start: 0, end: 1 },
1616 vec![Message::User(UserMessage::text("rewritten".to_string()))],
1617 crate::TranscriptRewriteReason::new("forged-first-save"),
1618 Some("unit-test".to_string()),
1619 Some(parent_revision),
1620 )?;
1621
1622 assert!(matches!(
1623 append_only_save_guard(&incoming, None),
1624 Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1625 ));
1626 Ok(())
1627 }
1628
1629 #[test]
1630 fn append_only_guard_rejects_commitless_history_on_first_save()
1631 -> Result<(), Box<dyn std::error::Error>> {
1632 let mut incoming = Session::new();
1633 incoming.push(Message::User(UserMessage::text("persisted".to_string())));
1634 let incoming_revision = incoming.transcript_revision()?;
1635 incoming.set_metadata(
1636 crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1637 serde_json::to_value(TranscriptHistoryState {
1638 head: incoming_revision.clone(),
1639 commits: Vec::new(),
1640 revisions: vec![crate::TranscriptRevisionBody {
1641 revision: incoming_revision,
1642 parent_revision: None,
1643 messages: incoming.messages().to_vec(),
1644 created_at: incoming.updated_at(),
1645 }],
1646 })?,
1647 );
1648
1649 assert!(matches!(
1650 append_only_save_guard(&incoming, None),
1651 Err(SessionStoreError::InvalidTranscriptRewrite { reason, .. })
1652 if reason.contains("first save would seed transcript history state")
1653 ));
1654 Ok(())
1655 }
1656
1657 #[test]
1658 fn append_only_guard_rejects_commitless_history_seed_on_plain_append()
1659 -> Result<(), Box<dyn std::error::Error>> {
1660 let mut previous = Session::new();
1661 previous.push(Message::User(UserMessage::text("persisted".to_string())));
1662 let previous_revision = previous.transcript_revision()?;
1663
1664 let mut incoming = previous.clone();
1665 incoming.push(Message::Assistant(AssistantMessage {
1666 content: "plain append".to_string(),
1667 tool_calls: Vec::new(),
1668 stop_reason: StopReason::EndTurn,
1669 usage: Usage::default(),
1670 created_at: crate::types::message_timestamp_now(),
1671 }));
1672 let incoming_revision = incoming.transcript_revision()?;
1673 incoming.set_metadata(
1674 crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1675 serde_json::to_value(TranscriptHistoryState {
1676 head: incoming_revision.clone(),
1677 commits: Vec::new(),
1678 revisions: vec![
1679 crate::TranscriptRevisionBody {
1680 revision: previous_revision,
1681 parent_revision: None,
1682 messages: previous.messages().to_vec(),
1683 created_at: previous.updated_at(),
1684 },
1685 crate::TranscriptRevisionBody {
1686 revision: incoming_revision,
1687 parent_revision: Some(previous.transcript_revision()?),
1688 messages: incoming.messages().to_vec(),
1689 created_at: incoming.updated_at(),
1690 },
1691 ],
1692 })?,
1693 );
1694
1695 assert!(matches!(
1696 append_only_save_guard(&incoming, Some(&previous)),
1697 Err(SessionStoreError::InvalidTranscriptRewrite { reason, .. })
1698 if reason.contains("append-only save would seed transcript history state")
1699 ));
1700 Ok(())
1701 }
1702
1703 #[test]
1704 fn append_only_guard_rejects_new_rewrite_commits_on_system_context_append()
1705 -> Result<(), Box<dyn std::error::Error>> {
1706 let mut previous = Session::new();
1707 previous.push(Message::System(SystemMessage::new("base system")));
1708 previous.push(Message::User(UserMessage::text("persisted".to_string())));
1709 let mut incoming = previous.clone();
1710 incoming.set_system_prompt(format!(
1711 "base system{SYSTEM_CONTEXT_SEPARATOR}[Runtime System Context]\nsource: unit-test\n\nextra context"
1712 ));
1713 incoming.push(Message::Assistant(AssistantMessage {
1714 content: "plain append".to_string(),
1715 tool_calls: Vec::new(),
1716 stop_reason: StopReason::EndTurn,
1717 usage: Usage::default(),
1718 created_at: crate::types::message_timestamp_now(),
1719 }));
1720 let incoming_revision = incoming.transcript_revision()?;
1721 incoming.set_metadata(
1722 crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1723 serde_json::to_value(TranscriptHistoryState {
1724 head: incoming_revision.clone(),
1725 commits: vec![TranscriptRewriteCommit {
1726 parent_revision: previous.transcript_revision()?,
1727 revision: incoming_revision.clone(),
1728 selection: TranscriptRewriteSelection::MessageRange { start: 0, end: 0 },
1729 original_span_digest: transcript_messages_digest(&[])?,
1730 replacement_digest: transcript_messages_digest(&[])?,
1731 messages_before: previous.messages().len(),
1732 messages_after: incoming.messages().len(),
1733 reason: crate::TranscriptRewriteReason::new("forged"),
1734 actor: Some("unit-test".to_string()),
1735 committed_at: incoming.updated_at(),
1736 }],
1737 revisions: vec![crate::TranscriptRevisionBody {
1738 revision: incoming_revision,
1739 parent_revision: None,
1740 messages: incoming.messages().to_vec(),
1741 created_at: incoming.updated_at(),
1742 }],
1743 })?,
1744 );
1745
1746 assert!(matches!(
1747 append_only_save_guard(&incoming, Some(&previous)),
1748 Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1749 ));
1750 Ok(())
1751 }
1752
1753 #[test]
1754 fn append_only_guard_rejects_new_rewrite_commits_on_transient_notice_cleanup()
1755 -> Result<(), Box<dyn std::error::Error>> {
1756 let mut previous = Session::new();
1757 previous.push(Message::SystemNotice(SystemNoticeMessage::new(
1758 SystemNoticeKind::Comms,
1759 "transient peer delivery notice",
1760 )));
1761 previous.push(Message::User(UserMessage::text("persisted".to_string())));
1762
1763 let mut incoming = Session::new();
1764 incoming.push(Message::User(UserMessage::text("persisted".to_string())));
1765 incoming.push(Message::Assistant(AssistantMessage {
1766 content: "plain append after notice cleanup".to_string(),
1767 tool_calls: Vec::new(),
1768 stop_reason: StopReason::EndTurn,
1769 usage: Usage::default(),
1770 created_at: crate::types::message_timestamp_now(),
1771 }));
1772 let incoming_revision = incoming.transcript_revision()?;
1773 incoming.set_metadata(
1774 crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1775 serde_json::to_value(TranscriptHistoryState {
1776 head: incoming_revision.clone(),
1777 commits: vec![TranscriptRewriteCommit {
1778 parent_revision: previous.transcript_revision()?,
1779 revision: incoming_revision.clone(),
1780 selection: TranscriptRewriteSelection::MessageRange { start: 0, end: 0 },
1781 original_span_digest: transcript_messages_digest(&[])?,
1782 replacement_digest: transcript_messages_digest(&[])?,
1783 messages_before: previous.messages().len(),
1784 messages_after: incoming.messages().len(),
1785 reason: crate::TranscriptRewriteReason::new("forged"),
1786 actor: Some("unit-test".to_string()),
1787 committed_at: incoming.updated_at(),
1788 }],
1789 revisions: vec![crate::TranscriptRevisionBody {
1790 revision: incoming_revision,
1791 parent_revision: None,
1792 messages: incoming.messages().to_vec(),
1793 created_at: incoming.updated_at(),
1794 }],
1795 })?,
1796 );
1797
1798 assert!(matches!(
1799 append_only_save_guard(&incoming, Some(&previous)),
1800 Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1801 ));
1802 Ok(())
1803 }
1804
1805 #[test]
1806 fn run_boundary_guard_rejects_runtime_parent_with_inserted_message_before_tail()
1807 -> Result<(), Box<dyn std::error::Error>> {
1808 let mut previous = Session::new();
1809 previous.push(Message::System(SystemMessage::new("base system")));
1810 previous.push(Message::User(UserMessage::text("turn one".to_string())));
1811 previous.push(Message::Assistant(AssistantMessage {
1812 content: "answer one".to_string(),
1813 tool_calls: Vec::new(),
1814 stop_reason: StopReason::EndTurn,
1815 usage: Usage::default(),
1816 created_at: crate::types::message_timestamp_now(),
1817 }));
1818
1819 let parent_messages = vec![
1820 Message::System(SystemMessage::new("refreshed runtime system projection")),
1821 Message::User(UserMessage::text(
1822 "injected before retained tail".to_string(),
1823 )),
1824 previous.messages()[1].clone(),
1825 previous.messages()[2].clone(),
1826 ];
1827 let parent_revision = transcript_messages_digest(&parent_messages)?;
1828 let mut parent = previous.clone();
1829 parent.apply_transcript_history_state(TranscriptHistoryState {
1830 head: parent_revision.clone(),
1831 commits: Vec::new(),
1832 revisions: vec![crate::TranscriptRevisionBody {
1833 revision: parent_revision,
1834 parent_revision: None,
1835 messages: parent_messages,
1836 created_at: parent.updated_at(),
1837 }],
1838 })?;
1839 let parent_revision = parent.transcript_revision()?;
1840
1841 let mut incoming = parent.clone();
1842 incoming.commit_transcript_rewrite(
1843 TranscriptRewriteSelection::MessageRange {
1844 start: 0,
1845 end: parent.messages().len(),
1846 },
1847 vec![Message::User(UserMessage::text(
1848 "[Context compacted] summary".to_string(),
1849 ))],
1850 crate::TranscriptRewriteReason::new("compaction"),
1851 Some("meerkat-core".to_string()),
1852 Some(parent_revision),
1853 )?;
1854
1855 assert!(matches!(
1856 run_boundary_snapshot_save_guard(&incoming, Some(&previous)),
1857 Err(SessionStoreError::TranscriptContinuityViolation { .. }
1858 | SessionStoreError::MonotonicityViolation { .. })
1859 ));
1860 Ok(())
1861 }
1862
1863 #[test]
1864 fn run_boundary_guard_rejects_forged_parent_edge_before_real_rewrite_commit()
1865 -> Result<(), Box<dyn std::error::Error>> {
1866 let mut previous = Session::new();
1867 previous.push(Message::System(SystemMessage::new("base system")));
1868 previous.push(Message::User(UserMessage::text("turn one".to_string())));
1869 previous.push(Message::Assistant(AssistantMessage {
1870 content: "answer one".to_string(),
1871 tool_calls: Vec::new(),
1872 stop_reason: StopReason::EndTurn,
1873 usage: Usage::default(),
1874 created_at: crate::types::message_timestamp_now(),
1875 }));
1876 let previous_revision = previous.transcript_revision()?;
1877
1878 let forged_parent_messages = vec![
1879 Message::System(SystemMessage::new("refreshed runtime system projection")),
1880 Message::User(UserMessage::text(
1881 "forged insertion before retained tail".to_string(),
1882 )),
1883 previous.messages()[1].clone(),
1884 previous.messages()[2].clone(),
1885 ];
1886 let forged_parent_revision = transcript_messages_digest(&forged_parent_messages)?;
1887 let mut forged_parent = previous.clone();
1888 forged_parent.apply_transcript_history_state(TranscriptHistoryState {
1889 head: forged_parent_revision.clone(),
1890 commits: Vec::new(),
1891 revisions: vec![
1892 crate::TranscriptRevisionBody {
1893 revision: previous_revision.clone(),
1894 parent_revision: None,
1895 messages: previous.messages().to_vec(),
1896 created_at: previous.updated_at(),
1897 },
1898 crate::TranscriptRevisionBody {
1899 revision: forged_parent_revision.clone(),
1900 parent_revision: Some(previous_revision),
1901 messages: forged_parent_messages,
1902 created_at: forged_parent.updated_at(),
1903 },
1904 ],
1905 })?;
1906
1907 let mut incoming = forged_parent.clone();
1908 incoming.commit_transcript_rewrite(
1909 TranscriptRewriteSelection::MessageRange {
1910 start: 0,
1911 end: forged_parent.messages().len(),
1912 },
1913 vec![Message::User(UserMessage::text(
1914 "[Context compacted] forged branch".to_string(),
1915 ))],
1916 crate::TranscriptRewriteReason::new("compaction"),
1917 Some("meerkat-core".to_string()),
1918 Some(forged_parent_revision),
1919 )?;
1920
1921 assert!(matches!(
1922 run_boundary_snapshot_save_guard(&incoming, Some(&previous)),
1923 Err(SessionStoreError::TranscriptContinuityViolation { .. }
1924 | SessionStoreError::MonotonicityViolation { .. })
1925 ));
1926 Ok(())
1927 }
1928
1929 #[test]
1930 fn append_only_guard_rejects_transient_mcp_pending_notice_cleanup_with_unaudited_commit()
1931 -> Result<(), crate::TranscriptEditError> {
1932 let mut previous = Session::new();
1933 previous.push(Message::User(UserMessage::text("hello".to_string())));
1934 previous.push(Message::SystemNotice(SystemNoticeMessage {
1935 kind: SystemNoticeKind::McpPending,
1936 body: Some("connecting".to_string()),
1937 blocks: vec![SystemNoticeBlock::Mcp {
1938 server_id: None,
1939 operation: None,
1940 phase: None,
1941 persisted: false,
1942 detail: Some("connecting".to_string()),
1943 pending_sources: vec!["test-server".to_string()],
1944 }],
1945 created_at: crate::types::message_timestamp_now(),
1946 }));
1947 previous.push(Message::BlockAssistant(BlockAssistantMessage::new(
1948 vec![crate::types::AssistantBlock::Text {
1949 text: "answer".to_string(),
1950 meta: None,
1951 }],
1952 StopReason::EndTurn,
1953 )));
1954
1955 let mut incoming = previous.clone();
1956 incoming.replace_messages_internal(
1957 previous
1958 .messages()
1959 .iter()
1960 .filter(|message| !matches!(message, Message::SystemNotice(_)))
1961 .cloned()
1962 .collect(),
1963 crate::TranscriptRewriteReason::new("unit-test"),
1964 )?;
1965 incoming.push(Message::User(UserMessage::text("again".to_string())));
1966
1967 assert!(matches!(
1968 append_only_save_guard(&incoming, Some(&previous)),
1969 Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1970 ));
1971 Ok::<(), crate::TranscriptEditError>(())
1972 }
1973
1974 #[test]
1975 fn rewrite_chain_finder_crosses_normal_append_between_rewrites()
1976 -> Result<(), Box<dyn std::error::Error>> {
1977 let mut session = Session::new();
1978 session.push(Message::User(UserMessage::text("first".to_string())));
1979 session.push(Message::Assistant(AssistantMessage {
1980 content: "verbose first answer".to_string(),
1981 tool_calls: Vec::new(),
1982 stop_reason: StopReason::EndTurn,
1983 usage: Usage::default(),
1984 created_at: crate::types::message_timestamp_now(),
1985 }));
1986
1987 let original = session.transcript_revision()?;
1988 let first = session.commit_transcript_rewrite(
1989 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
1990 vec![Message::Assistant(AssistantMessage {
1991 content: "compact first answer".to_string(),
1992 tool_calls: Vec::new(),
1993 stop_reason: StopReason::EndTurn,
1994 usage: Usage::default(),
1995 created_at: crate::types::message_timestamp_now(),
1996 })],
1997 crate::TranscriptRewriteReason::new("compaction"),
1998 Some("unit-test".to_string()),
1999 Some(original.clone()),
2000 )?;
2001
2002 session.push(Message::User(UserMessage::text("second".to_string())));
2003 session.push(Message::Assistant(AssistantMessage {
2004 content: "verbose second answer".to_string(),
2005 tool_calls: Vec::new(),
2006 stop_reason: StopReason::EndTurn,
2007 usage: Usage::default(),
2008 created_at: crate::types::message_timestamp_now(),
2009 }));
2010 let bridge = session.transcript_revision()?;
2011 assert_ne!(bridge, first.revision);
2012
2013 let second = session.commit_transcript_rewrite(
2014 TranscriptRewriteSelection::MessageRange { start: 3, end: 4 },
2015 vec![Message::Assistant(AssistantMessage {
2016 content: "compact second answer".to_string(),
2017 tool_calls: Vec::new(),
2018 stop_reason: StopReason::EndTurn,
2019 usage: Usage::default(),
2020 created_at: crate::types::message_timestamp_now(),
2021 })],
2022 crate::TranscriptRewriteReason::new("compaction"),
2023 Some("unit-test".to_string()),
2024 Some(bridge),
2025 )?;
2026 let state = session
2027 .transcript_history_state()?
2028 .ok_or_else(|| std::io::Error::other("missing transcript history state"))?;
2029
2030 let chain =
2031 find_transcript_rewrite_commit_chain_extending(&state, &original, &second.revision)
2032 .ok_or_else(|| {
2033 std::io::Error::other(
2034 "rewrite chain should extend through normal append bridge",
2035 )
2036 })?;
2037 assert_eq!(chain.len(), 2);
2038 assert_eq!(chain[0].revision, first.revision);
2039 assert_eq!(chain[1].revision, second.revision);
2040 Ok(())
2041 }
2042
2043 #[test]
2044 fn run_boundary_guard_rejects_dropped_retained_rewrite_commits()
2045 -> Result<(), Box<dyn std::error::Error>> {
2046 let mut base = Session::new();
2047 base.push(Message::User(UserMessage::text("turn one".to_string())));
2048 base.push(Message::Assistant(AssistantMessage {
2049 content: "verbose answer".to_string(),
2050 tool_calls: Vec::new(),
2051 stop_reason: StopReason::EndTurn,
2052 usage: Usage::default(),
2053 created_at: crate::types::message_timestamp_now(),
2054 }));
2055 let base_revision = base.transcript_revision()?;
2056
2057 let mut previous = base.clone();
2058 let _retained_commit = previous.commit_transcript_rewrite(
2059 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
2060 vec![Message::Assistant(AssistantMessage {
2061 content: "first compact answer".to_string(),
2062 tool_calls: Vec::new(),
2063 stop_reason: StopReason::EndTurn,
2064 usage: Usage::default(),
2065 created_at: crate::types::message_timestamp_now(),
2066 })],
2067 crate::TranscriptRewriteReason::new("compaction"),
2068 Some("unit-test".to_string()),
2069 Some(base_revision),
2070 )?;
2071 let previous_revision = previous.transcript_revision()?;
2072
2073 let mut incoming = previous.clone();
2074 let new_commit = incoming.commit_transcript_rewrite(
2075 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
2076 vec![Message::Assistant(AssistantMessage {
2077 content: "second compact answer".to_string(),
2078 tool_calls: Vec::new(),
2079 stop_reason: StopReason::EndTurn,
2080 usage: Usage::default(),
2081 created_at: crate::types::message_timestamp_now(),
2082 })],
2083 crate::TranscriptRewriteReason::new("compaction"),
2084 Some("unit-test".to_string()),
2085 Some(previous_revision),
2086 )?;
2087 let mut state = incoming
2088 .transcript_history_state()?
2089 .ok_or_else(|| std::io::Error::other("incoming rewrite should retain history"))?;
2090 state.commits = vec![new_commit];
2091 incoming.set_metadata(
2092 crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
2093 serde_json::to_value(state)?,
2094 );
2095
2096 assert!(matches!(
2097 run_boundary_snapshot_save_guard(&incoming, Some(&previous)),
2098 Err(SessionStoreError::InvalidTranscriptRewrite { reason, .. })
2099 if reason.contains("drop retained transcript rewrite commits")
2100 ));
2101 Ok(())
2102 }
2103}