Skip to main content

ai_memory/recover/
mod.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! `ai-memory recover-previous-session` — fail-safe recovery of agent
5//! context from host-written transcript files.
6//!
7//! Closes the substrate failure mode documented in issue
8//! [#1388](https://github.com/alphaonedev/ai-memory-mcp/issues/1388):
9//! when an AI agent session terminates ungracefully (SIGKILL, tmux
10//! lockup, host crash) between conversation turns, any decisions /
11//! plans / agreed scope from those turns are lost because the agent
12//! is `memory_store`-volunteer mode. The transcript file Claude
13//! Code / Codex CLI / Gemini CLI writes per-turn to disk SURVIVES
14//! the kill, but ai-memory has no bridge to that durable artifact —
15//! until this module.
16//!
17//! The recovery surface is **dual**: a CLI subcommand
18//! (`ai-memory recover-previous-session`) for SessionStart-hook
19//! integration, and an MCP tool (`memory_recover_previous_session`)
20//! for in-session agent self-recovery. Both call into
21//! [`recover_from_transcript`] which is the canonical handler.
22//!
23//! See issue [#1389](https://github.com/alphaonedev/ai-memory-mcp/issues/1389)
24//! for the full design + acceptance criteria; see CLAUDE.md
25//! §"Auto-capture" for operator-facing documentation.
26
27pub mod nag;
28pub mod parsers;
29pub mod transcript_paths;
30
31use std::path::{Path, PathBuf};
32use std::time::Instant;
33
34use rusqlite::OptionalExtension;
35use serde::{Deserialize, Serialize};
36
37pub use transcript_paths::{HostKind, resolve_transcript};
38
39/// Per-call recovery report. Doubles as the JSON wire shape for
40/// `ai-memory recover-previous-session --json` and as the MCP-tool
41/// return shape; field names + serialization order are the wire
42/// contract.
43#[derive(Debug, Clone, Serialize, Deserialize, Default)]
44pub struct RecoverReport {
45    /// Absolute path of the transcript file the recovery walked,
46    /// or `None` when no transcript was located (host stub had
47    /// no transcripts on disk).
48    pub transcript_path: Option<PathBuf>,
49    /// Host whose transcript was recovered. Echoes the resolver
50    /// arm that won (or `--host` flag value if explicit).
51    pub host_kind: HostKind,
52    /// Total wall-clock from `recover_from_transcript` entry to
53    /// return, in milliseconds. Pinned by the regression test
54    /// against the per-scenario budget (see #1389 perf design).
55    pub elapsed_ms: u64,
56    /// Wall-clock for path resolution + `stat(2)` only.
57    pub elapsed_ms_resolve_path: u64,
58    /// Wall-clock for the dedup-table SELECT.
59    pub elapsed_ms_dedup_query: u64,
60    /// Wall-clock for the JSONL stream parse.
61    pub elapsed_ms_parse: u64,
62    /// Wall-clock for the INSERTs into `memories` and
63    /// `transcript_line_dedup`.
64    pub elapsed_ms_writes: u64,
65    /// Total lines read from the transcript (pre-filter).
66    pub lines_total: u32,
67    /// Lines atomised into new memories this run.
68    pub lines_atomised: u32,
69    /// Lines skipped because their sha256 was already in the
70    /// dedup table from a prior recovery.
71    pub lines_skipped_dedup: u32,
72    /// Lines skipped because the `--limit` cap was reached.
73    pub lines_skipped_limit: u32,
74    /// IDs of memories created this run. Capped at the first
75    /// [`QUIET_MEMORY_ID_PREVIEW_CAP`] in `--quiet` mode to keep the
76    /// JSON payload bounded.
77    pub memories_created: Vec<String>,
78    /// Best-effort errors surfaced during recovery. A non-empty
79    /// list is NOT a hard failure — the recover verb is graceful
80    /// by design so SessionStart-hook integration can't wedge an
81    /// agent boot.
82    pub errors: Vec<String>,
83    /// `CURRENT_SCHEMA_VERSION` at the time recovery ran. Pinned
84    /// in `RecoverReport` so a future schema migration that
85    /// changes the dedup-table shape can be diagnosed from the
86    /// JSON wire payload.
87    pub schema_version_at_run: i64,
88    /// `true` when the common-case fast-path short-circuited
89    /// (transcript mtime ≤ most-recent `memory_store` write for
90    /// this agent_id). When `true`, parse + writes never ran and
91    /// the elapsed budget is the resolve-path + dedup-query sum.
92    pub fast_path_hit: bool,
93}
94
95impl RecoverReport {
96    /// New empty report scaffold; callers fill fields as work
97    /// happens. The `Instant` tracker pattern is captured in the
98    /// `RecoverTimer` helper below.
99    #[must_use]
100    pub fn new(host_kind: HostKind, schema_version: i64) -> Self {
101        Self {
102            transcript_path: None,
103            host_kind,
104            elapsed_ms: 0,
105            elapsed_ms_resolve_path: 0,
106            elapsed_ms_dedup_query: 0,
107            elapsed_ms_parse: 0,
108            elapsed_ms_writes: 0,
109            lines_total: 0,
110            lines_atomised: 0,
111            lines_skipped_dedup: 0,
112            lines_skipped_limit: 0,
113            memories_created: Vec::new(),
114            errors: Vec::new(),
115            schema_version_at_run: schema_version,
116            fast_path_hit: false,
117        }
118    }
119}
120
121/// Per-phase elapsed-ms accumulator. Lightweight wrapper around
122/// `Instant::now()` deltas so the recovery body can record each
123/// phase's wall-clock without scattering bookkeeping noise.
124pub struct RecoverTimer {
125    overall_start: Instant,
126    phase_start: Instant,
127}
128
129impl Default for RecoverTimer {
130    fn default() -> Self {
131        Self::new()
132    }
133}
134
135impl RecoverTimer {
136    #[must_use]
137    pub fn new() -> Self {
138        let now = Instant::now();
139        Self {
140            overall_start: now,
141            phase_start: now,
142        }
143    }
144
145    /// Return elapsed ms since the most recent `phase_lap()` call
146    /// (or, on first call, since construction). Resets the phase
147    /// anchor for the next call.
148    pub fn phase_lap(&mut self) -> u64 {
149        let now = Instant::now();
150        let ms =
151            u64::try_from(now.duration_since(self.phase_start).as_millis()).unwrap_or(u64::MAX);
152        self.phase_start = now;
153        ms
154    }
155
156    /// Total wall-clock from construction.
157    #[must_use]
158    pub fn overall_ms(&self) -> u64 {
159        u64::try_from(self.overall_start.elapsed().as_millis()).unwrap_or(u64::MAX)
160    }
161}
162
163/// Default cap on transcript lines atomised per recovery run. Bounds
164/// the gap-path work so a 1000-turn transcript can't blow the
165/// SessionStart-hook latency budget; the remainder is counted under
166/// `RecoverReport.lines_skipped_limit`. Used by
167/// [`RecoverOpts::for_session_start_hook`]; the CLI / MCP surfaces
168/// override via their `--limit` wire field.
169pub const DEFAULT_RECOVER_LIMIT: usize = 100;
170
171/// Cap on `RecoverReport.memories_created` IDs retained in `--quiet`
172/// mode. Bounds the JSON payload so SessionStart-hook log capture
173/// stays small regardless of how many memories a gap recovery wrote;
174/// the full set is still persisted to the DB, only the echoed-ID list
175/// is truncated.
176pub const QUIET_MEMORY_ID_PREVIEW_CAP: usize = 10;
177
178/// Per-call recovery options. Both surfaces (CLI + MCP) build one
179/// of these from their respective Args / Request shapes and pass
180/// it into [`recover_from_transcript`].
181#[derive(Debug, Clone)]
182pub struct RecoverOpts {
183    /// Which host's transcript format + path-resolver arm to use.
184    pub host: HostKind,
185    /// Explicit transcript path; when `None`, the resolver walks
186    /// the per-host candidate set and picks the most-recent.
187    pub transcript_override: Option<PathBuf>,
188    /// Filter to lines whose timestamp is at or after this RFC3339
189    /// instant. When `None`, recovery starts from the
190    /// most-recent-`memory_store` watermark for this agent_id.
191    pub since_iso: Option<String>,
192    /// Namespace to land recovered memories in. When `None`,
193    /// defaults to the agent's resolved default namespace per
194    /// `AppConfig`.
195    pub namespace: Option<String>,
196    /// Maximum number of transcript lines to atomise this run.
197    /// Excess lines are skipped + counted under
198    /// `lines_skipped_limit`. Defaults to [`DEFAULT_RECOVER_LIMIT`].
199    pub limit: usize,
200    /// When `true`, parse the transcript and emit the report but
201    /// write nothing to the DB. Used by `--dry-run` for operator
202    /// inspection.
203    pub dry_run: bool,
204    /// When `true`, drop every memory_id from `memories_created`
205    /// except the first [`QUIET_MEMORY_ID_PREVIEW_CAP`]; bounds the
206    /// JSON payload for SessionStart-hook log capture.
207    pub quiet: bool,
208    /// agent_id to attribute recovered memories to. Resolved from
209    /// the calling surface (CLI flag / MCP CallerContext / config).
210    pub agent_id: String,
211}
212
213impl RecoverOpts {
214    /// Sensible defaults for SessionStart-hook integration.
215    /// Callers override `agent_id` (required) and any other field
216    /// that diverges from the hook-friendly defaults.
217    #[must_use]
218    pub fn for_session_start_hook(host: HostKind, agent_id: String) -> Self {
219        Self {
220            host,
221            transcript_override: None,
222            since_iso: None,
223            namespace: None,
224            limit: DEFAULT_RECOVER_LIMIT,
225            dry_run: false,
226            quiet: true,
227            agent_id,
228        }
229    }
230}
231
232/// Canonical recovery handler. Both the CLI subcommand and the MCP
233/// tool dispatch into this function with a `RecoverOpts` they
234/// constructed from their respective wire shapes. The function
235/// returns a populated [`RecoverReport`] which both surfaces then
236/// serialize through their respective output paths.
237///
238/// **Performance contract** (see #1389 comment 4565477566):
239///
240/// - Common case (no gap): <100 ms p95 (fast-path short-circuit).
241/// - Gap case, 100 turns: <1 s p95 (raw observation INSERTs, no LLM call).
242/// - Gap case, 1000 turns: <5 s p95 with the default
243///   [`DEFAULT_RECOVER_LIMIT`] bounding the work; remainder logged
244///   under `lines_skipped_limit`.
245///
246/// **Failure semantics**: never panics on a parse error; surfaces
247/// errors via `RecoverReport.errors` and continues. SessionStart-hook
248/// integration depends on this — a transcript-parse exception MUST
249/// NOT wedge the operator's session boot.
250///
251/// # Errors
252///
253/// Returns an error ONLY when the underlying DB connection cannot
254/// be established (treating "no transcript found" as a benign empty
255/// report, not an error). The callers' `--quiet` path catches even
256/// the DB-open error and downgrades to a stderr WARN + exit 0.
257///
258/// # Panics
259///
260/// Does not panic.
261pub fn recover_from_transcript(
262    db_path: &Path,
263    opts: &RecoverOpts,
264) -> Result<RecoverReport, RecoverError> {
265    use parsers::TranscriptParser;
266    use parsers::claude_code_jsonl::ClaudeCodeJsonlParser;
267
268    let mut timer = RecoverTimer::new();
269    let schema_version = crate::storage::migrations::current_schema_version();
270    let mut report = RecoverReport::new(opts.host, schema_version);
271
272    // The DB-open path is the ONLY hard failure: every later error is
273    // surfaced via `report.errors` so a SessionStart-hook chain can't
274    // be wedged by a bad transcript line.
275    let conn = crate::storage::open(db_path).map_err(|e| RecoverError::DbOpen(e.to_string()))?;
276
277    // Step 1 — resolve the transcript. An explicit `--transcript`
278    // override bypasses the resolver; otherwise walk the per-host
279    // candidate set for the current working directory.
280    let path = match opts.transcript_override.clone() {
281        Some(p) => Some(p),
282        None => {
283            let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
284            match resolve_transcript(opts.host, &cwd) {
285                Ok(p) => p,
286                Err(e) => {
287                    report.errors.push(format!("path resolve failed: {e}"));
288                    None
289                }
290            }
291        }
292    };
293    let Some(path) = path else {
294        // No transcript located — a benign steady state (fresh box, or
295        // no agent has written a transcript for this cwd).
296        report.elapsed_ms_resolve_path = timer.phase_lap();
297        report.elapsed_ms = timer.overall_ms();
298        return Ok(report);
299    };
300    report.transcript_path = Some(path.clone());
301
302    let mtime = std::fs::metadata(&path)
303        .ok()
304        .and_then(|m| m.modified().ok());
305    report.elapsed_ms_resolve_path = timer.phase_lap();
306
307    // Step 2 — fast-path short-circuit. The watermark is the most
308    // recent `created_at` across ALL of this agent's memories (normal
309    // L1 stores included). When the transcript has not been modified
310    // since the agent last wrote a memory there is nothing to recover,
311    // so we skip the parse + write phases entirely.
312    // v0.7.0 R5.F5.3 (#1419) — route through the indexed VIRTUAL
313    // `agent_id_idx` column added in the v14 migration
314    // (`src/storage/migrations.rs:1174-1209`). The previous query
315    // used `json_extract(metadata, '$.agent_id') = ?1` which SQLite
316    // cannot index — degenerating to a full table scan on populated
317    // DBs (100k+ rows → blows the L2 fast-path <100 ms budget pinned
318    // by #1394). The VIRTUAL column has identical extraction
319    // semantics (CASE WHEN json_valid(metadata) THEN
320    // json_extract(metadata, '$.agent_id') ELSE NULL END) so this
321    // is a pure plan rewrite — same rows return, only the access
322    // path changes from SCAN to SEARCH via `idx_memories_agent_id`.
323    let watermark: Option<String> = conn
324        .query_row(
325            "SELECT MAX(created_at) FROM memories WHERE agent_id_idx = ?1",
326            rusqlite::params![&opts.agent_id],
327            |row| row.get::<_, Option<String>>(0),
328        )
329        .unwrap_or(None);
330    report.elapsed_ms_dedup_query = timer.phase_lap();
331
332    if let (Some(mtime), Some(watermark_iso)) = (mtime, watermark.as_deref()) {
333        if let Ok(watermark_dt) = chrono::DateTime::parse_from_rfc3339(watermark_iso) {
334            let mtime_dt: chrono::DateTime<chrono::Utc> = mtime.into();
335            if mtime_dt <= watermark_dt.with_timezone(&chrono::Utc) {
336                report.fast_path_hit = true;
337                report.elapsed_ms = timer.overall_ms();
338                return Ok(report);
339            }
340        }
341    }
342
343    // Step 3 — gap-path. The `transcript_line_dedup` table is the sole
344    // idempotency mechanism (each already-recovered line is skipped by
345    // its sha256 in step 4), so the parse is NOT pre-filtered by the
346    // memory watermark. Deriving `since` from the watermark would be
347    // unsound: recovered memories are stamped with their conversational
348    // turn timestamp (not wall-clock), and any normal L1 `memory_store`
349    // advances `MAX(created_at)` to wall-clock-now — either could push
350    // the watermark past an un-recovered gap turn and silently drop it.
351    // Only an explicit operator `--since` narrows the parse window.
352    let since = opts.since_iso.clone();
353    let turns = match ClaudeCodeJsonlParser.parse(&path, since.as_deref()) {
354        Ok(t) => t,
355        Err(e) => {
356            report.errors.push(format!("parse failed: {e}"));
357            report.elapsed_ms = timer.overall_ms();
358            return Ok(report);
359        }
360    };
361    report.lines_total = u32::try_from(turns.len()).unwrap_or(u32::MAX);
362    report.elapsed_ms_parse = timer.phase_lap();
363
364    // Step 4 — per-turn dedup + write. Each new turn becomes one
365    // observation memory + one `transcript_line_dedup` row, written
366    // atomically under BEGIN IMMEDIATE (mirroring the L4 storage tx in
367    // `src/mcp/tools/capture_turn.rs::handle_capture_turn`).
368    let namespace = opts
369        .namespace
370        .clone()
371        // v0.7.0 F-E4 fix (#1436): route through DEFAULT_NAMESPACE
372        // SSOT at src/lib.rs:266 instead of the bare literal.
373        .unwrap_or_else(|| crate::DEFAULT_NAMESPACE.to_string());
374    let host_kind = opts.host.as_str().to_string();
375
376    for turn in turns {
377        if usize::try_from(report.lines_atomised).unwrap_or(usize::MAX) >= opts.limit {
378            report.lines_skipped_limit += 1;
379            continue;
380        }
381
382        let Ok(raw_sha_bytes) = hex::decode(&turn.line_sha256_hex) else {
383            report.errors.push(format!(
384                "skipping turn with malformed sha256: {}",
385                turn.line_sha256_hex
386            ));
387            continue;
388        };
389
390        // #1573 — dedup key. The canonical key is
391        // `(host_session_id, host_turn_index)` when the host format
392        // provides both (parity with the L4 `memory_capture_turn`
393        // idempotency contract); otherwise the NORMALIZED-content
394        // hash, which is invariant under host re-serialization
395        // (whitespace, JSON key order). Pre-#1573 the key was the
396        // raw-line sha256 only, so the same turn re-serialized by a
397        // host upgrade re-atomised into a duplicate memory. The
398        // raw-line hash is still probed so rows written by pre-#1573
399        // recoveries keep deduping across the upgrade.
400        let sha_bytes =
401            hex::decode(turn.normalized_sha256_hex()).unwrap_or_else(|_| raw_sha_bytes.clone());
402        let already = find_existing_dedup(&conn, &turn, &raw_sha_bytes, &sha_bytes);
403        if already.is_some() {
404            report.lines_skipped_dedup += 1;
405            continue;
406        }
407
408        if opts.dry_run {
409            // Inspection mode: count the would-be write, persist nothing.
410            report.lines_atomised += 1;
411            continue;
412        }
413
414        match write_recovered_turn(
415            &conn, &turn, &sha_bytes, &namespace, &host_kind, &path, opts,
416        ) {
417            Ok(memory_id) => {
418                report.lines_atomised += 1;
419                report.memories_created.push(memory_id);
420            }
421            Err(e) => report.errors.push(e),
422        }
423    }
424
425    // Bound the JSON payload for SessionStart-hook log capture.
426    if opts.quiet && report.memories_created.len() > QUIET_MEMORY_ID_PREVIEW_CAP {
427        report
428            .memories_created
429            .truncate(QUIET_MEMORY_ID_PREVIEW_CAP);
430    }
431
432    report.elapsed_ms_writes = timer.phase_lap();
433    report.elapsed_ms = timer.overall_ms();
434    Ok(report)
435}
436
437/// Stable wire string for a parsed turn's role. Delegates to the
438/// [`parsers::TurnRole::as_str`] SSOT shared with the #1573
439/// normalized dedup hash.
440fn role_label(role: parsers::TurnRole) -> &'static str {
441    role.as_str()
442}
443
444/// #1573 — probe `transcript_line_dedup` for an existing capture of
445/// this turn. Checks, in order:
446///
447/// 1. `(host_session_id, host_turn_index)` — the canonical key when
448///    the host format provides both; also bridges to rows the L4
449///    `memory_capture_turn` path wrote for the same turn.
450/// 2. `sha256 IN (normalized, raw-line)` — the normalized-content
451///    hash this version writes, plus the raw-line hash pre-#1573
452///    recoveries wrote, so an upgrade never re-atomises
453///    already-recovered turns.
454fn find_existing_dedup(
455    conn: &rusqlite::Connection,
456    turn: &parsers::ParsedTurn,
457    raw_sha: &[u8],
458    norm_sha: &[u8],
459) -> Option<String> {
460    if let (Some(sid), Some(tix)) = (turn.host_session_id.as_deref(), turn.host_turn_index) {
461        let hit: Option<String> = conn
462            .query_row(
463                "SELECT memory_id FROM transcript_line_dedup \
464                 WHERE host_session_id = ?1 AND host_turn_index = ?2",
465                rusqlite::params![sid, tix],
466                |row| row.get(0),
467            )
468            .optional()
469            .unwrap_or(None);
470        if hit.is_some() {
471            return hit;
472        }
473    }
474    conn.query_row(
475        "SELECT memory_id FROM transcript_line_dedup WHERE sha256 IN (?1, ?2)",
476        rusqlite::params![norm_sha, raw_sha],
477        |row| row.get(0),
478    )
479    .optional()
480    .unwrap_or(None)
481}
482
483/// Write one recovered transcript turn as an `observation` memory plus
484/// its `transcript_line_dedup` row under a single BEGIN IMMEDIATE
485/// transaction. Mirrors the L4 storage transaction in
486/// `src/mcp/tools/capture_turn.rs::handle_capture_turn`: on any failure
487/// the transaction rolls back so an orphaned memory can never exist
488/// without its dedup row.
489fn write_recovered_turn(
490    conn: &rusqlite::Connection,
491    turn: &parsers::ParsedTurn,
492    sha_bytes: &[u8],
493    namespace: &str,
494    host_kind: &str,
495    transcript_path: &Path,
496    opts: &RecoverOpts,
497) -> Result<String, String> {
498    use crate::models::{Memory, MemoryKind, Tier};
499
500    let role = role_label(turn.role);
501    let now_iso = chrono::Utc::now().to_rfc3339();
502
503    // Prefer the turn's text; fall back to a tool-call summary so a
504    // tool_use-only turn still produces a non-empty observation.
505    let content = if turn.content_text.trim().is_empty() {
506        let briefs: Vec<String> = turn
507            .tool_calls
508            .iter()
509            .map(|tc| format!("{}: {}", tc.tool, tc.brief))
510            .collect();
511        format!("[tool calls] {}", briefs.join("; "))
512    } else {
513        turn.content_text.clone()
514    };
515
516    // The line sha256 makes the title unique per transcript line.
517    // `storage::insert` upserts on `(title, namespace)` and the dedup
518    // table guarantees one memory per line, so the only "same-title"
519    // case is a true re-recovery of the same line.
520    let title = format!(
521        "L2 recovered {role} turn {} @ {}",
522        turn.line_sha256_hex, turn.timestamp_iso
523    );
524
525    let mut tags = vec![
526        "captured-via-l2".to_string(),
527        "recovered-from-transcript".to_string(),
528        format!("host:{host_kind}"),
529        format!("role:{role}"),
530    ];
531    tags.push(match turn.role {
532        parsers::TurnRole::User => "operator-directive".to_string(),
533        parsers::TurnRole::Assistant => "agent-response".to_string(),
534        _ => "transcript-line".to_string(),
535    });
536
537    // User-role turns (operator directives) are the highest-value
538    // recovery target per the #1388 failure mode; bias their priority.
539    let priority = if turn.role == parsers::TurnRole::User {
540        6
541    } else {
542        5
543    };
544
545    let metadata = serde_json::json!({
546        "agent_id": opts.agent_id,
547        "host_kind": host_kind,
548        "transcript_path": transcript_path.display().to_string(),
549        "line_sha256": turn.line_sha256_hex,
550        "role": role,
551        "capture_layer": "L2",
552        "tool_calls": turn.tool_calls,
553    });
554
555    let mem = Memory {
556        id: uuid::Uuid::new_v4().to_string(),
557        tier: Tier::Long,
558        namespace: namespace.to_string(),
559        title,
560        content,
561        tags,
562        priority,
563        confidence: 1.0,
564        source: "recover".to_string(),
565        metadata,
566        // Use the turn's own timestamp so the recovered memory's
567        // timeline matches the original conversation.
568        created_at: turn.timestamp_iso.clone(),
569        updated_at: now_iso.clone(),
570        last_accessed_at: Some(now_iso),
571        memory_kind: MemoryKind::Observation,
572        ..Memory::default()
573    };
574
575    conn.execute_batch(crate::storage::connection::SQL_BEGIN_IMMEDIATE)
576        .map_err(|e| format!("TX_BEGIN_FAILED: {e}"))?;
577
578    let tx_result = (|| -> Result<String, String> {
579        let inserted_id =
580            crate::storage::insert(conn, &mem).map_err(|e| format!("MEMORY_INSERT_FAILED: {e}"))?;
581        let recovered_at_ms = chrono::Utc::now().timestamp_millis();
582        // #1573 — `sha_bytes` is the normalized-content hash and the
583        // session/turn identity columns are populated when the host
584        // format provided them, so future recoveries (and the L4
585        // capture path) can dedup on the canonical composite key.
586        conn.execute(
587            "INSERT INTO transcript_line_dedup \
588             (sha256, memory_id, host_kind, transcript_path, \
589              host_session_id, host_turn_index, recovered_at) \
590             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
591            rusqlite::params![
592                sha_bytes,
593                inserted_id,
594                host_kind,
595                transcript_path.display().to_string(),
596                turn.host_session_id,
597                turn.host_turn_index,
598                recovered_at_ms,
599            ],
600        )
601        .map_err(|e| format!("DEDUP_INSERT_FAILED: {e}"))?;
602        Ok(inserted_id)
603    })();
604
605    match tx_result {
606        Ok(memory_id) => {
607            conn.execute_batch(crate::storage::connection::SQL_COMMIT)
608                .map_err(|e| format!("TX_COMMIT_FAILED: {e}"))?;
609            Ok(memory_id)
610        }
611        Err(e) => {
612            let _ = conn.execute_batch(crate::storage::connection::SQL_ROLLBACK);
613            Err(e)
614        }
615    }
616}
617
618/// Errors that escape [`recover_from_transcript`]. Most failure
619/// modes are surfaced under `RecoverReport.errors` rather than
620/// returned — see the function's "Failure semantics" docstring.
621/// This enum carries only failures that can't be made graceful
622/// (e.g., the DB-open path failed before any report could be built).
623#[derive(Debug)]
624pub enum RecoverError {
625    /// DB connection could not be established.
626    DbOpen(String),
627    /// Invalid `RecoverOpts` (e.g., conflicting `--since` + explicit
628    /// timestamps in `--transcript`).
629    InvalidOpts(String),
630}
631
632impl std::fmt::Display for RecoverError {
633    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
634        match self {
635            Self::DbOpen(msg) => write!(f, "recover: db open failed: {msg}"),
636            Self::InvalidOpts(msg) => write!(f, "recover: invalid opts: {msg}"),
637        }
638    }
639}
640
641impl std::error::Error for RecoverError {}
642
643#[cfg(test)]
644mod tests {
645    use super::*;
646    use std::io::Write;
647
648    const USER_LINE_1: &str = r#"{"timestamp":"2026-05-28T12:00:00Z","type":"user","message":{"content":[{"type":"text","text":"operator directive one"}]}}"#;
649    const USER_LINE_2: &str = r#"{"timestamp":"2026-05-28T12:01:00Z","type":"user","message":{"content":[{"type":"text","text":"operator directive two"}]}}"#;
650    const USER_LINE_3: &str = r#"{"timestamp":"2026-05-28T12:02:00Z","type":"user","message":{"content":[{"type":"text","text":"operator directive three"}]}}"#;
651
652    /// In-tree scratch root honoring the project no-`/tmp` HARD RULE.
653    /// Tempdirs land under the repo's gitignored `.local-runs/`, never
654    /// on a tmpfs path.
655    fn fresh_dir() -> tempfile::TempDir {
656        let root = std::env::current_dir()
657            .unwrap_or_else(|_| PathBuf::from("."))
658            .join(".local-runs")
659            .join("issue-1389-recover-unit-test");
660        std::fs::create_dir_all(&root).ok();
661        tempfile::tempdir_in(&root).expect("tempdir under .local-runs")
662    }
663
664    fn write_transcript(dir: &Path, lines: &[&str]) -> PathBuf {
665        let p = dir.join("session.jsonl");
666        let mut f = std::fs::File::create(&p).unwrap();
667        for l in lines {
668            writeln!(f, "{l}").unwrap();
669        }
670        f.flush().unwrap();
671        p
672    }
673
674    fn base_opts(transcript: PathBuf, agent_id: &str) -> RecoverOpts {
675        RecoverOpts {
676            host: HostKind::ClaudeCode,
677            transcript_override: Some(transcript),
678            since_iso: None,
679            namespace: Some("test-recover".to_string()),
680            limit: DEFAULT_RECOVER_LIMIT,
681            dry_run: false,
682            quiet: false,
683            agent_id: agent_id.to_string(),
684        }
685    }
686
687    #[test]
688    fn gap_path_writes_one_memory_per_turn() {
689        let dir = fresh_dir();
690        let db = dir.path().join("mem.db");
691        let transcript = write_transcript(dir.path(), &[USER_LINE_1, USER_LINE_2]);
692        let report = recover_from_transcript(&db, &base_opts(transcript, "ai:test:gap")).unwrap();
693        assert!(!report.fast_path_hit);
694        assert_eq!(report.lines_total, 2);
695        assert_eq!(report.lines_atomised, 2);
696        assert_eq!(report.memories_created.len(), 2);
697        assert!(report.errors.is_empty(), "errors: {:?}", report.errors);
698    }
699
700    #[test]
701    fn rerun_dedups_already_recovered_turns() {
702        let dir = fresh_dir();
703        let db = dir.path().join("mem.db");
704        let transcript = write_transcript(dir.path(), &[USER_LINE_1, USER_LINE_2]);
705        let opts = base_opts(transcript, "ai:test:dedup");
706        let first = recover_from_transcript(&db, &opts).unwrap();
707        assert_eq!(first.lines_atomised, 2);
708        let second = recover_from_transcript(&db, &opts).unwrap();
709        // Same transcript content -> every line dedup-skipped, nothing new.
710        assert_eq!(second.lines_atomised, 0);
711        assert_eq!(second.lines_skipped_dedup, 2);
712        assert!(second.memories_created.is_empty());
713    }
714
715    #[test]
716    fn limit_caps_atomised_lines() {
717        let dir = fresh_dir();
718        let db = dir.path().join("mem.db");
719        let transcript = write_transcript(dir.path(), &[USER_LINE_1, USER_LINE_2, USER_LINE_3]);
720        let mut opts = base_opts(transcript, "ai:test:limit");
721        opts.limit = 2;
722        let report = recover_from_transcript(&db, &opts).unwrap();
723        assert_eq!(report.lines_atomised, 2);
724        assert_eq!(report.lines_skipped_limit, 1);
725    }
726
727    #[test]
728    fn dry_run_persists_nothing() {
729        let dir = fresh_dir();
730        let db = dir.path().join("mem.db");
731        let transcript = write_transcript(dir.path(), &[USER_LINE_1, USER_LINE_2]);
732        let mut opts = base_opts(transcript, "ai:test:dry");
733        opts.dry_run = true;
734        let report = recover_from_transcript(&db, &opts).unwrap();
735        assert_eq!(report.lines_atomised, 2, "would-be writes are counted");
736        assert!(report.memories_created.is_empty());
737        let conn = crate::storage::open(&db).unwrap();
738        let n: i64 = conn
739            .query_row("SELECT COUNT(*) FROM transcript_line_dedup", [], |r| {
740                r.get(0)
741            })
742            .unwrap();
743        assert_eq!(n, 0, "dry-run must not write dedup rows");
744    }
745
746    #[test]
747    fn fast_path_short_circuits_when_watermark_newer_than_mtime() {
748        use crate::models::{Memory, MemoryKind, Tier};
749        let dir = fresh_dir();
750        let db = dir.path().join("mem.db");
751        let transcript = write_transcript(dir.path(), &[USER_LINE_1]);
752        // Seed a memory for this agent with a far-future created_at so
753        // the watermark exceeds the transcript mtime.
754        {
755            let conn = crate::storage::open(&db).unwrap();
756            let mem = Memory {
757                id: uuid::Uuid::new_v4().to_string(),
758                tier: Tier::Long,
759                namespace: "test-recover".to_string(),
760                title: "watermark seed".to_string(),
761                content: "seed".to_string(),
762                priority: 5,
763                confidence: 1.0,
764                source: "test".to_string(),
765                metadata: serde_json::json!({"agent_id": "ai:test:fast"}),
766                created_at: "2999-01-01T00:00:00Z".to_string(),
767                updated_at: "2999-01-01T00:00:00Z".to_string(),
768                memory_kind: MemoryKind::Observation,
769                ..Memory::default()
770            };
771            crate::storage::insert(&conn, &mem).unwrap();
772        }
773        let report = recover_from_transcript(&db, &base_opts(transcript, "ai:test:fast")).unwrap();
774        assert!(report.fast_path_hit, "expected fast-path short-circuit");
775        assert_eq!(report.lines_atomised, 0);
776    }
777
778    /// #1573 — the same turn re-serialized by the host with different
779    /// whitespace and JSON key order MUST dedup. Pre-fix the dedup key
780    /// was the raw-line sha256, so any byte-level re-serialization
781    /// re-atomised the turn into a duplicate memory.
782    #[test]
783    fn reserialized_turn_different_whitespace_key_order_dedups_1573() {
784        // Semantically identical to USER_LINE_1 (same sessionId-less
785        // shape, same timestamp/role/content) but with reordered keys
786        // and extra whitespace — different raw bytes, same turn.
787        const USER_LINE_1_RESERIALIZED: &str = r#"{ "type": "user",  "message": {"content": [ {"text": "operator directive one", "type": "text"} ]}, "timestamp": "2026-05-28T12:00:00Z" }"#;
788
789        let dir = fresh_dir();
790        let db = dir.path().join("mem.db");
791        let t1 = write_transcript(dir.path(), &[USER_LINE_1]);
792        let first = recover_from_transcript(&db, &base_opts(t1, "ai:test:reser")).unwrap();
793        assert_eq!(first.lines_atomised, 1);
794
795        // Same turn, re-serialized. Must be a dedup skip, not a new memory.
796        let p2 = dir.path().join("session-reserialized.jsonl");
797        std::fs::write(&p2, format!("{USER_LINE_1_RESERIALIZED}\n")).unwrap();
798        let second = recover_from_transcript(&db, &base_opts(p2, "ai:test:reser")).unwrap();
799        assert_eq!(
800            second.lines_atomised, 0,
801            "re-serialized turn must not re-atomise: {second:?}"
802        );
803        assert_eq!(second.lines_skipped_dedup, 1);
804        assert!(second.memories_created.is_empty());
805    }
806
807    /// #1573 — `(host_session_id, host_turn_index)` is the canonical
808    /// dedup key when the host provides both (parity with the L4
809    /// `memory_capture_turn` idempotency contract), and rows written
810    /// by pre-#1573 recoveries (raw-line sha256) still dedup.
811    #[test]
812    fn session_turn_composite_key_and_raw_sha_backcompat_dedup_1573() {
813        let dir = fresh_dir();
814        let db = dir.path().join("mem.db");
815        let conn = crate::storage::open(&db).unwrap();
816
817        let turn = parsers::ParsedTurn {
818            timestamp_iso: "2026-05-28T12:00:00Z".to_string(),
819            role: parsers::TurnRole::User,
820            content_text: "operator directive one".to_string(),
821            tool_calls: vec![],
822            line_sha256_hex: "aa".repeat(32),
823            host_session_id: Some("sess-1573".to_string()),
824            host_turn_index: Some(3),
825        };
826        let raw_sha = hex::decode(&turn.line_sha256_hex).unwrap();
827        let norm_sha = hex::decode(turn.normalized_sha256_hex()).unwrap();
828
829        // Nothing recorded yet — no hit on any key.
830        assert!(find_existing_dedup(&conn, &turn, &raw_sha, &norm_sha).is_none());
831
832        // An L4-style row for the same (session, turn) under a DIFFERENT
833        // sha (the L4 payload hash) — the composite key must hit.
834        conn.execute(
835            "INSERT INTO transcript_line_dedup \
836             (sha256, memory_id, host_kind, transcript_path, \
837              host_session_id, host_turn_index, recovered_at) \
838             VALUES (?1, 'mem-l4', 'claude-code', NULL, 'sess-1573', 3, 0)",
839            rusqlite::params![vec![0x01_u8; 32]],
840        )
841        .unwrap();
842        assert_eq!(
843            find_existing_dedup(&conn, &turn, &raw_sha, &norm_sha).as_deref(),
844            Some("mem-l4"),
845            "composite (host_session_id, host_turn_index) key must dedup"
846        );
847
848        // A different turn index of the same session must NOT hit.
849        let other = parsers::ParsedTurn {
850            host_turn_index: Some(4),
851            content_text: "operator directive two".to_string(),
852            ..turn.clone()
853        };
854        let other_norm = hex::decode(other.normalized_sha256_hex()).unwrap();
855        assert!(find_existing_dedup(&conn, &other, &raw_sha, &other_norm).is_none());
856
857        // Back-compat: a pre-#1573 row keyed on the RAW line sha (NULL
858        // session columns) must still dedup the same turn.
859        let legacy = parsers::ParsedTurn {
860            host_session_id: None,
861            host_turn_index: None,
862            ..turn.clone()
863        };
864        conn.execute(
865            "INSERT INTO transcript_line_dedup \
866             (sha256, memory_id, host_kind, transcript_path, \
867              host_session_id, host_turn_index, recovered_at) \
868             VALUES (?1, 'mem-legacy', 'claude-code', NULL, NULL, NULL, 0)",
869            rusqlite::params![&raw_sha],
870        )
871        .unwrap();
872        let legacy_norm = hex::decode(legacy.normalized_sha256_hex()).unwrap();
873        assert_eq!(
874            find_existing_dedup(&conn, &legacy, &raw_sha, &legacy_norm).as_deref(),
875            Some("mem-legacy"),
876            "pre-#1573 raw-line sha rows must keep deduping"
877        );
878    }
879
880    #[test]
881    fn missing_transcript_is_graceful() {
882        let dir = fresh_dir();
883        let db = dir.path().join("mem.db");
884        let missing = dir.path().join("does-not-exist.jsonl");
885        let report = recover_from_transcript(&db, &base_opts(missing, "ai:test:missing")).unwrap();
886        // Parse fails gracefully -> error recorded, no panic, no writes.
887        assert_eq!(report.lines_atomised, 0);
888        assert!(!report.errors.is_empty());
889    }
890
891    // Coverage uplift (2026-06-12): timer Default, opts constructor,
892    // error Display arms, db-open failure, cwd-resolve (no override),
893    // and tool-call-only content in write_recovered_turn.
894
895    #[test]
896    fn recover_timer_default_and_phase_lap() {
897        let mut t = RecoverTimer::default();
898        let _ = t.phase_lap();
899        let _ = t.overall_ms();
900    }
901
902    #[test]
903    fn recover_report_new_initializes_host_and_schema() {
904        let r = RecoverReport::new(HostKind::Codex, 57);
905        assert_eq!(r.host_kind, HostKind::Codex);
906        assert_eq!(r.schema_version_at_run, 57);
907        assert!(!r.fast_path_hit);
908    }
909
910    #[test]
911    fn for_session_start_hook_defaults() {
912        let opts = RecoverOpts::for_session_start_hook(HostKind::ClaudeCode, "ai:hook".to_string());
913        assert_eq!(opts.host, HostKind::ClaudeCode);
914        assert_eq!(opts.agent_id, "ai:hook");
915        assert_eq!(opts.limit, DEFAULT_RECOVER_LIMIT);
916        assert!(opts.quiet);
917        assert!(!opts.dry_run);
918        assert!(opts.transcript_override.is_none());
919        assert!(opts.since_iso.is_none());
920        assert!(opts.namespace.is_none());
921    }
922
923    #[test]
924    fn recover_error_display_arms() {
925        assert_eq!(
926            RecoverError::DbOpen("boom".to_string()).to_string(),
927            "recover: db open failed: boom"
928        );
929        assert_eq!(
930            RecoverError::InvalidOpts("bad".to_string()).to_string(),
931            "recover: invalid opts: bad"
932        );
933        let e = RecoverError::DbOpen("x".to_string());
934        let _: &dyn std::error::Error = &e;
935        assert!(format!("{e:?}").contains("DbOpen"));
936    }
937
938    #[test]
939    fn db_open_failure_returns_db_open_error() {
940        let dir = fresh_dir();
941        let file_as_parent = dir.path().join("not-a-dir");
942        std::fs::write(&file_as_parent, b"x").unwrap();
943        let bad_db = file_as_parent.join("child.db");
944        let transcript = write_transcript(dir.path(), &[USER_LINE_1]);
945        let err =
946            recover_from_transcript(&bad_db, &base_opts(transcript, "ai:test:dberr")).unwrap_err();
947        assert!(matches!(err, RecoverError::DbOpen(_)), "got {err:?}");
948    }
949
950    #[test]
951    fn no_transcript_override_resolves_via_cwd_and_returns_none_gracefully() {
952        let dir = fresh_dir();
953        let db = dir.path().join("mem.db");
954        let opts = RecoverOpts {
955            host: HostKind::ClaudeCode,
956            transcript_override: None,
957            since_iso: None,
958            namespace: Some("test-recover".to_string()),
959            limit: DEFAULT_RECOVER_LIMIT,
960            dry_run: false,
961            quiet: false,
962            agent_id: "ai:test:cwd".to_string(),
963        };
964        let report = recover_from_transcript(&db, &opts).unwrap();
965        assert!(report.errors.is_empty() || report.transcript_path.is_some());
966    }
967
968    #[test]
969    fn tool_call_only_turn_produces_tool_calls_content() {
970        let dir = fresh_dir();
971        let db = dir.path().join("mem.db");
972        let tool_only = r#"{"timestamp":"2026-05-28T13:00:00Z","type":"assistant","message":{"content":[{"type":"tool_use","name":"Bash","input":{"command":"ls"}}]}}"#;
973        let transcript = write_transcript(dir.path(), &[tool_only]);
974        let report =
975            recover_from_transcript(&db, &base_opts(transcript, "ai:test:toolonly")).unwrap();
976        assert_eq!(report.lines_atomised, 1, "errors: {:?}", report.errors);
977        let conn = crate::storage::open(&db).unwrap();
978        let content: String = conn
979            .query_row(
980                "SELECT content FROM memories WHERE id = ?1",
981                rusqlite::params![&report.memories_created[0]],
982                |r| r.get(0),
983            )
984            .unwrap();
985        assert!(content.starts_with("[tool calls]"), "got: {content}");
986        assert!(content.contains("Bash"));
987    }
988
989    #[test]
990    fn quiet_mode_truncates_memory_id_preview() {
991        let dir = fresh_dir();
992        let db = dir.path().join("mem.db");
993        let lines: Vec<String> = (0..(QUIET_MEMORY_ID_PREVIEW_CAP + 5))
994            .map(|i| {
995                format!(
996                    r#"{{"timestamp":"2026-05-28T12:{:02}:00Z","type":"user","message":{{"content":[{{"type":"text","text":"directive {i}"}}]}}}}"#,
997                    i % 60
998                )
999            })
1000            .collect();
1001        let refs: Vec<&str> = lines.iter().map(String::as_str).collect();
1002        let transcript = write_transcript(dir.path(), &refs);
1003        let mut opts = base_opts(transcript, "ai:test:quiet");
1004        opts.quiet = true;
1005        let report = recover_from_transcript(&db, &opts).unwrap();
1006        assert!(usize::try_from(report.lines_atomised).unwrap() > QUIET_MEMORY_ID_PREVIEW_CAP);
1007        assert_eq!(
1008            report.memories_created.len(),
1009            QUIET_MEMORY_ID_PREVIEW_CAP,
1010            "quiet mode must cap the echoed id list"
1011        );
1012    }
1013}