1use async_trait::async_trait;
21use sha2::{Digest, Sha256};
22
23use crate::session::{SYSTEM_CONTEXT_SEPARATOR, SessionMeta};
24use crate::time_compat::SystemTime;
25use crate::types::{Message, SessionId, SystemMessage};
26use crate::{
27 Session, TranscriptHistoryState, TranscriptRewriteCommit, TranscriptRewriteSelection,
28 transcript_messages_digest,
29};
30
31#[derive(Debug, Clone, Default)]
33pub struct SessionFilter {
34 pub created_after: Option<SystemTime>,
36 pub updated_after: Option<SystemTime>,
38 pub limit: Option<usize>,
40 pub offset: Option<usize>,
42}
43
44#[derive(Debug, thiserror::Error)]
49pub enum SessionStoreError {
50 #[error("IO error: {0}")]
51 Io(#[from] std::io::Error),
52
53 #[error("Serialization error: {0}")]
54 Serialization(String),
55
56 #[error("Session not found: {0}")]
57 NotFound(SessionId),
58
59 #[error("Session corrupted: {0}")]
60 Corrupted(SessionId),
61
62 #[error(
63 "session {id} save rejected: new message count {new_len} is shorter than previously \
64 persisted {prev_len} without transcript-continuity proof"
65 )]
66 MonotonicityViolation {
67 id: SessionId,
68 prev_len: usize,
69 new_len: usize,
70 },
71
72 #[error(
73 "session {id} save rejected: incoming transcript is not a continuation of persisted revision {previous_revision}"
74 )]
75 TranscriptContinuityViolation {
76 id: SessionId,
77 previous_revision: String,
78 incoming_revision: String,
79 reason: String,
80 },
81
82 #[error(
83 "session {id} rewrite rejected: previous transcript revision {actual} did not match commit parent {expected}"
84 )]
85 TranscriptRevisionConflict {
86 id: SessionId,
87 expected: String,
88 actual: String,
89 },
90
91 #[error("session {id} rewrite rejected: {reason}")]
92 InvalidTranscriptRewrite { id: SessionId, reason: String },
93
94 #[error("Internal error: {0}")]
95 Internal(String),
96}
97
98pub fn session_projection_cas_token(session: &Session) -> Result<String, SessionStoreError> {
100 let bytes = serde_json::to_vec(session).map_err(|err| {
101 SessionStoreError::Serialization(format!(
102 "failed to serialize session projection CAS token: {err}"
103 ))
104 })?;
105 Ok(format!("row-sha256:{:x}", Sha256::digest(bytes)))
106}
107
108pub fn append_only_save_guard(
122 incoming: &Session,
123 previous: Option<&Session>,
124) -> Result<(), SessionStoreError> {
125 incoming
126 .validate_transcript_history_state()
127 .map_err(|err| SessionStoreError::InvalidTranscriptRewrite {
128 id: incoming.id().clone(),
129 reason: format!("incoming transcript history state is malformed: {err}"),
130 })?;
131 let incoming_revision =
132 transcript_messages_digest(incoming.messages()).map_err(SessionStoreError::from)?;
133 let incoming_state = incoming.transcript_history_state().map_err(|err| {
134 SessionStoreError::InvalidTranscriptRewrite {
135 id: incoming.id().clone(),
136 reason: format!("incoming transcript history state is malformed: {err}"),
137 }
138 })?;
139 if let Some(state) = incoming_state.as_ref()
140 && state.head != incoming_revision
141 {
142 return Err(SessionStoreError::InvalidTranscriptRewrite {
143 id: incoming.id().clone(),
144 reason: format!(
145 "incoming transcript graph head {} does not match current message digest {incoming_revision}",
146 state.head
147 ),
148 });
149 }
150
151 let Some(previous) = previous else {
152 if incoming_state.is_some() {
153 return Err(SessionStoreError::InvalidTranscriptRewrite {
154 id: incoming.id().clone(),
155 reason: "incoming first save would seed transcript history state outside the rewrite/audit path"
156 .to_string(),
157 });
158 }
159 validate_plain_save_transcript_history_preservation(
160 incoming,
161 None,
162 None,
163 incoming_state.as_ref(),
164 )?;
165 return Ok(());
166 };
167 let previous_state = previous.transcript_history_state().map_err(|err| {
168 SessionStoreError::InvalidTranscriptRewrite {
169 id: incoming.id().clone(),
170 reason: format!("previous transcript history state is malformed: {err}"),
171 }
172 })?;
173 let previous_had_history = previous_state.is_some();
174 let incoming_has_history = incoming_state.is_some();
175 if previous_had_history && !incoming_has_history {
176 return Err(SessionStoreError::InvalidTranscriptRewrite {
177 id: incoming.id().clone(),
178 reason: "incoming save would erase retained transcript history state".to_string(),
179 });
180 }
181 let previous_revision =
182 transcript_messages_digest(previous.messages()).map_err(SessionStoreError::from)?;
183 if previous_revision == incoming_revision {
184 validate_plain_save_transcript_history_preservation(
185 incoming,
186 Some(previous),
187 previous_state.as_ref(),
188 incoming_state.as_ref(),
189 )?;
190 return Ok(());
191 }
192
193 let prev_len = previous.messages().len();
194 let new_len = incoming.messages().len();
195 if new_len >= prev_len {
196 let incoming_prefix_revision = transcript_messages_digest(&incoming.messages()[..prev_len])
197 .map_err(SessionStoreError::from)?;
198 if incoming_prefix_revision == previous_revision {
199 validate_plain_save_transcript_history_preservation(
200 incoming,
201 Some(previous),
202 previous_state.as_ref(),
203 incoming_state.as_ref(),
204 )?;
205 return Ok(());
206 }
207 }
208 if incoming_preserves_conversation_tail_with_system_context_append(incoming, previous)? {
209 validate_plain_save_transcript_history_preservation(
210 incoming,
211 Some(previous),
212 previous_state.as_ref(),
213 incoming_state.as_ref(),
214 )?;
215 return Ok(());
216 }
217 if incoming_preserves_prefix_after_transient_notice_cleanup(incoming, previous)? {
218 validate_plain_save_transcript_history_preservation(
219 incoming,
220 Some(previous),
221 previous_state.as_ref(),
222 incoming_state.as_ref(),
223 )?;
224 return Ok(());
225 }
226 if new_len < prev_len {
227 return Err(SessionStoreError::MonotonicityViolation {
228 id: incoming.id().clone(),
229 prev_len,
230 new_len,
231 });
232 }
233
234 Err(SessionStoreError::TranscriptContinuityViolation {
235 id: incoming.id().clone(),
236 previous_revision,
237 incoming_revision,
238 reason: "incoming transcript neither preserves the persisted prefix nor records a graph edge from the persisted head".to_string(),
239 })
240}
241
242fn validate_plain_save_transcript_history_preservation(
243 incoming: &Session,
244 previous: Option<&Session>,
245 previous_state: Option<&TranscriptHistoryState>,
246 incoming_state: Option<&TranscriptHistoryState>,
247) -> Result<(), SessionStoreError> {
248 let Some(previous) = previous else {
249 if incoming_state.is_some() {
250 return Err(SessionStoreError::InvalidTranscriptRewrite {
251 id: incoming.id().clone(),
252 reason: "incoming first save would seed transcript history state outside the rewrite/audit path"
253 .to_string(),
254 });
255 }
256 return Ok(());
257 };
258 if previous_state.is_none() && incoming_state.is_some() {
259 return Err(SessionStoreError::InvalidTranscriptRewrite {
260 id: incoming.id().clone(),
261 reason: "incoming append-only save would seed transcript history state outside the rewrite/audit path"
262 .to_string(),
263 });
264 }
265 let Some(previous_state) = previous_state else {
266 return Ok(());
267 };
268 let Some(incoming_state) = incoming_state else {
269 return Err(SessionStoreError::InvalidTranscriptRewrite {
270 id: incoming.id().clone(),
271 reason: "incoming append-only save would erase retained transcript history state"
272 .to_string(),
273 });
274 };
275 let previous_commits = previous_state.commits.as_slice();
276 let incoming_commits = incoming_state.commits.as_slice();
277 if incoming_commits != previous_commits {
278 return Err(SessionStoreError::InvalidTranscriptRewrite {
279 id: incoming.id().clone(),
280 reason: "incoming append-only save would change retained transcript rewrite commits"
281 .to_string(),
282 });
283 }
284 let retained_revisions_preserved =
285 transcript_revision_bodies_preserved(previous_state, incoming_state)?;
286 if retained_revisions_preserved
287 && incoming_state.revisions.len() == previous_state.revisions.len()
288 && incoming_state.head == previous_state.head
289 {
290 return Ok(());
291 }
292 if incoming_state.revisions.len() != previous_state.revisions.len() + 1
293 || !retained_revisions_preserved
294 {
295 return Err(SessionStoreError::InvalidTranscriptRewrite {
296 id: incoming.id().clone(),
297 reason: "incoming append-only save would change retained transcript revision graph"
298 .to_string(),
299 });
300 }
301 let incoming_revision =
302 transcript_messages_digest(incoming.messages()).map_err(SessionStoreError::from)?;
303 let previous_revision =
304 transcript_messages_digest(previous.messages()).map_err(SessionStoreError::from)?;
305 if previous_state.head != previous_revision {
306 return Err(SessionStoreError::InvalidTranscriptRewrite {
307 id: incoming.id().clone(),
308 reason: "previous transcript history head does not match persisted message digest"
309 .to_string(),
310 });
311 }
312 let added = &incoming_state.revisions[previous_state.revisions.len()];
313 if incoming_state.head != incoming_revision
314 || added.revision != incoming_revision
315 || added.parent_revision.as_deref() != Some(previous_state.head.as_str())
316 || transcript_messages_digest(&added.messages).map_err(SessionStoreError::from)?
317 != incoming_revision
318 {
319 return Err(SessionStoreError::InvalidTranscriptRewrite {
320 id: incoming.id().clone(),
321 reason: "incoming append-only save would add a transcript revision body that is not the current append"
322 .to_string(),
323 });
324 }
325 Ok(())
326}
327
328fn transcript_revision_bodies_preserved(
329 previous_state: &TranscriptHistoryState,
330 incoming_state: &TranscriptHistoryState,
331) -> Result<bool, SessionStoreError> {
332 if incoming_state.revisions.len() < previous_state.revisions.len() {
333 return Ok(false);
334 }
335 previous_state
336 .revisions
337 .iter()
338 .zip(incoming_state.revisions.iter())
339 .map(|(previous, incoming)| {
340 Ok(previous.revision == incoming.revision
341 && previous.parent_revision == incoming.parent_revision
342 && previous.created_at == incoming.created_at
343 && transcript_messages_digest(&previous.messages)
344 .map_err(SessionStoreError::from)?
345 == transcript_messages_digest(&incoming.messages)
346 .map_err(SessionStoreError::from)?)
347 })
348 .try_fold(true, |acc, preserved| {
349 preserved.map(|preserved| acc && preserved)
350 })
351}
352
353fn validate_rewrite_save_retains_previous_commits(
354 incoming: &Session,
355 previous: &Session,
356 incoming_state: &TranscriptHistoryState,
357) -> Result<(), SessionStoreError> {
358 let previous_state = previous.transcript_history_state().map_err(|err| {
359 SessionStoreError::InvalidTranscriptRewrite {
360 id: incoming.id().clone(),
361 reason: format!("previous transcript history state is malformed: {err}"),
362 }
363 })?;
364 let Some(previous_state) = previous_state.as_ref() else {
365 return Ok(());
366 };
367 if incoming_state.commits.len() < previous_state.commits.len()
368 || incoming_state.commits[..previous_state.commits.len()] != previous_state.commits
369 {
370 return Err(SessionStoreError::InvalidTranscriptRewrite {
371 id: incoming.id().clone(),
372 reason: "incoming rewrite save would drop retained transcript rewrite commits"
373 .to_string(),
374 });
375 }
376 Ok(())
377}
378
379pub fn authoritative_projection_current_revision_guard(
382 incoming: &Session,
383 previous: Option<&Session>,
384 expected_current_revision: Option<&str>,
385) -> Result<(), SessionStoreError> {
386 let previous_token = previous.map(session_projection_cas_token).transpose()?;
387 if previous_token.as_deref() == expected_current_revision {
388 return Ok(());
389 }
390 let incoming_revision =
391 transcript_messages_digest(incoming.messages()).map_err(SessionStoreError::from)?;
392 Err(SessionStoreError::TranscriptContinuityViolation {
393 id: incoming.id().clone(),
394 previous_revision: previous_token.unwrap_or_else(|| "<missing>".to_string()),
395 incoming_revision,
396 reason: format!(
397 "authoritative projection expected persisted projection token {}, but current row has diverged",
398 expected_current_revision.unwrap_or("<missing>")
399 ),
400 })
401}
402
403fn incoming_preserves_conversation_tail_with_system_context_append(
404 incoming: &Session,
405 previous: &Session,
406) -> Result<bool, SessionStoreError> {
407 messages_preserve_conversation_tail_with_system_context_append(
408 incoming.messages(),
409 previous.messages(),
410 )
411}
412
413fn messages_preserve_conversation_tail_with_system_context_append(
414 incoming: &[Message],
415 previous: &[Message],
416) -> Result<bool, SessionStoreError> {
417 let (previous_system, previous_tail) = split_single_leading_system(previous);
418 let (incoming_system, incoming_tail) = split_single_leading_system(incoming);
419 let Some(incoming_system) = incoming_system else {
420 return Ok(false);
421 };
422 if !system_context_is_append(previous_system, incoming_system)? {
423 return Ok(false);
424 }
425 if incoming_tail.len() < previous_tail.len() {
426 return Ok(false);
427 }
428 let previous_tail_revision =
429 transcript_messages_digest(previous_tail).map_err(SessionStoreError::from)?;
430 let incoming_tail_prefix_revision =
431 transcript_messages_digest(&incoming_tail[..previous_tail.len()])
432 .map_err(SessionStoreError::from)?;
433 Ok(previous_tail_revision == incoming_tail_prefix_revision)
434}
435
436fn split_single_leading_system(messages: &[Message]) -> (Option<&SystemMessage>, &[Message]) {
437 match messages.first() {
438 Some(Message::System(system)) => (Some(system), &messages[1..]),
439 _ => (None, messages),
440 }
441}
442
443fn system_context_is_append(
459 previous: Option<&SystemMessage>,
460 incoming: &SystemMessage,
461) -> Result<bool, SessionStoreError> {
462 let has_previous = previous.is_some();
464 let content_identical = previous.is_some_and(|previous| incoming.content == previous.content);
465 let content_extends_previous =
466 previous.is_some_and(|previous| incoming.content.starts_with(&previous.content));
467 let appended_starts_with_separator = previous.is_some_and(|previous| {
468 incoming
469 .content
470 .get(previous.content.len()..)
471 .is_some_and(|appended| appended.starts_with(SYSTEM_CONTEXT_SEPARATOR))
472 });
473 let incoming_is_runtime_context_append = incoming.mutation_kind.is_runtime_context_append();
474
475 let mut authority = crate::session_document::SessionDocumentMachineAuthority::new();
476 let effects = authority
477 .resolve_system_context_persist_append_admission(
478 has_previous,
479 content_identical,
480 content_extends_previous,
481 appended_starts_with_separator,
482 incoming_is_runtime_context_append,
483 )
484 .map_err(|err| {
485 SessionStoreError::Internal(format!(
486 "session document authority refused persist-time system-context append admission: {err}"
487 ))
488 })?;
489 effects
490 .into_iter()
491 .find_map(|effect| match effect {
492 crate::session_document::SessionDocumentEffect::SystemContextPersistAppendAdmissionResolved {
493 admission,
494 } => Some(matches!(
495 admission,
496 crate::session_document::SystemContextPersistAppendAdmission::Admit
497 )),
498 _ => None,
499 })
500 .ok_or_else(|| {
501 SessionStoreError::Internal(
502 "session document authority emitted no persist-time system-context append admission verdict".to_string(),
503 )
504 })
505}
506
507fn incoming_preserves_prefix_after_transient_notice_cleanup(
508 incoming: &Session,
509 previous: &Session,
510) -> Result<bool, SessionStoreError> {
511 let previous_without_transient = previous
512 .messages()
513 .iter()
514 .filter(|message| !is_transient_system_notice(message))
515 .cloned()
516 .collect::<Vec<_>>();
517 if previous_without_transient.len() == previous.messages().len()
518 || incoming.messages().len() < previous_without_transient.len()
519 {
520 return Ok(false);
521 }
522 let previous_revision =
523 transcript_messages_digest(&previous_without_transient).map_err(SessionStoreError::from)?;
524 let incoming_prefix_revision =
525 transcript_messages_digest(&incoming.messages()[..previous_without_transient.len()])
526 .map_err(SessionStoreError::from)?;
527 Ok(previous_revision == incoming_prefix_revision)
528}
529
530fn is_transient_system_notice(message: &Message) -> bool {
531 let Message::SystemNotice(notice) = message else {
532 return false;
533 };
534 notice.kind == crate::types::SystemNoticeKind::McpPending
535 && notice.blocks.iter().all(|block| {
536 matches!(
537 block,
538 crate::types::SystemNoticeBlock::Mcp {
539 persisted: false,
540 ..
541 }
542 )
543 })
544}
545
546pub fn run_boundary_snapshot_save_guard(
555 incoming: &Session,
556 previous: Option<&Session>,
557) -> Result<(), SessionStoreError> {
558 match append_only_save_guard(incoming, previous) {
559 Ok(()) => Ok(()),
560 Err(append_error) => {
561 if run_boundary_commitless_history_projection_save_guard(incoming, previous)? {
562 return Ok(());
563 }
564 let Some(previous) = previous else {
565 return Err(append_error);
566 };
567 let incoming_revision =
568 transcript_messages_digest(incoming.messages()).map_err(SessionStoreError::from)?;
569 let Some(state) = incoming.transcript_history_state().map_err(|err| {
570 SessionStoreError::InvalidTranscriptRewrite {
571 id: incoming.id().clone(),
572 reason: format!("incoming transcript history state is malformed: {err}"),
573 }
574 })?
575 else {
576 return Err(append_error);
577 };
578 validate_rewrite_save_retains_previous_commits(incoming, previous, &state)?;
579 let commits = find_transcript_rewrite_commit_chain_extending_session(
580 &state,
581 previous,
582 &incoming_revision,
583 )?;
584 if commits.is_none()
585 && run_boundary_context_summary_tail_projection_save_guard(
586 incoming, previous, &state,
587 )?
588 {
589 return Ok(());
590 }
591 let Some(commits) = commits else {
592 return Err(append_error);
593 };
594 let Some(commit) = commits.first() else {
595 if state.commits.is_empty() {
596 return Err(append_error);
597 }
598 for commit in &state.commits {
599 validate_transcript_rewrite_commit_bodies(incoming, commit, &state)?;
600 }
601 return Ok(());
602 };
603 transcript_rewrite_bridge_save_guard(incoming, commit, &state, &incoming_revision)?;
604 for commit in commits.iter().skip(1) {
605 validate_transcript_rewrite_commit_bodies(incoming, commit, &state)?;
606 }
607 Ok(())
608 }
609 }
610}
611
612fn run_boundary_commitless_history_projection_save_guard(
613 incoming: &Session,
614 previous: Option<&Session>,
615) -> Result<bool, SessionStoreError> {
616 let Some(state) = incoming.transcript_history_state().map_err(|err| {
617 SessionStoreError::InvalidTranscriptRewrite {
618 id: incoming.id().clone(),
619 reason: format!("incoming transcript history state is malformed: {err}"),
620 }
621 })?
622 else {
623 return Ok(false);
624 };
625 if !state.commits.is_empty() {
626 return Ok(false);
627 }
628
629 let incoming_revision =
630 transcript_messages_digest(incoming.messages()).map_err(SessionStoreError::from)?;
631 if state.head != incoming_revision
632 || !state
633 .revisions
634 .iter()
635 .any(|body| body.revision == incoming_revision)
636 {
637 return Ok(false);
638 }
639
640 let mut projection_without_history = incoming.clone();
641 projection_without_history.clear_transcript_history_state();
642 if append_only_save_guard(&projection_without_history, previous).is_err() {
643 return Ok(false);
644 }
645
646 let Some(previous) = previous else {
647 return Ok(state.commits.is_empty());
648 };
649 if previous
650 .transcript_history_state()
651 .map_err(|err| SessionStoreError::InvalidTranscriptRewrite {
652 id: incoming.id().clone(),
653 reason: format!("previous transcript history state is malformed: {err}"),
654 })?
655 .is_some()
656 {
657 return Ok(false);
658 }
659
660 let previous_revision =
661 transcript_messages_digest(previous.messages()).map_err(SessionStoreError::from)?;
662 Ok(incoming_revision == previous_revision
663 || transcript_history_revision_extends(&state, &incoming_revision, &previous_revision))
664}
665
666fn run_boundary_context_summary_tail_projection_save_guard(
667 incoming: &Session,
668 previous: &Session,
669 state: &TranscriptHistoryState,
670) -> Result<bool, SessionStoreError> {
671 if state.commits.is_empty() {
672 return Ok(false);
673 }
674 incoming
675 .validate_transcript_history_state()
676 .map_err(|err| SessionStoreError::InvalidTranscriptRewrite {
677 id: incoming.id().clone(),
678 reason: format!("incoming transcript history state is malformed: {err}"),
679 })?;
680
681 let (incoming_system, incoming_tail) = match incoming.messages().split_first() {
682 Some((Message::System(system), tail)) => (Some(system), tail),
683 _ => (None, incoming.messages()),
684 };
685 let (previous_system, previous_tail) = match previous.messages().split_first() {
686 Some((Message::System(system), tail)) => (Some(system), tail),
687 _ => (None, previous.messages()),
688 };
689 if incoming_system.is_some() != previous_system.is_some()
690 || incoming_tail.len() <= previous_tail.len()
691 {
692 return Ok(false);
693 }
694 let Some(Message::User(summary)) = incoming_tail.first() else {
695 return Ok(false);
696 };
697 if !summary.transcript_role.is_compaction_summary() {
702 return Ok(false);
703 }
704
705 let retained_end = 1 + previous_tail.len();
706 let retained = &incoming_tail[1..retained_end];
707 let retained_revision =
708 transcript_messages_digest(retained).map_err(SessionStoreError::from)?;
709 let previous_revision =
710 transcript_messages_digest(previous_tail).map_err(SessionStoreError::from)?;
711 if retained_revision != previous_revision {
712 return Ok(false);
713 }
714
715 for commit in &state.commits {
716 validate_transcript_rewrite_commit_bodies(incoming, commit, state)?;
717 }
718 Ok(true)
719}
720
721pub fn find_transcript_rewrite_commit_extending<'a>(
724 state: &'a TranscriptHistoryState,
725 previous_revision: &str,
726 incoming_revision: &str,
727) -> Option<&'a TranscriptRewriteCommit> {
728 find_transcript_rewrite_commit_chain_extending(state, previous_revision, incoming_revision)
729 .and_then(|commits| commits.into_iter().next())
730}
731
732pub fn find_transcript_rewrite_commit_chain_extending<'a>(
735 state: &'a TranscriptHistoryState,
736 previous_revision: &str,
737 incoming_revision: &str,
738) -> Option<Vec<&'a TranscriptRewriteCommit>> {
739 let mut chain = Vec::new();
740 let mut cursor = previous_revision;
741 let mut visited = std::collections::BTreeSet::new();
742 loop {
743 if incoming_revision == cursor {
744 return Some(chain);
745 }
746 if !visited.insert(cursor.to_string()) {
747 return None;
748 }
749 let commit = state.commits.iter().find(|commit| {
750 (commit.parent_revision == cursor
751 || transcript_history_revision_extends(state, &commit.parent_revision, cursor))
752 && transcript_history_revision_extends(state, incoming_revision, &commit.revision)
753 });
754 let Some(commit) = commit else {
755 return transcript_history_revision_extends(state, incoming_revision, cursor)
756 .then_some(chain);
757 };
758 cursor = &commit.revision;
759 chain.push(commit);
760 }
761}
762
763pub fn find_transcript_rewrite_commit_chain_extending_session<'a>(
772 state: &'a TranscriptHistoryState,
773 previous: &Session,
774 incoming_revision: &str,
775) -> Result<Option<Vec<&'a TranscriptRewriteCommit>>, SessionStoreError> {
776 let previous_revision =
777 transcript_messages_digest(previous.messages()).map_err(SessionStoreError::from)?;
778 let mut chain = Vec::new();
779 let mut cursor = previous_revision.as_str();
780 let mut visited = std::collections::BTreeSet::new();
781 loop {
782 if incoming_revision == cursor {
783 return Ok(Some(chain));
784 }
785 if !visited.insert(cursor.to_string()) {
786 return Ok(None);
787 }
788
789 let Some(cursor_messages) = transcript_history_messages_for_revision(
790 state,
791 cursor,
792 &previous_revision,
793 previous.messages(),
794 ) else {
795 return Ok(None);
796 };
797 let mut selected = None;
798 for commit in &state.commits {
799 if !transcript_history_revision_extends(state, incoming_revision, &commit.revision) {
800 continue;
801 }
802 let parent_extends_cursor = commit.parent_revision == cursor
803 || revision_body_preserves_append_continuation_prefix(
804 state,
805 &commit.parent_revision,
806 cursor_messages,
807 cursor,
808 )?;
809 if parent_extends_cursor {
810 selected = Some(commit);
811 break;
812 }
813 }
814
815 let Some(commit) = selected else {
816 if revision_body_preserves_append_continuation_prefix(
817 state,
818 incoming_revision,
819 cursor_messages,
820 cursor,
821 )? {
822 return Ok(Some(chain));
823 }
824 return Ok(None);
825 };
826 cursor = &commit.revision;
827 chain.push(commit);
828 }
829}
830
831fn transcript_history_messages_for_revision<'a>(
832 state: &'a TranscriptHistoryState,
833 revision: &str,
834 previous_revision: &str,
835 previous_messages: &'a [Message],
836) -> Option<&'a [Message]> {
837 if revision == previous_revision {
838 return Some(previous_messages);
839 }
840 state
841 .revisions
842 .iter()
843 .find(|body| body.revision == revision)
844 .map(|body| body.messages.as_slice())
845}
846
847fn revision_body_preserves_append_continuation_prefix(
848 state: &TranscriptHistoryState,
849 revision: &str,
850 ancestor_messages: &[Message],
851 ancestor_revision: &str,
852) -> Result<bool, SessionStoreError> {
853 if revision == ancestor_revision {
854 return Ok(true);
855 }
856 let Some(body) = state
857 .revisions
858 .iter()
859 .find(|body| body.revision == revision)
860 else {
861 return Ok(false);
862 };
863 if body.messages.len() >= ancestor_messages.len() {
864 let prefix_revision = transcript_messages_digest(&body.messages[..ancestor_messages.len()])
865 .map_err(SessionStoreError::from)?;
866 if prefix_revision == ancestor_revision {
867 return Ok(true);
868 }
869 }
870 Ok(
871 messages_preserve_conversation_tail_with_system_context_append(
872 &body.messages,
873 ancestor_messages,
874 )? || messages_preserve_tail_after_leading_system_refresh(
875 &body.messages,
876 ancestor_messages,
877 )?,
878 )
879}
880
881fn messages_preserve_tail_after_leading_system_refresh(
882 incoming: &[Message],
883 previous: &[Message],
884) -> Result<bool, SessionStoreError> {
885 let (Some(Message::System(_)), Some(Message::System(_))) = (incoming.first(), previous.first())
886 else {
887 return Ok(false);
888 };
889 if incoming.len() < previous.len() {
890 return Ok(false);
891 }
892 let previous_tail_len = previous.len().saturating_sub(1);
893 if previous_tail_len == 0 {
894 return Ok(true);
895 }
896 let previous_tail_revision =
897 transcript_messages_digest(&previous[1..]).map_err(SessionStoreError::from)?;
898 let incoming_tail = &incoming[1..];
899 if incoming_tail.len() < previous_tail_len {
900 return Ok(false);
901 }
902 let incoming_tail_prefix_revision =
903 transcript_messages_digest(&incoming_tail[..previous_tail_len])
904 .map_err(SessionStoreError::from)?;
905 Ok(incoming_tail_prefix_revision == previous_tail_revision)
906}
907
908fn transcript_history_revision_extends(
909 state: &TranscriptHistoryState,
910 descendant: &str,
911 ancestor: &str,
912) -> bool {
913 if descendant == ancestor {
914 return true;
915 }
916 let mut cursor = descendant;
917 while let Some(body) = state.revisions.iter().find(|body| body.revision == cursor) {
918 let Some(parent) = body.parent_revision.as_deref() else {
919 return false;
920 };
921 if parent == ancestor {
922 return true;
923 }
924 cursor = parent;
925 }
926 false
927}
928
929fn transcript_rewrite_bridge_save_guard(
930 incoming: &Session,
931 commit: &TranscriptRewriteCommit,
932 incoming_state: &TranscriptHistoryState,
933 incoming_message_digest: &str,
934) -> Result<(), SessionStoreError> {
935 validate_transcript_rewrite_commit_bodies(incoming, commit, incoming_state)?;
936 if incoming_state.head != incoming_message_digest {
937 return Err(SessionStoreError::InvalidTranscriptRewrite {
938 id: incoming.id().clone(),
939 reason: format!(
940 "incoming transcript graph head {} does not match current message digest {incoming_message_digest}",
941 incoming_state.head
942 ),
943 });
944 }
945 if !transcript_history_revision_extends(
946 incoming_state,
947 incoming_message_digest,
948 &commit.revision,
949 ) {
950 return Err(SessionStoreError::InvalidTranscriptRewrite {
951 id: incoming.id().clone(),
952 reason: format!(
953 "incoming transcript head {incoming_message_digest} does not extend rewrite revision {}",
954 commit.revision
955 ),
956 });
957 }
958 Ok(())
959}
960
961pub fn transcript_rewrite_save_guard(
964 incoming: &Session,
965 previous: Option<&Session>,
966 commit: &TranscriptRewriteCommit,
967) -> Result<(), SessionStoreError> {
968 incoming
969 .validate_transcript_history_state()
970 .map_err(|err| SessionStoreError::InvalidTranscriptRewrite {
971 id: incoming.id().clone(),
972 reason: format!("incoming transcript history state is malformed: {err}"),
973 })?;
974 let Some(previous) = previous else {
975 return Err(SessionStoreError::InvalidTranscriptRewrite {
976 id: incoming.id().clone(),
977 reason: "rewrite target has no previously persisted session".to_string(),
978 });
979 };
980 if incoming.id() != previous.id() {
981 return Err(SessionStoreError::InvalidTranscriptRewrite {
982 id: incoming.id().clone(),
983 reason: format!(
984 "incoming session id {} differs from previous session id {}",
985 incoming.id(),
986 previous.id()
987 ),
988 });
989 }
990 let previous_revision = previous.transcript_revision().map_err(|err| {
991 SessionStoreError::InvalidTranscriptRewrite {
992 id: incoming.id().clone(),
993 reason: format!("previous transcript revision is malformed: {err}"),
994 }
995 })?;
996 if previous_revision != commit.parent_revision {
997 return Err(SessionStoreError::TranscriptRevisionConflict {
998 id: incoming.id().clone(),
999 expected: commit.parent_revision.clone(),
1000 actual: previous_revision,
1001 });
1002 }
1003 let previous_message_digest =
1004 transcript_messages_digest(previous.messages()).map_err(|err| {
1005 SessionStoreError::InvalidTranscriptRewrite {
1006 id: incoming.id().clone(),
1007 reason: format!("previous current transcript is not digestible: {err}"),
1008 }
1009 })?;
1010 if previous_message_digest != commit.parent_revision {
1011 return Err(SessionStoreError::InvalidTranscriptRewrite {
1012 id: incoming.id().clone(),
1013 reason: format!(
1014 "previous current transcript digest {previous_message_digest} does not match commit parent {}",
1015 commit.parent_revision
1016 ),
1017 });
1018 }
1019 let incoming_revision = incoming.transcript_revision().map_err(|err| {
1020 SessionStoreError::InvalidTranscriptRewrite {
1021 id: incoming.id().clone(),
1022 reason: format!("incoming transcript revision is malformed: {err}"),
1023 }
1024 })?;
1025 if incoming_revision != commit.revision {
1026 return Err(SessionStoreError::InvalidTranscriptRewrite {
1027 id: incoming.id().clone(),
1028 reason: format!(
1029 "incoming transcript revision {incoming_revision} does not match commit revision {}",
1030 commit.revision
1031 ),
1032 });
1033 }
1034 let incoming_message_digest =
1035 transcript_messages_digest(incoming.messages()).map_err(|err| {
1036 SessionStoreError::InvalidTranscriptRewrite {
1037 id: incoming.id().clone(),
1038 reason: format!("incoming current transcript is not digestible: {err}"),
1039 }
1040 })?;
1041 if incoming_message_digest != commit.revision {
1042 return Err(SessionStoreError::InvalidTranscriptRewrite {
1043 id: incoming.id().clone(),
1044 reason: format!(
1045 "incoming current transcript digest {incoming_message_digest} does not match commit revision {}",
1046 commit.revision
1047 ),
1048 });
1049 }
1050 let Some(incoming_state) = incoming.transcript_history_state().map_err(|err| {
1051 SessionStoreError::InvalidTranscriptRewrite {
1052 id: incoming.id().clone(),
1053 reason: format!("incoming transcript history state is malformed: {err}"),
1054 }
1055 })?
1056 else {
1057 return Err(SessionStoreError::InvalidTranscriptRewrite {
1058 id: incoming.id().clone(),
1059 reason: "incoming rewrite did not persist a transcript revision graph".to_string(),
1060 });
1061 };
1062 validate_rewrite_save_retains_previous_commits(incoming, previous, &incoming_state)?;
1063 validate_transcript_rewrite_commit_bodies(incoming, commit, &incoming_state)
1064}
1065
1066fn validate_transcript_rewrite_commit_bodies(
1067 incoming: &Session,
1068 commit: &TranscriptRewriteCommit,
1069 incoming_state: &TranscriptHistoryState,
1070) -> Result<(), SessionStoreError> {
1071 if !incoming_state
1072 .commits
1073 .iter()
1074 .any(|persisted| persisted == commit)
1075 {
1076 return Err(SessionStoreError::InvalidTranscriptRewrite {
1077 id: incoming.id().clone(),
1078 reason: format!(
1079 "incoming rewrite did not persist the rewrite commit in the transcript graph (wanted {} -> {}, graph commits: {:?})",
1080 commit.parent_revision,
1081 commit.revision,
1082 incoming_state
1083 .commits
1084 .iter()
1085 .map(|commit| (&commit.parent_revision, &commit.revision))
1086 .collect::<Vec<_>>()
1087 ),
1088 });
1089 }
1090 let Some(parent_body) = incoming_state
1091 .revisions
1092 .iter()
1093 .find(|body| body.revision == commit.parent_revision)
1094 else {
1095 return Err(SessionStoreError::InvalidTranscriptRewrite {
1096 id: incoming.id().clone(),
1097 reason: format!(
1098 "incoming rewrite omitted parent revision body {}",
1099 commit.parent_revision
1100 ),
1101 });
1102 };
1103 let Some(revision_body) = incoming_state
1104 .revisions
1105 .iter()
1106 .find(|body| body.revision == commit.revision)
1107 else {
1108 return Err(SessionStoreError::InvalidTranscriptRewrite {
1109 id: incoming.id().clone(),
1110 reason: format!(
1111 "incoming rewrite omitted new revision body {}",
1112 commit.revision
1113 ),
1114 });
1115 };
1116 if parent_body.messages.len() != commit.messages_before
1117 || revision_body.messages.len() != commit.messages_after
1118 {
1119 return Err(SessionStoreError::InvalidTranscriptRewrite {
1120 id: incoming.id().clone(),
1121 reason: format!(
1122 "commit message counts {} -> {} do not match persisted rewrite {} -> {}",
1123 commit.messages_before,
1124 commit.messages_after,
1125 parent_body.messages.len(),
1126 revision_body.messages.len()
1127 ),
1128 });
1129 }
1130 let parent_body_revision =
1131 transcript_messages_digest(&parent_body.messages).map_err(|err| {
1132 SessionStoreError::InvalidTranscriptRewrite {
1133 id: incoming.id().clone(),
1134 reason: format!("parent revision body is not digestible: {err}"),
1135 }
1136 })?;
1137 if parent_body_revision != commit.parent_revision {
1138 return Err(SessionStoreError::InvalidTranscriptRewrite {
1139 id: incoming.id().clone(),
1140 reason: format!(
1141 "parent revision body digest {parent_body_revision} does not match commit parent {}",
1142 commit.parent_revision
1143 ),
1144 });
1145 }
1146 let (start, end) = match &commit.selection {
1147 TranscriptRewriteSelection::MessageRange { start, end } => (*start, *end),
1148 };
1149 if start > end || end > parent_body.messages.len() {
1150 return Err(SessionStoreError::InvalidTranscriptRewrite {
1151 id: incoming.id().clone(),
1152 reason: format!(
1153 "commit selection {start}..{end} is invalid for parent revision with {} messages",
1154 parent_body.messages.len()
1155 ),
1156 });
1157 }
1158 let original_span_digest = transcript_messages_digest(&parent_body.messages[start..end])
1159 .map_err(|err| SessionStoreError::InvalidTranscriptRewrite {
1160 id: incoming.id().clone(),
1161 reason: format!("original span body is not digestible: {err}"),
1162 })?;
1163 if original_span_digest != commit.original_span_digest {
1164 return Err(SessionStoreError::InvalidTranscriptRewrite {
1165 id: incoming.id().clone(),
1166 reason: format!(
1167 "original span digest {original_span_digest} does not match commit digest {}",
1168 commit.original_span_digest
1169 ),
1170 });
1171 }
1172 let revision_body_digest =
1173 transcript_messages_digest(&revision_body.messages).map_err(|err| {
1174 SessionStoreError::InvalidTranscriptRewrite {
1175 id: incoming.id().clone(),
1176 reason: format!("new revision body is not digestible: {err}"),
1177 }
1178 })?;
1179 if revision_body_digest != commit.revision {
1180 return Err(SessionStoreError::InvalidTranscriptRewrite {
1181 id: incoming.id().clone(),
1182 reason: format!(
1183 "new revision body digest {revision_body_digest} does not match commit revision {}",
1184 commit.revision
1185 ),
1186 });
1187 }
1188 let removed_len = end - start;
1189 let retained_len = commit
1190 .messages_before
1191 .checked_sub(removed_len)
1192 .ok_or_else(|| SessionStoreError::InvalidTranscriptRewrite {
1193 id: incoming.id().clone(),
1194 reason: "commit removed more messages than it recorded before rewrite".to_string(),
1195 })?;
1196 let replacement_len = commit
1197 .messages_after
1198 .checked_sub(retained_len)
1199 .ok_or_else(|| SessionStoreError::InvalidTranscriptRewrite {
1200 id: incoming.id().clone(),
1201 reason: "commit message counts cannot describe a replacement span".to_string(),
1202 })?;
1203 let replacement_end = start.checked_add(replacement_len).ok_or_else(|| {
1204 SessionStoreError::InvalidTranscriptRewrite {
1205 id: incoming.id().clone(),
1206 reason: "replacement span end overflowed".to_string(),
1207 }
1208 })?;
1209 if replacement_end > revision_body.messages.len() {
1210 return Err(SessionStoreError::InvalidTranscriptRewrite {
1211 id: incoming.id().clone(),
1212 reason: format!(
1213 "replacement span {start}..{replacement_end} is invalid for revision with {} messages",
1214 revision_body.messages.len()
1215 ),
1216 });
1217 }
1218 let parent_prefix_digest =
1219 transcript_messages_digest(&parent_body.messages[..start]).map_err(|err| {
1220 SessionStoreError::InvalidTranscriptRewrite {
1221 id: incoming.id().clone(),
1222 reason: format!("parent prefix body is not digestible: {err}"),
1223 }
1224 })?;
1225 let revision_prefix_digest = transcript_messages_digest(&revision_body.messages[..start])
1226 .map_err(|err| SessionStoreError::InvalidTranscriptRewrite {
1227 id: incoming.id().clone(),
1228 reason: format!("revision prefix body is not digestible: {err}"),
1229 })?;
1230 if parent_prefix_digest != revision_prefix_digest {
1231 return Err(SessionStoreError::InvalidTranscriptRewrite {
1232 id: incoming.id().clone(),
1233 reason: "rewrite revision changed messages before the selected span".to_string(),
1234 });
1235 }
1236 let parent_suffix_digest =
1237 transcript_messages_digest(&parent_body.messages[end..]).map_err(|err| {
1238 SessionStoreError::InvalidTranscriptRewrite {
1239 id: incoming.id().clone(),
1240 reason: format!("parent suffix body is not digestible: {err}"),
1241 }
1242 })?;
1243 let revision_suffix_digest =
1244 transcript_messages_digest(&revision_body.messages[replacement_end..]).map_err(|err| {
1245 SessionStoreError::InvalidTranscriptRewrite {
1246 id: incoming.id().clone(),
1247 reason: format!("revision suffix body is not digestible: {err}"),
1248 }
1249 })?;
1250 if parent_suffix_digest != revision_suffix_digest {
1251 return Err(SessionStoreError::InvalidTranscriptRewrite {
1252 id: incoming.id().clone(),
1253 reason: "rewrite revision changed messages after the selected span".to_string(),
1254 });
1255 }
1256 let replacement_digest = transcript_messages_digest(
1257 &revision_body.messages[start..replacement_end],
1258 )
1259 .map_err(|err| SessionStoreError::InvalidTranscriptRewrite {
1260 id: incoming.id().clone(),
1261 reason: format!("replacement span body is not digestible: {err}"),
1262 })?;
1263 if replacement_digest != commit.replacement_digest {
1264 return Err(SessionStoreError::InvalidTranscriptRewrite {
1265 id: incoming.id().clone(),
1266 reason: format!(
1267 "replacement span digest {replacement_digest} does not match commit digest {}",
1268 commit.replacement_digest
1269 ),
1270 });
1271 }
1272 Ok(())
1273}
1274
1275impl From<serde_json::Error> for SessionStoreError {
1276 fn from(e: serde_json::Error) -> Self {
1277 Self::Serialization(e.to_string())
1278 }
1279}
1280
1281#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
1306#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
1307pub trait SessionStore: Send + Sync {
1308 async fn save(&self, session: &Session) -> Result<(), SessionStoreError>;
1314
1315 async fn save_transcript_rewrite(
1321 &self,
1322 session: &Session,
1323 commit: &TranscriptRewriteCommit,
1324 ) -> Result<(), SessionStoreError> {
1325 let _ = (session, commit);
1326 Err(SessionStoreError::Internal(
1327 "save_transcript_rewrite is not supported by this SessionStore".to_string(),
1328 ))
1329 }
1330
1331 async fn save_authoritative_projection(
1340 &self,
1341 session: &Session,
1342 ) -> Result<(), SessionStoreError> {
1343 self.save(session).await
1344 }
1345
1346 async fn save_authoritative_projection_if_current_revision(
1349 &self,
1350 session: &Session,
1351 expected_current_revision: Option<String>,
1352 ) -> Result<(), SessionStoreError> {
1353 let _ = (session, expected_current_revision);
1354 Err(SessionStoreError::Internal(
1355 "save_authoritative_projection_if_current_revision is not supported by this SessionStore"
1356 .to_string(),
1357 ))
1358 }
1359
1360 async fn load(&self, id: &SessionId) -> Result<Option<Session>, SessionStoreError>;
1362
1363 async fn list(&self, filter: SessionFilter) -> Result<Vec<SessionMeta>, SessionStoreError>;
1365
1366 async fn delete(&self, id: &SessionId) -> Result<(), SessionStoreError>;
1368
1369 async fn delete_if_current_revision(
1372 &self,
1373 id: &SessionId,
1374 expected_current_revision: &str,
1375 ) -> Result<bool, SessionStoreError>;
1376
1377 async fn exists(&self, id: &SessionId) -> Result<bool, SessionStoreError> {
1379 Ok(self.load(id).await?.is_some())
1380 }
1381}
1382
1383#[cfg(test)]
1384mod tests {
1385 use super::*;
1386 use crate::types::{
1387 AssistantBlock, BlockAssistantMessage, StopReason, SystemMessage, SystemNoticeBlock,
1388 SystemNoticeKind, SystemNoticeMessage, UserMessage,
1389 };
1390
1391 #[test]
1398 #[allow(clippy::expect_used)]
1399 fn classify_live_session_authority_is_decided_by_machine() {
1400 use crate::session_document::{
1401 LiveSessionAuthorityKind, LiveSessionAuthorityReason, SessionDocumentEffect,
1402 SessionDocumentMachineAuthority,
1403 };
1404
1405 fn classify(
1406 stored_transcript_diverged: bool,
1407 live_has_uncommitted_transcript: bool,
1408 runtime_system_context_diverged: bool,
1409 stored_is_archived: bool,
1410 ) -> (LiveSessionAuthorityKind, LiveSessionAuthorityReason) {
1411 let mut authority = SessionDocumentMachineAuthority::new();
1412 let effects = authority
1413 .classify_live_session_authority(
1414 stored_transcript_diverged,
1415 live_has_uncommitted_transcript,
1416 runtime_system_context_diverged,
1417 stored_is_archived,
1418 )
1419 .expect("classifier must resolve a verdict");
1420 effects
1421 .iter()
1422 .find_map(|effect| match effect {
1423 SessionDocumentEffect::LiveSessionAuthorityClassified { authority, reason } => {
1424 Some((*authority, *reason))
1425 }
1426 _ => None,
1427 })
1428 .expect("classifier must emit a verdict")
1429 }
1430
1431 let (kind, _) = classify(false, false, false, false);
1433 assert_eq!(kind, LiveSessionAuthorityKind::LiveAuthoritative);
1434
1435 assert_eq!(
1437 classify(true, false, false, false),
1438 (
1439 LiveSessionAuthorityKind::DurableAuthoritative,
1440 LiveSessionAuthorityReason::StoredTranscriptRevisionDiverged
1441 ),
1442 );
1443 assert_eq!(
1444 classify(false, true, false, false),
1445 (
1446 LiveSessionAuthorityKind::DurableAuthoritative,
1447 LiveSessionAuthorityReason::LiveUncommittedTranscript
1448 ),
1449 );
1450 assert_eq!(
1451 classify(false, false, true, false),
1452 (
1453 LiveSessionAuthorityKind::DurableAuthoritative,
1454 LiveSessionAuthorityReason::RuntimeSystemContextDiverged
1455 ),
1456 );
1457 assert_eq!(
1458 classify(false, false, false, true),
1459 (
1460 LiveSessionAuthorityKind::DurableAuthoritative,
1461 LiveSessionAuthorityReason::StoredArchived
1462 ),
1463 );
1464
1465 assert_eq!(
1468 classify(true, true, true, true),
1469 (
1470 LiveSessionAuthorityKind::DurableAuthoritative,
1471 LiveSessionAuthorityReason::StoredArchived
1472 ),
1473 );
1474 assert_eq!(
1476 classify(true, true, true, false),
1477 (
1478 LiveSessionAuthorityKind::DurableAuthoritative,
1479 LiveSessionAuthorityReason::LiveUncommittedTranscript
1480 ),
1481 );
1482 assert_eq!(
1484 classify(true, false, true, false),
1485 (
1486 LiveSessionAuthorityKind::DurableAuthoritative,
1487 LiveSessionAuthorityReason::RuntimeSystemContextDiverged
1488 ),
1489 );
1490 }
1491
1492 #[test]
1493 fn append_only_guard_rejects_leading_system_message_replacement() {
1494 let mut previous = Session::new();
1495 previous.push(Message::System(SystemMessage::new("original system")));
1496 previous.push(Message::User(UserMessage::text("hello".to_string())));
1497
1498 let mut incoming = previous.clone();
1499 let rewrite_result = incoming.replace_messages_internal(
1500 vec![
1501 Message::System(SystemMessage::new("rewritten system")),
1502 Message::User(UserMessage::text("hello".to_string())),
1503 ],
1504 crate::TranscriptRewriteReason::new("unit-test"),
1505 );
1506 assert!(
1507 rewrite_result.is_ok(),
1508 "typed rewrite should be constructible: {rewrite_result:?}"
1509 );
1510
1511 assert!(matches!(
1512 append_only_save_guard(&incoming, Some(&previous)),
1513 Err(SessionStoreError::TranscriptContinuityViolation { .. })
1514 ));
1515 }
1516
1517 #[test]
1518 fn append_only_guard_accepts_runtime_system_context_append()
1519 -> Result<(), Box<dyn std::error::Error>> {
1520 let mut previous = Session::new();
1521 previous.push(Message::System(SystemMessage::new("base system")));
1522 previous.push(Message::User(UserMessage::text("hello".to_string())));
1523
1524 let mut incoming = previous.clone();
1525 incoming.set_system_prompt_with_source(
1529 format!(
1530 "base system{SYSTEM_CONTEXT_SEPARATOR}[Runtime System Context]\nsource: unit-test\n\nextra context"
1531 ),
1532 crate::session_durable_config_authority::SessionSystemPromptSource::RuntimeContextAppend,
1533 )?;
1534
1535 assert!(append_only_save_guard(&incoming, Some(&previous)).is_ok());
1536 Ok(())
1537 }
1538
1539 #[test]
1540 fn append_only_guard_rejects_append_shaped_prompt_without_runtime_context_marker() {
1541 let mut previous = Session::new();
1542 previous.push(Message::System(SystemMessage::new("base system")));
1543 previous.push(Message::User(UserMessage::text("hello".to_string())));
1544
1545 let mut incoming = previous.clone();
1549 incoming.set_system_prompt(format!(
1550 "base system{SYSTEM_CONTEXT_SEPARATOR}[Runtime System Context]\nsource: forged\n\nextra context"
1551 ));
1552
1553 assert!(matches!(
1554 append_only_save_guard(&incoming, Some(&previous)),
1555 Err(SessionStoreError::TranscriptContinuityViolation { .. })
1556 ));
1557 }
1558
1559 #[test]
1560 fn append_only_guard_accepts_system_timestamp_refresh_without_content_change() {
1561 let mut previous = Session::new();
1562 previous.push(Message::System(SystemMessage::new("base system")));
1563
1564 let mut incoming = previous.clone();
1565 incoming.set_system_prompt("base system".to_string());
1566
1567 assert!(append_only_save_guard(&incoming, Some(&previous)).is_ok());
1568 }
1569
1570 #[test]
1571 fn run_boundary_guard_accepts_compaction_after_uncheckpointed_runtime_append()
1572 -> Result<(), Box<dyn std::error::Error>> {
1573 let mut previous = Session::new();
1574 previous.push(Message::System(SystemMessage::new("base system")));
1575 previous.push(Message::User(UserMessage::text("turn one".to_string())));
1576 previous.push(Message::BlockAssistant(BlockAssistantMessage {
1577 blocks: vec![AssistantBlock::Text {
1578 text: "answer one".to_string(),
1579 meta: None,
1580 }],
1581 stop_reason: StopReason::EndTurn,
1582 created_at: crate::types::message_timestamp_now(),
1583 }));
1584
1585 let mut parent = previous.clone();
1586 parent.set_system_prompt("refreshed runtime system projection".to_string());
1587 parent.push(Message::User(UserMessage::text(
1588 "runtime-only turn".to_string(),
1589 )));
1590 parent.push(Message::BlockAssistant(BlockAssistantMessage {
1591 blocks: vec![AssistantBlock::Text {
1592 text: "runtime-only answer".to_string(),
1593 meta: None,
1594 }],
1595 stop_reason: StopReason::EndTurn,
1596 created_at: crate::types::message_timestamp_now(),
1597 }));
1598 let parent_revision = parent.transcript_revision()?;
1599
1600 let mut incoming = parent.clone();
1601 let mut replacement = vec![
1602 parent.messages()[0].clone(),
1603 Message::User(UserMessage::compaction_summary(
1604 "[Context compacted] summary".to_string(),
1605 )),
1606 ];
1607 replacement.extend_from_slice(&parent.messages()[1..]);
1608 incoming.commit_transcript_rewrite(
1609 TranscriptRewriteSelection::MessageRange {
1610 start: 0,
1611 end: parent.messages().len(),
1612 },
1613 replacement,
1614 crate::TranscriptRewriteReason::new("compaction"),
1615 Some("meerkat-core".to_string()),
1616 Some(parent_revision),
1617 )?;
1618
1619 assert!(matches!(
1620 append_only_save_guard(&incoming, Some(&previous)),
1621 Err(SessionStoreError::TranscriptContinuityViolation { .. })
1622 ));
1623 assert!(run_boundary_snapshot_save_guard(&incoming, Some(&previous)).is_ok());
1624 Ok(())
1625 }
1626
1627 #[test]
1628 fn run_boundary_guard_accepts_compaction_with_retained_tail_window()
1629 -> Result<(), Box<dyn std::error::Error>> {
1630 let mut previous = Session::new();
1631 previous.push(Message::System(SystemMessage::new("base system")));
1632 previous.push(Message::User(UserMessage::text("turn one".to_string())));
1633 previous.push(Message::BlockAssistant(BlockAssistantMessage::new(
1634 vec![crate::types::AssistantBlock::Text {
1635 text: "answer one".to_string(),
1636 meta: None,
1637 }],
1638 StopReason::EndTurn,
1639 )));
1640
1641 let mut parent = previous.clone();
1642 parent.set_system_prompt("refreshed runtime system projection".to_string());
1643 parent.push(Message::SystemNotice(SystemNoticeMessage::new(
1644 SystemNoticeKind::Comms,
1645 "peer response queued",
1646 )));
1647 let parent_revision = parent.transcript_revision()?;
1648
1649 let mut incoming = parent.clone();
1650 let mut replacement = vec![
1651 parent.messages()[0].clone(),
1652 Message::User(UserMessage::compaction_summary(
1653 "[Context compacted] summary".to_string(),
1654 )),
1655 ];
1656 replacement.extend_from_slice(&parent.messages()[1..]);
1657 incoming.commit_transcript_rewrite(
1658 TranscriptRewriteSelection::MessageRange {
1659 start: 0,
1660 end: parent.messages().len(),
1661 },
1662 replacement,
1663 crate::TranscriptRewriteReason::new("compaction"),
1664 Some("meerkat-core".to_string()),
1665 Some(parent_revision),
1666 )?;
1667
1668 assert!(matches!(
1669 append_only_save_guard(&incoming, Some(&previous)),
1670 Err(SessionStoreError::TranscriptContinuityViolation { .. })
1671 ));
1672 assert!(run_boundary_snapshot_save_guard(&incoming, Some(&previous)).is_ok());
1673 Ok(())
1674 }
1675
1676 #[test]
1677 fn run_boundary_guard_rejects_commitless_history_parent_edge()
1678 -> Result<(), Box<dyn std::error::Error>> {
1679 let mut previous = Session::new();
1680 previous.push(Message::System(SystemMessage::new("base system")));
1681 previous.push(Message::User(UserMessage::text("turn one".to_string())));
1682 let previous_revision = previous.transcript_revision()?;
1683
1684 let mut incoming = previous.clone();
1685 incoming.set_system_prompt("forged replacement system".to_string());
1686 let incoming_revision = incoming.transcript_revision()?;
1687 let history = TranscriptHistoryState {
1688 head: incoming_revision.clone(),
1689 commits: Vec::new(),
1690 revisions: vec![
1691 crate::TranscriptRevisionBody {
1692 revision: previous_revision,
1693 parent_revision: None,
1694 messages: previous.messages().to_vec(),
1695 created_at: previous.updated_at(),
1696 },
1697 crate::TranscriptRevisionBody {
1698 revision: incoming_revision,
1699 parent_revision: Some(previous.transcript_revision()?),
1700 messages: incoming.messages().to_vec(),
1701 created_at: incoming.updated_at(),
1702 },
1703 ],
1704 };
1705 incoming.set_metadata_unchecked_for_test(
1706 crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1707 serde_json::to_value(history)?,
1708 );
1709
1710 assert!(matches!(
1711 append_only_save_guard(&incoming, Some(&previous)),
1712 Err(SessionStoreError::TranscriptContinuityViolation { .. })
1713 ));
1714 assert!(matches!(
1715 run_boundary_snapshot_save_guard(&incoming, Some(&previous)),
1716 Err(SessionStoreError::TranscriptContinuityViolation { .. }
1717 | SessionStoreError::MonotonicityViolation { .. })
1718 ));
1719 Ok(())
1720 }
1721
1722 #[test]
1723 fn append_only_guard_rejects_history_head_that_does_not_match_current_messages()
1724 -> Result<(), Box<dyn std::error::Error>> {
1725 let mut previous = Session::new();
1726 previous.push(Message::User(UserMessage::text("persisted".to_string())));
1727
1728 let mut incoming = previous.clone();
1729 incoming.push(Message::User(UserMessage::text("append".to_string())));
1730 let poisoned_messages = vec![Message::User(UserMessage::text(
1731 "unrelated poisoned history".to_string(),
1732 ))];
1733 let poisoned_revision = transcript_messages_digest(&poisoned_messages)?;
1734 incoming.set_metadata_unchecked_for_test(
1735 crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1736 serde_json::to_value(TranscriptHistoryState {
1737 head: poisoned_revision.clone(),
1738 commits: Vec::new(),
1739 revisions: vec![crate::TranscriptRevisionBody {
1740 revision: poisoned_revision,
1741 parent_revision: None,
1742 messages: poisoned_messages,
1743 created_at: incoming.updated_at(),
1744 }],
1745 })?,
1746 );
1747
1748 assert!(matches!(
1749 append_only_save_guard(&incoming, Some(&previous)),
1750 Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1751 ));
1752 assert!(matches!(
1753 append_only_save_guard(&incoming, None),
1754 Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1755 ));
1756 assert!(matches!(
1757 run_boundary_snapshot_save_guard(&incoming, Some(&previous)),
1758 Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1759 ));
1760 Ok(())
1761 }
1762
1763 #[test]
1764 fn append_only_guard_rejects_new_rewrite_commits_on_plain_append()
1765 -> Result<(), Box<dyn std::error::Error>> {
1766 let mut previous = Session::new();
1767 previous.push(Message::User(UserMessage::text("persisted".to_string())));
1768 let previous_revision = previous.transcript_revision()?;
1769
1770 let mut incoming = previous.clone();
1771 let appended = Message::BlockAssistant(BlockAssistantMessage {
1772 blocks: vec![AssistantBlock::Text {
1773 text: "plain append".to_string(),
1774 meta: None,
1775 }],
1776 stop_reason: StopReason::EndTurn,
1777 created_at: crate::types::message_timestamp_now(),
1778 });
1779 incoming.commit_transcript_rewrite(
1780 TranscriptRewriteSelection::MessageRange { start: 1, end: 1 },
1781 vec![appended],
1782 crate::TranscriptRewriteReason::new("forged-append"),
1783 Some("unit-test".to_string()),
1784 Some(previous_revision),
1785 )?;
1786
1787 assert!(matches!(
1788 append_only_save_guard(&incoming, Some(&previous)),
1789 Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1790 ));
1791 Ok(())
1792 }
1793
1794 #[test]
1795 fn append_only_guard_rejects_first_save_with_rewrite_commits()
1796 -> Result<(), Box<dyn std::error::Error>> {
1797 let mut incoming = Session::new();
1798 incoming.push(Message::User(UserMessage::text("seed".to_string())));
1799 let parent_messages = incoming.messages().to_vec();
1800 let parent_updated_at = incoming.updated_at();
1801 let parent_revision = incoming.transcript_revision()?;
1802 let commit = incoming.commit_transcript_rewrite(
1803 TranscriptRewriteSelection::MessageRange { start: 0, end: 1 },
1804 vec![Message::User(UserMessage::text(
1805 "compacted seed".to_string(),
1806 ))],
1807 crate::TranscriptRewriteReason::new("compaction"),
1808 Some("meerkat-core".to_string()),
1809 Some(parent_revision),
1810 )?;
1811 let incoming_revision = incoming.transcript_revision()?;
1812 let commit_parent_revision = commit.parent_revision.clone();
1813 incoming.set_metadata_unchecked_for_test(
1814 crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1815 serde_json::to_value(TranscriptHistoryState {
1816 head: incoming_revision.clone(),
1817 commits: vec![commit],
1818 revisions: vec![
1819 crate::TranscriptRevisionBody {
1820 revision: commit_parent_revision.clone(),
1821 parent_revision: None,
1822 messages: parent_messages,
1823 created_at: parent_updated_at,
1824 },
1825 crate::TranscriptRevisionBody {
1826 revision: incoming_revision,
1827 parent_revision: Some(commit_parent_revision),
1828 messages: incoming.messages().to_vec(),
1829 created_at: incoming.updated_at(),
1830 },
1831 ],
1832 })?,
1833 );
1834
1835 assert!(matches!(
1836 append_only_save_guard(&incoming, None),
1837 Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1838 ));
1839 Ok(())
1840 }
1841
1842 #[test]
1843 fn transcript_rewrite_guard_rejects_poisoned_history_graph()
1844 -> Result<(), Box<dyn std::error::Error>> {
1845 let mut previous = Session::new();
1846 previous.push(Message::User(UserMessage::text("persisted".to_string())));
1847 let parent_revision = previous.transcript_revision()?;
1848
1849 let mut first = previous.clone();
1850 let first_commit = first.commit_transcript_rewrite(
1851 TranscriptRewriteSelection::MessageRange { start: 0, end: 1 },
1852 vec![Message::User(UserMessage::text(
1853 "compacted persisted".to_string(),
1854 ))],
1855 crate::TranscriptRewriteReason::new("compaction"),
1856 Some("unit-test".to_string()),
1857 Some(parent_revision),
1858 )?;
1859 let first_snapshot = first.clone();
1860
1861 first.commit_transcript_rewrite(
1862 TranscriptRewriteSelection::MessageRange { start: 0, end: 1 },
1863 vec![Message::User(UserMessage::text(
1864 "uncommitted poisoned fork".to_string(),
1865 ))],
1866 crate::TranscriptRewriteReason::new("poison"),
1867 Some("unit-test".to_string()),
1868 Some(first_commit.revision.clone()),
1869 )?;
1870 let mut poisoned_state = first
1871 .transcript_history_state()?
1872 .ok_or_else(|| "second rewrite should retain history state".to_string())?;
1873 poisoned_state.head = first_commit.revision.clone();
1874
1875 let mut poisoned = first_snapshot;
1876 poisoned.set_metadata_unchecked_for_test(
1877 crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1878 serde_json::to_value(poisoned_state)?,
1879 );
1880
1881 assert!(matches!(
1882 transcript_rewrite_save_guard(&poisoned, Some(&previous), &first_commit),
1883 Err(SessionStoreError::InvalidTranscriptRewrite { reason, .. })
1884 if reason.contains("incoming transcript history state is malformed")
1885 ));
1886 Ok(())
1887 }
1888
1889 #[test]
1890 fn authoritative_projection_guard_rejects_changed_persisted_revision()
1891 -> Result<(), Box<dyn std::error::Error>> {
1892 let mut previous = Session::new();
1893 previous.push(Message::User(UserMessage::text("persisted A".to_string())));
1894 let expected_revision = previous.transcript_revision()?;
1895
1896 let mut current = previous.clone();
1897 current.push(Message::BlockAssistant(BlockAssistantMessage {
1898 blocks: vec![AssistantBlock::Text {
1899 text: "persisted B".to_string(),
1900 meta: None,
1901 }],
1902 stop_reason: StopReason::EndTurn,
1903 created_at: crate::types::message_timestamp_now(),
1904 }));
1905 let mut incoming = previous.clone();
1906 incoming.push(Message::User(UserMessage::text(
1907 "incoming from A".to_string(),
1908 )));
1909
1910 assert!(matches!(
1911 authoritative_projection_current_revision_guard(
1912 &incoming,
1913 Some(¤t),
1914 Some(&expected_revision)
1915 ),
1916 Err(SessionStoreError::TranscriptContinuityViolation { .. })
1917 ));
1918 Ok(())
1919 }
1920
1921 #[test]
1922 fn append_only_guard_rejects_rewrite_commits_on_first_save()
1923 -> Result<(), Box<dyn std::error::Error>> {
1924 let mut incoming = Session::new();
1925 incoming.push(Message::User(UserMessage::text("persisted".to_string())));
1926 let parent_revision = incoming.transcript_revision()?;
1927 incoming.commit_transcript_rewrite(
1928 TranscriptRewriteSelection::MessageRange { start: 0, end: 1 },
1929 vec![Message::User(UserMessage::text("rewritten".to_string()))],
1930 crate::TranscriptRewriteReason::new("forged-first-save"),
1931 Some("unit-test".to_string()),
1932 Some(parent_revision),
1933 )?;
1934
1935 assert!(matches!(
1936 append_only_save_guard(&incoming, None),
1937 Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1938 ));
1939 Ok(())
1940 }
1941
1942 #[test]
1943 fn append_only_guard_rejects_commitless_history_on_first_save()
1944 -> Result<(), Box<dyn std::error::Error>> {
1945 let mut incoming = Session::new();
1946 incoming.push(Message::User(UserMessage::text("persisted".to_string())));
1947 let incoming_revision = incoming.transcript_revision()?;
1948 incoming.set_metadata_unchecked_for_test(
1949 crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1950 serde_json::to_value(TranscriptHistoryState {
1951 head: incoming_revision.clone(),
1952 commits: Vec::new(),
1953 revisions: vec![crate::TranscriptRevisionBody {
1954 revision: incoming_revision,
1955 parent_revision: None,
1956 messages: incoming.messages().to_vec(),
1957 created_at: incoming.updated_at(),
1958 }],
1959 })?,
1960 );
1961
1962 assert!(matches!(
1963 append_only_save_guard(&incoming, None),
1964 Err(SessionStoreError::InvalidTranscriptRewrite { reason, .. })
1965 if reason.contains("first save would seed transcript history state")
1966 ));
1967 Ok(())
1968 }
1969
1970 #[test]
1971 fn append_only_guard_rejects_commitless_history_seed_on_plain_append()
1972 -> Result<(), Box<dyn std::error::Error>> {
1973 let mut previous = Session::new();
1974 previous.push(Message::User(UserMessage::text("persisted".to_string())));
1975 let previous_revision = previous.transcript_revision()?;
1976
1977 let mut incoming = previous.clone();
1978 incoming.push(Message::BlockAssistant(BlockAssistantMessage {
1979 blocks: vec![AssistantBlock::Text {
1980 text: "plain append".to_string(),
1981 meta: None,
1982 }],
1983 stop_reason: StopReason::EndTurn,
1984 created_at: crate::types::message_timestamp_now(),
1985 }));
1986 let incoming_revision = incoming.transcript_revision()?;
1987 incoming.set_metadata_unchecked_for_test(
1988 crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1989 serde_json::to_value(TranscriptHistoryState {
1990 head: incoming_revision.clone(),
1991 commits: Vec::new(),
1992 revisions: vec![
1993 crate::TranscriptRevisionBody {
1994 revision: previous_revision,
1995 parent_revision: None,
1996 messages: previous.messages().to_vec(),
1997 created_at: previous.updated_at(),
1998 },
1999 crate::TranscriptRevisionBody {
2000 revision: incoming_revision,
2001 parent_revision: Some(previous.transcript_revision()?),
2002 messages: incoming.messages().to_vec(),
2003 created_at: incoming.updated_at(),
2004 },
2005 ],
2006 })?,
2007 );
2008
2009 assert!(matches!(
2010 append_only_save_guard(&incoming, Some(&previous)),
2011 Err(SessionStoreError::InvalidTranscriptRewrite { reason, .. })
2012 if reason.contains("append-only save would seed transcript history state")
2013 ));
2014 Ok(())
2015 }
2016
2017 #[test]
2018 fn run_boundary_guard_accepts_commitless_history_seed_on_plain_append()
2019 -> Result<(), Box<dyn std::error::Error>> {
2020 let mut previous = Session::new();
2021 previous.push(Message::User(UserMessage::text("persisted".to_string())));
2022 let previous_revision = previous.transcript_revision()?;
2023
2024 let mut incoming = previous.clone();
2025 incoming.push(Message::BlockAssistant(BlockAssistantMessage {
2026 blocks: vec![AssistantBlock::Text {
2027 text: "plain append".to_string(),
2028 meta: None,
2029 }],
2030 stop_reason: StopReason::EndTurn,
2031 created_at: crate::types::message_timestamp_now(),
2032 }));
2033 let incoming_revision = incoming.transcript_revision()?;
2034 incoming.set_metadata_unchecked_for_test(
2035 crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
2036 serde_json::to_value(TranscriptHistoryState {
2037 head: incoming_revision.clone(),
2038 commits: Vec::new(),
2039 revisions: vec![
2040 crate::TranscriptRevisionBody {
2041 revision: previous_revision.clone(),
2042 parent_revision: None,
2043 messages: previous.messages().to_vec(),
2044 created_at: previous.updated_at(),
2045 },
2046 crate::TranscriptRevisionBody {
2047 revision: incoming_revision,
2048 parent_revision: Some(previous_revision),
2049 messages: incoming.messages().to_vec(),
2050 created_at: incoming.updated_at(),
2051 },
2052 ],
2053 })?,
2054 );
2055
2056 assert!(append_only_save_guard(&incoming, Some(&previous)).is_err());
2057 assert!(run_boundary_snapshot_save_guard(&incoming, Some(&previous)).is_ok());
2058 Ok(())
2059 }
2060
2061 #[test]
2062 fn run_boundary_guard_accepts_retained_history_seed_on_plain_append()
2063 -> Result<(), Box<dyn std::error::Error>> {
2064 let mut original = Session::new();
2065 original.push(Message::User(UserMessage::text("verbose seed".to_string())));
2066 let original_revision = original.transcript_revision()?;
2067
2068 let mut previous = original.clone();
2069 previous.commit_transcript_rewrite(
2070 TranscriptRewriteSelection::MessageRange { start: 0, end: 1 },
2071 vec![Message::User(UserMessage::text(
2072 "compacted seed".to_string(),
2073 ))],
2074 crate::TranscriptRewriteReason::new("compaction"),
2075 Some("meerkat-core".to_string()),
2076 Some(original_revision),
2077 )?;
2078 let previous_with_history = previous.clone();
2079 previous.clear_transcript_history_state();
2080
2081 let mut incoming = previous_with_history;
2082 incoming.push(Message::BlockAssistant(BlockAssistantMessage {
2083 blocks: vec![AssistantBlock::Text {
2084 text: "plain append after retained history".to_string(),
2085 meta: None,
2086 }],
2087 stop_reason: StopReason::EndTurn,
2088 created_at: crate::types::message_timestamp_now(),
2089 }));
2090
2091 assert!(append_only_save_guard(&incoming, Some(&previous)).is_err());
2092 assert!(run_boundary_snapshot_save_guard(&incoming, Some(&previous)).is_ok());
2093 Ok(())
2094 }
2095
2096 #[test]
2097 fn run_boundary_guard_accepts_commitless_history_seed_on_first_snapshot()
2098 -> Result<(), Box<dyn std::error::Error>> {
2099 let mut incoming = Session::new();
2100 incoming.push(Message::User(UserMessage::text("persisted".to_string())));
2101 let incoming_revision = incoming.transcript_revision()?;
2102 incoming.set_metadata_unchecked_for_test(
2103 crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
2104 serde_json::to_value(TranscriptHistoryState {
2105 head: incoming_revision.clone(),
2106 commits: Vec::new(),
2107 revisions: vec![crate::TranscriptRevisionBody {
2108 revision: incoming_revision,
2109 parent_revision: None,
2110 messages: incoming.messages().to_vec(),
2111 created_at: incoming.updated_at(),
2112 }],
2113 })?,
2114 );
2115
2116 assert!(append_only_save_guard(&incoming, None).is_err());
2117 assert!(run_boundary_snapshot_save_guard(&incoming, None).is_ok());
2118 Ok(())
2119 }
2120
2121 #[test]
2122 fn run_boundary_guard_accepts_commitless_history_seed_on_initial_multi_revision_snapshot()
2123 -> Result<(), Box<dyn std::error::Error>> {
2124 let mut base = Session::new();
2125 base.push(Message::User(UserMessage::text("first".to_string())));
2126 let base_revision = base.transcript_revision()?;
2127
2128 let mut incoming = base.clone();
2129 incoming.push(Message::BlockAssistant(BlockAssistantMessage {
2130 blocks: vec![AssistantBlock::Text {
2131 text: "second".to_string(),
2132 meta: None,
2133 }],
2134 stop_reason: StopReason::EndTurn,
2135 created_at: crate::types::message_timestamp_now(),
2136 }));
2137 let incoming_revision = incoming.transcript_revision()?;
2138 incoming.set_metadata_unchecked_for_test(
2139 crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
2140 serde_json::to_value(TranscriptHistoryState {
2141 head: incoming_revision.clone(),
2142 commits: Vec::new(),
2143 revisions: vec![
2144 crate::TranscriptRevisionBody {
2145 revision: base_revision.clone(),
2146 parent_revision: None,
2147 messages: base.messages().to_vec(),
2148 created_at: base.updated_at(),
2149 },
2150 crate::TranscriptRevisionBody {
2151 revision: incoming_revision,
2152 parent_revision: Some(base_revision),
2153 messages: incoming.messages().to_vec(),
2154 created_at: incoming.updated_at(),
2155 },
2156 ],
2157 })?,
2158 );
2159
2160 assert!(append_only_save_guard(&incoming, None).is_err());
2161 assert!(run_boundary_snapshot_save_guard(&incoming, None).is_ok());
2162 Ok(())
2163 }
2164
2165 #[test]
2166 fn append_only_guard_rejects_new_rewrite_commits_on_system_context_append()
2167 -> Result<(), Box<dyn std::error::Error>> {
2168 let mut previous = Session::new();
2169 previous.push(Message::System(SystemMessage::new("base system")));
2170 previous.push(Message::User(UserMessage::text("persisted".to_string())));
2171 let mut incoming = previous.clone();
2172 incoming.set_system_prompt_with_source(
2173 format!(
2174 "base system{SYSTEM_CONTEXT_SEPARATOR}[Runtime System Context]\nsource: unit-test\n\nextra context"
2175 ),
2176 crate::session_durable_config_authority::SessionSystemPromptSource::RuntimeContextAppend,
2177 )?;
2178 incoming.push(Message::BlockAssistant(BlockAssistantMessage {
2179 blocks: vec![AssistantBlock::Text {
2180 text: "plain append".to_string(),
2181 meta: None,
2182 }],
2183 stop_reason: StopReason::EndTurn,
2184 created_at: crate::types::message_timestamp_now(),
2185 }));
2186 let incoming_revision = incoming.transcript_revision()?;
2187 incoming.set_metadata_unchecked_for_test(
2188 crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
2189 serde_json::to_value(TranscriptHistoryState {
2190 head: incoming_revision.clone(),
2191 commits: vec![TranscriptRewriteCommit {
2192 parent_revision: previous.transcript_revision()?,
2193 revision: incoming_revision.clone(),
2194 selection: TranscriptRewriteSelection::MessageRange { start: 0, end: 0 },
2195 original_span_digest: transcript_messages_digest(&[])?,
2196 replacement_digest: transcript_messages_digest(&[])?,
2197 messages_before: previous.messages().len(),
2198 messages_after: incoming.messages().len(),
2199 reason: crate::TranscriptRewriteReason::new("forged"),
2200 actor: Some("unit-test".to_string()),
2201 committed_at: incoming.updated_at(),
2202 }],
2203 revisions: vec![crate::TranscriptRevisionBody {
2204 revision: incoming_revision,
2205 parent_revision: None,
2206 messages: incoming.messages().to_vec(),
2207 created_at: incoming.updated_at(),
2208 }],
2209 })?,
2210 );
2211
2212 assert!(matches!(
2213 append_only_save_guard(&incoming, Some(&previous)),
2214 Err(SessionStoreError::InvalidTranscriptRewrite { .. })
2215 ));
2216 Ok(())
2217 }
2218
2219 #[test]
2220 fn append_only_guard_rejects_new_rewrite_commits_on_transient_notice_cleanup()
2221 -> Result<(), Box<dyn std::error::Error>> {
2222 let mut previous = Session::new();
2223 previous.push(Message::SystemNotice(SystemNoticeMessage::new(
2224 SystemNoticeKind::Comms,
2225 "transient peer delivery notice",
2226 )));
2227 previous.push(Message::User(UserMessage::text("persisted".to_string())));
2228
2229 let mut incoming = Session::new();
2230 incoming.push(Message::User(UserMessage::text("persisted".to_string())));
2231 incoming.push(Message::BlockAssistant(BlockAssistantMessage {
2232 blocks: vec![AssistantBlock::Text {
2233 text: "plain append after notice cleanup".to_string(),
2234 meta: None,
2235 }],
2236 stop_reason: StopReason::EndTurn,
2237 created_at: crate::types::message_timestamp_now(),
2238 }));
2239 let incoming_revision = incoming.transcript_revision()?;
2240 incoming.set_metadata_unchecked_for_test(
2241 crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
2242 serde_json::to_value(TranscriptHistoryState {
2243 head: incoming_revision.clone(),
2244 commits: vec![TranscriptRewriteCommit {
2245 parent_revision: previous.transcript_revision()?,
2246 revision: incoming_revision.clone(),
2247 selection: TranscriptRewriteSelection::MessageRange { start: 0, end: 0 },
2248 original_span_digest: transcript_messages_digest(&[])?,
2249 replacement_digest: transcript_messages_digest(&[])?,
2250 messages_before: previous.messages().len(),
2251 messages_after: incoming.messages().len(),
2252 reason: crate::TranscriptRewriteReason::new("forged"),
2253 actor: Some("unit-test".to_string()),
2254 committed_at: incoming.updated_at(),
2255 }],
2256 revisions: vec![crate::TranscriptRevisionBody {
2257 revision: incoming_revision,
2258 parent_revision: None,
2259 messages: incoming.messages().to_vec(),
2260 created_at: incoming.updated_at(),
2261 }],
2262 })?,
2263 );
2264
2265 assert!(matches!(
2266 append_only_save_guard(&incoming, Some(&previous)),
2267 Err(SessionStoreError::InvalidTranscriptRewrite { .. })
2268 ));
2269 Ok(())
2270 }
2271
2272 #[test]
2273 fn run_boundary_guard_accepts_generated_context_summary_before_retained_tail()
2274 -> Result<(), Box<dyn std::error::Error>> {
2275 let mut previous = Session::new();
2276 previous.push(Message::System(SystemMessage::new(
2277 "runtime system before context refresh",
2278 )));
2279 previous.push(Message::User(UserMessage::text(
2280 "Turn 1 request".to_string(),
2281 )));
2282 previous.push(Message::BlockAssistant(BlockAssistantMessage {
2283 blocks: vec![AssistantBlock::Text {
2284 text: "Turn 1 answer".to_string(),
2285 meta: None,
2286 }],
2287 stop_reason: StopReason::EndTurn,
2288 created_at: crate::types::message_timestamp_now(),
2289 }));
2290
2291 let mut incoming = Session::with_id(previous.id().clone());
2292 incoming.push(Message::System(SystemMessage::new(
2293 "runtime system after context refresh",
2294 )));
2295 incoming.push(Message::User(UserMessage::text(
2296 "Verbose context that will be compacted".to_string(),
2297 )));
2298 for message in previous.messages()[1..].iter().cloned() {
2299 incoming.push(message);
2300 }
2301 incoming.push(Message::BlockAssistant(BlockAssistantMessage {
2302 blocks: vec![AssistantBlock::Text {
2303 text: "Turn 2 generated answer".to_string(),
2304 meta: None,
2305 }],
2306 stop_reason: StopReason::EndTurn,
2307 created_at: crate::types::message_timestamp_now(),
2308 }));
2309 let parent_revision = incoming.transcript_revision()?;
2310 incoming.commit_transcript_rewrite(
2311 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
2312 vec![Message::User(UserMessage::compaction_summary(
2313 "[Context compacted] Earlier runtime context".to_string(),
2314 ))],
2315 crate::TranscriptRewriteReason::new("compaction"),
2316 Some("meerkat-core".to_string()),
2317 Some(parent_revision),
2318 )?;
2319
2320 assert!(append_only_save_guard(&incoming, Some(&previous)).is_err());
2321 assert!(run_boundary_snapshot_save_guard(&incoming, Some(&previous)).is_ok());
2322 Ok(())
2323 }
2324
2325 #[test]
2326 fn run_boundary_guard_rejects_context_summary_tail_without_compaction_summary_marker()
2327 -> Result<(), Box<dyn std::error::Error>> {
2328 let mut previous = Session::new();
2329 previous.push(Message::System(SystemMessage::new(
2330 "runtime system before context refresh",
2331 )));
2332 previous.push(Message::User(UserMessage::text(
2333 "Turn 1 request".to_string(),
2334 )));
2335 previous.push(Message::BlockAssistant(BlockAssistantMessage {
2336 blocks: vec![AssistantBlock::Text {
2337 text: "Turn 1 answer".to_string(),
2338 meta: None,
2339 }],
2340 stop_reason: StopReason::EndTurn,
2341 created_at: crate::types::message_timestamp_now(),
2342 }));
2343
2344 let mut incoming = Session::with_id(previous.id().clone());
2345 incoming.push(Message::System(SystemMessage::new(
2346 "runtime system after context refresh",
2347 )));
2348 incoming.push(Message::User(UserMessage::text(
2349 "Verbose context that will be compacted".to_string(),
2350 )));
2351 for message in previous.messages()[1..].iter().cloned() {
2352 incoming.push(message);
2353 }
2354 incoming.push(Message::BlockAssistant(BlockAssistantMessage {
2355 blocks: vec![AssistantBlock::Text {
2356 text: "Turn 2 generated answer".to_string(),
2357 meta: None,
2358 }],
2359 stop_reason: StopReason::EndTurn,
2360 created_at: crate::types::message_timestamp_now(),
2361 }));
2362 let parent_revision = incoming.transcript_revision()?;
2363 incoming.commit_transcript_rewrite(
2367 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
2368 vec![Message::User(UserMessage::text(
2369 "[Context compacted] Earlier runtime context".to_string(),
2370 ))],
2371 crate::TranscriptRewriteReason::new("compaction"),
2372 Some("meerkat-core".to_string()),
2373 Some(parent_revision),
2374 )?;
2375
2376 assert!(append_only_save_guard(&incoming, Some(&previous)).is_err());
2377 assert!(matches!(
2378 run_boundary_snapshot_save_guard(&incoming, Some(&previous)),
2379 Err(SessionStoreError::TranscriptContinuityViolation { .. }
2380 | SessionStoreError::MonotonicityViolation { .. })
2381 ));
2382 Ok(())
2383 }
2384
2385 #[test]
2386 fn run_boundary_guard_rejects_runtime_parent_with_inserted_message_before_tail()
2387 -> Result<(), Box<dyn std::error::Error>> {
2388 let mut previous = Session::new();
2389 previous.push(Message::System(SystemMessage::new("base system")));
2390 previous.push(Message::User(UserMessage::text("turn one".to_string())));
2391 previous.push(Message::BlockAssistant(BlockAssistantMessage {
2392 blocks: vec![AssistantBlock::Text {
2393 text: "answer one".to_string(),
2394 meta: None,
2395 }],
2396 stop_reason: StopReason::EndTurn,
2397 created_at: crate::types::message_timestamp_now(),
2398 }));
2399
2400 let parent_messages = vec![
2401 Message::System(SystemMessage::new("refreshed runtime system projection")),
2402 Message::User(UserMessage::text(
2403 "injected before retained tail".to_string(),
2404 )),
2405 previous.messages()[1].clone(),
2406 previous.messages()[2].clone(),
2407 ];
2408 let parent_revision = transcript_messages_digest(&parent_messages)?;
2409 let mut parent = previous.clone();
2410 parent.apply_transcript_history_state(TranscriptHistoryState {
2411 head: parent_revision.clone(),
2412 commits: Vec::new(),
2413 revisions: vec![crate::TranscriptRevisionBody {
2414 revision: parent_revision,
2415 parent_revision: None,
2416 messages: parent_messages,
2417 created_at: parent.updated_at(),
2418 }],
2419 })?;
2420 let parent_revision = parent.transcript_revision()?;
2421
2422 let mut incoming = parent.clone();
2423 incoming.commit_transcript_rewrite(
2424 TranscriptRewriteSelection::MessageRange {
2425 start: 0,
2426 end: parent.messages().len(),
2427 },
2428 vec![Message::User(UserMessage::text(
2429 "[Context compacted] summary".to_string(),
2430 ))],
2431 crate::TranscriptRewriteReason::new("compaction"),
2432 Some("meerkat-core".to_string()),
2433 Some(parent_revision),
2434 )?;
2435
2436 assert!(matches!(
2437 run_boundary_snapshot_save_guard(&incoming, Some(&previous)),
2438 Err(SessionStoreError::TranscriptContinuityViolation { .. }
2439 | SessionStoreError::MonotonicityViolation { .. })
2440 ));
2441 Ok(())
2442 }
2443
2444 #[test]
2445 fn run_boundary_guard_rejects_forged_parent_edge_before_real_rewrite_commit()
2446 -> Result<(), Box<dyn std::error::Error>> {
2447 let mut previous = Session::new();
2448 previous.push(Message::System(SystemMessage::new("base system")));
2449 previous.push(Message::User(UserMessage::text("turn one".to_string())));
2450 previous.push(Message::BlockAssistant(BlockAssistantMessage {
2451 blocks: vec![AssistantBlock::Text {
2452 text: "answer one".to_string(),
2453 meta: None,
2454 }],
2455 stop_reason: StopReason::EndTurn,
2456 created_at: crate::types::message_timestamp_now(),
2457 }));
2458 let previous_revision = previous.transcript_revision()?;
2459
2460 let forged_parent_messages = vec![
2461 Message::System(SystemMessage::new("refreshed runtime system projection")),
2462 Message::User(UserMessage::text(
2463 "forged insertion before retained tail".to_string(),
2464 )),
2465 previous.messages()[1].clone(),
2466 previous.messages()[2].clone(),
2467 ];
2468 let forged_parent_revision = transcript_messages_digest(&forged_parent_messages)?;
2469 let mut forged_parent = previous.clone();
2470 forged_parent.apply_transcript_history_state(TranscriptHistoryState {
2471 head: forged_parent_revision.clone(),
2472 commits: Vec::new(),
2473 revisions: vec![
2474 crate::TranscriptRevisionBody {
2475 revision: previous_revision.clone(),
2476 parent_revision: None,
2477 messages: previous.messages().to_vec(),
2478 created_at: previous.updated_at(),
2479 },
2480 crate::TranscriptRevisionBody {
2481 revision: forged_parent_revision.clone(),
2482 parent_revision: Some(previous_revision),
2483 messages: forged_parent_messages,
2484 created_at: forged_parent.updated_at(),
2485 },
2486 ],
2487 })?;
2488
2489 let mut incoming = forged_parent.clone();
2490 incoming.commit_transcript_rewrite(
2491 TranscriptRewriteSelection::MessageRange {
2492 start: 0,
2493 end: forged_parent.messages().len(),
2494 },
2495 vec![Message::User(UserMessage::text(
2496 "[Context compacted] forged branch".to_string(),
2497 ))],
2498 crate::TranscriptRewriteReason::new("compaction"),
2499 Some("meerkat-core".to_string()),
2500 Some(forged_parent_revision),
2501 )?;
2502
2503 assert!(matches!(
2504 run_boundary_snapshot_save_guard(&incoming, Some(&previous)),
2505 Err(SessionStoreError::TranscriptContinuityViolation { .. }
2506 | SessionStoreError::MonotonicityViolation { .. })
2507 ));
2508 Ok(())
2509 }
2510
2511 #[test]
2512 fn append_only_guard_rejects_transient_mcp_pending_notice_cleanup_with_unaudited_commit()
2513 -> Result<(), crate::TranscriptEditError> {
2514 let mut previous = Session::new();
2515 previous.push(Message::User(UserMessage::text("hello".to_string())));
2516 previous.push(Message::SystemNotice(SystemNoticeMessage {
2517 kind: SystemNoticeKind::McpPending,
2518 body: Some("connecting".to_string()),
2519 blocks: vec![SystemNoticeBlock::Mcp {
2520 server_id: None,
2521 operation: None,
2522 phase: None,
2523 persisted: false,
2524 detail: Some("connecting".to_string()),
2525 pending_sources: vec!["test-server".to_string()],
2526 }],
2527 created_at: crate::types::message_timestamp_now(),
2528 }));
2529 previous.push(Message::BlockAssistant(BlockAssistantMessage::new(
2530 vec![crate::types::AssistantBlock::Text {
2531 text: "answer".to_string(),
2532 meta: None,
2533 }],
2534 StopReason::EndTurn,
2535 )));
2536
2537 let mut incoming = previous.clone();
2538 incoming.replace_messages_internal(
2539 previous
2540 .messages()
2541 .iter()
2542 .filter(|message| !matches!(message, Message::SystemNotice(_)))
2543 .cloned()
2544 .collect(),
2545 crate::TranscriptRewriteReason::new("unit-test"),
2546 )?;
2547 incoming.push(Message::User(UserMessage::text("again".to_string())));
2548
2549 assert!(matches!(
2550 append_only_save_guard(&incoming, Some(&previous)),
2551 Err(SessionStoreError::InvalidTranscriptRewrite { .. })
2552 ));
2553 Ok::<(), crate::TranscriptEditError>(())
2554 }
2555
2556 #[test]
2557 fn rewrite_chain_finder_crosses_normal_append_between_rewrites()
2558 -> Result<(), Box<dyn std::error::Error>> {
2559 let mut session = Session::new();
2560 session.push(Message::User(UserMessage::text("first".to_string())));
2561 session.push(Message::BlockAssistant(BlockAssistantMessage {
2562 blocks: vec![AssistantBlock::Text {
2563 text: "verbose first answer".to_string(),
2564 meta: None,
2565 }],
2566 stop_reason: StopReason::EndTurn,
2567 created_at: crate::types::message_timestamp_now(),
2568 }));
2569
2570 let original = session.transcript_revision()?;
2571 let first = session.commit_transcript_rewrite(
2572 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
2573 vec![Message::BlockAssistant(BlockAssistantMessage {
2574 blocks: vec![AssistantBlock::Text {
2575 text: "compact first answer".to_string(),
2576 meta: None,
2577 }],
2578 stop_reason: StopReason::EndTurn,
2579 created_at: crate::types::message_timestamp_now(),
2580 })],
2581 crate::TranscriptRewriteReason::new("compaction"),
2582 Some("unit-test".to_string()),
2583 Some(original.clone()),
2584 )?;
2585
2586 session.push(Message::User(UserMessage::text("second".to_string())));
2587 session.push(Message::BlockAssistant(BlockAssistantMessage {
2588 blocks: vec![AssistantBlock::Text {
2589 text: "verbose second answer".to_string(),
2590 meta: None,
2591 }],
2592 stop_reason: StopReason::EndTurn,
2593 created_at: crate::types::message_timestamp_now(),
2594 }));
2595 let bridge = session.transcript_revision()?;
2596 assert_ne!(bridge, first.revision);
2597
2598 let second = session.commit_transcript_rewrite(
2599 TranscriptRewriteSelection::MessageRange { start: 3, end: 4 },
2600 vec![Message::BlockAssistant(BlockAssistantMessage {
2601 blocks: vec![AssistantBlock::Text {
2602 text: "compact second answer".to_string(),
2603 meta: None,
2604 }],
2605 stop_reason: StopReason::EndTurn,
2606 created_at: crate::types::message_timestamp_now(),
2607 })],
2608 crate::TranscriptRewriteReason::new("compaction"),
2609 Some("unit-test".to_string()),
2610 Some(bridge),
2611 )?;
2612 let state = session
2613 .transcript_history_state()?
2614 .ok_or_else(|| std::io::Error::other("missing transcript history state"))?;
2615
2616 let chain =
2617 find_transcript_rewrite_commit_chain_extending(&state, &original, &second.revision)
2618 .ok_or_else(|| {
2619 std::io::Error::other(
2620 "rewrite chain should extend through normal append bridge",
2621 )
2622 })?;
2623 assert_eq!(chain.len(), 2);
2624 assert_eq!(chain[0].revision, first.revision);
2625 assert_eq!(chain[1].revision, second.revision);
2626 Ok(())
2627 }
2628
2629 #[test]
2630 fn run_boundary_guard_rejects_dropped_retained_rewrite_commits()
2631 -> Result<(), Box<dyn std::error::Error>> {
2632 let mut base = Session::new();
2633 base.push(Message::User(UserMessage::text("turn one".to_string())));
2634 base.push(Message::BlockAssistant(BlockAssistantMessage {
2635 blocks: vec![AssistantBlock::Text {
2636 text: "verbose answer".to_string(),
2637 meta: None,
2638 }],
2639 stop_reason: StopReason::EndTurn,
2640 created_at: crate::types::message_timestamp_now(),
2641 }));
2642 let base_revision = base.transcript_revision()?;
2643
2644 let mut previous = base.clone();
2645 let _retained_commit = previous.commit_transcript_rewrite(
2646 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
2647 vec![Message::BlockAssistant(BlockAssistantMessage {
2648 blocks: vec![AssistantBlock::Text {
2649 text: "first compact answer".to_string(),
2650 meta: None,
2651 }],
2652 stop_reason: StopReason::EndTurn,
2653 created_at: crate::types::message_timestamp_now(),
2654 })],
2655 crate::TranscriptRewriteReason::new("compaction"),
2656 Some("unit-test".to_string()),
2657 Some(base_revision),
2658 )?;
2659 let previous_revision = previous.transcript_revision()?;
2660
2661 let mut incoming = previous.clone();
2662 let new_commit = incoming.commit_transcript_rewrite(
2663 TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
2664 vec![Message::BlockAssistant(BlockAssistantMessage {
2665 blocks: vec![AssistantBlock::Text {
2666 text: "second compact answer".to_string(),
2667 meta: None,
2668 }],
2669 stop_reason: StopReason::EndTurn,
2670 created_at: crate::types::message_timestamp_now(),
2671 })],
2672 crate::TranscriptRewriteReason::new("compaction"),
2673 Some("unit-test".to_string()),
2674 Some(previous_revision),
2675 )?;
2676 let mut state = incoming
2677 .transcript_history_state()?
2678 .ok_or_else(|| std::io::Error::other("incoming rewrite should retain history"))?;
2679 state.commits = vec![new_commit];
2680 incoming.set_metadata_unchecked_for_test(
2681 crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
2682 serde_json::to_value(state)?,
2683 );
2684
2685 assert!(matches!(
2686 run_boundary_snapshot_save_guard(&incoming, Some(&previous)),
2687 Err(SessionStoreError::InvalidTranscriptRewrite { reason, .. })
2688 if reason.contains("drop retained transcript rewrite commits")
2689 ));
2690 Ok(())
2691 }
2692
2693 fn runtime_append_system(content: &str) -> SystemMessage {
2702 let mut system = SystemMessage::new(content);
2703 system.mutation_kind = crate::types::SystemPromptMutationKind::RuntimeContextAppend;
2704 system
2705 }
2706
2707 #[allow(clippy::expect_used)]
2710 fn machine_persist_append_admits(
2711 previous: Option<&SystemMessage>,
2712 incoming: &SystemMessage,
2713 ) -> bool {
2714 let has_previous = previous.is_some();
2715 let content_identical =
2716 previous.is_some_and(|previous| incoming.content == previous.content);
2717 let content_extends_previous =
2718 previous.is_some_and(|previous| incoming.content.starts_with(&previous.content));
2719 let appended_starts_with_separator = previous.is_some_and(|previous| {
2720 incoming
2721 .content
2722 .get(previous.content.len()..)
2723 .is_some_and(|appended| appended.starts_with(SYSTEM_CONTEXT_SEPARATOR))
2724 });
2725 let incoming_is_runtime_context_append = incoming.mutation_kind.is_runtime_context_append();
2726 let mut authority = crate::session_document::SessionDocumentMachineAuthority::new();
2727 let effects = authority
2728 .resolve_system_context_persist_append_admission(
2729 has_previous,
2730 content_identical,
2731 content_extends_previous,
2732 appended_starts_with_separator,
2733 incoming_is_runtime_context_append,
2734 )
2735 .expect("machine resolves persist-append admission");
2736 effects.into_iter().any(|effect| {
2737 matches!(
2738 effect,
2739 crate::session_document::SessionDocumentEffect::SystemContextPersistAppendAdmissionResolved {
2740 admission: crate::session_document::SystemContextPersistAppendAdmission::Admit,
2741 }
2742 )
2743 })
2744 }
2745
2746 #[allow(clippy::expect_used)]
2747 fn assert_persist_append_matches_machine(
2748 previous: Option<&SystemMessage>,
2749 incoming: &SystemMessage,
2750 expected: bool,
2751 ) {
2752 let verdict =
2753 system_context_is_append(previous, incoming).expect("persist-time admission resolves");
2754 assert_eq!(verdict, expected, "persist-time verdict mismatch");
2755 assert_eq!(
2756 verdict,
2757 machine_persist_append_admits(previous, incoming),
2758 "persist-time verdict diverges from direct machine call"
2759 );
2760 }
2761
2762 #[test]
2763 fn persist_append_identical_content_admits() {
2764 let previous = SystemMessage::new("base system");
2765 let incoming = SystemMessage::new("base system");
2766 assert_persist_append_matches_machine(Some(&previous), &incoming, true);
2767 }
2768
2769 #[test]
2770 fn persist_append_separator_append_with_marker_admits() {
2771 let previous = SystemMessage::new("base system");
2772 let incoming = runtime_append_system(&format!(
2773 "base system{SYSTEM_CONTEXT_SEPARATOR}[Runtime System Context]\nextra"
2774 ));
2775 assert_persist_append_matches_machine(Some(&previous), &incoming, true);
2776 }
2777
2778 #[test]
2779 fn persist_append_shaped_without_marker_rejects() {
2780 let previous = SystemMessage::new("base system");
2781 let incoming = SystemMessage::new(format!(
2783 "base system{SYSTEM_CONTEXT_SEPARATOR}[Runtime System Context]\nextra"
2784 ));
2785 assert_persist_append_matches_machine(Some(&previous), &incoming, false);
2786 }
2787
2788 #[test]
2789 fn persist_append_divergent_content_rejects() {
2790 let previous = SystemMessage::new("base system");
2791 let incoming = runtime_append_system("totally different");
2792 assert_persist_append_matches_machine(Some(&previous), &incoming, false);
2793 }
2794
2795 #[test]
2796 fn persist_append_no_previous_admits_only_with_marker() {
2797 let with_marker = runtime_append_system("brand new context");
2798 assert_persist_append_matches_machine(None, &with_marker, true);
2799
2800 let without_marker = SystemMessage::new("brand new context");
2801 assert_persist_append_matches_machine(None, &without_marker, false);
2802 }
2803}