Skip to main content

pond/adapter/
mod.rs

1//! Source-adapter seam.
2//!
3//! Pond ingests sessions from many runtimes. The seam splits in two:
4//!
5//! - [`AdapterFactory`] is the stateless face every format publishes once,
6//!   collected by [`registry`]. It knows how to construct configured adapters
7//!   from an opaque JSON config blob ([`AdapterFactory::open`]) and how to
8//!   probe the user's environment for a default config
9//!   ([`AdapterFactory::probe_default`]).
10//! - [`Adapter`] is the live, configured instance. Its only job is
11//!   [`Adapter::events`]: stream canonical [`IngestEvent`]s in append-only
12//!   order per session. The "source" is opaque to the seam - a directory
13//!   tree, an HTTP endpoint, a database, an archive file.
14//!
15//! Concrete implementations live in `adapter/<format>.rs` and are tied
16//! together by [`registry`]. A new adapter is one file plus one line in the
17//! registry; no central dispatch table to edit.
18
19use std::path::{Path, PathBuf};
20
21use chrono::{DateTime, Utc};
22use serde_json::Value;
23use tokio_stream::{Stream, StreamExt};
24
25use crate::{
26    sessions::{IngestEvent, MessageWithParts, SessionWithMessages},
27    wire::ProviderOptions,
28};
29
30mod claude_ai_export;
31mod claude_code;
32mod claude_desktop_app;
33mod codex_cli;
34mod discovery;
35pub mod extract;
36mod jsonl;
37mod opencode;
38mod pi_coding_agent;
39
40pub use claude_ai_export::{ClaudeAiExportAdapter, ClaudeAiExportFactory};
41pub use claude_code::{ClaudeCodeAdapter, ClaudeCodeFactory};
42pub use claude_desktop_app::{ClaudeDesktopAppAdapter, ClaudeDesktopAppFactory};
43pub use codex_cli::{CodexCliAdapter, CodexCliFactory};
44pub use discovery::{
45    Candidate, PromptOutcome, apply_to_doc, discover, persist_accept, persist_decline,
46    probe_unconfigured, prompt_and_persist, prompt_each,
47};
48pub use extract::{
49    Extracted, Source, extract_bool, extract_compact_repr, extract_raw_record, extract_self_str,
50    extract_str, extract_value,
51};
52pub use opencode::{OpencodeAdapter, OpencodeFactory};
53pub use pi_coding_agent::{PiCodingAgentAdapter, PiCodingAgentFactory};
54
55/// Stateless face of an adapter type: how the registry knows about it without
56/// instantiating it. One implementation per known format, registered in
57/// [`registry`].
58pub trait AdapterFactory: Send + Sync {
59    /// Stable short name. Used as the `[sources.<name>]` config key, the
60    /// `pond sync <name>` positional arg, and the `Session.source_agent`
61    /// value emitted by the corresponding adapter.
62    fn name(&self) -> &'static str;
63
64    /// Open a configured adapter from a JSON-shaped config blob. The shape is
65    /// owned by each factory: filesystem adapters expect `{ "path": "..." }`,
66    /// API-backed adapters expect `{ "endpoint": "...", "auth_token": "..." }`,
67    /// etc. The seam doesn't know or care. A factory rejects a bad blob with
68    /// [`AdapterErrorKind::Config`].
69    fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError>;
70
71    /// Probe the user's environment for a default config. Returns the JSON
72    /// blob that would go into `[sources.<name>]` if the picker writes it
73    /// back. Filesystem adapters check their canonical install path under
74    /// `env.home`; adapters with no auto-discovery rule (e.g. API adapters
75    /// that need explicit creds) return `None`.
76    fn probe_default(&self, env: &Env) -> Option<Value>;
77
78    /// Restore one canonical session into this adapter's native file layout.
79    fn serialize(
80        &self,
81        session: &SessionWithMessages,
82        fidelity: RestoreFidelity,
83    ) -> Result<Vec<RestoredFile>, AdapterError>;
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum RestoreFidelity {
88    Native,
89    Foreign,
90}
91
92#[derive(Debug, Clone, PartialEq, Eq)]
93pub struct RestoredFile {
94    pub relative_path: PathBuf,
95    pub bytes: Vec<u8>,
96    /// Fidelity actually served when this file was produced. Equal to the
97    /// requested fidelity unless the adapter had to downgrade (e.g. caller
98    /// asked `Native` but the session lacks a stored `raw_record`, so the
99    /// adapter served `Foreign`). spec.md#adapter-native-restore-lossless:
100    /// native may be impossible on older logs; the signal lets the CLI warn
101    /// rather than silently degrade.
102    pub actual_fidelity: RestoreFidelity,
103}
104
105impl RestoredFile {
106    pub(crate) fn new(
107        relative_path: impl Into<PathBuf>,
108        bytes: Vec<u8>,
109        actual_fidelity: RestoreFidelity,
110    ) -> Self {
111        Self {
112            relative_path: relative_path.into(),
113            bytes,
114            actual_fidelity,
115        }
116    }
117}
118
119/// Live, configured adapter instance. Holds whatever handle the source needs
120/// (an open directory root, an HTTP client + auth, a database connection)
121/// for the lifetime of its event stream.
122pub trait Adapter: Send + Sync {
123    /// Stream every canonical event for every session this adapter knows
124    /// about, in append-only order per session. The stream borrows `self`
125    /// so callers can pass `&adapter` or hold a `Box<dyn Adapter>` and
126    /// invoke this through `as_ref()`.
127    fn events(&self) -> EventStream<'_> {
128        let stream = self.events_with(&NoopOracle);
129        Box::pin(stream.filter_map(|res| match res {
130            Ok(AdapterYield::Event(event)) => Some(Ok(event)),
131            Ok(AdapterYield::Skipped { .. }) => None,
132            Err(error) => Some(Err(error)),
133        }))
134    }
135
136    /// Count how many sessions [`Self::events`] will produce, used by the
137    /// CLI bar to set its length up front. A filesystem adapter walks its
138    /// root and counts `.jsonl` files; an API adapter calls its list
139    /// endpoint. Cheap and best-effort: errors here only mean we run with
140    /// an unknown total (the bar still ticks per session), so callers
141    /// fall back to a rolling counter rather than failing the sync.
142    fn discover(&self) -> DiscoverFuture<'_>;
143
144    /// Stream events with a [`SkipOracle`] the adapter MAY consult to
145    /// short-circuit per-session re-decoding (spec.md#adapter-integrity-event-ordering). Default impl
146    /// ignores the oracle.
147    fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a>;
148}
149
150/// Per-session watermark lookup: when did pond last write this session?
151/// Backed by Lance's `_row_last_updated_at_version` joined to the manifest
152/// commit timestamp (spec.md#adapter-integrity-event-ordering). Adapter compares this to the source
153/// file's mtime to decide whether to re-decode.
154pub trait SkipOracle: Send + Sync {
155    fn last_ingested_at(&self, session_id: &str) -> Option<DateTime<Utc>>;
156
157    /// Fast-path hint: the oracle has no watermarks at all (first ingest or
158    /// `NoopOracle`). Lets adapters skip the per-file work needed to ask
159    /// `last_ingested_at` - typically the JSONL header peek + driver parse.
160    /// Defaults to `false` so existing oracles stay correct without changes.
161    fn is_empty(&self) -> bool {
162        false
163    }
164}
165
166/// `SkipOracle` that always returns `None`. Used by tests and benches that
167/// don't want skip behavior interfering with their assertions.
168#[derive(Debug, Default, Clone, Copy)]
169pub struct NoopOracle;
170
171impl SkipOracle for NoopOracle {
172    fn last_ingested_at(&self, _session_id: &str) -> Option<DateTime<Utc>> {
173        None
174    }
175
176    fn is_empty(&self) -> bool {
177        true
178    }
179}
180
181#[derive(Debug, Clone)]
182pub enum AdapterYield {
183    Event(IngestEvent),
184    Skipped {
185        /// `None` for files that never yield a session id (empty `.jsonl`).
186        session_id: Option<String>,
187        project: Option<String>,
188        reason: SkipReason,
189    },
190}
191
192#[derive(Debug, Clone, PartialEq, Eq)]
193pub enum SkipReason {
194    Fresh,
195    /// File produced no importable session (empty `.jsonl`, sidecar-only rows,
196    /// or an unextractable header). Benign: counted, never an error or a
197    /// per-event drop. The underlying cause is logged at `-vv` (debug) verbosity.
198    Empty,
199    /// File is structurally a known sidecar whose specific shape this adapter
200    /// version can't ingest. Surfaced as a visible, counted failure - NOT a
201    /// benign skip - so the gap is actionable and the file is never folded into
202    /// another session under a borrowed id. The payload is the user-facing
203    /// reason naming the file and the fix.
204    Unsupported(String),
205}
206
207pub type AdapterYieldStream<'a> =
208    std::pin::Pin<Box<dyn Stream<Item = Result<AdapterYield, AdapterError>> + Send + 'a>>;
209
210/// Boxed future returning the number of sessions an adapter will emit. The
211/// shape mirrors [`EventStream`] - one alias per async trait method so the
212/// trait stays `dyn`-compatible without per-adapter associated types.
213pub type DiscoverFuture<'a> =
214    std::pin::Pin<Box<dyn std::future::Future<Output = Result<usize, AdapterError>> + Send + 'a>>;
215
216/// Environment slice handed to [`AdapterFactory::probe_default`]. Kept
217/// deliberately small - just `home`, because env-var lookups for API creds
218/// are unreliable and most adapters with API backends should require
219/// explicit config rather than opportunistic env reads.
220pub struct Env {
221    pub home: PathBuf,
222}
223
224impl Env {
225    /// Read `home` from the `HOME` env var. Returns `None` when `HOME` is
226    /// unset (CI, post-install hooks, sandboxed runs).
227    pub fn from_env() -> Option<Self> {
228        let home = std::env::var_os("HOME")?;
229        Some(Self {
230            home: PathBuf::from(home),
231        })
232    }
233
234    /// Construct an `Env` with an explicit home. Tests use this to inject a
235    /// `TempDir`-backed home without touching the process env.
236    pub fn with_home(home: impl Into<PathBuf>) -> Self {
237        Self { home: home.into() }
238    }
239}
240
241/// Boxed, `Send`-only stream of [`IngestEvent`]s with one shared error type.
242/// The lifetime parameter lets future adapters borrow from their config; for
243/// `self: Box<Self>` impls the lifetime collapses to `'static`.
244pub type EventStream<'a> =
245    std::pin::Pin<Box<dyn Stream<Item = Result<IngestEvent, AdapterError>> + Send + 'a>>;
246
247/// One error type for every adapter. Each call site tags the error with the
248/// adapter's name (so multi-adapter syncs can attribute failures) and a
249/// `location` string the operator can act on (file path, URL, line number,
250/// config key, ...). The `kind` carries the underlying class.
251#[derive(Debug)]
252pub struct AdapterError {
253    pub adapter: &'static str,
254    pub location: String,
255    pub kind: AdapterErrorKind,
256}
257
258#[derive(Debug)]
259pub enum AdapterErrorKind {
260    /// Filesystem / network IO at `location`.
261    Io(std::io::Error),
262    /// JSON parse error at line `line` inside `location`.
263    Parse {
264        line: usize,
265        source: serde_json::Error,
266    },
267    /// Format-specific shape error: missing required field, unknown role,
268    /// unsupported record type. The `String` is operator-facing.
269    Schema(String),
270    /// `AdapterFactory::open` rejected its config blob.
271    Config(String),
272    /// HTTP / RPC / timeout error from an API-backed adapter.
273    Transport(String),
274    /// Auth failure from an API-backed adapter (bad token, expired creds).
275    Auth(String),
276}
277
278impl AdapterError {
279    pub fn io(adapter: &'static str, location: impl Into<String>, source: std::io::Error) -> Self {
280        Self {
281            adapter,
282            location: location.into(),
283            kind: AdapterErrorKind::Io(source),
284        }
285    }
286
287    pub fn parse(
288        adapter: &'static str,
289        location: impl Into<String>,
290        line: usize,
291        source: serde_json::Error,
292    ) -> Self {
293        Self {
294            adapter,
295            location: location.into(),
296            kind: AdapterErrorKind::Parse { line, source },
297        }
298    }
299
300    pub fn schema(
301        adapter: &'static str,
302        location: impl Into<String>,
303        message: impl Into<String>,
304    ) -> Self {
305        Self {
306            adapter,
307            location: location.into(),
308            kind: AdapterErrorKind::Schema(message.into()),
309        }
310    }
311
312    pub fn config(adapter: &'static str, message: impl Into<String>) -> Self {
313        Self {
314            adapter,
315            location: "config".to_owned(),
316            kind: AdapterErrorKind::Config(message.into()),
317        }
318    }
319}
320
321impl std::fmt::Display for AdapterError {
322    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
323        match &self.kind {
324            AdapterErrorKind::Io(source) => {
325                write!(
326                    formatter,
327                    "{} io error at {}: {source}",
328                    self.adapter, self.location
329                )
330            }
331            AdapterErrorKind::Parse { line, source } => write!(
332                formatter,
333                "{} json parse error at {}:{line}: {source}",
334                self.adapter, self.location,
335            ),
336            AdapterErrorKind::Schema(message) => {
337                write!(
338                    formatter,
339                    "{} schema error at {}: {message}",
340                    self.adapter, self.location
341                )
342            }
343            AdapterErrorKind::Config(message) => {
344                write!(formatter, "{} config error: {message}", self.adapter)
345            }
346            AdapterErrorKind::Transport(message) => write!(
347                formatter,
348                "{} transport error at {}: {message}",
349                self.adapter, self.location,
350            ),
351            AdapterErrorKind::Auth(message) => {
352                write!(formatter, "{} auth error: {message}", self.adapter)
353            }
354        }
355    }
356}
357
358impl std::error::Error for AdapterError {
359    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
360        match &self.kind {
361            AdapterErrorKind::Io(source) => Some(source),
362            AdapterErrorKind::Parse { source, .. } => Some(source),
363            _ => None,
364        }
365    }
366}
367
368/// The static, ordered registry of every adapter pond knows. A new adapter
369/// adds one `&Factory` here plus one file under `src/adapter/`. Order is the
370/// order discovery presents to the operator.
371pub fn registry() -> &'static [&'static dyn AdapterFactory] {
372    &[
373        &ClaudeCodeFactory,
374        &ClaudeDesktopAppFactory,
375        &ClaudeAiExportFactory,
376        &CodexCliFactory,
377        &OpencodeFactory,
378        &PiCodingAgentFactory,
379    ]
380}
381
382/// Look up a factory by name. Returns `None` for unknown names; callers
383/// usually wrap that in a clear error using [`known_names`].
384pub fn by_name(name: &str) -> Option<&'static dyn AdapterFactory> {
385    registry().iter().copied().find(|f| f.name() == name)
386}
387
388/// The names of every registered adapter. Drives error messages
389/// ("unknown adapter X; known: ...") and the discovery picker labels.
390pub fn known_names() -> Vec<&'static str> {
391    registry().iter().map(|f| f.name()).collect()
392}
393
394/// Probe every adapter for a default config under `env.home`. Returns
395/// `(name, default_config)` pairs in registry order, skipping adapters whose
396/// `probe_default` returned `None`. The picker shows these to the operator.
397pub fn probe_all(env: &Env) -> Vec<(&'static str, Value)> {
398    registry()
399        .iter()
400        .filter_map(|factory| factory.probe_default(env).map(|cfg| (factory.name(), cfg)))
401        .collect()
402}
403
404/// Stable Part-row id: `"{message_id}:{ordinal:04}"`. Both JSONL adapters use
405/// this shape so the cross-adapter id space stays predictable.
406pub(crate) fn part_id(message_id: &str, ordinal: usize) -> String {
407    format!("{message_id}:{ordinal:04}")
408}
409
410/// Compact (no-whitespace) JSON serialization used as a fallback Part body
411/// when a row carries something we don't have a richer canonical shape for.
412pub(crate) fn compact_json(value: &Value) -> String {
413    serde_json::to_string(value).unwrap_or_default()
414}
415
416pub(crate) fn jsonl_bytes(
417    adapter: &'static str,
418    records: &[Value],
419) -> Result<Vec<u8>, AdapterError> {
420    let mut bytes = Vec::new();
421    for record in records {
422        let line = serde_json::to_vec(record).map_err(|err| {
423            AdapterError::schema(adapter, "serialize", format!("json encode failed: {err}"))
424        })?;
425        bytes.extend(line);
426        bytes.push(b'\n');
427    }
428    Ok(bytes)
429}
430
431/// Shared `AdapterFactory::open` plumbing: parse the config blob's `path` and
432/// expand a leading `~` against `$HOME` once, not per path adapter.
433pub(crate) fn config_path(adapter: &'static str, config: Value) -> Result<PathBuf, AdapterError> {
434    use serde::Deserialize;
435    #[derive(Deserialize)]
436    struct Cfg {
437        path: PathBuf,
438    }
439    let cfg: Cfg = serde_json::from_value(config)
440        .map_err(|err| AdapterError::config(adapter, format!("bad config blob: {err}")))?;
441    Ok(match std::env::var_os("HOME") {
442        Some(home) => crate::config::expand_home_under(&cfg.path, Path::new(&home)),
443        None => cfg.path,
444    })
445}
446
447pub(crate) fn raw_record(options: &ProviderOptions) -> Option<Value> {
448    options
449        .get("source")
450        .and_then(|source| source.get("raw_record"))
451        .cloned()
452}
453
454/// Standard `options.source = {adapter, raw_record}` shape used by every
455/// adapter that captures its source record for native restore. Centralized so
456/// the writer side of the raw-record convention lives next to the reader
457/// ([`raw_record`]); per-adapter side-fields (e.g. claude-code's `cwd`,
458/// codex-cli's `git`) extend this map after construction.
459pub(crate) fn source_options(adapter: &'static str, raw: &Value) -> ProviderOptions {
460    let mut options = ProviderOptions::new();
461    options.insert(
462        "source".to_owned(),
463        serde_json::json!({
464            "adapter": adapter,
465            "raw_record": extract_raw_record(raw),
466        }),
467    );
468    options
469}
470
471/// `Part.ordinal` is stored as `i32`; ingest counts as `usize`. A session
472/// could in principle exceed `i32::MAX` parts, in which case we clamp rather
473/// than drop the record.
474#[inline]
475pub(crate) fn part_ordinal(ordinal: usize) -> i32 {
476    i32::try_from(ordinal).unwrap_or(i32::MAX)
477}
478
479/// Reject `/`, `\`, `..`, and absolute paths in any segment that will become
480/// part of a filesystem path during restore. Centralizing it here keeps every
481/// adapter's restore-write path on the same allowlist; the writer
482/// ([`write_restored_files`]) re-applies it as a defense-in-depth check on
483/// every segment regardless of which adapter built the `RestoredFile`.
484pub(crate) fn validate_path_id(
485    adapter: &'static str,
486    kind: &str,
487    id: &str,
488    location: impl Into<String>,
489) -> Result<(), AdapterError> {
490    if id.is_empty()
491        || id.contains('/')
492        || id.contains('\\')
493        || id.contains("..")
494        || std::path::Path::new(id).is_absolute()
495    {
496        return Err(AdapterError::schema(
497            adapter,
498            location,
499            format!("{kind} contains a path separator or traversal marker: {id}"),
500        ));
501    }
502    Ok(())
503}
504
505/// Atomically write a batch of `RestoredFile`s under `root`. Every path
506/// segment is re-validated and the joined path is required to stay inside
507/// `root` (spec.md#adapter-native-restore-lossless: restore writes are
508/// adapter-supplied, but the gate lives at the writer so a single audit
509/// covers every adapter today and tomorrow). On partial failure the
510/// half-written tree is discarded before the error is returned.
511///
512/// Currently exercised only by adapter tests; the production restore CLI
513/// will route through this same helper when it lands.
514#[allow(dead_code)]
515pub(crate) fn write_restored_files(
516    root: &Path,
517    files: Vec<RestoredFile>,
518) -> Result<(), AdapterError> {
519    // Stage under a sibling temp dir and atomically rename so a partial
520    // failure cannot leave a half-populated restore in place.
521    let parent = root.parent().unwrap_or_else(|| Path::new("."));
522    let stem = root
523        .file_name()
524        .and_then(|n| n.to_str())
525        .unwrap_or("restore");
526    let staging = parent.join(format!(".{stem}.tmp"));
527    let io =
528        |location: String, source: std::io::Error| AdapterError::io("restore", location, source);
529    let _ = std::fs::remove_dir_all(&staging);
530    std::fs::create_dir_all(&staging).map_err(|e| io(staging.display().to_string(), e))?;
531
532    let result = (|| -> Result<(), AdapterError> {
533        for file in files {
534            write_one_into_staging(&staging, &file)?;
535        }
536        Ok(())
537    })();
538
539    if let Err(error) = result {
540        let _ = std::fs::remove_dir_all(&staging);
541        return Err(error);
542    }
543
544    // Replace any existing restore root; the staging dir becomes the new root.
545    let _ = std::fs::remove_dir_all(root);
546    if let Some(parent) = root.parent()
547        && !parent.as_os_str().is_empty()
548    {
549        std::fs::create_dir_all(parent).map_err(|e| io(parent.display().to_string(), e))?;
550    }
551    std::fs::rename(&staging, root).map_err(|e| {
552        let _ = std::fs::remove_dir_all(&staging);
553        io(root.display().to_string(), e)
554    })?;
555    Ok(())
556}
557
558#[allow(dead_code)]
559fn write_one_into_staging(staging: &Path, file: &RestoredFile) -> Result<(), AdapterError> {
560    // Validate every segment of the supplied relative path.
561    for component in file.relative_path.components() {
562        use std::path::Component;
563        let segment = match component {
564            Component::Normal(s) => s,
565            Component::CurDir => continue,
566            // Absolute prefixes, root, and `..` are categorically rejected -
567            // a restored file's relative_path is by contract relative + safe.
568            _ => {
569                return Err(AdapterError::schema(
570                    "restore",
571                    file.relative_path.display().to_string(),
572                    "relative_path component is not a normal name",
573                ));
574            }
575        };
576        let Some(text) = segment.to_str() else {
577            return Err(AdapterError::schema(
578                "restore",
579                file.relative_path.display().to_string(),
580                "relative_path segment is not UTF-8",
581            ));
582        };
583        validate_path_id(
584            "restore",
585            "relative_path segment",
586            text,
587            file.relative_path.display().to_string(),
588        )?;
589    }
590
591    let dest = staging.join(&file.relative_path);
592    // Defense-in-depth: confirm the joined path is still inside the staging
593    // dir even if every individual segment passed the syntactic check.
594    if !dest.starts_with(staging) {
595        return Err(AdapterError::schema(
596            "restore",
597            file.relative_path.display().to_string(),
598            "relative_path escaped the restore root after join",
599        ));
600    }
601    let io =
602        |location: String, source: std::io::Error| AdapterError::io("restore", location, source);
603    if let Some(parent) = dest.parent() {
604        std::fs::create_dir_all(parent).map_err(|e| io(parent.display().to_string(), e))?;
605    }
606    std::fs::write(&dest, &file.bytes).map_err(|e| io(dest.display().to_string(), e))?;
607    Ok(())
608}
609
610pub(crate) fn extracted_text(value: &Option<Extracted<String>>) -> &str {
611    value.as_deref().map(String::as_str).unwrap_or("")
612}
613
614/// Deterministic message ordering for restore: timestamp, then id as a
615/// tiebreaker so equal-timestamp messages always serialize in a stable order.
616pub(crate) fn by_timestamp_then_id(
617    left: &MessageWithParts,
618    right: &MessageWithParts,
619) -> std::cmp::Ordering {
620    left.message
621        .timestamp()
622        .cmp(&right.message.timestamp())
623        .then_with(|| left.message.id().cmp(right.message.id()))
624}
625
626/// `ProviderOptions::new()` shortcut; both adapters reach for an empty
627/// options map often enough that naming the no-op clarifies the call sites.
628#[inline]
629pub(crate) fn empty_options() -> ProviderOptions {
630    ProviderOptions::new()
631}
632
633#[cfg(test)]
634pub(crate) mod test_support {
635    use std::{
636        collections::BTreeSet,
637        path::{Path, PathBuf},
638    };
639
640    use serde_json::Value;
641    use tempfile::TempDir;
642
643    use super::{Adapter, AdapterFactory, Env, NoopOracle, RestoreFidelity};
644    use crate::{handlers::ingest_adapter, sessions::Store};
645
646    /// Shared probe_default contract: when the adapter's expected install
647    /// subpath exists under an injected `HOME`, `probe_default` returns it;
648    /// when the path is removed, it returns `None`. Each adapter owns its
649    /// `probe_default_*` test (per the seam-boundaries rule) but the shape
650    /// is the same, so the helper takes the factory + its expected subpath.
651    pub(crate) fn assert_probe_default(
652        factory: &dyn AdapterFactory,
653        expected_subpath: &[&str],
654    ) -> anyhow::Result<()> {
655        let temp = TempDir::new()?;
656        let mut expected = temp.path().to_path_buf();
657        for segment in expected_subpath {
658            expected.push(segment);
659        }
660        std::fs::create_dir_all(&expected)?;
661        let env = Env::with_home(temp.path());
662
663        let probe = factory.probe_default(&env);
664        let got = probe
665            .as_ref()
666            .and_then(|value| value.get("path"))
667            .and_then(Value::as_str);
668        anyhow::ensure!(
669            got == expected.to_str(),
670            "factory must probe its install path: got {got:?}, expected {expected:?}",
671        );
672
673        std::fs::remove_dir_all(&expected)?;
674        anyhow::ensure!(
675            factory.probe_default(&env).is_none(),
676            "probe_default must be None once the install path disappears",
677        );
678        Ok(())
679    }
680
681    pub(crate) async fn assert_native_restore(
682        factory: &dyn AdapterFactory,
683        adapter: &dyn Adapter,
684        source_root: &Path,
685    ) -> anyhow::Result<()> {
686        let temp = TempDir::new()?;
687        let store = Store::open_local(temp.path()).await?;
688        ingest_adapter(&store, adapter, &NoopOracle, |_| {}).await?;
689        let session_ids = store.session_ids().await?;
690        assert!(
691            !session_ids.is_empty(),
692            "native restore fixture must ingest at least one session",
693        );
694
695        let mut restored_paths = BTreeSet::new();
696        for session_id in session_ids {
697            let Some(session) = store.get_session(&session_id).await? else {
698                anyhow::bail!("session id listed by store was not readable: {session_id}");
699            };
700            let restored = factory.serialize(&session, RestoreFidelity::Native)?;
701            for file in restored {
702                let expected = source_root.join(&file.relative_path);
703                let expected_bytes = std::fs::read(&expected)
704                    .map_err(|err| anyhow::anyhow!("read {}: {err}", expected.display()))?;
705                assert_json_file_equal(&expected, &expected_bytes, &file.bytes)?;
706                restored_paths.insert(file.relative_path);
707            }
708        }
709        assert_eq!(
710            restored_paths,
711            source_json_files(source_root)?,
712            "native restore must emit exactly the source JSON/JSONL file set",
713        );
714        Ok(())
715    }
716
717    fn source_json_files(root: &Path) -> anyhow::Result<BTreeSet<PathBuf>> {
718        let mut out = BTreeSet::new();
719        collect_source_json_files(root, root, &mut out)?;
720        Ok(out)
721    }
722
723    fn collect_source_json_files(
724        root: &Path,
725        dir: &Path,
726        out: &mut BTreeSet<PathBuf>,
727    ) -> anyhow::Result<()> {
728        for entry in std::fs::read_dir(dir)? {
729            let entry = entry?;
730            let path = entry.path();
731            if entry.file_type()?.is_dir() {
732                collect_source_json_files(root, &path, out)?;
733                continue;
734            }
735            if let Some("json" | "jsonl") = path.extension().and_then(|ext| ext.to_str()) {
736                out.insert(path.strip_prefix(root)?.to_path_buf());
737            }
738        }
739        Ok(())
740    }
741
742    fn assert_json_file_equal(path: &Path, expected: &[u8], actual: &[u8]) -> anyhow::Result<()> {
743        if path.extension().and_then(|ext| ext.to_str()) == Some("jsonl") {
744            let expected_lines = json_lines(expected)?;
745            let actual_lines = json_lines(actual)?;
746            assert_eq!(
747                actual_lines,
748                expected_lines,
749                "jsonl mismatch at {}",
750                path.display()
751            );
752        } else {
753            let expected_value: serde_json::Value = serde_json::from_slice(expected)?;
754            let actual_value: serde_json::Value = serde_json::from_slice(actual)?;
755            assert_eq!(
756                actual_value,
757                expected_value,
758                "json mismatch at {}",
759                path.display()
760            );
761        }
762        Ok(())
763    }
764
765    fn json_lines(bytes: &[u8]) -> anyhow::Result<Vec<serde_json::Value>> {
766        let text = std::str::from_utf8(bytes)?;
767        text.lines()
768            .filter(|line| !line.trim().is_empty())
769            .map(|line| serde_json::from_str(line).map_err(Into::into))
770            .collect()
771    }
772}