Skip to main content

meerkat_core/
session_store.rs

1//! SessionStore trait — canonical session persistence contract.
2//!
3//! This trait lives in `meerkat-core` so that custom storage implementations
4//! (Postgres, DynamoDB, etc.) can be written without depending on `meerkat-store`.
5//!
6//! # Snapshot = projection
7//!
8//! The `Session` row a `SessionStore` persists is a **projection of the
9//! canonical event log**. The event log (`EventStore`) is append-only at
10//! the trait level; the snapshot is a rebuildable materialization of
11//! replaying that log. Deleting a `.rkat/sessions/<id>/session.json` and
12//! replaying the event store produces an identical snapshot (the
13//! `CLAUDE.md` invariant).
14//!
15//! Wave-c C-H1 (F1 closure from the state-scope-audit) makes the
16//! append-only nature of that projection enforceable at the
17//! `SessionStore::save` boundary — see the trait docs on
18//! [`SessionStore`] and the [`append_only_save_guard`] helper.
19
20use async_trait::async_trait;
21use sha2::{Digest, Sha256};
22
23use crate::session::{SYSTEM_CONTEXT_SEPARATOR, SessionMeta};
24use crate::time_compat::SystemTime;
25use crate::types::{Message, SessionId, SystemMessage};
26use crate::{
27    Session, TranscriptHistoryState, TranscriptRewriteCommit, TranscriptRewriteSelection,
28    transcript_messages_digest,
29};
30
31/// Filter for listing sessions.
32#[derive(Debug, Clone, Default)]
33pub struct SessionFilter {
34    /// Only sessions created after this time.
35    pub created_after: Option<SystemTime>,
36    /// Only sessions updated after this time.
37    pub updated_after: Option<SystemTime>,
38    /// Maximum number of results.
39    pub limit: Option<usize>,
40    /// Offset for pagination.
41    pub offset: Option<usize>,
42}
43
44/// Errors from session store operations.
45///
46/// Backend-specific details (rusqlite, filesystem, etc.) are erased to strings
47/// so that the trait contract carries no I/O dependencies.
48#[derive(Debug, thiserror::Error)]
49pub enum SessionStoreError {
50    #[error("IO error: {0}")]
51    Io(#[from] std::io::Error),
52
53    #[error("Serialization error: {0}")]
54    Serialization(String),
55
56    #[error("Session not found: {0}")]
57    NotFound(SessionId),
58
59    #[error("Session corrupted: {0}")]
60    Corrupted(SessionId),
61
62    #[error(
63        "session {id} save rejected: new message count {new_len} is shorter than previously \
64         persisted {prev_len} without transcript-continuity proof"
65    )]
66    MonotonicityViolation {
67        id: SessionId,
68        prev_len: usize,
69        new_len: usize,
70    },
71
72    #[error(
73        "session {id} save rejected: incoming transcript is not a continuation of persisted revision {previous_revision}"
74    )]
75    TranscriptContinuityViolation {
76        id: SessionId,
77        previous_revision: String,
78        incoming_revision: String,
79        reason: String,
80    },
81
82    #[error(
83        "session {id} rewrite rejected: previous transcript revision {actual} did not match commit parent {expected}"
84    )]
85    TranscriptRevisionConflict {
86        id: SessionId,
87        expected: String,
88        actual: String,
89    },
90
91    #[error("session {id} rewrite rejected: {reason}")]
92    InvalidTranscriptRewrite { id: SessionId, reason: String },
93
94    #[error("Internal error: {0}")]
95    Internal(String),
96}
97
98/// Stable compare token for a full persisted session projection row.
99pub fn session_projection_cas_token(session: &Session) -> Result<String, SessionStoreError> {
100    let bytes = serde_json::to_vec(session).map_err(|err| {
101        SessionStoreError::Serialization(format!(
102            "failed to serialize session projection CAS token: {err}"
103        ))
104    })?;
105    Ok(format!("row-sha256:{:x}", Sha256::digest(bytes)))
106}
107
108/// Shared append-only guard for `SessionStore::save` implementations.
109///
110/// Backends call this at the top of their `save` method with the new
111/// session and the previously persisted row (or `None` if no prior row
112/// exists). Returns
113/// [`SessionStoreError::MonotonicityViolation`] when the new row's
114/// message count is strictly smaller than the previously persisted one
115/// without a transcript graph edge that proves a core-owned mutation.
116///
117/// The guard also rejects equal/longer saves whose retained prefix no longer
118/// matches the persisted transcript. A plain save may append or update
119/// metadata; same-session replacement must go through
120/// [`transcript_rewrite_save_guard`].
121pub fn append_only_save_guard(
122    incoming: &Session,
123    previous: Option<&Session>,
124) -> Result<(), SessionStoreError> {
125    incoming
126        .validate_transcript_history_state()
127        .map_err(|err| SessionStoreError::InvalidTranscriptRewrite {
128            id: incoming.id().clone(),
129            reason: format!("incoming transcript history state is malformed: {err}"),
130        })?;
131    let incoming_revision =
132        transcript_messages_digest(incoming.messages()).map_err(SessionStoreError::from)?;
133    let incoming_state = incoming.transcript_history_state().map_err(|err| {
134        SessionStoreError::InvalidTranscriptRewrite {
135            id: incoming.id().clone(),
136            reason: format!("incoming transcript history state is malformed: {err}"),
137        }
138    })?;
139    if let Some(state) = incoming_state.as_ref()
140        && state.head != incoming_revision
141    {
142        return Err(SessionStoreError::InvalidTranscriptRewrite {
143            id: incoming.id().clone(),
144            reason: format!(
145                "incoming transcript graph head {} does not match current message digest {incoming_revision}",
146                state.head
147            ),
148        });
149    }
150
151    let Some(previous) = previous else {
152        if incoming_state.is_some() {
153            return Err(SessionStoreError::InvalidTranscriptRewrite {
154                id: incoming.id().clone(),
155                reason: "incoming first save would seed transcript history state outside the rewrite/audit path"
156                    .to_string(),
157            });
158        }
159        validate_plain_save_transcript_history_preservation(
160            incoming,
161            None,
162            None,
163            incoming_state.as_ref(),
164        )?;
165        return Ok(());
166    };
167    let previous_state = previous.transcript_history_state().map_err(|err| {
168        SessionStoreError::InvalidTranscriptRewrite {
169            id: incoming.id().clone(),
170            reason: format!("previous transcript history state is malformed: {err}"),
171        }
172    })?;
173    let previous_had_history = previous_state.is_some();
174    let incoming_has_history = incoming_state.is_some();
175    if previous_had_history && !incoming_has_history {
176        return Err(SessionStoreError::InvalidTranscriptRewrite {
177            id: incoming.id().clone(),
178            reason: "incoming save would erase retained transcript history state".to_string(),
179        });
180    }
181    let previous_revision =
182        transcript_messages_digest(previous.messages()).map_err(SessionStoreError::from)?;
183    if previous_revision == incoming_revision {
184        validate_plain_save_transcript_history_preservation(
185            incoming,
186            Some(previous),
187            previous_state.as_ref(),
188            incoming_state.as_ref(),
189        )?;
190        return Ok(());
191    }
192
193    let prev_len = previous.messages().len();
194    let new_len = incoming.messages().len();
195    if new_len >= prev_len {
196        let incoming_prefix_revision = transcript_messages_digest(&incoming.messages()[..prev_len])
197            .map_err(SessionStoreError::from)?;
198        if incoming_prefix_revision == previous_revision {
199            validate_plain_save_transcript_history_preservation(
200                incoming,
201                Some(previous),
202                previous_state.as_ref(),
203                incoming_state.as_ref(),
204            )?;
205            return Ok(());
206        }
207    }
208    if incoming_preserves_conversation_tail_with_system_context_append(incoming, previous)? {
209        validate_plain_save_transcript_history_preservation(
210            incoming,
211            Some(previous),
212            previous_state.as_ref(),
213            incoming_state.as_ref(),
214        )?;
215        return Ok(());
216    }
217    if incoming_preserves_prefix_after_transient_notice_cleanup(incoming, previous)? {
218        validate_plain_save_transcript_history_preservation(
219            incoming,
220            Some(previous),
221            previous_state.as_ref(),
222            incoming_state.as_ref(),
223        )?;
224        return Ok(());
225    }
226    if new_len < prev_len {
227        return Err(SessionStoreError::MonotonicityViolation {
228            id: incoming.id().clone(),
229            prev_len,
230            new_len,
231        });
232    }
233
234    Err(SessionStoreError::TranscriptContinuityViolation {
235        id: incoming.id().clone(),
236        previous_revision,
237        incoming_revision,
238        reason: "incoming transcript neither preserves the persisted prefix nor records a graph edge from the persisted head".to_string(),
239    })
240}
241
242fn validate_plain_save_transcript_history_preservation(
243    incoming: &Session,
244    previous: Option<&Session>,
245    previous_state: Option<&TranscriptHistoryState>,
246    incoming_state: Option<&TranscriptHistoryState>,
247) -> Result<(), SessionStoreError> {
248    let Some(previous) = previous else {
249        if incoming_state.is_some() {
250            return Err(SessionStoreError::InvalidTranscriptRewrite {
251                id: incoming.id().clone(),
252                reason: "incoming first save would seed transcript history state outside the rewrite/audit path"
253                    .to_string(),
254            });
255        }
256        return Ok(());
257    };
258    if previous_state.is_none() && incoming_state.is_some() {
259        return Err(SessionStoreError::InvalidTranscriptRewrite {
260            id: incoming.id().clone(),
261            reason: "incoming append-only save would seed transcript history state outside the rewrite/audit path"
262                .to_string(),
263        });
264    }
265    let Some(previous_state) = previous_state else {
266        return Ok(());
267    };
268    let Some(incoming_state) = incoming_state else {
269        return Err(SessionStoreError::InvalidTranscriptRewrite {
270            id: incoming.id().clone(),
271            reason: "incoming append-only save would erase retained transcript history state"
272                .to_string(),
273        });
274    };
275    let previous_commits = previous_state.commits.as_slice();
276    let incoming_commits = incoming_state.commits.as_slice();
277    if incoming_commits != previous_commits {
278        return Err(SessionStoreError::InvalidTranscriptRewrite {
279            id: incoming.id().clone(),
280            reason: "incoming append-only save would change retained transcript rewrite commits"
281                .to_string(),
282        });
283    }
284    let retained_revisions_preserved =
285        transcript_revision_bodies_preserved(previous_state, incoming_state)?;
286    if retained_revisions_preserved
287        && incoming_state.revisions.len() == previous_state.revisions.len()
288        && incoming_state.head == previous_state.head
289    {
290        return Ok(());
291    }
292    if incoming_state.revisions.len() != previous_state.revisions.len() + 1
293        || !retained_revisions_preserved
294    {
295        return Err(SessionStoreError::InvalidTranscriptRewrite {
296            id: incoming.id().clone(),
297            reason: "incoming append-only save would change retained transcript revision graph"
298                .to_string(),
299        });
300    }
301    let incoming_revision =
302        transcript_messages_digest(incoming.messages()).map_err(SessionStoreError::from)?;
303    let previous_revision =
304        transcript_messages_digest(previous.messages()).map_err(SessionStoreError::from)?;
305    if previous_state.head != previous_revision {
306        return Err(SessionStoreError::InvalidTranscriptRewrite {
307            id: incoming.id().clone(),
308            reason: "previous transcript history head does not match persisted message digest"
309                .to_string(),
310        });
311    }
312    let added = &incoming_state.revisions[previous_state.revisions.len()];
313    if incoming_state.head != incoming_revision
314        || added.revision != incoming_revision
315        || added.parent_revision.as_deref() != Some(previous_state.head.as_str())
316        || transcript_messages_digest(&added.messages).map_err(SessionStoreError::from)?
317            != incoming_revision
318    {
319        return Err(SessionStoreError::InvalidTranscriptRewrite {
320            id: incoming.id().clone(),
321            reason: "incoming append-only save would add a transcript revision body that is not the current append"
322                .to_string(),
323        });
324    }
325    Ok(())
326}
327
328fn transcript_revision_bodies_preserved(
329    previous_state: &TranscriptHistoryState,
330    incoming_state: &TranscriptHistoryState,
331) -> Result<bool, SessionStoreError> {
332    if incoming_state.revisions.len() < previous_state.revisions.len() {
333        return Ok(false);
334    }
335    previous_state
336        .revisions
337        .iter()
338        .zip(incoming_state.revisions.iter())
339        .map(|(previous, incoming)| {
340            Ok(previous.revision == incoming.revision
341                && previous.parent_revision == incoming.parent_revision
342                && previous.created_at == incoming.created_at
343                && transcript_messages_digest(&previous.messages)
344                    .map_err(SessionStoreError::from)?
345                    == transcript_messages_digest(&incoming.messages)
346                        .map_err(SessionStoreError::from)?)
347        })
348        .try_fold(true, |acc, preserved| {
349            preserved.map(|preserved| acc && preserved)
350        })
351}
352
353fn validate_rewrite_save_retains_previous_commits(
354    incoming: &Session,
355    previous: &Session,
356    incoming_state: &TranscriptHistoryState,
357) -> Result<(), SessionStoreError> {
358    let previous_state = previous.transcript_history_state().map_err(|err| {
359        SessionStoreError::InvalidTranscriptRewrite {
360            id: incoming.id().clone(),
361            reason: format!("previous transcript history state is malformed: {err}"),
362        }
363    })?;
364    let Some(previous_state) = previous_state.as_ref() else {
365        return Ok(());
366    };
367    if incoming_state.commits.len() < previous_state.commits.len()
368        || incoming_state.commits[..previous_state.commits.len()] != previous_state.commits
369    {
370        return Err(SessionStoreError::InvalidTranscriptRewrite {
371            id: incoming.id().clone(),
372            reason: "incoming rewrite save would drop retained transcript rewrite commits"
373                .to_string(),
374        });
375    }
376    Ok(())
377}
378
379/// Validate that an authoritative projection write still targets the row that
380/// the caller proved continuity against.
381pub fn authoritative_projection_current_revision_guard(
382    incoming: &Session,
383    previous: Option<&Session>,
384    expected_current_revision: Option<&str>,
385) -> Result<(), SessionStoreError> {
386    let previous_token = previous.map(session_projection_cas_token).transpose()?;
387    if previous_token.as_deref() == expected_current_revision {
388        return Ok(());
389    }
390    let incoming_revision =
391        transcript_messages_digest(incoming.messages()).map_err(SessionStoreError::from)?;
392    Err(SessionStoreError::TranscriptContinuityViolation {
393        id: incoming.id().clone(),
394        previous_revision: previous_token.unwrap_or_else(|| "<missing>".to_string()),
395        incoming_revision,
396        reason: format!(
397            "authoritative projection expected persisted projection token {}, but current row has diverged",
398            expected_current_revision.unwrap_or("<missing>")
399        ),
400    })
401}
402
403fn incoming_preserves_conversation_tail_with_system_context_append(
404    incoming: &Session,
405    previous: &Session,
406) -> Result<bool, SessionStoreError> {
407    messages_preserve_conversation_tail_with_system_context_append(
408        incoming.messages(),
409        previous.messages(),
410    )
411}
412
413fn messages_preserve_conversation_tail_with_system_context_append(
414    incoming: &[Message],
415    previous: &[Message],
416) -> Result<bool, SessionStoreError> {
417    let (previous_system, previous_tail) = split_single_leading_system(previous);
418    let (incoming_system, incoming_tail) = split_single_leading_system(incoming);
419    let Some(incoming_system) = incoming_system else {
420        return Ok(false);
421    };
422    if !system_context_is_append(previous_system, incoming_system)? {
423        return Ok(false);
424    }
425    if incoming_tail.len() < previous_tail.len() {
426        return Ok(false);
427    }
428    let previous_tail_revision =
429        transcript_messages_digest(previous_tail).map_err(SessionStoreError::from)?;
430    let incoming_tail_prefix_revision =
431        transcript_messages_digest(&incoming_tail[..previous_tail.len()])
432            .map_err(SessionStoreError::from)?;
433    Ok(previous_tail_revision == incoming_tail_prefix_revision)
434}
435
436fn split_single_leading_system(messages: &[Message]) -> (Option<&SystemMessage>, &[Message]) {
437    match messages.first() {
438        Some(Message::System(system)) => (Some(system), &messages[1..]),
439        _ => (None, messages),
440    }
441}
442
443/// Decide whether `incoming` is a continuation of `previous` produced by a
444/// runtime system-context append.
445///
446/// The structural part — identical content, or `incoming = previous +
447/// separator + suffix` — is a transcript-continuity proof (content equality of
448/// the retained prefix), not classification. The SEMANTIC append-admission
449/// verdict ("is this incoming persisted prompt an admissible
450/// runtime-context-append continuation of the persisted one") is owned by the
451/// canonical [`SessionDocumentMachine`] — the same machine the staging path
452/// already drives for the four-way append disposition — not a handwritten shell
453/// reducer. This function extracts only the pure structural observations plus
454/// the typed [`SystemPromptMutationKind`] runtime-context-append marker, drives
455/// the machine's `ResolveSystemContextPersistAppendAdmission` input, and mirrors
456/// the emitted verdict (`Admit` -> `true`, `Reject` -> `false`). It fails closed
457/// if the machine refuses or emits no verdict.
458fn system_context_is_append(
459    previous: Option<&SystemMessage>,
460    incoming: &SystemMessage,
461) -> Result<bool, SessionStoreError> {
462    // Pure structural observations the shell computes; NO semantic decision.
463    let has_previous = previous.is_some();
464    let content_identical = previous.is_some_and(|previous| incoming.content == previous.content);
465    let content_extends_previous =
466        previous.is_some_and(|previous| incoming.content.starts_with(&previous.content));
467    let appended_starts_with_separator = previous.is_some_and(|previous| {
468        incoming
469            .content
470            .get(previous.content.len()..)
471            .is_some_and(|appended| appended.starts_with(SYSTEM_CONTEXT_SEPARATOR))
472    });
473    let incoming_is_runtime_context_append = incoming.mutation_kind.is_runtime_context_append();
474
475    let mut authority = crate::session_document::SessionDocumentMachineAuthority::new();
476    let effects = authority
477        .resolve_system_context_persist_append_admission(
478            has_previous,
479            content_identical,
480            content_extends_previous,
481            appended_starts_with_separator,
482            incoming_is_runtime_context_append,
483        )
484        .map_err(|err| {
485            SessionStoreError::Internal(format!(
486                "session document authority refused persist-time system-context append admission: {err}"
487            ))
488        })?;
489    effects
490        .into_iter()
491        .find_map(|effect| match effect {
492            crate::session_document::SessionDocumentEffect::SystemContextPersistAppendAdmissionResolved {
493                admission,
494            } => Some(matches!(
495                admission,
496                crate::session_document::SystemContextPersistAppendAdmission::Admit
497            )),
498            _ => None,
499        })
500        .ok_or_else(|| {
501            SessionStoreError::Internal(
502                "session document authority emitted no persist-time system-context append admission verdict".to_string(),
503            )
504        })
505}
506
507fn incoming_preserves_prefix_after_transient_notice_cleanup(
508    incoming: &Session,
509    previous: &Session,
510) -> Result<bool, SessionStoreError> {
511    let previous_without_transient = previous
512        .messages()
513        .iter()
514        .filter(|message| !is_transient_system_notice(message))
515        .cloned()
516        .collect::<Vec<_>>();
517    if previous_without_transient.len() == previous.messages().len()
518        || incoming.messages().len() < previous_without_transient.len()
519    {
520        return Ok(false);
521    }
522    let previous_revision =
523        transcript_messages_digest(&previous_without_transient).map_err(SessionStoreError::from)?;
524    let incoming_prefix_revision =
525        transcript_messages_digest(&incoming.messages()[..previous_without_transient.len()])
526            .map_err(SessionStoreError::from)?;
527    Ok(previous_revision == incoming_prefix_revision)
528}
529
530fn is_transient_system_notice(message: &Message) -> bool {
531    let Message::SystemNotice(notice) = message else {
532        return false;
533    };
534    notice.kind == crate::types::SystemNoticeKind::McpPending
535        && notice.blocks.iter().all(|block| {
536            matches!(
537                block,
538                crate::types::SystemNoticeBlock::Mcp {
539                    persisted: false,
540                    ..
541                }
542            )
543        })
544}
545
546/// Validate a runtime run-boundary snapshot.
547///
548/// Runtime turns normally append to the transcript, but core-owned turn
549/// mechanics such as compaction can also produce an audited internal rewrite.
550/// Runtime stores use this guard inside their atomic boundary commit: plain
551/// replacement is rejected, while an incoming snapshot carrying a typed rewrite
552/// commit from the currently persisted head is accepted through the same
553/// rewrite validator as [`SessionStore::save_transcript_rewrite`].
554pub fn run_boundary_snapshot_save_guard(
555    incoming: &Session,
556    previous: Option<&Session>,
557) -> Result<(), SessionStoreError> {
558    match append_only_save_guard(incoming, previous) {
559        Ok(()) => Ok(()),
560        Err(append_error) => {
561            if run_boundary_commitless_history_projection_save_guard(incoming, previous)? {
562                return Ok(());
563            }
564            let Some(previous) = previous else {
565                return Err(append_error);
566            };
567            let incoming_revision =
568                transcript_messages_digest(incoming.messages()).map_err(SessionStoreError::from)?;
569            let Some(state) = incoming.transcript_history_state().map_err(|err| {
570                SessionStoreError::InvalidTranscriptRewrite {
571                    id: incoming.id().clone(),
572                    reason: format!("incoming transcript history state is malformed: {err}"),
573                }
574            })?
575            else {
576                return Err(append_error);
577            };
578            validate_rewrite_save_retains_previous_commits(incoming, previous, &state)?;
579            let commits = find_transcript_rewrite_commit_chain_extending_session(
580                &state,
581                previous,
582                &incoming_revision,
583            )?;
584            if commits.is_none()
585                && run_boundary_context_summary_tail_projection_save_guard(
586                    incoming, previous, &state,
587                )?
588            {
589                return Ok(());
590            }
591            let Some(commits) = commits else {
592                return Err(append_error);
593            };
594            let Some(commit) = commits.first() else {
595                if state.commits.is_empty() {
596                    return Err(append_error);
597                }
598                for commit in &state.commits {
599                    validate_transcript_rewrite_commit_bodies(incoming, commit, &state)?;
600                }
601                return Ok(());
602            };
603            transcript_rewrite_bridge_save_guard(incoming, commit, &state, &incoming_revision)?;
604            for commit in commits.iter().skip(1) {
605                validate_transcript_rewrite_commit_bodies(incoming, commit, &state)?;
606            }
607            Ok(())
608        }
609    }
610}
611
612fn run_boundary_commitless_history_projection_save_guard(
613    incoming: &Session,
614    previous: Option<&Session>,
615) -> Result<bool, SessionStoreError> {
616    let Some(state) = incoming.transcript_history_state().map_err(|err| {
617        SessionStoreError::InvalidTranscriptRewrite {
618            id: incoming.id().clone(),
619            reason: format!("incoming transcript history state is malformed: {err}"),
620        }
621    })?
622    else {
623        return Ok(false);
624    };
625    if !state.commits.is_empty() {
626        return Ok(false);
627    }
628
629    let incoming_revision =
630        transcript_messages_digest(incoming.messages()).map_err(SessionStoreError::from)?;
631    if state.head != incoming_revision
632        || !state
633            .revisions
634            .iter()
635            .any(|body| body.revision == incoming_revision)
636    {
637        return Ok(false);
638    }
639
640    let mut projection_without_history = incoming.clone();
641    projection_without_history.clear_transcript_history_state();
642    if append_only_save_guard(&projection_without_history, previous).is_err() {
643        return Ok(false);
644    }
645
646    let Some(previous) = previous else {
647        return Ok(state.commits.is_empty());
648    };
649    if previous
650        .transcript_history_state()
651        .map_err(|err| SessionStoreError::InvalidTranscriptRewrite {
652            id: incoming.id().clone(),
653            reason: format!("previous transcript history state is malformed: {err}"),
654        })?
655        .is_some()
656    {
657        return Ok(false);
658    }
659
660    let previous_revision =
661        transcript_messages_digest(previous.messages()).map_err(SessionStoreError::from)?;
662    Ok(incoming_revision == previous_revision
663        || transcript_history_revision_extends(&state, &incoming_revision, &previous_revision))
664}
665
666fn run_boundary_context_summary_tail_projection_save_guard(
667    incoming: &Session,
668    previous: &Session,
669    state: &TranscriptHistoryState,
670) -> Result<bool, SessionStoreError> {
671    if state.commits.is_empty() {
672        return Ok(false);
673    }
674    incoming
675        .validate_transcript_history_state()
676        .map_err(|err| SessionStoreError::InvalidTranscriptRewrite {
677            id: incoming.id().clone(),
678            reason: format!("incoming transcript history state is malformed: {err}"),
679        })?;
680
681    let (incoming_system, incoming_tail) = match incoming.messages().split_first() {
682        Some((Message::System(system), tail)) => (Some(system), tail),
683        _ => (None, incoming.messages()),
684    };
685    let (previous_system, previous_tail) = match previous.messages().split_first() {
686        Some((Message::System(system), tail)) => (Some(system), tail),
687        _ => (None, previous.messages()),
688    };
689    if incoming_system.is_some() != previous_system.is_some()
690        || incoming_tail.len() <= previous_tail.len()
691    {
692        return Ok(false);
693    }
694    let Some(Message::User(summary)) = incoming_tail.first() else {
695        return Ok(false);
696    };
697    // Typed marker, not content classification: the runtime compaction producer
698    // stamps the rebuilt-transcript boundary message with the
699    // `CompactionSummary` transcript role. The save-guard admits the divergent
700    // rewrite parent only when that typed fact is present.
701    if !summary.transcript_role.is_compaction_summary() {
702        return Ok(false);
703    }
704
705    let retained_end = 1 + previous_tail.len();
706    let retained = &incoming_tail[1..retained_end];
707    let retained_revision =
708        transcript_messages_digest(retained).map_err(SessionStoreError::from)?;
709    let previous_revision =
710        transcript_messages_digest(previous_tail).map_err(SessionStoreError::from)?;
711    if retained_revision != previous_revision {
712        return Ok(false);
713    }
714
715    for commit in &state.commits {
716        validate_transcript_rewrite_commit_bodies(incoming, commit, state)?;
717    }
718    Ok(true)
719}
720
721/// Find the rewrite commit that authorizes replacing `previous_revision`,
722/// allowing the incoming head to extend the rewrite via normal append bodies.
723pub fn find_transcript_rewrite_commit_extending<'a>(
724    state: &'a TranscriptHistoryState,
725    previous_revision: &str,
726    incoming_revision: &str,
727) -> Option<&'a TranscriptRewriteCommit> {
728    find_transcript_rewrite_commit_chain_extending(state, previous_revision, incoming_revision)
729        .and_then(|commits| commits.into_iter().next())
730}
731
732/// Find the contiguous rewrite commits that connect `previous_revision` to the
733/// incoming head, allowing normal append bodies after the final rewrite.
734pub fn find_transcript_rewrite_commit_chain_extending<'a>(
735    state: &'a TranscriptHistoryState,
736    previous_revision: &str,
737    incoming_revision: &str,
738) -> Option<Vec<&'a TranscriptRewriteCommit>> {
739    let mut chain = Vec::new();
740    let mut cursor = previous_revision;
741    let mut visited = std::collections::BTreeSet::new();
742    loop {
743        if incoming_revision == cursor {
744            return Some(chain);
745        }
746        if !visited.insert(cursor.to_string()) {
747            return None;
748        }
749        let commit = state.commits.iter().find(|commit| {
750            (commit.parent_revision == cursor
751                || transcript_history_revision_extends(state, &commit.parent_revision, cursor))
752                && transcript_history_revision_extends(state, incoming_revision, &commit.revision)
753        });
754        let Some(commit) = commit else {
755            return transcript_history_revision_extends(state, incoming_revision, cursor)
756                .then_some(chain);
757        };
758        cursor = &commit.revision;
759        chain.push(commit);
760    }
761}
762
763/// Find a rewrite chain whose first parent may be an append-only continuation
764/// of a previously persisted snapshot.
765///
766/// Runtime-backed sessions can append messages in the runtime store before a
767/// core-owned compaction rewrite is checkpointed to the compatibility
768/// `SessionStore`. In that case the first rewrite commit's parent revision is
769/// not equal to the persisted row's digest, but its retained parent body proves
770/// a normal append path from that persisted row.
771pub fn find_transcript_rewrite_commit_chain_extending_session<'a>(
772    state: &'a TranscriptHistoryState,
773    previous: &Session,
774    incoming_revision: &str,
775) -> Result<Option<Vec<&'a TranscriptRewriteCommit>>, SessionStoreError> {
776    let previous_revision =
777        transcript_messages_digest(previous.messages()).map_err(SessionStoreError::from)?;
778    let mut chain = Vec::new();
779    let mut cursor = previous_revision.as_str();
780    let mut visited = std::collections::BTreeSet::new();
781    loop {
782        if incoming_revision == cursor {
783            return Ok(Some(chain));
784        }
785        if !visited.insert(cursor.to_string()) {
786            return Ok(None);
787        }
788
789        let Some(cursor_messages) = transcript_history_messages_for_revision(
790            state,
791            cursor,
792            &previous_revision,
793            previous.messages(),
794        ) else {
795            return Ok(None);
796        };
797        let mut selected = None;
798        for commit in &state.commits {
799            if !transcript_history_revision_extends(state, incoming_revision, &commit.revision) {
800                continue;
801            }
802            let parent_extends_cursor = commit.parent_revision == cursor
803                || revision_body_preserves_append_continuation_prefix(
804                    state,
805                    &commit.parent_revision,
806                    cursor_messages,
807                    cursor,
808                )?;
809            if parent_extends_cursor {
810                selected = Some(commit);
811                break;
812            }
813        }
814
815        let Some(commit) = selected else {
816            if revision_body_preserves_append_continuation_prefix(
817                state,
818                incoming_revision,
819                cursor_messages,
820                cursor,
821            )? {
822                return Ok(Some(chain));
823            }
824            return Ok(None);
825        };
826        cursor = &commit.revision;
827        chain.push(commit);
828    }
829}
830
831fn transcript_history_messages_for_revision<'a>(
832    state: &'a TranscriptHistoryState,
833    revision: &str,
834    previous_revision: &str,
835    previous_messages: &'a [Message],
836) -> Option<&'a [Message]> {
837    if revision == previous_revision {
838        return Some(previous_messages);
839    }
840    state
841        .revisions
842        .iter()
843        .find(|body| body.revision == revision)
844        .map(|body| body.messages.as_slice())
845}
846
847fn revision_body_preserves_append_continuation_prefix(
848    state: &TranscriptHistoryState,
849    revision: &str,
850    ancestor_messages: &[Message],
851    ancestor_revision: &str,
852) -> Result<bool, SessionStoreError> {
853    if revision == ancestor_revision {
854        return Ok(true);
855    }
856    let Some(body) = state
857        .revisions
858        .iter()
859        .find(|body| body.revision == revision)
860    else {
861        return Ok(false);
862    };
863    if body.messages.len() >= ancestor_messages.len() {
864        let prefix_revision = transcript_messages_digest(&body.messages[..ancestor_messages.len()])
865            .map_err(SessionStoreError::from)?;
866        if prefix_revision == ancestor_revision {
867            return Ok(true);
868        }
869    }
870    Ok(
871        messages_preserve_conversation_tail_with_system_context_append(
872            &body.messages,
873            ancestor_messages,
874        )? || messages_preserve_tail_after_leading_system_refresh(
875            &body.messages,
876            ancestor_messages,
877        )?,
878    )
879}
880
881fn messages_preserve_tail_after_leading_system_refresh(
882    incoming: &[Message],
883    previous: &[Message],
884) -> Result<bool, SessionStoreError> {
885    let (Some(Message::System(_)), Some(Message::System(_))) = (incoming.first(), previous.first())
886    else {
887        return Ok(false);
888    };
889    if incoming.len() < previous.len() {
890        return Ok(false);
891    }
892    let previous_tail_len = previous.len().saturating_sub(1);
893    if previous_tail_len == 0 {
894        return Ok(true);
895    }
896    let previous_tail_revision =
897        transcript_messages_digest(&previous[1..]).map_err(SessionStoreError::from)?;
898    let incoming_tail = &incoming[1..];
899    if incoming_tail.len() < previous_tail_len {
900        return Ok(false);
901    }
902    let incoming_tail_prefix_revision =
903        transcript_messages_digest(&incoming_tail[..previous_tail_len])
904            .map_err(SessionStoreError::from)?;
905    Ok(incoming_tail_prefix_revision == previous_tail_revision)
906}
907
908fn transcript_history_revision_extends(
909    state: &TranscriptHistoryState,
910    descendant: &str,
911    ancestor: &str,
912) -> bool {
913    if descendant == ancestor {
914        return true;
915    }
916    let mut cursor = descendant;
917    while let Some(body) = state.revisions.iter().find(|body| body.revision == cursor) {
918        let Some(parent) = body.parent_revision.as_deref() else {
919            return false;
920        };
921        if parent == ancestor {
922            return true;
923        }
924        cursor = parent;
925    }
926    false
927}
928
929fn transcript_rewrite_bridge_save_guard(
930    incoming: &Session,
931    commit: &TranscriptRewriteCommit,
932    incoming_state: &TranscriptHistoryState,
933    incoming_message_digest: &str,
934) -> Result<(), SessionStoreError> {
935    validate_transcript_rewrite_commit_bodies(incoming, commit, incoming_state)?;
936    if incoming_state.head != incoming_message_digest {
937        return Err(SessionStoreError::InvalidTranscriptRewrite {
938            id: incoming.id().clone(),
939            reason: format!(
940                "incoming transcript graph head {} does not match current message digest {incoming_message_digest}",
941                incoming_state.head
942            ),
943        });
944    }
945    if !transcript_history_revision_extends(
946        incoming_state,
947        incoming_message_digest,
948        &commit.revision,
949    ) {
950        return Err(SessionStoreError::InvalidTranscriptRewrite {
951            id: incoming.id().clone(),
952            reason: format!(
953                "incoming transcript head {incoming_message_digest} does not extend rewrite revision {}",
954                commit.revision
955            ),
956        });
957    }
958    Ok(())
959}
960
961/// Validate that a same-session shrink/replace save is backed by a typed
962/// transcript rewrite commit.
963pub fn transcript_rewrite_save_guard(
964    incoming: &Session,
965    previous: Option<&Session>,
966    commit: &TranscriptRewriteCommit,
967) -> Result<(), SessionStoreError> {
968    incoming
969        .validate_transcript_history_state()
970        .map_err(|err| SessionStoreError::InvalidTranscriptRewrite {
971            id: incoming.id().clone(),
972            reason: format!("incoming transcript history state is malformed: {err}"),
973        })?;
974    let Some(previous) = previous else {
975        return Err(SessionStoreError::InvalidTranscriptRewrite {
976            id: incoming.id().clone(),
977            reason: "rewrite target has no previously persisted session".to_string(),
978        });
979    };
980    if incoming.id() != previous.id() {
981        return Err(SessionStoreError::InvalidTranscriptRewrite {
982            id: incoming.id().clone(),
983            reason: format!(
984                "incoming session id {} differs from previous session id {}",
985                incoming.id(),
986                previous.id()
987            ),
988        });
989    }
990    let previous_revision = previous.transcript_revision().map_err(|err| {
991        SessionStoreError::InvalidTranscriptRewrite {
992            id: incoming.id().clone(),
993            reason: format!("previous transcript revision is malformed: {err}"),
994        }
995    })?;
996    if previous_revision != commit.parent_revision {
997        return Err(SessionStoreError::TranscriptRevisionConflict {
998            id: incoming.id().clone(),
999            expected: commit.parent_revision.clone(),
1000            actual: previous_revision,
1001        });
1002    }
1003    let previous_message_digest =
1004        transcript_messages_digest(previous.messages()).map_err(|err| {
1005            SessionStoreError::InvalidTranscriptRewrite {
1006                id: incoming.id().clone(),
1007                reason: format!("previous current transcript is not digestible: {err}"),
1008            }
1009        })?;
1010    if previous_message_digest != commit.parent_revision {
1011        return Err(SessionStoreError::InvalidTranscriptRewrite {
1012            id: incoming.id().clone(),
1013            reason: format!(
1014                "previous current transcript digest {previous_message_digest} does not match commit parent {}",
1015                commit.parent_revision
1016            ),
1017        });
1018    }
1019    let incoming_revision = incoming.transcript_revision().map_err(|err| {
1020        SessionStoreError::InvalidTranscriptRewrite {
1021            id: incoming.id().clone(),
1022            reason: format!("incoming transcript revision is malformed: {err}"),
1023        }
1024    })?;
1025    if incoming_revision != commit.revision {
1026        return Err(SessionStoreError::InvalidTranscriptRewrite {
1027            id: incoming.id().clone(),
1028            reason: format!(
1029                "incoming transcript revision {incoming_revision} does not match commit revision {}",
1030                commit.revision
1031            ),
1032        });
1033    }
1034    let incoming_message_digest =
1035        transcript_messages_digest(incoming.messages()).map_err(|err| {
1036            SessionStoreError::InvalidTranscriptRewrite {
1037                id: incoming.id().clone(),
1038                reason: format!("incoming current transcript is not digestible: {err}"),
1039            }
1040        })?;
1041    if incoming_message_digest != commit.revision {
1042        return Err(SessionStoreError::InvalidTranscriptRewrite {
1043            id: incoming.id().clone(),
1044            reason: format!(
1045                "incoming current transcript digest {incoming_message_digest} does not match commit revision {}",
1046                commit.revision
1047            ),
1048        });
1049    }
1050    let Some(incoming_state) = incoming.transcript_history_state().map_err(|err| {
1051        SessionStoreError::InvalidTranscriptRewrite {
1052            id: incoming.id().clone(),
1053            reason: format!("incoming transcript history state is malformed: {err}"),
1054        }
1055    })?
1056    else {
1057        return Err(SessionStoreError::InvalidTranscriptRewrite {
1058            id: incoming.id().clone(),
1059            reason: "incoming rewrite did not persist a transcript revision graph".to_string(),
1060        });
1061    };
1062    validate_rewrite_save_retains_previous_commits(incoming, previous, &incoming_state)?;
1063    validate_transcript_rewrite_commit_bodies(incoming, commit, &incoming_state)
1064}
1065
1066fn validate_transcript_rewrite_commit_bodies(
1067    incoming: &Session,
1068    commit: &TranscriptRewriteCommit,
1069    incoming_state: &TranscriptHistoryState,
1070) -> Result<(), SessionStoreError> {
1071    if !incoming_state
1072        .commits
1073        .iter()
1074        .any(|persisted| persisted == commit)
1075    {
1076        return Err(SessionStoreError::InvalidTranscriptRewrite {
1077            id: incoming.id().clone(),
1078            reason: format!(
1079                "incoming rewrite did not persist the rewrite commit in the transcript graph (wanted {} -> {}, graph commits: {:?})",
1080                commit.parent_revision,
1081                commit.revision,
1082                incoming_state
1083                    .commits
1084                    .iter()
1085                    .map(|commit| (&commit.parent_revision, &commit.revision))
1086                    .collect::<Vec<_>>()
1087            ),
1088        });
1089    }
1090    let Some(parent_body) = incoming_state
1091        .revisions
1092        .iter()
1093        .find(|body| body.revision == commit.parent_revision)
1094    else {
1095        return Err(SessionStoreError::InvalidTranscriptRewrite {
1096            id: incoming.id().clone(),
1097            reason: format!(
1098                "incoming rewrite omitted parent revision body {}",
1099                commit.parent_revision
1100            ),
1101        });
1102    };
1103    let Some(revision_body) = incoming_state
1104        .revisions
1105        .iter()
1106        .find(|body| body.revision == commit.revision)
1107    else {
1108        return Err(SessionStoreError::InvalidTranscriptRewrite {
1109            id: incoming.id().clone(),
1110            reason: format!(
1111                "incoming rewrite omitted new revision body {}",
1112                commit.revision
1113            ),
1114        });
1115    };
1116    if parent_body.messages.len() != commit.messages_before
1117        || revision_body.messages.len() != commit.messages_after
1118    {
1119        return Err(SessionStoreError::InvalidTranscriptRewrite {
1120            id: incoming.id().clone(),
1121            reason: format!(
1122                "commit message counts {} -> {} do not match persisted rewrite {} -> {}",
1123                commit.messages_before,
1124                commit.messages_after,
1125                parent_body.messages.len(),
1126                revision_body.messages.len()
1127            ),
1128        });
1129    }
1130    let parent_body_revision =
1131        transcript_messages_digest(&parent_body.messages).map_err(|err| {
1132            SessionStoreError::InvalidTranscriptRewrite {
1133                id: incoming.id().clone(),
1134                reason: format!("parent revision body is not digestible: {err}"),
1135            }
1136        })?;
1137    if parent_body_revision != commit.parent_revision {
1138        return Err(SessionStoreError::InvalidTranscriptRewrite {
1139            id: incoming.id().clone(),
1140            reason: format!(
1141                "parent revision body digest {parent_body_revision} does not match commit parent {}",
1142                commit.parent_revision
1143            ),
1144        });
1145    }
1146    let (start, end) = match &commit.selection {
1147        TranscriptRewriteSelection::MessageRange { start, end } => (*start, *end),
1148    };
1149    if start > end || end > parent_body.messages.len() {
1150        return Err(SessionStoreError::InvalidTranscriptRewrite {
1151            id: incoming.id().clone(),
1152            reason: format!(
1153                "commit selection {start}..{end} is invalid for parent revision with {} messages",
1154                parent_body.messages.len()
1155            ),
1156        });
1157    }
1158    let original_span_digest = transcript_messages_digest(&parent_body.messages[start..end])
1159        .map_err(|err| SessionStoreError::InvalidTranscriptRewrite {
1160            id: incoming.id().clone(),
1161            reason: format!("original span body is not digestible: {err}"),
1162        })?;
1163    if original_span_digest != commit.original_span_digest {
1164        return Err(SessionStoreError::InvalidTranscriptRewrite {
1165            id: incoming.id().clone(),
1166            reason: format!(
1167                "original span digest {original_span_digest} does not match commit digest {}",
1168                commit.original_span_digest
1169            ),
1170        });
1171    }
1172    let revision_body_digest =
1173        transcript_messages_digest(&revision_body.messages).map_err(|err| {
1174            SessionStoreError::InvalidTranscriptRewrite {
1175                id: incoming.id().clone(),
1176                reason: format!("new revision body is not digestible: {err}"),
1177            }
1178        })?;
1179    if revision_body_digest != commit.revision {
1180        return Err(SessionStoreError::InvalidTranscriptRewrite {
1181            id: incoming.id().clone(),
1182            reason: format!(
1183                "new revision body digest {revision_body_digest} does not match commit revision {}",
1184                commit.revision
1185            ),
1186        });
1187    }
1188    let removed_len = end - start;
1189    let retained_len = commit
1190        .messages_before
1191        .checked_sub(removed_len)
1192        .ok_or_else(|| SessionStoreError::InvalidTranscriptRewrite {
1193            id: incoming.id().clone(),
1194            reason: "commit removed more messages than it recorded before rewrite".to_string(),
1195        })?;
1196    let replacement_len = commit
1197        .messages_after
1198        .checked_sub(retained_len)
1199        .ok_or_else(|| SessionStoreError::InvalidTranscriptRewrite {
1200            id: incoming.id().clone(),
1201            reason: "commit message counts cannot describe a replacement span".to_string(),
1202        })?;
1203    let replacement_end = start.checked_add(replacement_len).ok_or_else(|| {
1204        SessionStoreError::InvalidTranscriptRewrite {
1205            id: incoming.id().clone(),
1206            reason: "replacement span end overflowed".to_string(),
1207        }
1208    })?;
1209    if replacement_end > revision_body.messages.len() {
1210        return Err(SessionStoreError::InvalidTranscriptRewrite {
1211            id: incoming.id().clone(),
1212            reason: format!(
1213                "replacement span {start}..{replacement_end} is invalid for revision with {} messages",
1214                revision_body.messages.len()
1215            ),
1216        });
1217    }
1218    let parent_prefix_digest =
1219        transcript_messages_digest(&parent_body.messages[..start]).map_err(|err| {
1220            SessionStoreError::InvalidTranscriptRewrite {
1221                id: incoming.id().clone(),
1222                reason: format!("parent prefix body is not digestible: {err}"),
1223            }
1224        })?;
1225    let revision_prefix_digest = transcript_messages_digest(&revision_body.messages[..start])
1226        .map_err(|err| SessionStoreError::InvalidTranscriptRewrite {
1227            id: incoming.id().clone(),
1228            reason: format!("revision prefix body is not digestible: {err}"),
1229        })?;
1230    if parent_prefix_digest != revision_prefix_digest {
1231        return Err(SessionStoreError::InvalidTranscriptRewrite {
1232            id: incoming.id().clone(),
1233            reason: "rewrite revision changed messages before the selected span".to_string(),
1234        });
1235    }
1236    let parent_suffix_digest =
1237        transcript_messages_digest(&parent_body.messages[end..]).map_err(|err| {
1238            SessionStoreError::InvalidTranscriptRewrite {
1239                id: incoming.id().clone(),
1240                reason: format!("parent suffix body is not digestible: {err}"),
1241            }
1242        })?;
1243    let revision_suffix_digest =
1244        transcript_messages_digest(&revision_body.messages[replacement_end..]).map_err(|err| {
1245            SessionStoreError::InvalidTranscriptRewrite {
1246                id: incoming.id().clone(),
1247                reason: format!("revision suffix body is not digestible: {err}"),
1248            }
1249        })?;
1250    if parent_suffix_digest != revision_suffix_digest {
1251        return Err(SessionStoreError::InvalidTranscriptRewrite {
1252            id: incoming.id().clone(),
1253            reason: "rewrite revision changed messages after the selected span".to_string(),
1254        });
1255    }
1256    let replacement_digest = transcript_messages_digest(
1257        &revision_body.messages[start..replacement_end],
1258    )
1259    .map_err(|err| SessionStoreError::InvalidTranscriptRewrite {
1260        id: incoming.id().clone(),
1261        reason: format!("replacement span body is not digestible: {err}"),
1262    })?;
1263    if replacement_digest != commit.replacement_digest {
1264        return Err(SessionStoreError::InvalidTranscriptRewrite {
1265            id: incoming.id().clone(),
1266            reason: format!(
1267                "replacement span digest {replacement_digest} does not match commit digest {}",
1268                commit.replacement_digest
1269            ),
1270        });
1271    }
1272    Ok(())
1273}
1274
1275impl From<serde_json::Error> for SessionStoreError {
1276    fn from(e: serde_json::Error) -> Self {
1277        Self::Serialization(e.to_string())
1278    }
1279}
1280
1281/// Abstraction over session storage backends.
1282///
1283/// All methods take `&self` — implementations must handle interior mutability.
1284/// Object-safe: consumed as `Arc<dyn SessionStore>` throughout the system.
1285///
1286/// # Append-only contract (F1 closure, wave-c C-H1)
1287///
1288/// The snapshot written by [`save`](Self::save) is a **projection of the
1289/// canonical event log** ([`crate::session_store`] doc: "snapshot =
1290/// projection"). Implementations that persist across calls MUST enforce
1291/// that the message vector stored for a given `SessionId` is monotonically
1292/// non-shrinking — a subsequent `save()` for the same id must not have a
1293/// smaller `messages().len()` than the previously persisted row.
1294///
1295/// Callers that need to produce a session with a shorter history must go
1296/// through [`Session::fork_at`], which rotates `SessionId` — a fork is a
1297/// new identity on a new event log, not a same-session truncation.
1298///
1299/// Backends are encouraged to assert this invariant in their `save`
1300/// implementation and return
1301/// [`SessionStoreError::MonotonicityViolation`] when a caller tries to
1302/// shrink a snapshot. The default implementations in `meerkat-store`
1303/// (`SqliteSessionStore`, `JsonlStore`, `MemoryStore`) all go through
1304/// the [`append_only_save_guard`] helper.
1305#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
1306#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
1307pub trait SessionStore: Send + Sync {
1308    /// Save a session (create or extend).
1309    ///
1310    /// Implementations MUST reject a save whose message history is
1311    /// shorter than the previously persisted row for the same `SessionId`
1312    /// — see the trait-level doc on the append-only contract.
1313    async fn save(&self, session: &Session) -> Result<(), SessionStoreError>;
1314
1315    /// Save a same-SessionId transcript rewrite.
1316    ///
1317    /// This is the only `SessionStore` path allowed to replace or shrink the
1318    /// current message projection. Implementations must validate `commit`
1319    /// against the previously persisted head before writing `session`.
1320    async fn save_transcript_rewrite(
1321        &self,
1322        session: &Session,
1323        commit: &TranscriptRewriteCommit,
1324    ) -> Result<(), SessionStoreError> {
1325        let _ = (session, commit);
1326        Err(SessionStoreError::Internal(
1327            "save_transcript_rewrite is not supported by this SessionStore".to_string(),
1328        ))
1329    }
1330
1331    /// Save a compatibility projection after a separate authority has already
1332    /// committed the session snapshot.
1333    ///
1334    /// This method is for runtime-backed services only: the runtime snapshot
1335    /// has already accepted the semantic mutation, and the `SessionStore` row is
1336    /// a rebuildable projection. Normal callers must use [`SessionStore::save`]
1337    /// or [`SessionStore::save_transcript_rewrite`] so the store boundary keeps
1338    /// enforcing append-only/CAS semantics.
1339    async fn save_authoritative_projection(
1340        &self,
1341        session: &Session,
1342    ) -> Result<(), SessionStoreError> {
1343        self.save(session).await
1344    }
1345
1346    /// Save an authoritative projection only if the persisted row is still the
1347    /// revision that the caller already validated.
1348    async fn save_authoritative_projection_if_current_revision(
1349        &self,
1350        session: &Session,
1351        expected_current_revision: Option<String>,
1352    ) -> Result<(), SessionStoreError> {
1353        let _ = (session, expected_current_revision);
1354        Err(SessionStoreError::Internal(
1355            "save_authoritative_projection_if_current_revision is not supported by this SessionStore"
1356                .to_string(),
1357        ))
1358    }
1359
1360    /// Load a session by ID.
1361    async fn load(&self, id: &SessionId) -> Result<Option<Session>, SessionStoreError>;
1362
1363    /// List sessions matching filter.
1364    async fn list(&self, filter: SessionFilter) -> Result<Vec<SessionMeta>, SessionStoreError>;
1365
1366    /// Delete a session.
1367    async fn delete(&self, id: &SessionId) -> Result<(), SessionStoreError>;
1368
1369    /// Delete a compatibility projection only if it is still the revision that
1370    /// the caller already validated as unsafe to expose.
1371    async fn delete_if_current_revision(
1372        &self,
1373        id: &SessionId,
1374        expected_current_revision: &str,
1375    ) -> Result<bool, SessionStoreError>;
1376
1377    /// Check if a session exists.
1378    async fn exists(&self, id: &SessionId) -> Result<bool, SessionStoreError> {
1379        Ok(self.load(id).await?.is_some())
1380    }
1381}
1382
1383#[cfg(test)]
1384mod tests {
1385    use super::*;
1386    use crate::types::{
1387        AssistantBlock, BlockAssistantMessage, StopReason, SystemMessage, SystemNoticeBlock,
1388        SystemNoticeKind, SystemNoticeMessage, UserMessage,
1389    };
1390
1391    /// FOLD C: the canonical SessionDocumentMachine — not a handwritten shell
1392    /// boolean reducer — owns the live-vs-durable session-document authority
1393    /// verdict, the precedence (archived > uncommitted transcript > runtime
1394    /// system-context > stored transcript-revision), and the typed reason. This
1395    /// drives the classifier directly and asserts every authority/reason outcome
1396    /// and the precedence ordering.
1397    #[test]
1398    #[allow(clippy::expect_used)]
1399    fn classify_live_session_authority_is_decided_by_machine() {
1400        use crate::session_document::{
1401            LiveSessionAuthorityKind, LiveSessionAuthorityReason, SessionDocumentEffect,
1402            SessionDocumentMachineAuthority,
1403        };
1404
1405        fn classify(
1406            stored_transcript_diverged: bool,
1407            live_has_uncommitted_transcript: bool,
1408            runtime_system_context_diverged: bool,
1409            stored_is_archived: bool,
1410        ) -> (LiveSessionAuthorityKind, LiveSessionAuthorityReason) {
1411            let mut authority = SessionDocumentMachineAuthority::new();
1412            let effects = authority
1413                .classify_live_session_authority(
1414                    stored_transcript_diverged,
1415                    live_has_uncommitted_transcript,
1416                    runtime_system_context_diverged,
1417                    stored_is_archived,
1418                )
1419                .expect("classifier must resolve a verdict");
1420            effects
1421                .iter()
1422                .find_map(|effect| match effect {
1423                    SessionDocumentEffect::LiveSessionAuthorityClassified { authority, reason } => {
1424                        Some((*authority, *reason))
1425                    }
1426                    _ => None,
1427                })
1428                .expect("classifier must emit a verdict")
1429        }
1430
1431        // All four false -> LiveAuthoritative.
1432        let (kind, _) = classify(false, false, false, false);
1433        assert_eq!(kind, LiveSessionAuthorityKind::LiveAuthoritative);
1434
1435        // Each divergence (in isolation) -> DurableAuthoritative with its reason.
1436        assert_eq!(
1437            classify(true, false, false, false),
1438            (
1439                LiveSessionAuthorityKind::DurableAuthoritative,
1440                LiveSessionAuthorityReason::StoredTranscriptRevisionDiverged
1441            ),
1442        );
1443        assert_eq!(
1444            classify(false, true, false, false),
1445            (
1446                LiveSessionAuthorityKind::DurableAuthoritative,
1447                LiveSessionAuthorityReason::LiveUncommittedTranscript
1448            ),
1449        );
1450        assert_eq!(
1451            classify(false, false, true, false),
1452            (
1453                LiveSessionAuthorityKind::DurableAuthoritative,
1454                LiveSessionAuthorityReason::RuntimeSystemContextDiverged
1455            ),
1456        );
1457        assert_eq!(
1458            classify(false, false, false, true),
1459            (
1460                LiveSessionAuthorityKind::DurableAuthoritative,
1461                LiveSessionAuthorityReason::StoredArchived
1462            ),
1463        );
1464
1465        // Precedence: archived > uncommitted > system-context > revision.
1466        // When ALL four diverge, archived wins.
1467        assert_eq!(
1468            classify(true, true, true, true),
1469            (
1470                LiveSessionAuthorityKind::DurableAuthoritative,
1471                LiveSessionAuthorityReason::StoredArchived
1472            ),
1473        );
1474        // Not archived, but uncommitted + system-context + revision -> uncommitted.
1475        assert_eq!(
1476            classify(true, true, true, false),
1477            (
1478                LiveSessionAuthorityKind::DurableAuthoritative,
1479                LiveSessionAuthorityReason::LiveUncommittedTranscript
1480            ),
1481        );
1482        // Not archived, not uncommitted, but system-context + revision -> system-context.
1483        assert_eq!(
1484            classify(true, false, true, false),
1485            (
1486                LiveSessionAuthorityKind::DurableAuthoritative,
1487                LiveSessionAuthorityReason::RuntimeSystemContextDiverged
1488            ),
1489        );
1490    }
1491
1492    #[test]
1493    fn append_only_guard_rejects_leading_system_message_replacement() {
1494        let mut previous = Session::new();
1495        previous.push(Message::System(SystemMessage::new("original system")));
1496        previous.push(Message::User(UserMessage::text("hello".to_string())));
1497
1498        let mut incoming = previous.clone();
1499        let rewrite_result = incoming.replace_messages_internal(
1500            vec![
1501                Message::System(SystemMessage::new("rewritten system")),
1502                Message::User(UserMessage::text("hello".to_string())),
1503            ],
1504            crate::TranscriptRewriteReason::new("unit-test"),
1505        );
1506        assert!(
1507            rewrite_result.is_ok(),
1508            "typed rewrite should be constructible: {rewrite_result:?}"
1509        );
1510
1511        assert!(matches!(
1512            append_only_save_guard(&incoming, Some(&previous)),
1513            Err(SessionStoreError::TranscriptContinuityViolation { .. })
1514        ));
1515    }
1516
1517    #[test]
1518    fn append_only_guard_accepts_runtime_system_context_append()
1519    -> Result<(), Box<dyn std::error::Error>> {
1520        let mut previous = Session::new();
1521        previous.push(Message::System(SystemMessage::new("base system")));
1522        previous.push(Message::User(UserMessage::text("hello".to_string())));
1523
1524        let mut incoming = previous.clone();
1525        // The typed runtime-context-append producer stamps the system message's
1526        // mutation_kind so the save-guard admits the divergence from a typed
1527        // field, not the rendered `[Runtime System Context]` label.
1528        incoming.set_system_prompt_with_source(
1529            format!(
1530                "base system{SYSTEM_CONTEXT_SEPARATOR}[Runtime System Context]\nsource: unit-test\n\nextra context"
1531            ),
1532            crate::session_durable_config_authority::SessionSystemPromptSource::RuntimeContextAppend,
1533        )?;
1534
1535        assert!(append_only_save_guard(&incoming, Some(&previous)).is_ok());
1536        Ok(())
1537    }
1538
1539    #[test]
1540    fn append_only_guard_rejects_append_shaped_prompt_without_runtime_context_marker() {
1541        let mut previous = Session::new();
1542        previous.push(Message::System(SystemMessage::new("base system")));
1543        previous.push(Message::User(UserMessage::text("hello".to_string())));
1544
1545        // Same rendered shape as a runtime context append, but produced via a
1546        // direct mutation (mutation_kind != RuntimeContextAppend). The typed
1547        // gate must reject it — content prefix alone is not authority.
1548        let mut incoming = previous.clone();
1549        incoming.set_system_prompt(format!(
1550            "base system{SYSTEM_CONTEXT_SEPARATOR}[Runtime System Context]\nsource: forged\n\nextra context"
1551        ));
1552
1553        assert!(matches!(
1554            append_only_save_guard(&incoming, Some(&previous)),
1555            Err(SessionStoreError::TranscriptContinuityViolation { .. })
1556        ));
1557    }
1558
1559    #[test]
1560    fn append_only_guard_accepts_system_timestamp_refresh_without_content_change() {
1561        let mut previous = Session::new();
1562        previous.push(Message::System(SystemMessage::new("base system")));
1563
1564        let mut incoming = previous.clone();
1565        incoming.set_system_prompt("base system".to_string());
1566
1567        assert!(append_only_save_guard(&incoming, Some(&previous)).is_ok());
1568    }
1569
1570    #[test]
1571    fn run_boundary_guard_accepts_compaction_after_uncheckpointed_runtime_append()
1572    -> Result<(), Box<dyn std::error::Error>> {
1573        let mut previous = Session::new();
1574        previous.push(Message::System(SystemMessage::new("base system")));
1575        previous.push(Message::User(UserMessage::text("turn one".to_string())));
1576        previous.push(Message::BlockAssistant(BlockAssistantMessage {
1577            blocks: vec![AssistantBlock::Text {
1578                text: "answer one".to_string(),
1579                meta: None,
1580            }],
1581            stop_reason: StopReason::EndTurn,
1582            created_at: crate::types::message_timestamp_now(),
1583        }));
1584
1585        let mut parent = previous.clone();
1586        parent.set_system_prompt("refreshed runtime system projection".to_string());
1587        parent.push(Message::User(UserMessage::text(
1588            "runtime-only turn".to_string(),
1589        )));
1590        parent.push(Message::BlockAssistant(BlockAssistantMessage {
1591            blocks: vec![AssistantBlock::Text {
1592                text: "runtime-only answer".to_string(),
1593                meta: None,
1594            }],
1595            stop_reason: StopReason::EndTurn,
1596            created_at: crate::types::message_timestamp_now(),
1597        }));
1598        let parent_revision = parent.transcript_revision()?;
1599
1600        let mut incoming = parent.clone();
1601        let mut replacement = vec![
1602            parent.messages()[0].clone(),
1603            Message::User(UserMessage::compaction_summary(
1604                "[Context compacted] summary".to_string(),
1605            )),
1606        ];
1607        replacement.extend_from_slice(&parent.messages()[1..]);
1608        incoming.commit_transcript_rewrite(
1609            TranscriptRewriteSelection::MessageRange {
1610                start: 0,
1611                end: parent.messages().len(),
1612            },
1613            replacement,
1614            crate::TranscriptRewriteReason::new("compaction"),
1615            Some("meerkat-core".to_string()),
1616            Some(parent_revision),
1617        )?;
1618
1619        assert!(matches!(
1620            append_only_save_guard(&incoming, Some(&previous)),
1621            Err(SessionStoreError::TranscriptContinuityViolation { .. })
1622        ));
1623        assert!(run_boundary_snapshot_save_guard(&incoming, Some(&previous)).is_ok());
1624        Ok(())
1625    }
1626
1627    #[test]
1628    fn run_boundary_guard_accepts_compaction_with_retained_tail_window()
1629    -> Result<(), Box<dyn std::error::Error>> {
1630        let mut previous = Session::new();
1631        previous.push(Message::System(SystemMessage::new("base system")));
1632        previous.push(Message::User(UserMessage::text("turn one".to_string())));
1633        previous.push(Message::BlockAssistant(BlockAssistantMessage::new(
1634            vec![crate::types::AssistantBlock::Text {
1635                text: "answer one".to_string(),
1636                meta: None,
1637            }],
1638            StopReason::EndTurn,
1639        )));
1640
1641        let mut parent = previous.clone();
1642        parent.set_system_prompt("refreshed runtime system projection".to_string());
1643        parent.push(Message::SystemNotice(SystemNoticeMessage::new(
1644            SystemNoticeKind::Comms,
1645            "peer response queued",
1646        )));
1647        let parent_revision = parent.transcript_revision()?;
1648
1649        let mut incoming = parent.clone();
1650        let mut replacement = vec![
1651            parent.messages()[0].clone(),
1652            Message::User(UserMessage::compaction_summary(
1653                "[Context compacted] summary".to_string(),
1654            )),
1655        ];
1656        replacement.extend_from_slice(&parent.messages()[1..]);
1657        incoming.commit_transcript_rewrite(
1658            TranscriptRewriteSelection::MessageRange {
1659                start: 0,
1660                end: parent.messages().len(),
1661            },
1662            replacement,
1663            crate::TranscriptRewriteReason::new("compaction"),
1664            Some("meerkat-core".to_string()),
1665            Some(parent_revision),
1666        )?;
1667
1668        assert!(matches!(
1669            append_only_save_guard(&incoming, Some(&previous)),
1670            Err(SessionStoreError::TranscriptContinuityViolation { .. })
1671        ));
1672        assert!(run_boundary_snapshot_save_guard(&incoming, Some(&previous)).is_ok());
1673        Ok(())
1674    }
1675
1676    #[test]
1677    fn run_boundary_guard_rejects_commitless_history_parent_edge()
1678    -> Result<(), Box<dyn std::error::Error>> {
1679        let mut previous = Session::new();
1680        previous.push(Message::System(SystemMessage::new("base system")));
1681        previous.push(Message::User(UserMessage::text("turn one".to_string())));
1682        let previous_revision = previous.transcript_revision()?;
1683
1684        let mut incoming = previous.clone();
1685        incoming.set_system_prompt("forged replacement system".to_string());
1686        let incoming_revision = incoming.transcript_revision()?;
1687        let history = TranscriptHistoryState {
1688            head: incoming_revision.clone(),
1689            commits: Vec::new(),
1690            revisions: vec![
1691                crate::TranscriptRevisionBody {
1692                    revision: previous_revision,
1693                    parent_revision: None,
1694                    messages: previous.messages().to_vec(),
1695                    created_at: previous.updated_at(),
1696                },
1697                crate::TranscriptRevisionBody {
1698                    revision: incoming_revision,
1699                    parent_revision: Some(previous.transcript_revision()?),
1700                    messages: incoming.messages().to_vec(),
1701                    created_at: incoming.updated_at(),
1702                },
1703            ],
1704        };
1705        incoming.set_metadata_unchecked_for_test(
1706            crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1707            serde_json::to_value(history)?,
1708        );
1709
1710        assert!(matches!(
1711            append_only_save_guard(&incoming, Some(&previous)),
1712            Err(SessionStoreError::TranscriptContinuityViolation { .. })
1713        ));
1714        assert!(matches!(
1715            run_boundary_snapshot_save_guard(&incoming, Some(&previous)),
1716            Err(SessionStoreError::TranscriptContinuityViolation { .. }
1717                | SessionStoreError::MonotonicityViolation { .. })
1718        ));
1719        Ok(())
1720    }
1721
1722    #[test]
1723    fn append_only_guard_rejects_history_head_that_does_not_match_current_messages()
1724    -> Result<(), Box<dyn std::error::Error>> {
1725        let mut previous = Session::new();
1726        previous.push(Message::User(UserMessage::text("persisted".to_string())));
1727
1728        let mut incoming = previous.clone();
1729        incoming.push(Message::User(UserMessage::text("append".to_string())));
1730        let poisoned_messages = vec![Message::User(UserMessage::text(
1731            "unrelated poisoned history".to_string(),
1732        ))];
1733        let poisoned_revision = transcript_messages_digest(&poisoned_messages)?;
1734        incoming.set_metadata_unchecked_for_test(
1735            crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1736            serde_json::to_value(TranscriptHistoryState {
1737                head: poisoned_revision.clone(),
1738                commits: Vec::new(),
1739                revisions: vec![crate::TranscriptRevisionBody {
1740                    revision: poisoned_revision,
1741                    parent_revision: None,
1742                    messages: poisoned_messages,
1743                    created_at: incoming.updated_at(),
1744                }],
1745            })?,
1746        );
1747
1748        assert!(matches!(
1749            append_only_save_guard(&incoming, Some(&previous)),
1750            Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1751        ));
1752        assert!(matches!(
1753            append_only_save_guard(&incoming, None),
1754            Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1755        ));
1756        assert!(matches!(
1757            run_boundary_snapshot_save_guard(&incoming, Some(&previous)),
1758            Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1759        ));
1760        Ok(())
1761    }
1762
1763    #[test]
1764    fn append_only_guard_rejects_new_rewrite_commits_on_plain_append()
1765    -> Result<(), Box<dyn std::error::Error>> {
1766        let mut previous = Session::new();
1767        previous.push(Message::User(UserMessage::text("persisted".to_string())));
1768        let previous_revision = previous.transcript_revision()?;
1769
1770        let mut incoming = previous.clone();
1771        let appended = Message::BlockAssistant(BlockAssistantMessage {
1772            blocks: vec![AssistantBlock::Text {
1773                text: "plain append".to_string(),
1774                meta: None,
1775            }],
1776            stop_reason: StopReason::EndTurn,
1777            created_at: crate::types::message_timestamp_now(),
1778        });
1779        incoming.commit_transcript_rewrite(
1780            TranscriptRewriteSelection::MessageRange { start: 1, end: 1 },
1781            vec![appended],
1782            crate::TranscriptRewriteReason::new("forged-append"),
1783            Some("unit-test".to_string()),
1784            Some(previous_revision),
1785        )?;
1786
1787        assert!(matches!(
1788            append_only_save_guard(&incoming, Some(&previous)),
1789            Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1790        ));
1791        Ok(())
1792    }
1793
1794    #[test]
1795    fn append_only_guard_rejects_first_save_with_rewrite_commits()
1796    -> Result<(), Box<dyn std::error::Error>> {
1797        let mut incoming = Session::new();
1798        incoming.push(Message::User(UserMessage::text("seed".to_string())));
1799        let parent_messages = incoming.messages().to_vec();
1800        let parent_updated_at = incoming.updated_at();
1801        let parent_revision = incoming.transcript_revision()?;
1802        let commit = incoming.commit_transcript_rewrite(
1803            TranscriptRewriteSelection::MessageRange { start: 0, end: 1 },
1804            vec![Message::User(UserMessage::text(
1805                "compacted seed".to_string(),
1806            ))],
1807            crate::TranscriptRewriteReason::new("compaction"),
1808            Some("meerkat-core".to_string()),
1809            Some(parent_revision),
1810        )?;
1811        let incoming_revision = incoming.transcript_revision()?;
1812        let commit_parent_revision = commit.parent_revision.clone();
1813        incoming.set_metadata_unchecked_for_test(
1814            crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1815            serde_json::to_value(TranscriptHistoryState {
1816                head: incoming_revision.clone(),
1817                commits: vec![commit],
1818                revisions: vec![
1819                    crate::TranscriptRevisionBody {
1820                        revision: commit_parent_revision.clone(),
1821                        parent_revision: None,
1822                        messages: parent_messages,
1823                        created_at: parent_updated_at,
1824                    },
1825                    crate::TranscriptRevisionBody {
1826                        revision: incoming_revision,
1827                        parent_revision: Some(commit_parent_revision),
1828                        messages: incoming.messages().to_vec(),
1829                        created_at: incoming.updated_at(),
1830                    },
1831                ],
1832            })?,
1833        );
1834
1835        assert!(matches!(
1836            append_only_save_guard(&incoming, None),
1837            Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1838        ));
1839        Ok(())
1840    }
1841
1842    #[test]
1843    fn transcript_rewrite_guard_rejects_poisoned_history_graph()
1844    -> Result<(), Box<dyn std::error::Error>> {
1845        let mut previous = Session::new();
1846        previous.push(Message::User(UserMessage::text("persisted".to_string())));
1847        let parent_revision = previous.transcript_revision()?;
1848
1849        let mut first = previous.clone();
1850        let first_commit = first.commit_transcript_rewrite(
1851            TranscriptRewriteSelection::MessageRange { start: 0, end: 1 },
1852            vec![Message::User(UserMessage::text(
1853                "compacted persisted".to_string(),
1854            ))],
1855            crate::TranscriptRewriteReason::new("compaction"),
1856            Some("unit-test".to_string()),
1857            Some(parent_revision),
1858        )?;
1859        let first_snapshot = first.clone();
1860
1861        first.commit_transcript_rewrite(
1862            TranscriptRewriteSelection::MessageRange { start: 0, end: 1 },
1863            vec![Message::User(UserMessage::text(
1864                "uncommitted poisoned fork".to_string(),
1865            ))],
1866            crate::TranscriptRewriteReason::new("poison"),
1867            Some("unit-test".to_string()),
1868            Some(first_commit.revision.clone()),
1869        )?;
1870        let mut poisoned_state = first
1871            .transcript_history_state()?
1872            .ok_or_else(|| "second rewrite should retain history state".to_string())?;
1873        poisoned_state.head = first_commit.revision.clone();
1874
1875        let mut poisoned = first_snapshot;
1876        poisoned.set_metadata_unchecked_for_test(
1877            crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1878            serde_json::to_value(poisoned_state)?,
1879        );
1880
1881        assert!(matches!(
1882            transcript_rewrite_save_guard(&poisoned, Some(&previous), &first_commit),
1883            Err(SessionStoreError::InvalidTranscriptRewrite { reason, .. })
1884                if reason.contains("incoming transcript history state is malformed")
1885        ));
1886        Ok(())
1887    }
1888
1889    #[test]
1890    fn authoritative_projection_guard_rejects_changed_persisted_revision()
1891    -> Result<(), Box<dyn std::error::Error>> {
1892        let mut previous = Session::new();
1893        previous.push(Message::User(UserMessage::text("persisted A".to_string())));
1894        let expected_revision = previous.transcript_revision()?;
1895
1896        let mut current = previous.clone();
1897        current.push(Message::BlockAssistant(BlockAssistantMessage {
1898            blocks: vec![AssistantBlock::Text {
1899                text: "persisted B".to_string(),
1900                meta: None,
1901            }],
1902            stop_reason: StopReason::EndTurn,
1903            created_at: crate::types::message_timestamp_now(),
1904        }));
1905        let mut incoming = previous.clone();
1906        incoming.push(Message::User(UserMessage::text(
1907            "incoming from A".to_string(),
1908        )));
1909
1910        assert!(matches!(
1911            authoritative_projection_current_revision_guard(
1912                &incoming,
1913                Some(&current),
1914                Some(&expected_revision)
1915            ),
1916            Err(SessionStoreError::TranscriptContinuityViolation { .. })
1917        ));
1918        Ok(())
1919    }
1920
1921    #[test]
1922    fn append_only_guard_rejects_rewrite_commits_on_first_save()
1923    -> Result<(), Box<dyn std::error::Error>> {
1924        let mut incoming = Session::new();
1925        incoming.push(Message::User(UserMessage::text("persisted".to_string())));
1926        let parent_revision = incoming.transcript_revision()?;
1927        incoming.commit_transcript_rewrite(
1928            TranscriptRewriteSelection::MessageRange { start: 0, end: 1 },
1929            vec![Message::User(UserMessage::text("rewritten".to_string()))],
1930            crate::TranscriptRewriteReason::new("forged-first-save"),
1931            Some("unit-test".to_string()),
1932            Some(parent_revision),
1933        )?;
1934
1935        assert!(matches!(
1936            append_only_save_guard(&incoming, None),
1937            Err(SessionStoreError::InvalidTranscriptRewrite { .. })
1938        ));
1939        Ok(())
1940    }
1941
1942    #[test]
1943    fn append_only_guard_rejects_commitless_history_on_first_save()
1944    -> Result<(), Box<dyn std::error::Error>> {
1945        let mut incoming = Session::new();
1946        incoming.push(Message::User(UserMessage::text("persisted".to_string())));
1947        let incoming_revision = incoming.transcript_revision()?;
1948        incoming.set_metadata_unchecked_for_test(
1949            crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1950            serde_json::to_value(TranscriptHistoryState {
1951                head: incoming_revision.clone(),
1952                commits: Vec::new(),
1953                revisions: vec![crate::TranscriptRevisionBody {
1954                    revision: incoming_revision,
1955                    parent_revision: None,
1956                    messages: incoming.messages().to_vec(),
1957                    created_at: incoming.updated_at(),
1958                }],
1959            })?,
1960        );
1961
1962        assert!(matches!(
1963            append_only_save_guard(&incoming, None),
1964            Err(SessionStoreError::InvalidTranscriptRewrite { reason, .. })
1965                if reason.contains("first save would seed transcript history state")
1966        ));
1967        Ok(())
1968    }
1969
1970    #[test]
1971    fn append_only_guard_rejects_commitless_history_seed_on_plain_append()
1972    -> Result<(), Box<dyn std::error::Error>> {
1973        let mut previous = Session::new();
1974        previous.push(Message::User(UserMessage::text("persisted".to_string())));
1975        let previous_revision = previous.transcript_revision()?;
1976
1977        let mut incoming = previous.clone();
1978        incoming.push(Message::BlockAssistant(BlockAssistantMessage {
1979            blocks: vec![AssistantBlock::Text {
1980                text: "plain append".to_string(),
1981                meta: None,
1982            }],
1983            stop_reason: StopReason::EndTurn,
1984            created_at: crate::types::message_timestamp_now(),
1985        }));
1986        let incoming_revision = incoming.transcript_revision()?;
1987        incoming.set_metadata_unchecked_for_test(
1988            crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
1989            serde_json::to_value(TranscriptHistoryState {
1990                head: incoming_revision.clone(),
1991                commits: Vec::new(),
1992                revisions: vec![
1993                    crate::TranscriptRevisionBody {
1994                        revision: previous_revision,
1995                        parent_revision: None,
1996                        messages: previous.messages().to_vec(),
1997                        created_at: previous.updated_at(),
1998                    },
1999                    crate::TranscriptRevisionBody {
2000                        revision: incoming_revision,
2001                        parent_revision: Some(previous.transcript_revision()?),
2002                        messages: incoming.messages().to_vec(),
2003                        created_at: incoming.updated_at(),
2004                    },
2005                ],
2006            })?,
2007        );
2008
2009        assert!(matches!(
2010            append_only_save_guard(&incoming, Some(&previous)),
2011            Err(SessionStoreError::InvalidTranscriptRewrite { reason, .. })
2012                if reason.contains("append-only save would seed transcript history state")
2013        ));
2014        Ok(())
2015    }
2016
2017    #[test]
2018    fn run_boundary_guard_accepts_commitless_history_seed_on_plain_append()
2019    -> Result<(), Box<dyn std::error::Error>> {
2020        let mut previous = Session::new();
2021        previous.push(Message::User(UserMessage::text("persisted".to_string())));
2022        let previous_revision = previous.transcript_revision()?;
2023
2024        let mut incoming = previous.clone();
2025        incoming.push(Message::BlockAssistant(BlockAssistantMessage {
2026            blocks: vec![AssistantBlock::Text {
2027                text: "plain append".to_string(),
2028                meta: None,
2029            }],
2030            stop_reason: StopReason::EndTurn,
2031            created_at: crate::types::message_timestamp_now(),
2032        }));
2033        let incoming_revision = incoming.transcript_revision()?;
2034        incoming.set_metadata_unchecked_for_test(
2035            crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
2036            serde_json::to_value(TranscriptHistoryState {
2037                head: incoming_revision.clone(),
2038                commits: Vec::new(),
2039                revisions: vec![
2040                    crate::TranscriptRevisionBody {
2041                        revision: previous_revision.clone(),
2042                        parent_revision: None,
2043                        messages: previous.messages().to_vec(),
2044                        created_at: previous.updated_at(),
2045                    },
2046                    crate::TranscriptRevisionBody {
2047                        revision: incoming_revision,
2048                        parent_revision: Some(previous_revision),
2049                        messages: incoming.messages().to_vec(),
2050                        created_at: incoming.updated_at(),
2051                    },
2052                ],
2053            })?,
2054        );
2055
2056        assert!(append_only_save_guard(&incoming, Some(&previous)).is_err());
2057        assert!(run_boundary_snapshot_save_guard(&incoming, Some(&previous)).is_ok());
2058        Ok(())
2059    }
2060
2061    #[test]
2062    fn run_boundary_guard_accepts_retained_history_seed_on_plain_append()
2063    -> Result<(), Box<dyn std::error::Error>> {
2064        let mut original = Session::new();
2065        original.push(Message::User(UserMessage::text("verbose seed".to_string())));
2066        let original_revision = original.transcript_revision()?;
2067
2068        let mut previous = original.clone();
2069        previous.commit_transcript_rewrite(
2070            TranscriptRewriteSelection::MessageRange { start: 0, end: 1 },
2071            vec![Message::User(UserMessage::text(
2072                "compacted seed".to_string(),
2073            ))],
2074            crate::TranscriptRewriteReason::new("compaction"),
2075            Some("meerkat-core".to_string()),
2076            Some(original_revision),
2077        )?;
2078        let previous_with_history = previous.clone();
2079        previous.clear_transcript_history_state();
2080
2081        let mut incoming = previous_with_history;
2082        incoming.push(Message::BlockAssistant(BlockAssistantMessage {
2083            blocks: vec![AssistantBlock::Text {
2084                text: "plain append after retained history".to_string(),
2085                meta: None,
2086            }],
2087            stop_reason: StopReason::EndTurn,
2088            created_at: crate::types::message_timestamp_now(),
2089        }));
2090
2091        assert!(append_only_save_guard(&incoming, Some(&previous)).is_err());
2092        assert!(run_boundary_snapshot_save_guard(&incoming, Some(&previous)).is_ok());
2093        Ok(())
2094    }
2095
2096    #[test]
2097    fn run_boundary_guard_accepts_commitless_history_seed_on_first_snapshot()
2098    -> Result<(), Box<dyn std::error::Error>> {
2099        let mut incoming = Session::new();
2100        incoming.push(Message::User(UserMessage::text("persisted".to_string())));
2101        let incoming_revision = incoming.transcript_revision()?;
2102        incoming.set_metadata_unchecked_for_test(
2103            crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
2104            serde_json::to_value(TranscriptHistoryState {
2105                head: incoming_revision.clone(),
2106                commits: Vec::new(),
2107                revisions: vec![crate::TranscriptRevisionBody {
2108                    revision: incoming_revision,
2109                    parent_revision: None,
2110                    messages: incoming.messages().to_vec(),
2111                    created_at: incoming.updated_at(),
2112                }],
2113            })?,
2114        );
2115
2116        assert!(append_only_save_guard(&incoming, None).is_err());
2117        assert!(run_boundary_snapshot_save_guard(&incoming, None).is_ok());
2118        Ok(())
2119    }
2120
2121    #[test]
2122    fn run_boundary_guard_accepts_commitless_history_seed_on_initial_multi_revision_snapshot()
2123    -> Result<(), Box<dyn std::error::Error>> {
2124        let mut base = Session::new();
2125        base.push(Message::User(UserMessage::text("first".to_string())));
2126        let base_revision = base.transcript_revision()?;
2127
2128        let mut incoming = base.clone();
2129        incoming.push(Message::BlockAssistant(BlockAssistantMessage {
2130            blocks: vec![AssistantBlock::Text {
2131                text: "second".to_string(),
2132                meta: None,
2133            }],
2134            stop_reason: StopReason::EndTurn,
2135            created_at: crate::types::message_timestamp_now(),
2136        }));
2137        let incoming_revision = incoming.transcript_revision()?;
2138        incoming.set_metadata_unchecked_for_test(
2139            crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
2140            serde_json::to_value(TranscriptHistoryState {
2141                head: incoming_revision.clone(),
2142                commits: Vec::new(),
2143                revisions: vec![
2144                    crate::TranscriptRevisionBody {
2145                        revision: base_revision.clone(),
2146                        parent_revision: None,
2147                        messages: base.messages().to_vec(),
2148                        created_at: base.updated_at(),
2149                    },
2150                    crate::TranscriptRevisionBody {
2151                        revision: incoming_revision,
2152                        parent_revision: Some(base_revision),
2153                        messages: incoming.messages().to_vec(),
2154                        created_at: incoming.updated_at(),
2155                    },
2156                ],
2157            })?,
2158        );
2159
2160        assert!(append_only_save_guard(&incoming, None).is_err());
2161        assert!(run_boundary_snapshot_save_guard(&incoming, None).is_ok());
2162        Ok(())
2163    }
2164
2165    #[test]
2166    fn append_only_guard_rejects_new_rewrite_commits_on_system_context_append()
2167    -> Result<(), Box<dyn std::error::Error>> {
2168        let mut previous = Session::new();
2169        previous.push(Message::System(SystemMessage::new("base system")));
2170        previous.push(Message::User(UserMessage::text("persisted".to_string())));
2171        let mut incoming = previous.clone();
2172        incoming.set_system_prompt_with_source(
2173            format!(
2174                "base system{SYSTEM_CONTEXT_SEPARATOR}[Runtime System Context]\nsource: unit-test\n\nextra context"
2175            ),
2176            crate::session_durable_config_authority::SessionSystemPromptSource::RuntimeContextAppend,
2177        )?;
2178        incoming.push(Message::BlockAssistant(BlockAssistantMessage {
2179            blocks: vec![AssistantBlock::Text {
2180                text: "plain append".to_string(),
2181                meta: None,
2182            }],
2183            stop_reason: StopReason::EndTurn,
2184            created_at: crate::types::message_timestamp_now(),
2185        }));
2186        let incoming_revision = incoming.transcript_revision()?;
2187        incoming.set_metadata_unchecked_for_test(
2188            crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
2189            serde_json::to_value(TranscriptHistoryState {
2190                head: incoming_revision.clone(),
2191                commits: vec![TranscriptRewriteCommit {
2192                    parent_revision: previous.transcript_revision()?,
2193                    revision: incoming_revision.clone(),
2194                    selection: TranscriptRewriteSelection::MessageRange { start: 0, end: 0 },
2195                    original_span_digest: transcript_messages_digest(&[])?,
2196                    replacement_digest: transcript_messages_digest(&[])?,
2197                    messages_before: previous.messages().len(),
2198                    messages_after: incoming.messages().len(),
2199                    reason: crate::TranscriptRewriteReason::new("forged"),
2200                    actor: Some("unit-test".to_string()),
2201                    committed_at: incoming.updated_at(),
2202                }],
2203                revisions: vec![crate::TranscriptRevisionBody {
2204                    revision: incoming_revision,
2205                    parent_revision: None,
2206                    messages: incoming.messages().to_vec(),
2207                    created_at: incoming.updated_at(),
2208                }],
2209            })?,
2210        );
2211
2212        assert!(matches!(
2213            append_only_save_guard(&incoming, Some(&previous)),
2214            Err(SessionStoreError::InvalidTranscriptRewrite { .. })
2215        ));
2216        Ok(())
2217    }
2218
2219    #[test]
2220    fn append_only_guard_rejects_new_rewrite_commits_on_transient_notice_cleanup()
2221    -> Result<(), Box<dyn std::error::Error>> {
2222        let mut previous = Session::new();
2223        previous.push(Message::SystemNotice(SystemNoticeMessage::new(
2224            SystemNoticeKind::Comms,
2225            "transient peer delivery notice",
2226        )));
2227        previous.push(Message::User(UserMessage::text("persisted".to_string())));
2228
2229        let mut incoming = Session::new();
2230        incoming.push(Message::User(UserMessage::text("persisted".to_string())));
2231        incoming.push(Message::BlockAssistant(BlockAssistantMessage {
2232            blocks: vec![AssistantBlock::Text {
2233                text: "plain append after notice cleanup".to_string(),
2234                meta: None,
2235            }],
2236            stop_reason: StopReason::EndTurn,
2237            created_at: crate::types::message_timestamp_now(),
2238        }));
2239        let incoming_revision = incoming.transcript_revision()?;
2240        incoming.set_metadata_unchecked_for_test(
2241            crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
2242            serde_json::to_value(TranscriptHistoryState {
2243                head: incoming_revision.clone(),
2244                commits: vec![TranscriptRewriteCommit {
2245                    parent_revision: previous.transcript_revision()?,
2246                    revision: incoming_revision.clone(),
2247                    selection: TranscriptRewriteSelection::MessageRange { start: 0, end: 0 },
2248                    original_span_digest: transcript_messages_digest(&[])?,
2249                    replacement_digest: transcript_messages_digest(&[])?,
2250                    messages_before: previous.messages().len(),
2251                    messages_after: incoming.messages().len(),
2252                    reason: crate::TranscriptRewriteReason::new("forged"),
2253                    actor: Some("unit-test".to_string()),
2254                    committed_at: incoming.updated_at(),
2255                }],
2256                revisions: vec![crate::TranscriptRevisionBody {
2257                    revision: incoming_revision,
2258                    parent_revision: None,
2259                    messages: incoming.messages().to_vec(),
2260                    created_at: incoming.updated_at(),
2261                }],
2262            })?,
2263        );
2264
2265        assert!(matches!(
2266            append_only_save_guard(&incoming, Some(&previous)),
2267            Err(SessionStoreError::InvalidTranscriptRewrite { .. })
2268        ));
2269        Ok(())
2270    }
2271
2272    #[test]
2273    fn run_boundary_guard_accepts_generated_context_summary_before_retained_tail()
2274    -> Result<(), Box<dyn std::error::Error>> {
2275        let mut previous = Session::new();
2276        previous.push(Message::System(SystemMessage::new(
2277            "runtime system before context refresh",
2278        )));
2279        previous.push(Message::User(UserMessage::text(
2280            "Turn 1 request".to_string(),
2281        )));
2282        previous.push(Message::BlockAssistant(BlockAssistantMessage {
2283            blocks: vec![AssistantBlock::Text {
2284                text: "Turn 1 answer".to_string(),
2285                meta: None,
2286            }],
2287            stop_reason: StopReason::EndTurn,
2288            created_at: crate::types::message_timestamp_now(),
2289        }));
2290
2291        let mut incoming = Session::with_id(previous.id().clone());
2292        incoming.push(Message::System(SystemMessage::new(
2293            "runtime system after context refresh",
2294        )));
2295        incoming.push(Message::User(UserMessage::text(
2296            "Verbose context that will be compacted".to_string(),
2297        )));
2298        for message in previous.messages()[1..].iter().cloned() {
2299            incoming.push(message);
2300        }
2301        incoming.push(Message::BlockAssistant(BlockAssistantMessage {
2302            blocks: vec![AssistantBlock::Text {
2303                text: "Turn 2 generated answer".to_string(),
2304                meta: None,
2305            }],
2306            stop_reason: StopReason::EndTurn,
2307            created_at: crate::types::message_timestamp_now(),
2308        }));
2309        let parent_revision = incoming.transcript_revision()?;
2310        incoming.commit_transcript_rewrite(
2311            TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
2312            vec![Message::User(UserMessage::compaction_summary(
2313                "[Context compacted] Earlier runtime context".to_string(),
2314            ))],
2315            crate::TranscriptRewriteReason::new("compaction"),
2316            Some("meerkat-core".to_string()),
2317            Some(parent_revision),
2318        )?;
2319
2320        assert!(append_only_save_guard(&incoming, Some(&previous)).is_err());
2321        assert!(run_boundary_snapshot_save_guard(&incoming, Some(&previous)).is_ok());
2322        Ok(())
2323    }
2324
2325    #[test]
2326    fn run_boundary_guard_rejects_context_summary_tail_without_compaction_summary_marker()
2327    -> Result<(), Box<dyn std::error::Error>> {
2328        let mut previous = Session::new();
2329        previous.push(Message::System(SystemMessage::new(
2330            "runtime system before context refresh",
2331        )));
2332        previous.push(Message::User(UserMessage::text(
2333            "Turn 1 request".to_string(),
2334        )));
2335        previous.push(Message::BlockAssistant(BlockAssistantMessage {
2336            blocks: vec![AssistantBlock::Text {
2337                text: "Turn 1 answer".to_string(),
2338                meta: None,
2339            }],
2340            stop_reason: StopReason::EndTurn,
2341            created_at: crate::types::message_timestamp_now(),
2342        }));
2343
2344        let mut incoming = Session::with_id(previous.id().clone());
2345        incoming.push(Message::System(SystemMessage::new(
2346            "runtime system after context refresh",
2347        )));
2348        incoming.push(Message::User(UserMessage::text(
2349            "Verbose context that will be compacted".to_string(),
2350        )));
2351        for message in previous.messages()[1..].iter().cloned() {
2352            incoming.push(message);
2353        }
2354        incoming.push(Message::BlockAssistant(BlockAssistantMessage {
2355            blocks: vec![AssistantBlock::Text {
2356                text: "Turn 2 generated answer".to_string(),
2357                meta: None,
2358            }],
2359            stop_reason: StopReason::EndTurn,
2360            created_at: crate::types::message_timestamp_now(),
2361        }));
2362        let parent_revision = incoming.transcript_revision()?;
2363        // Same rendered shape (content begins with `[Context compacted]`) but the
2364        // summary message uses the ordinary conversational role. The typed gate
2365        // must reject it: rendered content alone is not authority.
2366        incoming.commit_transcript_rewrite(
2367            TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
2368            vec![Message::User(UserMessage::text(
2369                "[Context compacted] Earlier runtime context".to_string(),
2370            ))],
2371            crate::TranscriptRewriteReason::new("compaction"),
2372            Some("meerkat-core".to_string()),
2373            Some(parent_revision),
2374        )?;
2375
2376        assert!(append_only_save_guard(&incoming, Some(&previous)).is_err());
2377        assert!(matches!(
2378            run_boundary_snapshot_save_guard(&incoming, Some(&previous)),
2379            Err(SessionStoreError::TranscriptContinuityViolation { .. }
2380                | SessionStoreError::MonotonicityViolation { .. })
2381        ));
2382        Ok(())
2383    }
2384
2385    #[test]
2386    fn run_boundary_guard_rejects_runtime_parent_with_inserted_message_before_tail()
2387    -> Result<(), Box<dyn std::error::Error>> {
2388        let mut previous = Session::new();
2389        previous.push(Message::System(SystemMessage::new("base system")));
2390        previous.push(Message::User(UserMessage::text("turn one".to_string())));
2391        previous.push(Message::BlockAssistant(BlockAssistantMessage {
2392            blocks: vec![AssistantBlock::Text {
2393                text: "answer one".to_string(),
2394                meta: None,
2395            }],
2396            stop_reason: StopReason::EndTurn,
2397            created_at: crate::types::message_timestamp_now(),
2398        }));
2399
2400        let parent_messages = vec![
2401            Message::System(SystemMessage::new("refreshed runtime system projection")),
2402            Message::User(UserMessage::text(
2403                "injected before retained tail".to_string(),
2404            )),
2405            previous.messages()[1].clone(),
2406            previous.messages()[2].clone(),
2407        ];
2408        let parent_revision = transcript_messages_digest(&parent_messages)?;
2409        let mut parent = previous.clone();
2410        parent.apply_transcript_history_state(TranscriptHistoryState {
2411            head: parent_revision.clone(),
2412            commits: Vec::new(),
2413            revisions: vec![crate::TranscriptRevisionBody {
2414                revision: parent_revision,
2415                parent_revision: None,
2416                messages: parent_messages,
2417                created_at: parent.updated_at(),
2418            }],
2419        })?;
2420        let parent_revision = parent.transcript_revision()?;
2421
2422        let mut incoming = parent.clone();
2423        incoming.commit_transcript_rewrite(
2424            TranscriptRewriteSelection::MessageRange {
2425                start: 0,
2426                end: parent.messages().len(),
2427            },
2428            vec![Message::User(UserMessage::text(
2429                "[Context compacted] summary".to_string(),
2430            ))],
2431            crate::TranscriptRewriteReason::new("compaction"),
2432            Some("meerkat-core".to_string()),
2433            Some(parent_revision),
2434        )?;
2435
2436        assert!(matches!(
2437            run_boundary_snapshot_save_guard(&incoming, Some(&previous)),
2438            Err(SessionStoreError::TranscriptContinuityViolation { .. }
2439                | SessionStoreError::MonotonicityViolation { .. })
2440        ));
2441        Ok(())
2442    }
2443
2444    #[test]
2445    fn run_boundary_guard_rejects_forged_parent_edge_before_real_rewrite_commit()
2446    -> Result<(), Box<dyn std::error::Error>> {
2447        let mut previous = Session::new();
2448        previous.push(Message::System(SystemMessage::new("base system")));
2449        previous.push(Message::User(UserMessage::text("turn one".to_string())));
2450        previous.push(Message::BlockAssistant(BlockAssistantMessage {
2451            blocks: vec![AssistantBlock::Text {
2452                text: "answer one".to_string(),
2453                meta: None,
2454            }],
2455            stop_reason: StopReason::EndTurn,
2456            created_at: crate::types::message_timestamp_now(),
2457        }));
2458        let previous_revision = previous.transcript_revision()?;
2459
2460        let forged_parent_messages = vec![
2461            Message::System(SystemMessage::new("refreshed runtime system projection")),
2462            Message::User(UserMessage::text(
2463                "forged insertion before retained tail".to_string(),
2464            )),
2465            previous.messages()[1].clone(),
2466            previous.messages()[2].clone(),
2467        ];
2468        let forged_parent_revision = transcript_messages_digest(&forged_parent_messages)?;
2469        let mut forged_parent = previous.clone();
2470        forged_parent.apply_transcript_history_state(TranscriptHistoryState {
2471            head: forged_parent_revision.clone(),
2472            commits: Vec::new(),
2473            revisions: vec![
2474                crate::TranscriptRevisionBody {
2475                    revision: previous_revision.clone(),
2476                    parent_revision: None,
2477                    messages: previous.messages().to_vec(),
2478                    created_at: previous.updated_at(),
2479                },
2480                crate::TranscriptRevisionBody {
2481                    revision: forged_parent_revision.clone(),
2482                    parent_revision: Some(previous_revision),
2483                    messages: forged_parent_messages,
2484                    created_at: forged_parent.updated_at(),
2485                },
2486            ],
2487        })?;
2488
2489        let mut incoming = forged_parent.clone();
2490        incoming.commit_transcript_rewrite(
2491            TranscriptRewriteSelection::MessageRange {
2492                start: 0,
2493                end: forged_parent.messages().len(),
2494            },
2495            vec![Message::User(UserMessage::text(
2496                "[Context compacted] forged branch".to_string(),
2497            ))],
2498            crate::TranscriptRewriteReason::new("compaction"),
2499            Some("meerkat-core".to_string()),
2500            Some(forged_parent_revision),
2501        )?;
2502
2503        assert!(matches!(
2504            run_boundary_snapshot_save_guard(&incoming, Some(&previous)),
2505            Err(SessionStoreError::TranscriptContinuityViolation { .. }
2506                | SessionStoreError::MonotonicityViolation { .. })
2507        ));
2508        Ok(())
2509    }
2510
2511    #[test]
2512    fn append_only_guard_rejects_transient_mcp_pending_notice_cleanup_with_unaudited_commit()
2513    -> Result<(), crate::TranscriptEditError> {
2514        let mut previous = Session::new();
2515        previous.push(Message::User(UserMessage::text("hello".to_string())));
2516        previous.push(Message::SystemNotice(SystemNoticeMessage {
2517            kind: SystemNoticeKind::McpPending,
2518            body: Some("connecting".to_string()),
2519            blocks: vec![SystemNoticeBlock::Mcp {
2520                server_id: None,
2521                operation: None,
2522                phase: None,
2523                persisted: false,
2524                detail: Some("connecting".to_string()),
2525                pending_sources: vec!["test-server".to_string()],
2526            }],
2527            created_at: crate::types::message_timestamp_now(),
2528        }));
2529        previous.push(Message::BlockAssistant(BlockAssistantMessage::new(
2530            vec![crate::types::AssistantBlock::Text {
2531                text: "answer".to_string(),
2532                meta: None,
2533            }],
2534            StopReason::EndTurn,
2535        )));
2536
2537        let mut incoming = previous.clone();
2538        incoming.replace_messages_internal(
2539            previous
2540                .messages()
2541                .iter()
2542                .filter(|message| !matches!(message, Message::SystemNotice(_)))
2543                .cloned()
2544                .collect(),
2545            crate::TranscriptRewriteReason::new("unit-test"),
2546        )?;
2547        incoming.push(Message::User(UserMessage::text("again".to_string())));
2548
2549        assert!(matches!(
2550            append_only_save_guard(&incoming, Some(&previous)),
2551            Err(SessionStoreError::InvalidTranscriptRewrite { .. })
2552        ));
2553        Ok::<(), crate::TranscriptEditError>(())
2554    }
2555
2556    #[test]
2557    fn rewrite_chain_finder_crosses_normal_append_between_rewrites()
2558    -> Result<(), Box<dyn std::error::Error>> {
2559        let mut session = Session::new();
2560        session.push(Message::User(UserMessage::text("first".to_string())));
2561        session.push(Message::BlockAssistant(BlockAssistantMessage {
2562            blocks: vec![AssistantBlock::Text {
2563                text: "verbose first answer".to_string(),
2564                meta: None,
2565            }],
2566            stop_reason: StopReason::EndTurn,
2567            created_at: crate::types::message_timestamp_now(),
2568        }));
2569
2570        let original = session.transcript_revision()?;
2571        let first = session.commit_transcript_rewrite(
2572            TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
2573            vec![Message::BlockAssistant(BlockAssistantMessage {
2574                blocks: vec![AssistantBlock::Text {
2575                    text: "compact first answer".to_string(),
2576                    meta: None,
2577                }],
2578                stop_reason: StopReason::EndTurn,
2579                created_at: crate::types::message_timestamp_now(),
2580            })],
2581            crate::TranscriptRewriteReason::new("compaction"),
2582            Some("unit-test".to_string()),
2583            Some(original.clone()),
2584        )?;
2585
2586        session.push(Message::User(UserMessage::text("second".to_string())));
2587        session.push(Message::BlockAssistant(BlockAssistantMessage {
2588            blocks: vec![AssistantBlock::Text {
2589                text: "verbose second answer".to_string(),
2590                meta: None,
2591            }],
2592            stop_reason: StopReason::EndTurn,
2593            created_at: crate::types::message_timestamp_now(),
2594        }));
2595        let bridge = session.transcript_revision()?;
2596        assert_ne!(bridge, first.revision);
2597
2598        let second = session.commit_transcript_rewrite(
2599            TranscriptRewriteSelection::MessageRange { start: 3, end: 4 },
2600            vec![Message::BlockAssistant(BlockAssistantMessage {
2601                blocks: vec![AssistantBlock::Text {
2602                    text: "compact second answer".to_string(),
2603                    meta: None,
2604                }],
2605                stop_reason: StopReason::EndTurn,
2606                created_at: crate::types::message_timestamp_now(),
2607            })],
2608            crate::TranscriptRewriteReason::new("compaction"),
2609            Some("unit-test".to_string()),
2610            Some(bridge),
2611        )?;
2612        let state = session
2613            .transcript_history_state()?
2614            .ok_or_else(|| std::io::Error::other("missing transcript history state"))?;
2615
2616        let chain =
2617            find_transcript_rewrite_commit_chain_extending(&state, &original, &second.revision)
2618                .ok_or_else(|| {
2619                    std::io::Error::other(
2620                        "rewrite chain should extend through normal append bridge",
2621                    )
2622                })?;
2623        assert_eq!(chain.len(), 2);
2624        assert_eq!(chain[0].revision, first.revision);
2625        assert_eq!(chain[1].revision, second.revision);
2626        Ok(())
2627    }
2628
2629    #[test]
2630    fn run_boundary_guard_rejects_dropped_retained_rewrite_commits()
2631    -> Result<(), Box<dyn std::error::Error>> {
2632        let mut base = Session::new();
2633        base.push(Message::User(UserMessage::text("turn one".to_string())));
2634        base.push(Message::BlockAssistant(BlockAssistantMessage {
2635            blocks: vec![AssistantBlock::Text {
2636                text: "verbose answer".to_string(),
2637                meta: None,
2638            }],
2639            stop_reason: StopReason::EndTurn,
2640            created_at: crate::types::message_timestamp_now(),
2641        }));
2642        let base_revision = base.transcript_revision()?;
2643
2644        let mut previous = base.clone();
2645        let _retained_commit = previous.commit_transcript_rewrite(
2646            TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
2647            vec![Message::BlockAssistant(BlockAssistantMessage {
2648                blocks: vec![AssistantBlock::Text {
2649                    text: "first compact answer".to_string(),
2650                    meta: None,
2651                }],
2652                stop_reason: StopReason::EndTurn,
2653                created_at: crate::types::message_timestamp_now(),
2654            })],
2655            crate::TranscriptRewriteReason::new("compaction"),
2656            Some("unit-test".to_string()),
2657            Some(base_revision),
2658        )?;
2659        let previous_revision = previous.transcript_revision()?;
2660
2661        let mut incoming = previous.clone();
2662        let new_commit = incoming.commit_transcript_rewrite(
2663            TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
2664            vec![Message::BlockAssistant(BlockAssistantMessage {
2665                blocks: vec![AssistantBlock::Text {
2666                    text: "second compact answer".to_string(),
2667                    meta: None,
2668                }],
2669                stop_reason: StopReason::EndTurn,
2670                created_at: crate::types::message_timestamp_now(),
2671            })],
2672            crate::TranscriptRewriteReason::new("compaction"),
2673            Some("unit-test".to_string()),
2674            Some(previous_revision),
2675        )?;
2676        let mut state = incoming
2677            .transcript_history_state()?
2678            .ok_or_else(|| std::io::Error::other("incoming rewrite should retain history"))?;
2679        state.commits = vec![new_commit];
2680        incoming.set_metadata_unchecked_for_test(
2681            crate::session::SESSION_TRANSCRIPT_HISTORY_STATE_KEY,
2682            serde_json::to_value(state)?,
2683        );
2684
2685        assert!(matches!(
2686            run_boundary_snapshot_save_guard(&incoming, Some(&previous)),
2687            Err(SessionStoreError::InvalidTranscriptRewrite { reason, .. })
2688                if reason.contains("drop retained transcript rewrite commits")
2689        ));
2690        Ok(())
2691    }
2692
2693    // ------------------------------------------------------------------
2694    // FOLD 2: the persist-time system-context append-admission decision routes
2695    // through SessionDocumentMachine ResolveSystemContextPersistAppendAdmission
2696    // (the SAME machine the staging path drives). These tests pin that the
2697    // persist-time verdict matches a direct machine call for every shape, and
2698    // that the four admission cases behave exactly as the retired shell reducer.
2699    // ------------------------------------------------------------------
2700
2701    fn runtime_append_system(content: &str) -> SystemMessage {
2702        let mut system = SystemMessage::new(content);
2703        system.mutation_kind = crate::types::SystemPromptMutationKind::RuntimeContextAppend;
2704        system
2705    }
2706
2707    /// Direct machine call mirroring the persist-time observation extraction —
2708    /// the persist-time path MUST agree with this for every input shape.
2709    #[allow(clippy::expect_used)]
2710    fn machine_persist_append_admits(
2711        previous: Option<&SystemMessage>,
2712        incoming: &SystemMessage,
2713    ) -> bool {
2714        let has_previous = previous.is_some();
2715        let content_identical =
2716            previous.is_some_and(|previous| incoming.content == previous.content);
2717        let content_extends_previous =
2718            previous.is_some_and(|previous| incoming.content.starts_with(&previous.content));
2719        let appended_starts_with_separator = previous.is_some_and(|previous| {
2720            incoming
2721                .content
2722                .get(previous.content.len()..)
2723                .is_some_and(|appended| appended.starts_with(SYSTEM_CONTEXT_SEPARATOR))
2724        });
2725        let incoming_is_runtime_context_append = incoming.mutation_kind.is_runtime_context_append();
2726        let mut authority = crate::session_document::SessionDocumentMachineAuthority::new();
2727        let effects = authority
2728            .resolve_system_context_persist_append_admission(
2729                has_previous,
2730                content_identical,
2731                content_extends_previous,
2732                appended_starts_with_separator,
2733                incoming_is_runtime_context_append,
2734            )
2735            .expect("machine resolves persist-append admission");
2736        effects.into_iter().any(|effect| {
2737            matches!(
2738                effect,
2739                crate::session_document::SessionDocumentEffect::SystemContextPersistAppendAdmissionResolved {
2740                    admission: crate::session_document::SystemContextPersistAppendAdmission::Admit,
2741                }
2742            )
2743        })
2744    }
2745
2746    #[allow(clippy::expect_used)]
2747    fn assert_persist_append_matches_machine(
2748        previous: Option<&SystemMessage>,
2749        incoming: &SystemMessage,
2750        expected: bool,
2751    ) {
2752        let verdict =
2753            system_context_is_append(previous, incoming).expect("persist-time admission resolves");
2754        assert_eq!(verdict, expected, "persist-time verdict mismatch");
2755        assert_eq!(
2756            verdict,
2757            machine_persist_append_admits(previous, incoming),
2758            "persist-time verdict diverges from direct machine call"
2759        );
2760    }
2761
2762    #[test]
2763    fn persist_append_identical_content_admits() {
2764        let previous = SystemMessage::new("base system");
2765        let incoming = SystemMessage::new("base system");
2766        assert_persist_append_matches_machine(Some(&previous), &incoming, true);
2767    }
2768
2769    #[test]
2770    fn persist_append_separator_append_with_marker_admits() {
2771        let previous = SystemMessage::new("base system");
2772        let incoming = runtime_append_system(&format!(
2773            "base system{SYSTEM_CONTEXT_SEPARATOR}[Runtime System Context]\nextra"
2774        ));
2775        assert_persist_append_matches_machine(Some(&previous), &incoming, true);
2776    }
2777
2778    #[test]
2779    fn persist_append_shaped_without_marker_rejects() {
2780        let previous = SystemMessage::new("base system");
2781        // Append-shaped content but no runtime-context-append provenance marker.
2782        let incoming = SystemMessage::new(format!(
2783            "base system{SYSTEM_CONTEXT_SEPARATOR}[Runtime System Context]\nextra"
2784        ));
2785        assert_persist_append_matches_machine(Some(&previous), &incoming, false);
2786    }
2787
2788    #[test]
2789    fn persist_append_divergent_content_rejects() {
2790        let previous = SystemMessage::new("base system");
2791        let incoming = runtime_append_system("totally different");
2792        assert_persist_append_matches_machine(Some(&previous), &incoming, false);
2793    }
2794
2795    #[test]
2796    fn persist_append_no_previous_admits_only_with_marker() {
2797        let with_marker = runtime_append_system("brand new context");
2798        assert_persist_append_matches_machine(None, &with_marker, true);
2799
2800        let without_marker = SystemMessage::new("brand new context");
2801        assert_persist_append_matches_machine(None, &without_marker, false);
2802    }
2803}