Skip to main content

meerkat_core/
session_store.rs

1//! SessionStore trait — canonical session persistence contract.
2//!
3//! This trait lives in `meerkat-core` so that custom storage implementations
4//! (Postgres, DynamoDB, etc.) can be written without depending on `meerkat-store`.
5//!
6//! # Snapshot = projection
7//!
8//! The `Session` row a `SessionStore` persists is a **projection of the
9//! canonical event log**. The event log (`EventStore`) is append-only at
10//! the trait level; the snapshot is a rebuildable materialization of
11//! replaying that log. Deleting a `.rkat/sessions/<id>/session.json` and
12//! replaying the event store produces an identical snapshot (the
13//! `CLAUDE.md` invariant).
14//!
15//! Wave-c C-H1 (F1 closure from the state-scope-audit) makes the
16//! append-only nature of that projection enforceable at the
17//! `SessionStore::save` boundary — see the trait docs on
18//! [`SessionStore`] and the [`append_only_save_guard`] helper.
19
20use async_trait::async_trait;
21use sha2::{Digest, Sha256};
22
23use crate::session::{SYSTEM_CONTEXT_SEPARATOR, SessionMeta};
24use crate::time_compat::SystemTime;
25use crate::types::{Message, SessionId};
26use crate::{
27    Session, TranscriptHistoryState, TranscriptRewriteCommit, TranscriptRewriteSelection,
28    transcript_messages_digest,
29};
30
31/// Filter for listing sessions.
32#[derive(Debug, Clone, Default)]
33pub struct SessionFilter {
34    /// Only sessions created after this time.
35    pub created_after: Option<SystemTime>,
36    /// Only sessions updated after this time.
37    pub updated_after: Option<SystemTime>,
38    /// Maximum number of results.
39    pub limit: Option<usize>,
40    /// Offset for pagination.
41    pub offset: Option<usize>,
42}
43
44/// Errors from session store operations.
45///
46/// Backend-specific details (rusqlite, filesystem, etc.) are erased to strings
47/// so that the trait contract carries no I/O dependencies.
48#[derive(Debug, thiserror::Error)]
49pub enum SessionStoreError {
50    #[error("IO error: {0}")]
51    Io(#[from] std::io::Error),
52
53    #[error("Serialization error: {0}")]
54    Serialization(String),
55
56    #[error("Session not found: {0}")]
57    NotFound(SessionId),
58
59    #[error("Session corrupted: {0}")]
60    Corrupted(SessionId),
61
62    #[error(
63        "session {id} save rejected: new message count {new_len} is shorter than previously \
64         persisted {prev_len} without transcript-continuity proof"
65    )]
66    MonotonicityViolation {
67        id: SessionId,
68        prev_len: usize,
69        new_len: usize,
70    },
71
72    #[error(
73        "session {id} save rejected: incoming transcript is not a continuation of persisted revision {previous_revision}"
74    )]
75    TranscriptContinuityViolation {
76        id: SessionId,
77        previous_revision: String,
78        incoming_revision: String,
79        reason: String,
80    },
81
82    #[error(
83        "session {id} rewrite rejected: previous transcript revision {actual} did not match commit parent {expected}"
84    )]
85    TranscriptRevisionConflict {
86        id: SessionId,
87        expected: String,
88        actual: String,
89    },
90
91    #[error("session {id} rewrite rejected: {reason}")]
92    InvalidTranscriptRewrite { id: SessionId, reason: String },
93
94    #[error("Internal error: {0}")]
95    Internal(String),
96}
97
98/// Stable compare token for a full persisted session projection row.
99pub fn session_projection_cas_token(session: &Session) -> Result<String, SessionStoreError> {
100    let bytes = serde_json::to_vec(session).map_err(|err| {
101        SessionStoreError::Serialization(format!(
102            "failed to serialize session projection CAS token: {err}"
103        ))
104    })?;
105    Ok(format!("row-sha256:{:x}", Sha256::digest(bytes)))
106}
107
108/// Shared append-only guard for `SessionStore::save` implementations.
109///
110/// Backends call this at the top of their `save` method with the new
111/// session and the previously persisted row (or `None` if no prior row
112/// exists). Returns
113/// [`SessionStoreError::MonotonicityViolation`] when the new row's
114/// message count is strictly smaller than the previously persisted one
115/// without a transcript graph edge that proves a core-owned mutation.
116///
117/// The guard also rejects equal/longer saves whose retained prefix no longer
118/// matches the persisted transcript. A plain save may append or update
119/// metadata; same-session replacement must go through
120/// [`transcript_rewrite_save_guard`].
121pub fn append_only_save_guard(
122    incoming: &Session,
123    previous: Option<&Session>,
124) -> Result<(), SessionStoreError> {
125    incoming
126        .validate_transcript_history_state()
127        .map_err(|err| SessionStoreError::InvalidTranscriptRewrite {
128            id: incoming.id().clone(),
129            reason: format!("incoming transcript history state is malformed: {err}"),
130        })?;
131    let incoming_revision =
132        transcript_messages_digest(incoming.messages()).map_err(SessionStoreError::from)?;
133    let incoming_state = incoming.transcript_history_state().map_err(|err| {
134        SessionStoreError::InvalidTranscriptRewrite {
135            id: incoming.id().clone(),
136            reason: format!("incoming transcript history state is malformed: {err}"),
137        }
138    })?;
139    if let Some(state) = incoming_state.as_ref()
140        && state.head != incoming_revision
141    {
142        return Err(SessionStoreError::InvalidTranscriptRewrite {
143            id: incoming.id().clone(),
144            reason: format!(
145                "incoming transcript graph head {} does not match current message digest {incoming_revision}",
146                state.head
147            ),
148        });
149    }
150
151    let Some(previous) = previous else {
152        if incoming_state.is_some() {
153            return Err(SessionStoreError::InvalidTranscriptRewrite {
154                id: incoming.id().clone(),
155                reason: "incoming first save would seed transcript history state outside the rewrite/audit path"
156                    .to_string(),
157            });
158        }
159        validate_plain_save_transcript_history_preservation(
160            incoming,
161            None,
162            None,
163            incoming_state.as_ref(),
164        )?;
165        return Ok(());
166    };
167    let previous_state = previous.transcript_history_state().map_err(|err| {
168        SessionStoreError::InvalidTranscriptRewrite {
169            id: incoming.id().clone(),
170            reason: format!("previous transcript history state is malformed: {err}"),
171        }
172    })?;
173    let previous_had_history = previous_state.is_some();
174    let incoming_has_history = incoming_state.is_some();
175    if previous_had_history && !incoming_has_history {
176        return Err(SessionStoreError::InvalidTranscriptRewrite {
177            id: incoming.id().clone(),
178            reason: "incoming save would erase retained transcript history state".to_string(),
179        });
180    }
181    let previous_revision =
182        transcript_messages_digest(previous.messages()).map_err(SessionStoreError::from)?;
183    if previous_revision == incoming_revision {
184        validate_plain_save_transcript_history_preservation(
185            incoming,
186            Some(previous),
187            previous_state.as_ref(),
188            incoming_state.as_ref(),
189        )?;
190        return Ok(());
191    }
192
193    let prev_len = previous.messages().len();
194    let new_len = incoming.messages().len();
195    if new_len >= prev_len {
196        let incoming_prefix_revision = transcript_messages_digest(&incoming.messages()[..prev_len])
197            .map_err(SessionStoreError::from)?;
198        if incoming_prefix_revision == previous_revision {
199            validate_plain_save_transcript_history_preservation(
200                incoming,
201                Some(previous),
202                previous_state.as_ref(),
203                incoming_state.as_ref(),
204            )?;
205            return Ok(());
206        }
207    }
208    if incoming_preserves_conversation_tail_with_system_context_append(incoming, previous)? {
209        validate_plain_save_transcript_history_preservation(
210            incoming,
211            Some(previous),
212            previous_state.as_ref(),
213            incoming_state.as_ref(),
214        )?;
215        return Ok(());
216    }
217    if incoming_preserves_prefix_after_transient_notice_cleanup(incoming, previous)? {
218        validate_plain_save_transcript_history_preservation(
219            incoming,
220            Some(previous),
221            previous_state.as_ref(),
222            incoming_state.as_ref(),
223        )?;
224        return Ok(());
225    }
226    if new_len < prev_len {
227        return Err(SessionStoreError::MonotonicityViolation {
228            id: incoming.id().clone(),
229            prev_len,
230            new_len,
231        });
232    }
233
234    Err(SessionStoreError::TranscriptContinuityViolation {
235        id: incoming.id().clone(),
236        previous_revision,
237        incoming_revision,
238        reason: "incoming transcript neither preserves the persisted prefix nor records a graph edge from the persisted head".to_string(),
239    })
240}
241
242fn validate_plain_save_transcript_history_preservation(
243    incoming: &Session,
244    previous: Option<&Session>,
245    previous_state: Option<&TranscriptHistoryState>,
246    incoming_state: Option<&TranscriptHistoryState>,
247) -> Result<(), SessionStoreError> {
248    let Some(previous) = previous else {
249        if incoming_state.is_some() {
250            return Err(SessionStoreError::InvalidTranscriptRewrite {
251                id: incoming.id().clone(),
252                reason: "incoming first save would seed transcript history state outside the rewrite/audit path"
253                    .to_string(),
254            });
255        }
256        return Ok(());
257    };
258    if previous_state.is_none() && incoming_state.is_some() {
259        return Err(SessionStoreError::InvalidTranscriptRewrite {
260            id: incoming.id().clone(),
261            reason: "incoming append-only save would seed transcript history state outside the rewrite/audit path"
262                .to_string(),
263        });
264    }
265    let Some(previous_state) = previous_state else {
266        return Ok(());
267    };
268    let Some(incoming_state) = incoming_state else {
269        return Err(SessionStoreError::InvalidTranscriptRewrite {
270            id: incoming.id().clone(),
271            reason: "incoming append-only save would erase retained transcript history state"
272                .to_string(),
273        });
274    };
275    let previous_commits = previous_state.commits.as_slice();
276    let incoming_commits = incoming_state.commits.as_slice();
277    if incoming_commits != previous_commits {
278        return Err(SessionStoreError::InvalidTranscriptRewrite {
279            id: incoming.id().clone(),
280            reason: "incoming append-only save would change retained transcript rewrite commits"
281                .to_string(),
282        });
283    }
284    let retained_revisions_preserved =
285        transcript_revision_bodies_preserved(previous_state, incoming_state)?;
286    if retained_revisions_preserved
287        && incoming_state.revisions.len() == previous_state.revisions.len()
288        && incoming_state.head == previous_state.head
289    {
290        return Ok(());
291    }
292    if incoming_state.revisions.len() != previous_state.revisions.len() + 1
293        || !retained_revisions_preserved
294    {
295        return Err(SessionStoreError::InvalidTranscriptRewrite {
296            id: incoming.id().clone(),
297            reason: "incoming append-only save would change retained transcript revision graph"
298                .to_string(),
299        });
300    }
301    let incoming_revision =
302        transcript_messages_digest(incoming.messages()).map_err(SessionStoreError::from)?;
303    let previous_revision =
304        transcript_messages_digest(previous.messages()).map_err(SessionStoreError::from)?;
305    if previous_state.head != previous_revision {
306        return Err(SessionStoreError::InvalidTranscriptRewrite {
307            id: incoming.id().clone(),
308            reason: "previous transcript history head does not match persisted message digest"
309                .to_string(),
310        });
311    }
312    let added = &incoming_state.revisions[previous_state.revisions.len()];
313    if incoming_state.head != incoming_revision
314        || added.revision != incoming_revision
315        || added.parent_revision.as_deref() != Some(previous_state.head.as_str())
316        || transcript_messages_digest(&added.messages).map_err(SessionStoreError::from)?
317            != incoming_revision
318    {
319        return Err(SessionStoreError::InvalidTranscriptRewrite {
320            id: incoming.id().clone(),
321            reason: "incoming append-only save would add a transcript revision body that is not the current append"
322                .to_string(),
323        });
324    }
325    Ok(())
326}
327
328fn transcript_revision_bodies_preserved(
329    previous_state: &TranscriptHistoryState,
330    incoming_state: &TranscriptHistoryState,
331) -> Result<bool, SessionStoreError> {
332    if incoming_state.revisions.len() < previous_state.revisions.len() {
333        return Ok(false);
334    }
335    previous_state
336        .revisions
337        .iter()
338        .zip(incoming_state.revisions.iter())
339        .map(|(previous, incoming)| {
340            Ok(previous.revision == incoming.revision
341                && previous.parent_revision == incoming.parent_revision
342                && previous.created_at == incoming.created_at
343                && transcript_messages_digest(&previous.messages)
344                    .map_err(SessionStoreError::from)?
345                    == transcript_messages_digest(&incoming.messages)
346                        .map_err(SessionStoreError::from)?)
347        })
348        .try_fold(true, |acc, preserved| {
349            preserved.map(|preserved| acc && preserved)
350        })
351}
352
353fn validate_rewrite_save_retains_previous_commits(
354    incoming: &Session,
355    previous: &Session,
356    incoming_state: &TranscriptHistoryState,
357) -> Result<(), SessionStoreError> {
358    let previous_state = previous.transcript_history_state().map_err(|err| {
359        SessionStoreError::InvalidTranscriptRewrite {
360            id: incoming.id().clone(),
361            reason: format!("previous transcript history state is malformed: {err}"),
362        }
363    })?;
364    let Some(previous_state) = previous_state.as_ref() else {
365        return Ok(());
366    };
367    if incoming_state.commits.len() < previous_state.commits.len()
368        || incoming_state.commits[..previous_state.commits.len()] != previous_state.commits
369    {
370        return Err(SessionStoreError::InvalidTranscriptRewrite {
371            id: incoming.id().clone(),
372            reason: "incoming rewrite save would drop retained transcript rewrite commits"
373                .to_string(),
374        });
375    }
376    Ok(())
377}
378
379/// Validate that an authoritative projection write still targets the row that
380/// the caller proved continuity against.
381pub fn authoritative_projection_current_revision_guard(
382    incoming: &Session,
383    previous: Option<&Session>,
384    expected_current_revision: Option<&str>,
385) -> Result<(), SessionStoreError> {
386    let previous_token = previous.map(session_projection_cas_token).transpose()?;
387    if previous_token.as_deref() == expected_current_revision {
388        return Ok(());
389    }
390    let incoming_revision =
391        transcript_messages_digest(incoming.messages()).map_err(SessionStoreError::from)?;
392    Err(SessionStoreError::TranscriptContinuityViolation {
393        id: incoming.id().clone(),
394        previous_revision: previous_token.unwrap_or_else(|| "<missing>".to_string()),
395        incoming_revision,
396        reason: format!(
397            "authoritative projection expected persisted projection token {}, but current row has diverged",
398            expected_current_revision.unwrap_or("<missing>")
399        ),
400    })
401}
402
403fn incoming_preserves_conversation_tail_with_system_context_append(
404    incoming: &Session,
405    previous: &Session,
406) -> Result<bool, SessionStoreError> {
407    messages_preserve_conversation_tail_with_system_context_append(
408        incoming.messages(),
409        previous.messages(),
410    )
411}
412
413fn messages_preserve_conversation_tail_with_system_context_append(
414    incoming: &[Message],
415    previous: &[Message],
416) -> Result<bool, SessionStoreError> {
417    let (previous_system, previous_tail) = split_single_leading_system(previous);
418    let (incoming_system, incoming_tail) = split_single_leading_system(incoming);
419    let Some(incoming_system) = incoming_system else {
420        return Ok(false);
421    };
422    if !system_context_is_append(previous_system, incoming_system) {
423        return Ok(false);
424    }
425    if incoming_tail.len() < previous_tail.len() {
426        return Ok(false);
427    }
428    let previous_tail_revision =
429        transcript_messages_digest(previous_tail).map_err(SessionStoreError::from)?;
430    let incoming_tail_prefix_revision =
431        transcript_messages_digest(&incoming_tail[..previous_tail.len()])
432            .map_err(SessionStoreError::from)?;
433    Ok(previous_tail_revision == incoming_tail_prefix_revision)
434}
435
436fn split_single_leading_system(messages: &[Message]) -> (Option<&str>, &[Message]) {
437    match messages.first() {
438        Some(Message::System(system)) => (Some(system.content.as_str()), &messages[1..]),
439        _ => (None, messages),
440    }
441}
442
443fn system_context_is_append(previous: Option<&str>, incoming: &str) -> bool {
444    let appended = match previous {
445        Some(previous) if incoming == previous => return true,
446        Some(previous) if incoming.starts_with(previous) => {
447            let appended = &incoming[previous.len()..];
448            appended.strip_prefix(SYSTEM_CONTEXT_SEPARATOR)
449        }
450        Some(_) => None,
451        None => Some(incoming),
452    };
453    appended.is_some_and(|appended| appended.starts_with("[Runtime System Context]"))
454}
455
456fn incoming_preserves_prefix_after_transient_notice_cleanup(
457    incoming: &Session,
458    previous: &Session,
459) -> Result<bool, SessionStoreError> {
460    let previous_without_transient = previous
461        .messages()
462        .iter()
463        .filter(|message| !is_transient_system_notice(message))
464        .cloned()
465        .collect::<Vec<_>>();
466    if previous_without_transient.len() == previous.messages().len()
467        || incoming.messages().len() < previous_without_transient.len()
468    {
469        return Ok(false);
470    }
471    let previous_revision =
472        transcript_messages_digest(&previous_without_transient).map_err(SessionStoreError::from)?;
473    let incoming_prefix_revision =
474        transcript_messages_digest(&incoming.messages()[..previous_without_transient.len()])
475            .map_err(SessionStoreError::from)?;
476    Ok(previous_revision == incoming_prefix_revision)
477}
478
479fn is_transient_system_notice(message: &Message) -> bool {
480    let Message::SystemNotice(notice) = message else {
481        return false;
482    };
483    notice.kind == crate::types::SystemNoticeKind::McpPending
484        && notice.blocks.iter().all(|block| {
485            matches!(
486                block,
487                crate::types::SystemNoticeBlock::Mcp {
488                    persisted: false,
489                    ..
490                }
491            )
492        })
493}
494
495/// Validate a runtime run-boundary snapshot.
496///
497/// Runtime turns normally append to the transcript, but core-owned turn
498/// mechanics such as compaction can also produce an audited internal rewrite.
499/// Runtime stores use this guard inside their atomic boundary commit: plain
500/// replacement is rejected, while an incoming snapshot carrying a typed rewrite
501/// commit from the currently persisted head is accepted through the same
502/// rewrite validator as [`SessionStore::save_transcript_rewrite`].
503pub fn run_boundary_snapshot_save_guard(
504    incoming: &Session,
505    previous: Option<&Session>,
506) -> Result<(), SessionStoreError> {
507    match append_only_save_guard(incoming, previous) {
508        Ok(()) => Ok(()),
509        Err(append_error) => {
510            let Some(previous) = previous else {
511                return Err(append_error);
512            };
513            let incoming_revision =
514                transcript_messages_digest(incoming.messages()).map_err(SessionStoreError::from)?;
515            let Some(state) = incoming.transcript_history_state().map_err(|err| {
516                SessionStoreError::InvalidTranscriptRewrite {
517                    id: incoming.id().clone(),
518                    reason: format!("incoming transcript history state is malformed: {err}"),
519                }
520            })?
521            else {
522                return Err(append_error);
523            };
524            validate_rewrite_save_retains_previous_commits(incoming, previous, &state)?;
525            let commits = find_transcript_rewrite_commit_chain_extending_session(
526                &state,
527                previous,
528                &incoming_revision,
529            )?;
530            let Some(commits) = commits else {
531                return Err(append_error);
532            };
533            let Some(commit) = commits.first() else {
534                return Err(append_error);
535            };
536            transcript_rewrite_bridge_save_guard(incoming, commit, &state, &incoming_revision)?;
537            for commit in commits.iter().skip(1) {
538                validate_transcript_rewrite_commit_bodies(incoming, commit, &state)?;
539            }
540            Ok(())
541        }
542    }
543}
544
545/// Find the rewrite commit that authorizes replacing `previous_revision`,
546/// allowing the incoming head to extend the rewrite via normal append bodies.
547pub fn find_transcript_rewrite_commit_extending<'a>(
548    state: &'a TranscriptHistoryState,
549    previous_revision: &str,
550    incoming_revision: &str,
551) -> Option<&'a TranscriptRewriteCommit> {
552    find_transcript_rewrite_commit_chain_extending(state, previous_revision, incoming_revision)
553        .and_then(|commits| commits.into_iter().next())
554}
555
556/// Find the contiguous rewrite commits that connect `previous_revision` to the
557/// incoming head, allowing normal append bodies after the final rewrite.
558pub fn find_transcript_rewrite_commit_chain_extending<'a>(
559    state: &'a TranscriptHistoryState,
560    previous_revision: &str,
561    incoming_revision: &str,
562) -> Option<Vec<&'a TranscriptRewriteCommit>> {
563    let mut chain = Vec::new();
564    let mut cursor = previous_revision;
565    let mut visited = std::collections::BTreeSet::new();
566    loop {
567        if incoming_revision == cursor {
568            return Some(chain);
569        }
570        if !visited.insert(cursor.to_string()) {
571            return None;
572        }
573        let commit = state.commits.iter().find(|commit| {
574            (commit.parent_revision == cursor
575                || transcript_history_revision_extends(state, &commit.parent_revision, cursor))
576                && transcript_history_revision_extends(state, incoming_revision, &commit.revision)
577        });
578        let Some(commit) = commit else {
579            return transcript_history_revision_extends(state, incoming_revision, cursor)
580                .then_some(chain);
581        };
582        cursor = &commit.revision;
583        chain.push(commit);
584    }
585}
586
587/// Find a rewrite chain whose first parent may be an append-only continuation
588/// of a previously persisted snapshot.
589///
590/// Runtime-backed sessions can append messages in the runtime store before a
591/// core-owned compaction rewrite is checkpointed to the compatibility
592/// `SessionStore`. In that case the first rewrite commit's parent revision is
593/// not equal to the persisted row's digest, but its retained parent body proves
594/// a normal append path from that persisted row.
595pub fn find_transcript_rewrite_commit_chain_extending_session<'a>(
596    state: &'a TranscriptHistoryState,
597    previous: &Session,
598    incoming_revision: &str,
599) -> Result<Option<Vec<&'a TranscriptRewriteCommit>>, SessionStoreError> {
600    let previous_revision =
601        transcript_messages_digest(previous.messages()).map_err(SessionStoreError::from)?;
602    let mut chain = Vec::new();
603    let mut cursor = previous_revision.as_str();
604    let mut visited = std::collections::BTreeSet::new();
605    loop {
606        if incoming_revision == cursor {
607            return Ok(Some(chain));
608        }
609        if !visited.insert(cursor.to_string()) {
610            return Ok(None);
611        }
612
613        let Some(cursor_messages) = transcript_history_messages_for_revision(
614            state,
615            cursor,
616            &previous_revision,
617            previous.messages(),
618        ) else {
619            return Ok(None);
620        };
621        let mut selected = None;
622        for commit in &state.commits {
623            if !transcript_history_revision_extends(state, incoming_revision, &commit.revision) {
624                continue;
625            }
626            let parent_extends_cursor = commit.parent_revision == cursor
627                || revision_body_preserves_append_continuation_prefix(
628                    state,
629                    &commit.parent_revision,
630                    cursor_messages,
631                    cursor,
632                )?;
633            if parent_extends_cursor {
634                selected = Some(commit);
635                break;
636            }
637        }
638
639        let Some(commit) = selected else {
640            if revision_body_preserves_append_continuation_prefix(
641                state,
642                incoming_revision,
643                cursor_messages,
644                cursor,
645            )? {
646                return Ok(Some(chain));
647            }
648            return Ok(None);
649        };
650        cursor = &commit.revision;
651        chain.push(commit);
652    }
653}
654
655fn transcript_history_messages_for_revision<'a>(
656    state: &'a TranscriptHistoryState,
657    revision: &str,
658    previous_revision: &str,
659    previous_messages: &'a [Message],
660) -> Option<&'a [Message]> {
661    if revision == previous_revision {
662        return Some(previous_messages);
663    }
664    state
665        .revisions
666        .iter()
667        .find(|body| body.revision == revision)
668        .map(|body| body.messages.as_slice())
669}
670
671fn revision_body_preserves_append_continuation_prefix(
672    state: &TranscriptHistoryState,
673    revision: &str,
674    ancestor_messages: &[Message],
675    ancestor_revision: &str,
676) -> Result<bool, SessionStoreError> {
677    if revision == ancestor_revision {
678        return Ok(true);
679    }
680    let Some(body) = state
681        .revisions
682        .iter()
683        .find(|body| body.revision == revision)
684    else {
685        return Ok(false);
686    };
687    if body.messages.len() >= ancestor_messages.len() {
688        let prefix_revision = transcript_messages_digest(&body.messages[..ancestor_messages.len()])
689            .map_err(SessionStoreError::from)?;
690        if prefix_revision == ancestor_revision {
691            return Ok(true);
692        }
693    }
694    Ok(
695        messages_preserve_conversation_tail_with_system_context_append(
696            &body.messages,
697            ancestor_messages,
698        )? || messages_preserve_tail_after_leading_system_refresh(
699            &body.messages,
700            ancestor_messages,
701        )?,
702    )
703}
704
705fn messages_preserve_tail_after_leading_system_refresh(
706    incoming: &[Message],
707    previous: &[Message],
708) -> Result<bool, SessionStoreError> {
709    let (Some(Message::System(_)), Some(Message::System(_))) = (incoming.first(), previous.first())
710    else {
711        return Ok(false);
712    };
713    if incoming.len() < previous.len() {
714        return Ok(false);
715    }
716    let previous_tail_len = previous.len().saturating_sub(1);
717    if previous_tail_len == 0 {
718        return Ok(true);
719    }
720    let previous_tail_revision =
721        transcript_messages_digest(&previous[1..]).map_err(SessionStoreError::from)?;
722    let incoming_tail = &incoming[1..];
723    if incoming_tail.len() < previous_tail_len {
724        return Ok(false);
725    }
726    let incoming_tail_prefix_revision =
727        transcript_messages_digest(&incoming_tail[..previous_tail_len])
728            .map_err(SessionStoreError::from)?;
729    Ok(incoming_tail_prefix_revision == previous_tail_revision)
730}
731
732fn transcript_history_revision_extends(
733    state: &TranscriptHistoryState,
734    descendant: &str,
735    ancestor: &str,
736) -> bool {
737    if descendant == ancestor {
738        return true;
739    }
740    let mut cursor = descendant;
741    while let Some(body) = state.revisions.iter().find(|body| body.revision == cursor) {
742        let Some(parent) = body.parent_revision.as_deref() else {
743            return false;
744        };
745        if parent == ancestor {
746            return true;
747        }
748        cursor = parent;
749    }
750    false
751}
752
753fn transcript_rewrite_bridge_save_guard(
754    incoming: &Session,
755    commit: &TranscriptRewriteCommit,
756    incoming_state: &TranscriptHistoryState,
757    incoming_message_digest: &str,
758) -> Result<(), SessionStoreError> {
759    validate_transcript_rewrite_commit_bodies(incoming, commit, incoming_state)?;
760    if incoming_state.head != incoming_message_digest {
761        return Err(SessionStoreError::InvalidTranscriptRewrite {
762            id: incoming.id().clone(),
763            reason: format!(
764                "incoming transcript graph head {} does not match current message digest {incoming_message_digest}",
765                incoming_state.head
766            ),
767        });
768    }
769    if !transcript_history_revision_extends(
770        incoming_state,
771        incoming_message_digest,
772        &commit.revision,
773    ) {
774        return Err(SessionStoreError::InvalidTranscriptRewrite {
775            id: incoming.id().clone(),
776            reason: format!(
777                "incoming transcript head {incoming_message_digest} does not extend rewrite revision {}",
778                commit.revision
779            ),
780        });
781    }
782    Ok(())
783}
784
785/// Validate that a same-session shrink/replace save is backed by a typed
786/// transcript rewrite commit.
787pub fn transcript_rewrite_save_guard(
788    incoming: &Session,
789    previous: Option<&Session>,
790    commit: &TranscriptRewriteCommit,
791) -> Result<(), SessionStoreError> {
792    incoming
793        .validate_transcript_history_state()
794        .map_err(|err| SessionStoreError::InvalidTranscriptRewrite {
795            id: incoming.id().clone(),
796            reason: format!("incoming transcript history state is malformed: {err}"),
797        })?;
798    let Some(previous) = previous else {
799        return Err(SessionStoreError::InvalidTranscriptRewrite {
800            id: incoming.id().clone(),
801            reason: "rewrite target has no previously persisted session".to_string(),
802        });
803    };
804    if incoming.id() != previous.id() {
805        return Err(SessionStoreError::InvalidTranscriptRewrite {
806            id: incoming.id().clone(),
807            reason: format!(
808                "incoming session id {} differs from previous session id {}",
809                incoming.id(),
810                previous.id()
811            ),
812        });
813    }
814    let previous_revision = previous.transcript_revision().map_err(|err| {
815        SessionStoreError::InvalidTranscriptRewrite {
816            id: incoming.id().clone(),
817            reason: format!("previous transcript revision is malformed: {err}"),
818        }
819    })?;
820    if previous_revision != commit.parent_revision {
821        return Err(SessionStoreError::TranscriptRevisionConflict {
822            id: incoming.id().clone(),
823            expected: commit.parent_revision.clone(),
824            actual: previous_revision,
825        });
826    }
827    let previous_message_digest =
828        transcript_messages_digest(previous.messages()).map_err(|err| {
829            SessionStoreError::InvalidTranscriptRewrite {
830                id: incoming.id().clone(),
831                reason: format!("previous current transcript is not digestible: {err}"),
832            }
833        })?;
834    if previous_message_digest != commit.parent_revision {
835        return Err(SessionStoreError::InvalidTranscriptRewrite {
836            id: incoming.id().clone(),
837            reason: format!(
838                "previous current transcript digest {previous_message_digest} does not match commit parent {}",
839                commit.parent_revision
840            ),
841        });
842    }
843    let incoming_revision = incoming.transcript_revision().map_err(|err| {
844        SessionStoreError::InvalidTranscriptRewrite {
845            id: incoming.id().clone(),
846            reason: format!("incoming transcript revision is malformed: {err}"),
847        }
848    })?;
849    if incoming_revision != commit.revision {
850        return Err(SessionStoreError::InvalidTranscriptRewrite {
851            id: incoming.id().clone(),
852            reason: format!(
853                "incoming transcript revision {incoming_revision} does not match commit revision {}",
854                commit.revision
855            ),
856        });
857    }
858    let incoming_message_digest =
859        transcript_messages_digest(incoming.messages()).map_err(|err| {
860            SessionStoreError::InvalidTranscriptRewrite {
861                id: incoming.id().clone(),
862                reason: format!("incoming current transcript is not digestible: {err}"),
863            }
864        })?;
865    if incoming_message_digest != commit.revision {
866        return Err(SessionStoreError::InvalidTranscriptRewrite {
867            id: incoming.id().clone(),
868            reason: format!(
869                "incoming current transcript digest {incoming_message_digest} does not match commit revision {}",
870                commit.revision
871            ),
872        });
873    }
874    let Some(incoming_state) = incoming.transcript_history_state().map_err(|err| {
875        SessionStoreError::InvalidTranscriptRewrite {
876            id: incoming.id().clone(),
877            reason: format!("incoming transcript history state is malformed: {err}"),
878        }
879    })?
880    else {
881        return Err(SessionStoreError::InvalidTranscriptRewrite {
882            id: incoming.id().clone(),
883            reason: "incoming rewrite did not persist a transcript revision graph".to_string(),
884        });
885    };
886    validate_rewrite_save_retains_previous_commits(incoming, previous, &incoming_state)?;
887    validate_transcript_rewrite_commit_bodies(incoming, commit, &incoming_state)
888}
889
890fn validate_transcript_rewrite_commit_bodies(
891    incoming: &Session,
892    commit: &TranscriptRewriteCommit,
893    incoming_state: &TranscriptHistoryState,
894) -> Result<(), SessionStoreError> {
895    if !incoming_state
896        .commits
897        .iter()
898        .any(|persisted| persisted == commit)
899    {
900        return Err(SessionStoreError::InvalidTranscriptRewrite {
901            id: incoming.id().clone(),
902            reason: format!(
903                "incoming rewrite did not persist the rewrite commit in the transcript graph (wanted {} -> {}, graph commits: {:?})",
904                commit.parent_revision,
905                commit.revision,
906                incoming_state
907                    .commits
908                    .iter()
909                    .map(|commit| (&commit.parent_revision, &commit.revision))
910                    .collect::<Vec<_>>()
911            ),
912        });
913    }
914    let Some(parent_body) = incoming_state
915        .revisions
916        .iter()
917        .find(|body| body.revision == commit.parent_revision)
918    else {
919        return Err(SessionStoreError::InvalidTranscriptRewrite {
920            id: incoming.id().clone(),
921            reason: format!(
922                "incoming rewrite omitted parent revision body {}",
923                commit.parent_revision
924            ),
925        });
926    };
927    let Some(revision_body) = incoming_state
928        .revisions
929        .iter()
930        .find(|body| body.revision == commit.revision)
931    else {
932        return Err(SessionStoreError::InvalidTranscriptRewrite {
933            id: incoming.id().clone(),
934            reason: format!(
935                "incoming rewrite omitted new revision body {}",
936                commit.revision
937            ),
938        });
939    };
940    if parent_body.messages.len() != commit.messages_before
941        || revision_body.messages.len() != commit.messages_after
942    {
943        return Err(SessionStoreError::InvalidTranscriptRewrite {
944            id: incoming.id().clone(),
945            reason: format!(
946                "commit message counts {} -> {} do not match persisted rewrite {} -> {}",
947                commit.messages_before,
948                commit.messages_after,
949                parent_body.messages.len(),
950                revision_body.messages.len()
951            ),
952        });
953    }
954    let parent_body_revision =
955        transcript_messages_digest(&parent_body.messages).map_err(|err| {
956            SessionStoreError::InvalidTranscriptRewrite {
957                id: incoming.id().clone(),
958                reason: format!("parent revision body is not digestible: {err}"),
959            }
960        })?;
961    if parent_body_revision != commit.parent_revision {
962        return Err(SessionStoreError::InvalidTranscriptRewrite {
963            id: incoming.id().clone(),
964            reason: format!(
965                "parent revision body digest {parent_body_revision} does not match commit parent {}",
966                commit.parent_revision
967            ),
968        });
969    }
970    let (start, end) = match &commit.selection {
971        TranscriptRewriteSelection::MessageRange { start, end } => (*start, *end),
972    };
973    if start > end || end > parent_body.messages.len() {
974        return Err(SessionStoreError::InvalidTranscriptRewrite {
975            id: incoming.id().clone(),
976            reason: format!(
977                "commit selection {start}..{end} is invalid for parent revision with {} messages",
978                parent_body.messages.len()
979            ),
980        });
981    }
982    let original_span_digest = transcript_messages_digest(&parent_body.messages[start..end])
983        .map_err(|err| SessionStoreError::InvalidTranscriptRewrite {
984            id: incoming.id().clone(),
985            reason: format!("original span body is not digestible: {err}"),
986        })?;
987    if original_span_digest != commit.original_span_digest {
988        return Err(SessionStoreError::InvalidTranscriptRewrite {
989            id: incoming.id().clone(),
990            reason: format!(
991                "original span digest {original_span_digest} does not match commit digest {}",
992                commit.original_span_digest
993            ),
994        });
995    }
996    let revision_body_digest =
997        transcript_messages_digest(&revision_body.messages).map_err(|err| {
998            SessionStoreError::InvalidTranscriptRewrite {
999                id: incoming.id().clone(),
1000                reason: format!("new revision body is not digestible: {err}"),
1001            }
1002        })?;
1003    if revision_body_digest != commit.revision {
1004        return Err(SessionStoreError::InvalidTranscriptRewrite {
1005            id: incoming.id().clone(),
1006            reason: format!(
1007                "new revision body digest {revision_body_digest} does not match commit revision {}",
1008                commit.revision
1009            ),
1010        });
1011    }
1012    let removed_len = end - start;
1013    let retained_len = commit
1014        .messages_before
1015        .checked_sub(removed_len)
1016        .ok_or_else(|| SessionStoreError::InvalidTranscriptRewrite {
1017            id: incoming.id().clone(),
1018            reason: "commit removed more messages than it recorded before rewrite".to_string(),
1019        })?;
1020    let replacement_len = commit
1021        .messages_after
1022        .checked_sub(retained_len)
1023        .ok_or_else(|| SessionStoreError::InvalidTranscriptRewrite {
1024            id: incoming.id().clone(),
1025            reason: "commit message counts cannot describe a replacement span".to_string(),
1026        })?;
1027    let replacement_end = start.checked_add(replacement_len).ok_or_else(|| {
1028        SessionStoreError::InvalidTranscriptRewrite {
1029            id: incoming.id().clone(),
1030            reason: "replacement span end overflowed".to_string(),
1031        }
1032    })?;
1033    if replacement_end > revision_body.messages.len() {
1034        return Err(SessionStoreError::InvalidTranscriptRewrite {
1035            id: incoming.id().clone(),
1036            reason: format!(
1037                "replacement span {start}..{replacement_end} is invalid for revision with {} messages",
1038                revision_body.messages.len()
1039            ),
1040        });
1041    }
1042    let parent_prefix_digest =
1043        transcript_messages_digest(&parent_body.messages[..start]).map_err(|err| {
1044            SessionStoreError::InvalidTranscriptRewrite {
1045                id: incoming.id().clone(),
1046                reason: format!("parent prefix body is not digestible: {err}"),
1047            }
1048        })?;
1049    let revision_prefix_digest = transcript_messages_digest(&revision_body.messages[..start])
1050        .map_err(|err| SessionStoreError::InvalidTranscriptRewrite {
1051            id: incoming.id().clone(),
1052            reason: format!("revision prefix body is not digestible: {err}"),
1053        })?;
1054    if parent_prefix_digest != revision_prefix_digest {
1055        return Err(SessionStoreError::InvalidTranscriptRewrite {
1056            id: incoming.id().clone(),
1057            reason: "rewrite revision changed messages before the selected span".to_string(),
1058        });
1059    }
1060    let parent_suffix_digest =
1061        transcript_messages_digest(&parent_body.messages[end..]).map_err(|err| {
1062            SessionStoreError::InvalidTranscriptRewrite {
1063                id: incoming.id().clone(),
1064                reason: format!("parent suffix body is not digestible: {err}"),
1065            }
1066        })?;
1067    let revision_suffix_digest =
1068        transcript_messages_digest(&revision_body.messages[replacement_end..]).map_err(|err| {
1069            SessionStoreError::InvalidTranscriptRewrite {
1070                id: incoming.id().clone(),
1071                reason: format!("revision suffix body is not digestible: {err}"),
1072            }
1073        })?;
1074    if parent_suffix_digest != revision_suffix_digest {
1075        return Err(SessionStoreError::InvalidTranscriptRewrite {
1076            id: incoming.id().clone(),
1077            reason: "rewrite revision changed messages after the selected span".to_string(),
1078        });
1079    }
1080    let replacement_digest = transcript_messages_digest(
1081        &revision_body.messages[start..replacement_end],
1082    )
1083    .map_err(|err| SessionStoreError::InvalidTranscriptRewrite {
1084        id: incoming.id().clone(),
1085        reason: format!("replacement span body is not digestible: {err}"),
1086    })?;
1087    if replacement_digest != commit.replacement_digest {
1088        return Err(SessionStoreError::InvalidTranscriptRewrite {
1089            id: incoming.id().clone(),
1090            reason: format!(
1091                "replacement span digest {replacement_digest} does not match commit digest {}",
1092                commit.replacement_digest
1093            ),
1094        });
1095    }
1096    Ok(())
1097}
1098
1099impl From<serde_json::Error> for SessionStoreError {
1100    fn from(e: serde_json::Error) -> Self {
1101        Self::Serialization(e.to_string())
1102    }
1103}
1104
1105/// Abstraction over session storage backends.
1106///
1107/// All methods take `&self` — implementations must handle interior mutability.
1108/// Object-safe: consumed as `Arc<dyn SessionStore>` throughout the system.
1109///
1110/// # Append-only contract (F1 closure, wave-c C-H1)
1111///
1112/// The snapshot written by [`save`](Self::save) is a **projection of the
1113/// canonical event log** ([`crate::session_store`] doc: "snapshot =
1114/// projection"). Implementations that persist across calls MUST enforce
1115/// that the message vector stored for a given `SessionId` is monotonically
1116/// non-shrinking — a subsequent `save()` for the same id must not have a
1117/// smaller `messages().len()` than the previously persisted row.
1118///
1119/// Callers that need to produce a session with a shorter history must go
1120/// through [`Session::fork_at`], which rotates `SessionId` — a fork is a
1121/// new identity on a new event log, not a same-session truncation.
1122///
1123/// Backends are encouraged to assert this invariant in their `save`
1124/// implementation and return
1125/// [`SessionStoreError::MonotonicityViolation`] when a caller tries to
1126/// shrink a snapshot. The default implementations in `meerkat-store`
1127/// (`SqliteSessionStore`, `JsonlStore`, `MemoryStore`) all go through
1128/// the [`append_only_save_guard`] helper.
1129#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
1130#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
1131pub trait SessionStore: Send + Sync {
1132    /// Save a session (create or extend).
1133    ///
1134    /// Implementations MUST reject a save whose message history is
1135    /// shorter than the previously persisted row for the same `SessionId`
1136    /// — see the trait-level doc on the append-only contract.
1137    async fn save(&self, session: &Session) -> Result<(), SessionStoreError>;
1138
1139    /// Save a same-SessionId transcript rewrite.
1140    ///
1141    /// This is the only `SessionStore` path allowed to replace or shrink the
1142    /// current message projection. Implementations must validate `commit`
1143    /// against the previously persisted head before writing `session`.
1144    async fn save_transcript_rewrite(
1145        &self,
1146        session: &Session,
1147        commit: &TranscriptRewriteCommit,
1148    ) -> Result<(), SessionStoreError> {
1149        let _ = (session, commit);
1150        Err(SessionStoreError::Internal(
1151            "save_transcript_rewrite is not supported by this SessionStore".to_string(),
1152        ))
1153    }
1154
1155    /// Save a compatibility projection after a separate authority has already
1156    /// committed the session snapshot.
1157    ///
1158    /// This method is for runtime-backed services only: the runtime snapshot
1159    /// has already accepted the semantic mutation, and the `SessionStore` row is
1160    /// a rebuildable projection. Normal callers must use [`SessionStore::save`]
1161    /// or [`SessionStore::save_transcript_rewrite`] so the store boundary keeps
1162    /// enforcing append-only/CAS semantics.
1163    async fn save_authoritative_projection(
1164        &self,
1165        session: &Session,
1166    ) -> Result<(), SessionStoreError> {
1167        self.save(session).await
1168    }
1169
1170    /// Save an authoritative projection only if the persisted row is still the
1171    /// revision that the caller already validated.
1172    async fn save_authoritative_projection_if_current_revision(
1173        &self,
1174        session: &Session,
1175        expected_current_revision: Option<String>,
1176    ) -> Result<(), SessionStoreError> {
1177        let _ = (session, expected_current_revision);
1178        Err(SessionStoreError::Internal(
1179            "save_authoritative_projection_if_current_revision is not supported by this SessionStore"
1180                .to_string(),
1181        ))
1182    }
1183
1184    /// Load a session by ID.
1185    async fn load(&self, id: &SessionId) -> Result<Option<Session>, SessionStoreError>;
1186
1187    /// List sessions matching filter.
1188    async fn list(&self, filter: SessionFilter) -> Result<Vec<SessionMeta>, SessionStoreError>;
1189
1190    /// Delete a session.
1191    async fn delete(&self, id: &SessionId) -> Result<(), SessionStoreError>;
1192
1193    /// Delete a compatibility projection only if it is still the revision that
1194    /// the caller already validated as unsafe to expose.
1195    async fn delete_if_current_revision(
1196        &self,
1197        id: &SessionId,
1198        expected_current_revision: &str,
1199    ) -> Result<bool, SessionStoreError>;
1200
1201    /// Check if a session exists.
1202    async fn exists(&self, id: &SessionId) -> Result<bool, SessionStoreError> {
1203        Ok(self.load(id).await?.is_some())
1204    }
1205}
1206
1207#[cfg(test)]
1208mod tests {
1209    use super::*;
1210    use crate::types::{
1211        AssistantMessage, BlockAssistantMessage, StopReason, SystemMessage, SystemNoticeBlock,
1212        SystemNoticeKind, SystemNoticeMessage, Usage, UserMessage,
1213    };
1214
1215    #[test]
1216    fn append_only_guard_rejects_leading_system_message_replacement() {
1217        let mut previous = Session::new();
1218        previous.push(Message::System(SystemMessage::new("original system")));
1219        previous.push(Message::User(UserMessage::text("hello".to_string())));
1220
1221        let mut incoming = previous.clone();
1222        let rewrite_result = incoming.replace_messages_internal(
1223            vec![
1224                Message::System(SystemMessage::new("rewritten system")),
1225                Message::User(UserMessage::text("hello".to_string())),
1226            ],
1227            crate::TranscriptRewriteReason::new("unit-test"),
1228        );
1229        assert!(
1230            rewrite_result.is_ok(),
1231            "typed rewrite should be constructible: {rewrite_result:?}"
1232        );
1233
1234        assert!(matches!(
1235            append_only_save_guard(&incoming, Some(&previous)),
1236            Err(SessionStoreError::TranscriptContinuityViolation { .. })
1237        ));
1238    }
1239
1240    #[test]
1241    fn append_only_guard_accepts_runtime_system_context_append() {
1242        let mut previous = Session::new();
1243        previous.push(Message::System(SystemMessage::new("base system")));
1244        previous.push(Message::User(UserMessage::text("hello".to_string())));
1245
1246        let mut incoming = previous.clone();
1247        incoming.set_system_prompt(format!(
1248            "base system{SYSTEM_CONTEXT_SEPARATOR}[Runtime System Context]\nsource: unit-test\n\nextra context"
1249        ));
1250
1251        assert!(append_only_save_guard(&incoming, Some(&previous)).is_ok());
1252    }
1253
1254    #[test]
1255    fn append_only_guard_accepts_system_timestamp_refresh_without_content_change() {
1256        let mut previous = Session::new();
1257        previous.push(Message::System(SystemMessage::new("base system")));
1258
1259        let mut incoming = previous.clone();
1260        incoming.set_system_prompt("base system".to_string());
1261
1262        assert!(append_only_save_guard(&incoming, Some(&previous)).is_ok());
1263    }
1264
1265    #[test]
1266    fn run_boundary_guard_accepts_compaction_after_uncheckpointed_runtime_append()
1267    -> Result<(), Box<dyn std::error::Error>> {
1268        let mut previous = Session::new();
1269        previous.push(Message::System(SystemMessage::new("base system")));
1270        previous.push(Message::User(UserMessage::text("turn one".to_string())));
1271        previous.push(Message::Assistant(AssistantMessage {
1272            content: "answer one".to_string(),
1273            tool_calls: Vec::new(),
1274            stop_reason: StopReason::EndTurn,
1275            usage: Usage::default(),
1276            created_at: crate::types::message_timestamp_now(),
1277        }));
1278
1279        let mut parent = previous.clone();
1280        parent.set_system_prompt("refreshed runtime system projection".to_string());
1281        parent.push(Message::User(UserMessage::text(
1282            "runtime-only turn".to_string(),
1283        )));
1284        parent.push(Message::Assistant(AssistantMessage {
1285            content: "runtime-only answer".to_string(),
1286            tool_calls: Vec::new(),
1287            stop_reason: StopReason::EndTurn,
1288            usage: Usage::default(),
1289            created_at: crate::types::message_timestamp_now(),
1290        }));
1291        let parent_revision = parent.transcript_revision()?;
1292
1293        let mut incoming = parent.clone();
1294        let mut replacement = vec![
1295            parent.messages()[0].clone(),
1296            Message::User(UserMessage::text("[Context compacted] summary".to_string())),
1297        ];
1298        replacement.extend_from_slice(&parent.messages()[1..]);
1299        incoming.commit_transcript_rewrite(
1300            TranscriptRewriteSelection::MessageRange {
1301                start: 0,
1302                end: parent.messages().len(),
1303            },
1304            replacement,
1305            crate::TranscriptRewriteReason::new("compaction"),
1306            Some("meerkat-core".to_string()),
1307            Some(parent_revision),
1308        )?;
1309
1310        assert!(matches!(
1311            append_only_save_guard(&incoming, Some(&previous)),
1312            Err(SessionStoreError::TranscriptContinuityViolation { .. })
1313        ));
1314        assert!(run_boundary_snapshot_save_guard(&incoming, Some(&previous)).is_ok());
1315        Ok(())
1316    }
1317
1318    #[test]
1319    fn run_boundary_guard_accepts_compaction_with_retained_tail_window()
1320    -> Result<(), Box<dyn std::error::Error>> {
1321        let mut previous = Session::new();
1322        previous.push(Message::System(SystemMessage::new("base system")));
1323        previous.push(Message::User(UserMessage::text("turn one".to_string())));
1324        previous.push(Message::BlockAssistant(BlockAssistantMessage::new(
1325            vec![crate::types::AssistantBlock::Text {
1326                text: "answer one".to_string(),
1327                meta: None,
1328            }],
1329            StopReason::EndTurn,
1330        )));
1331
1332        let mut parent = previous.clone();
1333        parent.set_system_prompt("refreshed runtime system projection".to_string());
1334        parent.push(Message::SystemNotice(SystemNoticeMessage::new(
1335            SystemNoticeKind::Comms,
1336            "peer response queued",
1337        )));
1338        let parent_revision = parent.transcript_revision()?;
1339
1340        let mut incoming = parent.clone();
1341        let mut replacement = vec![
1342            parent.messages()[0].clone(),
1343            Message::User(UserMessage::text("[Context compacted] summary".to_string())),
1344        ];
1345        replacement.extend_from_slice(&parent.messages()[1..]);
1346        incoming.commit_transcript_rewrite(
1347            TranscriptRewriteSelection::MessageRange {
1348                start: 0,
1349                end: parent.messages().len(),
1350            },
1351            replacement,
1352            crate::TranscriptRewriteReason::new("compaction"),
1353            Some("meerkat-core".to_string()),
1354            Some(parent_revision),
1355        )?;
1356
1357        assert!(matches!(
1358            append_only_save_guard(&incoming, Some(&previous)),
1359            Err(SessionStoreError::TranscriptContinuityViolation { .. })
1360        ));
1361        assert!(run_boundary_snapshot_save_guard(&incoming, Some(&previous)).is_ok());
1362        Ok(())
1363    }
1364
1365    #[test]
1366    fn run_boundary_guard_rejects_commitless_history_parent_edge()
1367    -> Result<(), Box<dyn std::error::Error>> {
1368        let mut previous = Session::new();
1369        previous.push(Message::System(SystemMessage::new("base system")));
1370        previous.push(Message::User(UserMessage::text("turn one".to_string())));
1371        let previous_revision = previous.transcript_revision()?;
1372
1373        let mut incoming = previous.clone();
1374        incoming.set_system_prompt("forged replacement system".to_string());
1375        let incoming_revision = incoming.transcript_revision()?;
1376        let history = TranscriptHistoryState {
1377            head: incoming_revision.clone(),
1378            commits: Vec::new(),
1379            revisions: vec![
1380                crate::TranscriptRevisionBody {
1381                    revision: previous_revision,
1382                    parent_revision: None,
1383                    messages: previous.messages().to_vec(),
1384                    created_at: previous.updated_at(),
1385                },
1386                crate::TranscriptRevisionBody {
1387                    revision: incoming_revision,
1388                    parent_revision: Some(previous.transcript_revision()?),
1389                    messages: incoming.messages().to_vec(),
1390                    created_at: incoming.updated_at(),
1391                },
1392            ],
1393        };
1394        incoming.set_metadata(
1395            crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1396            serde_json::to_value(history)?,
1397        );
1398
1399        assert!(matches!(
1400            append_only_save_guard(&incoming, Some(&previous)),
1401            Err(SessionStoreError::TranscriptContinuityViolation { .. })
1402        ));
1403        assert!(matches!(
1404            run_boundary_snapshot_save_guard(&incoming, Some(&previous)),
1405            Err(SessionStoreError::TranscriptContinuityViolation { .. }
1406                | SessionStoreError::MonotonicityViolation { .. })
1407        ));
1408        Ok(())
1409    }
1410
1411    #[test]
1412    fn append_only_guard_rejects_history_head_that_does_not_match_current_messages()
1413    -> Result<(), Box<dyn std::error::Error>> {
1414        let mut previous = Session::new();
1415        previous.push(Message::User(UserMessage::text("persisted".to_string())));
1416
1417        let mut incoming = previous.clone();
1418        incoming.push(Message::User(UserMessage::text("append".to_string())));
1419        let poisoned_messages = vec![Message::User(UserMessage::text(
1420            "unrelated poisoned history".to_string(),
1421        ))];
1422        let poisoned_revision = transcript_messages_digest(&poisoned_messages)?;
1423        incoming.set_metadata(
1424            crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1425            serde_json::to_value(TranscriptHistoryState {
1426                head: poisoned_revision.clone(),
1427                commits: Vec::new(),
1428                revisions: vec![crate::TranscriptRevisionBody {
1429                    revision: poisoned_revision,
1430                    parent_revision: None,
1431                    messages: poisoned_messages,
1432                    created_at: incoming.updated_at(),
1433                }],
1434            })?,
1435        );
1436
1437        assert!(matches!(
1438            append_only_save_guard(&incoming, Some(&previous)),
1439            Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1440        ));
1441        assert!(matches!(
1442            append_only_save_guard(&incoming, None),
1443            Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1444        ));
1445        assert!(matches!(
1446            run_boundary_snapshot_save_guard(&incoming, Some(&previous)),
1447            Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1448        ));
1449        Ok(())
1450    }
1451
1452    #[test]
1453    fn append_only_guard_rejects_new_rewrite_commits_on_plain_append()
1454    -> Result<(), Box<dyn std::error::Error>> {
1455        let mut previous = Session::new();
1456        previous.push(Message::User(UserMessage::text("persisted".to_string())));
1457        let previous_revision = previous.transcript_revision()?;
1458
1459        let mut incoming = previous.clone();
1460        let appended = Message::Assistant(AssistantMessage {
1461            content: "plain append".to_string(),
1462            tool_calls: Vec::new(),
1463            stop_reason: StopReason::EndTurn,
1464            usage: Usage::default(),
1465            created_at: crate::types::message_timestamp_now(),
1466        });
1467        incoming.commit_transcript_rewrite(
1468            TranscriptRewriteSelection::MessageRange { start: 1, end: 1 },
1469            vec![appended],
1470            crate::TranscriptRewriteReason::new("forged-append"),
1471            Some("unit-test".to_string()),
1472            Some(previous_revision),
1473        )?;
1474
1475        assert!(matches!(
1476            append_only_save_guard(&incoming, Some(&previous)),
1477            Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1478        ));
1479        Ok(())
1480    }
1481
1482    #[test]
1483    fn append_only_guard_rejects_first_save_with_rewrite_commits()
1484    -> Result<(), Box<dyn std::error::Error>> {
1485        let mut incoming = Session::new();
1486        incoming.push(Message::User(UserMessage::text("seed".to_string())));
1487        let parent_messages = incoming.messages().to_vec();
1488        let parent_updated_at = incoming.updated_at();
1489        let parent_revision = incoming.transcript_revision()?;
1490        let commit = incoming.commit_transcript_rewrite(
1491            TranscriptRewriteSelection::MessageRange { start: 0, end: 1 },
1492            vec![Message::User(UserMessage::text(
1493                "compacted seed".to_string(),
1494            ))],
1495            crate::TranscriptRewriteReason::new("compaction"),
1496            Some("meerkat-core".to_string()),
1497            Some(parent_revision),
1498        )?;
1499        let incoming_revision = incoming.transcript_revision()?;
1500        let commit_parent_revision = commit.parent_revision.clone();
1501        incoming.set_metadata(
1502            crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1503            serde_json::to_value(TranscriptHistoryState {
1504                head: incoming_revision.clone(),
1505                commits: vec![commit],
1506                revisions: vec![
1507                    crate::TranscriptRevisionBody {
1508                        revision: commit_parent_revision.clone(),
1509                        parent_revision: None,
1510                        messages: parent_messages,
1511                        created_at: parent_updated_at,
1512                    },
1513                    crate::TranscriptRevisionBody {
1514                        revision: incoming_revision,
1515                        parent_revision: Some(commit_parent_revision),
1516                        messages: incoming.messages().to_vec(),
1517                        created_at: incoming.updated_at(),
1518                    },
1519                ],
1520            })?,
1521        );
1522
1523        assert!(matches!(
1524            append_only_save_guard(&incoming, None),
1525            Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1526        ));
1527        Ok(())
1528    }
1529
1530    #[test]
1531    fn transcript_rewrite_guard_rejects_poisoned_history_graph()
1532    -> Result<(), Box<dyn std::error::Error>> {
1533        let mut previous = Session::new();
1534        previous.push(Message::User(UserMessage::text("persisted".to_string())));
1535        let parent_revision = previous.transcript_revision()?;
1536
1537        let mut first = previous.clone();
1538        let first_commit = first.commit_transcript_rewrite(
1539            TranscriptRewriteSelection::MessageRange { start: 0, end: 1 },
1540            vec![Message::User(UserMessage::text(
1541                "compacted persisted".to_string(),
1542            ))],
1543            crate::TranscriptRewriteReason::new("compaction"),
1544            Some("unit-test".to_string()),
1545            Some(parent_revision),
1546        )?;
1547        let first_snapshot = first.clone();
1548
1549        first.commit_transcript_rewrite(
1550            TranscriptRewriteSelection::MessageRange { start: 0, end: 1 },
1551            vec![Message::User(UserMessage::text(
1552                "uncommitted poisoned fork".to_string(),
1553            ))],
1554            crate::TranscriptRewriteReason::new("poison"),
1555            Some("unit-test".to_string()),
1556            Some(first_commit.revision.clone()),
1557        )?;
1558        let mut poisoned_state = first
1559            .transcript_history_state()?
1560            .ok_or_else(|| "second rewrite should retain history state".to_string())?;
1561        poisoned_state.head = first_commit.revision.clone();
1562
1563        let mut poisoned = first_snapshot;
1564        poisoned.set_metadata(
1565            crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1566            serde_json::to_value(poisoned_state)?,
1567        );
1568
1569        assert!(matches!(
1570            transcript_rewrite_save_guard(&poisoned, Some(&previous), &first_commit),
1571            Err(SessionStoreError::InvalidTranscriptRewrite { reason, .. })
1572                if reason.contains("incoming transcript history state is malformed")
1573        ));
1574        Ok(())
1575    }
1576
1577    #[test]
1578    fn authoritative_projection_guard_rejects_changed_persisted_revision()
1579    -> Result<(), Box<dyn std::error::Error>> {
1580        let mut previous = Session::new();
1581        previous.push(Message::User(UserMessage::text("persisted A".to_string())));
1582        let expected_revision = previous.transcript_revision()?;
1583
1584        let mut current = previous.clone();
1585        current.push(Message::Assistant(AssistantMessage {
1586            content: "persisted B".to_string(),
1587            tool_calls: Vec::new(),
1588            stop_reason: StopReason::EndTurn,
1589            usage: Usage::default(),
1590            created_at: crate::types::message_timestamp_now(),
1591        }));
1592        let mut incoming = previous.clone();
1593        incoming.push(Message::User(UserMessage::text(
1594            "incoming from A".to_string(),
1595        )));
1596
1597        assert!(matches!(
1598            authoritative_projection_current_revision_guard(
1599                &incoming,
1600                Some(&current),
1601                Some(&expected_revision)
1602            ),
1603            Err(SessionStoreError::TranscriptContinuityViolation { .. })
1604        ));
1605        Ok(())
1606    }
1607
1608    #[test]
1609    fn append_only_guard_rejects_rewrite_commits_on_first_save()
1610    -> Result<(), Box<dyn std::error::Error>> {
1611        let mut incoming = Session::new();
1612        incoming.push(Message::User(UserMessage::text("persisted".to_string())));
1613        let parent_revision = incoming.transcript_revision()?;
1614        incoming.commit_transcript_rewrite(
1615            TranscriptRewriteSelection::MessageRange { start: 0, end: 1 },
1616            vec![Message::User(UserMessage::text("rewritten".to_string()))],
1617            crate::TranscriptRewriteReason::new("forged-first-save"),
1618            Some("unit-test".to_string()),
1619            Some(parent_revision),
1620        )?;
1621
1622        assert!(matches!(
1623            append_only_save_guard(&incoming, None),
1624            Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1625        ));
1626        Ok(())
1627    }
1628
1629    #[test]
1630    fn append_only_guard_rejects_commitless_history_on_first_save()
1631    -> Result<(), Box<dyn std::error::Error>> {
1632        let mut incoming = Session::new();
1633        incoming.push(Message::User(UserMessage::text("persisted".to_string())));
1634        let incoming_revision = incoming.transcript_revision()?;
1635        incoming.set_metadata(
1636            crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1637            serde_json::to_value(TranscriptHistoryState {
1638                head: incoming_revision.clone(),
1639                commits: Vec::new(),
1640                revisions: vec![crate::TranscriptRevisionBody {
1641                    revision: incoming_revision,
1642                    parent_revision: None,
1643                    messages: incoming.messages().to_vec(),
1644                    created_at: incoming.updated_at(),
1645                }],
1646            })?,
1647        );
1648
1649        assert!(matches!(
1650            append_only_save_guard(&incoming, None),
1651            Err(SessionStoreError::InvalidTranscriptRewrite { reason, .. })
1652                if reason.contains("first save would seed transcript history state")
1653        ));
1654        Ok(())
1655    }
1656
1657    #[test]
1658    fn append_only_guard_rejects_commitless_history_seed_on_plain_append()
1659    -> Result<(), Box<dyn std::error::Error>> {
1660        let mut previous = Session::new();
1661        previous.push(Message::User(UserMessage::text("persisted".to_string())));
1662        let previous_revision = previous.transcript_revision()?;
1663
1664        let mut incoming = previous.clone();
1665        incoming.push(Message::Assistant(AssistantMessage {
1666            content: "plain append".to_string(),
1667            tool_calls: Vec::new(),
1668            stop_reason: StopReason::EndTurn,
1669            usage: Usage::default(),
1670            created_at: crate::types::message_timestamp_now(),
1671        }));
1672        let incoming_revision = incoming.transcript_revision()?;
1673        incoming.set_metadata(
1674            crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1675            serde_json::to_value(TranscriptHistoryState {
1676                head: incoming_revision.clone(),
1677                commits: Vec::new(),
1678                revisions: vec![
1679                    crate::TranscriptRevisionBody {
1680                        revision: previous_revision,
1681                        parent_revision: None,
1682                        messages: previous.messages().to_vec(),
1683                        created_at: previous.updated_at(),
1684                    },
1685                    crate::TranscriptRevisionBody {
1686                        revision: incoming_revision,
1687                        parent_revision: Some(previous.transcript_revision()?),
1688                        messages: incoming.messages().to_vec(),
1689                        created_at: incoming.updated_at(),
1690                    },
1691                ],
1692            })?,
1693        );
1694
1695        assert!(matches!(
1696            append_only_save_guard(&incoming, Some(&previous)),
1697            Err(SessionStoreError::InvalidTranscriptRewrite { reason, .. })
1698                if reason.contains("append-only save would seed transcript history state")
1699        ));
1700        Ok(())
1701    }
1702
1703    #[test]
1704    fn append_only_guard_rejects_new_rewrite_commits_on_system_context_append()
1705    -> Result<(), Box<dyn std::error::Error>> {
1706        let mut previous = Session::new();
1707        previous.push(Message::System(SystemMessage::new("base system")));
1708        previous.push(Message::User(UserMessage::text("persisted".to_string())));
1709        let mut incoming = previous.clone();
1710        incoming.set_system_prompt(format!(
1711            "base system{SYSTEM_CONTEXT_SEPARATOR}[Runtime System Context]\nsource: unit-test\n\nextra context"
1712        ));
1713        incoming.push(Message::Assistant(AssistantMessage {
1714            content: "plain append".to_string(),
1715            tool_calls: Vec::new(),
1716            stop_reason: StopReason::EndTurn,
1717            usage: Usage::default(),
1718            created_at: crate::types::message_timestamp_now(),
1719        }));
1720        let incoming_revision = incoming.transcript_revision()?;
1721        incoming.set_metadata(
1722            crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1723            serde_json::to_value(TranscriptHistoryState {
1724                head: incoming_revision.clone(),
1725                commits: vec![TranscriptRewriteCommit {
1726                    parent_revision: previous.transcript_revision()?,
1727                    revision: incoming_revision.clone(),
1728                    selection: TranscriptRewriteSelection::MessageRange { start: 0, end: 0 },
1729                    original_span_digest: transcript_messages_digest(&[])?,
1730                    replacement_digest: transcript_messages_digest(&[])?,
1731                    messages_before: previous.messages().len(),
1732                    messages_after: incoming.messages().len(),
1733                    reason: crate::TranscriptRewriteReason::new("forged"),
1734                    actor: Some("unit-test".to_string()),
1735                    committed_at: incoming.updated_at(),
1736                }],
1737                revisions: vec![crate::TranscriptRevisionBody {
1738                    revision: incoming_revision,
1739                    parent_revision: None,
1740                    messages: incoming.messages().to_vec(),
1741                    created_at: incoming.updated_at(),
1742                }],
1743            })?,
1744        );
1745
1746        assert!(matches!(
1747            append_only_save_guard(&incoming, Some(&previous)),
1748            Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1749        ));
1750        Ok(())
1751    }
1752
1753    #[test]
1754    fn append_only_guard_rejects_new_rewrite_commits_on_transient_notice_cleanup()
1755    -> Result<(), Box<dyn std::error::Error>> {
1756        let mut previous = Session::new();
1757        previous.push(Message::SystemNotice(SystemNoticeMessage::new(
1758            SystemNoticeKind::Comms,
1759            "transient peer delivery notice",
1760        )));
1761        previous.push(Message::User(UserMessage::text("persisted".to_string())));
1762
1763        let mut incoming = Session::new();
1764        incoming.push(Message::User(UserMessage::text("persisted".to_string())));
1765        incoming.push(Message::Assistant(AssistantMessage {
1766            content: "plain append after notice cleanup".to_string(),
1767            tool_calls: Vec::new(),
1768            stop_reason: StopReason::EndTurn,
1769            usage: Usage::default(),
1770            created_at: crate::types::message_timestamp_now(),
1771        }));
1772        let incoming_revision = incoming.transcript_revision()?;
1773        incoming.set_metadata(
1774            crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1775            serde_json::to_value(TranscriptHistoryState {
1776                head: incoming_revision.clone(),
1777                commits: vec![TranscriptRewriteCommit {
1778                    parent_revision: previous.transcript_revision()?,
1779                    revision: incoming_revision.clone(),
1780                    selection: TranscriptRewriteSelection::MessageRange { start: 0, end: 0 },
1781                    original_span_digest: transcript_messages_digest(&[])?,
1782                    replacement_digest: transcript_messages_digest(&[])?,
1783                    messages_before: previous.messages().len(),
1784                    messages_after: incoming.messages().len(),
1785                    reason: crate::TranscriptRewriteReason::new("forged"),
1786                    actor: Some("unit-test".to_string()),
1787                    committed_at: incoming.updated_at(),
1788                }],
1789                revisions: vec![crate::TranscriptRevisionBody {
1790                    revision: incoming_revision,
1791                    parent_revision: None,
1792                    messages: incoming.messages().to_vec(),
1793                    created_at: incoming.updated_at(),
1794                }],
1795            })?,
1796        );
1797
1798        assert!(matches!(
1799            append_only_save_guard(&incoming, Some(&previous)),
1800            Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1801        ));
1802        Ok(())
1803    }
1804
1805    #[test]
1806    fn run_boundary_guard_rejects_runtime_parent_with_inserted_message_before_tail()
1807    -> Result<(), Box<dyn std::error::Error>> {
1808        let mut previous = Session::new();
1809        previous.push(Message::System(SystemMessage::new("base system")));
1810        previous.push(Message::User(UserMessage::text("turn one".to_string())));
1811        previous.push(Message::Assistant(AssistantMessage {
1812            content: "answer one".to_string(),
1813            tool_calls: Vec::new(),
1814            stop_reason: StopReason::EndTurn,
1815            usage: Usage::default(),
1816            created_at: crate::types::message_timestamp_now(),
1817        }));
1818
1819        let parent_messages = vec![
1820            Message::System(SystemMessage::new("refreshed runtime system projection")),
1821            Message::User(UserMessage::text(
1822                "injected before retained tail".to_string(),
1823            )),
1824            previous.messages()[1].clone(),
1825            previous.messages()[2].clone(),
1826        ];
1827        let parent_revision = transcript_messages_digest(&parent_messages)?;
1828        let mut parent = previous.clone();
1829        parent.apply_transcript_history_state(TranscriptHistoryState {
1830            head: parent_revision.clone(),
1831            commits: Vec::new(),
1832            revisions: vec![crate::TranscriptRevisionBody {
1833                revision: parent_revision,
1834                parent_revision: None,
1835                messages: parent_messages,
1836                created_at: parent.updated_at(),
1837            }],
1838        })?;
1839        let parent_revision = parent.transcript_revision()?;
1840
1841        let mut incoming = parent.clone();
1842        incoming.commit_transcript_rewrite(
1843            TranscriptRewriteSelection::MessageRange {
1844                start: 0,
1845                end: parent.messages().len(),
1846            },
1847            vec![Message::User(UserMessage::text(
1848                "[Context compacted] summary".to_string(),
1849            ))],
1850            crate::TranscriptRewriteReason::new("compaction"),
1851            Some("meerkat-core".to_string()),
1852            Some(parent_revision),
1853        )?;
1854
1855        assert!(matches!(
1856            run_boundary_snapshot_save_guard(&incoming, Some(&previous)),
1857            Err(SessionStoreError::TranscriptContinuityViolation { .. }
1858                | SessionStoreError::MonotonicityViolation { .. })
1859        ));
1860        Ok(())
1861    }
1862
1863    #[test]
1864    fn run_boundary_guard_rejects_forged_parent_edge_before_real_rewrite_commit()
1865    -> Result<(), Box<dyn std::error::Error>> {
1866        let mut previous = Session::new();
1867        previous.push(Message::System(SystemMessage::new("base system")));
1868        previous.push(Message::User(UserMessage::text("turn one".to_string())));
1869        previous.push(Message::Assistant(AssistantMessage {
1870            content: "answer one".to_string(),
1871            tool_calls: Vec::new(),
1872            stop_reason: StopReason::EndTurn,
1873            usage: Usage::default(),
1874            created_at: crate::types::message_timestamp_now(),
1875        }));
1876        let previous_revision = previous.transcript_revision()?;
1877
1878        let forged_parent_messages = vec![
1879            Message::System(SystemMessage::new("refreshed runtime system projection")),
1880            Message::User(UserMessage::text(
1881                "forged insertion before retained tail".to_string(),
1882            )),
1883            previous.messages()[1].clone(),
1884            previous.messages()[2].clone(),
1885        ];
1886        let forged_parent_revision = transcript_messages_digest(&forged_parent_messages)?;
1887        let mut forged_parent = previous.clone();
1888        forged_parent.apply_transcript_history_state(TranscriptHistoryState {
1889            head: forged_parent_revision.clone(),
1890            commits: Vec::new(),
1891            revisions: vec![
1892                crate::TranscriptRevisionBody {
1893                    revision: previous_revision.clone(),
1894                    parent_revision: None,
1895                    messages: previous.messages().to_vec(),
1896                    created_at: previous.updated_at(),
1897                },
1898                crate::TranscriptRevisionBody {
1899                    revision: forged_parent_revision.clone(),
1900                    parent_revision: Some(previous_revision),
1901                    messages: forged_parent_messages,
1902                    created_at: forged_parent.updated_at(),
1903                },
1904            ],
1905        })?;
1906
1907        let mut incoming = forged_parent.clone();
1908        incoming.commit_transcript_rewrite(
1909            TranscriptRewriteSelection::MessageRange {
1910                start: 0,
1911                end: forged_parent.messages().len(),
1912            },
1913            vec![Message::User(UserMessage::text(
1914                "[Context compacted] forged branch".to_string(),
1915            ))],
1916            crate::TranscriptRewriteReason::new("compaction"),
1917            Some("meerkat-core".to_string()),
1918            Some(forged_parent_revision),
1919        )?;
1920
1921        assert!(matches!(
1922            run_boundary_snapshot_save_guard(&incoming, Some(&previous)),
1923            Err(SessionStoreError::TranscriptContinuityViolation { .. }
1924                | SessionStoreError::MonotonicityViolation { .. })
1925        ));
1926        Ok(())
1927    }
1928
1929    #[test]
1930    fn append_only_guard_rejects_transient_mcp_pending_notice_cleanup_with_unaudited_commit()
1931    -> Result<(), crate::TranscriptEditError> {
1932        let mut previous = Session::new();
1933        previous.push(Message::User(UserMessage::text("hello".to_string())));
1934        previous.push(Message::SystemNotice(SystemNoticeMessage {
1935            kind: SystemNoticeKind::McpPending,
1936            body: Some("connecting".to_string()),
1937            blocks: vec![SystemNoticeBlock::Mcp {
1938                server_id: None,
1939                operation: None,
1940                phase: None,
1941                persisted: false,
1942                detail: Some("connecting".to_string()),
1943                pending_sources: vec!["test-server".to_string()],
1944            }],
1945            created_at: crate::types::message_timestamp_now(),
1946        }));
1947        previous.push(Message::BlockAssistant(BlockAssistantMessage::new(
1948            vec![crate::types::AssistantBlock::Text {
1949                text: "answer".to_string(),
1950                meta: None,
1951            }],
1952            StopReason::EndTurn,
1953        )));
1954
1955        let mut incoming = previous.clone();
1956        incoming.replace_messages_internal(
1957            previous
1958                .messages()
1959                .iter()
1960                .filter(|message| !matches!(message, Message::SystemNotice(_)))
1961                .cloned()
1962                .collect(),
1963            crate::TranscriptRewriteReason::new("unit-test"),
1964        )?;
1965        incoming.push(Message::User(UserMessage::text("again".to_string())));
1966
1967        assert!(matches!(
1968            append_only_save_guard(&incoming, Some(&previous)),
1969            Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1970        ));
1971        Ok::<(), crate::TranscriptEditError>(())
1972    }
1973
1974    #[test]
1975    fn rewrite_chain_finder_crosses_normal_append_between_rewrites()
1976    -> Result<(), Box<dyn std::error::Error>> {
1977        let mut session = Session::new();
1978        session.push(Message::User(UserMessage::text("first".to_string())));
1979        session.push(Message::Assistant(AssistantMessage {
1980            content: "verbose first answer".to_string(),
1981            tool_calls: Vec::new(),
1982            stop_reason: StopReason::EndTurn,
1983            usage: Usage::default(),
1984            created_at: crate::types::message_timestamp_now(),
1985        }));
1986
1987        let original = session.transcript_revision()?;
1988        let first = session.commit_transcript_rewrite(
1989            TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
1990            vec![Message::Assistant(AssistantMessage {
1991                content: "compact first answer".to_string(),
1992                tool_calls: Vec::new(),
1993                stop_reason: StopReason::EndTurn,
1994                usage: Usage::default(),
1995                created_at: crate::types::message_timestamp_now(),
1996            })],
1997            crate::TranscriptRewriteReason::new("compaction"),
1998            Some("unit-test".to_string()),
1999            Some(original.clone()),
2000        )?;
2001
2002        session.push(Message::User(UserMessage::text("second".to_string())));
2003        session.push(Message::Assistant(AssistantMessage {
2004            content: "verbose second answer".to_string(),
2005            tool_calls: Vec::new(),
2006            stop_reason: StopReason::EndTurn,
2007            usage: Usage::default(),
2008            created_at: crate::types::message_timestamp_now(),
2009        }));
2010        let bridge = session.transcript_revision()?;
2011        assert_ne!(bridge, first.revision);
2012
2013        let second = session.commit_transcript_rewrite(
2014            TranscriptRewriteSelection::MessageRange { start: 3, end: 4 },
2015            vec![Message::Assistant(AssistantMessage {
2016                content: "compact second answer".to_string(),
2017                tool_calls: Vec::new(),
2018                stop_reason: StopReason::EndTurn,
2019                usage: Usage::default(),
2020                created_at: crate::types::message_timestamp_now(),
2021            })],
2022            crate::TranscriptRewriteReason::new("compaction"),
2023            Some("unit-test".to_string()),
2024            Some(bridge),
2025        )?;
2026        let state = session
2027            .transcript_history_state()?
2028            .ok_or_else(|| std::io::Error::other("missing transcript history state"))?;
2029
2030        let chain =
2031            find_transcript_rewrite_commit_chain_extending(&state, &original, &second.revision)
2032                .ok_or_else(|| {
2033                    std::io::Error::other(
2034                        "rewrite chain should extend through normal append bridge",
2035                    )
2036                })?;
2037        assert_eq!(chain.len(), 2);
2038        assert_eq!(chain[0].revision, first.revision);
2039        assert_eq!(chain[1].revision, second.revision);
2040        Ok(())
2041    }
2042
2043    #[test]
2044    fn run_boundary_guard_rejects_dropped_retained_rewrite_commits()
2045    -> Result<(), Box<dyn std::error::Error>> {
2046        let mut base = Session::new();
2047        base.push(Message::User(UserMessage::text("turn one".to_string())));
2048        base.push(Message::Assistant(AssistantMessage {
2049            content: "verbose answer".to_string(),
2050            tool_calls: Vec::new(),
2051            stop_reason: StopReason::EndTurn,
2052            usage: Usage::default(),
2053            created_at: crate::types::message_timestamp_now(),
2054        }));
2055        let base_revision = base.transcript_revision()?;
2056
2057        let mut previous = base.clone();
2058        let _retained_commit = previous.commit_transcript_rewrite(
2059            TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
2060            vec![Message::Assistant(AssistantMessage {
2061                content: "first compact answer".to_string(),
2062                tool_calls: Vec::new(),
2063                stop_reason: StopReason::EndTurn,
2064                usage: Usage::default(),
2065                created_at: crate::types::message_timestamp_now(),
2066            })],
2067            crate::TranscriptRewriteReason::new("compaction"),
2068            Some("unit-test".to_string()),
2069            Some(base_revision),
2070        )?;
2071        let previous_revision = previous.transcript_revision()?;
2072
2073        let mut incoming = previous.clone();
2074        let new_commit = incoming.commit_transcript_rewrite(
2075            TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
2076            vec![Message::Assistant(AssistantMessage {
2077                content: "second compact answer".to_string(),
2078                tool_calls: Vec::new(),
2079                stop_reason: StopReason::EndTurn,
2080                usage: Usage::default(),
2081                created_at: crate::types::message_timestamp_now(),
2082            })],
2083            crate::TranscriptRewriteReason::new("compaction"),
2084            Some("unit-test".to_string()),
2085            Some(previous_revision),
2086        )?;
2087        let mut state = incoming
2088            .transcript_history_state()?
2089            .ok_or_else(|| std::io::Error::other("incoming rewrite should retain history"))?;
2090        state.commits = vec![new_commit];
2091        incoming.set_metadata(
2092            crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
2093            serde_json::to_value(state)?,
2094        );
2095
2096        assert!(matches!(
2097            run_boundary_snapshot_save_guard(&incoming, Some(&previous)),
2098            Err(SessionStoreError::InvalidTranscriptRewrite { reason, .. })
2099                if reason.contains("drop retained transcript rewrite commits")
2100        ));
2101        Ok(())
2102    }
2103}