Skip to main content

pond/adapter/
mod.rs

1//! Source-adapter seam.
2//!
3//! Pond ingests sessions from many runtimes. The seam splits in two:
4//!
5//! - [`AdapterFactory`] is the stateless face every format publishes once,
6//!   collected by [`registry`]. It knows how to construct configured adapters
7//!   from an opaque JSON config blob ([`AdapterFactory::open`]) and how to
8//!   probe the user's environment for a default config
9//!   ([`AdapterFactory::probe_default`]).
10//! - [`Adapter`] is the live, configured instance. Its only job is
11//!   [`Adapter::events`]: stream canonical [`IngestEvent`]s in append-only
12//!   order per session. The "source" is opaque to the seam - a directory
13//!   tree, an HTTP endpoint, a database, an archive file.
14//!
15//! Concrete implementations live in `adapter/<format>.rs` and are tied
16//! together by [`registry`]. A new adapter is one file plus one line in the
17//! registry; no central dispatch table to edit.
18
19use std::path::{Path, PathBuf};
20
21use chrono::{DateTime, Utc};
22use serde_json::Value;
23use tokio_stream::{Stream, StreamExt};
24
25use crate::{
26    sessions::{IngestEvent, MessageWithParts, SessionWithMessages},
27    wire::ProviderOptions,
28};
29
30mod claude_code;
31mod codex_cli;
32mod discovery;
33pub mod extract;
34mod jsonl;
35mod opencode;
36mod pi_coding_agent;
37
38pub use claude_code::{ClaudeCodeAdapter, ClaudeCodeFactory};
39pub use codex_cli::{CodexCliAdapter, CodexCliFactory};
40pub use discovery::{
41    Candidate, PromptOutcome, discover, persist_accept, persist_decline, probe_unconfigured,
42    prompt_and_persist, prompt_each,
43};
44pub use extract::{
45    Extracted, Source, extract_bool, extract_compact_repr, extract_raw_record, extract_self_str,
46    extract_str, extract_value,
47};
48pub use opencode::{OpencodeAdapter, OpencodeFactory};
49pub use pi_coding_agent::{PiCodingAgentAdapter, PiCodingAgentFactory};
50
51/// Stateless face of an adapter type: how the registry knows about it without
52/// instantiating it. One implementation per known format, registered in
53/// [`registry`].
54pub trait AdapterFactory: Send + Sync {
55    /// Stable short name. Used as the `[sources.<name>]` config key, the
56    /// `pond sync <name>` positional arg, and the `Session.source_agent`
57    /// value emitted by the corresponding adapter.
58    fn name(&self) -> &'static str;
59
60    /// Open a configured adapter from a JSON-shaped config blob. The shape is
61    /// owned by each factory: filesystem adapters expect `{ "path": "..." }`,
62    /// API-backed adapters expect `{ "endpoint": "...", "auth_token": "..." }`,
63    /// etc. The seam doesn't know or care. A factory rejects a bad blob with
64    /// [`AdapterErrorKind::Config`].
65    fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError>;
66
67    /// Probe the user's environment for a default config. Returns the JSON
68    /// blob that would go into `[sources.<name>]` if the picker writes it
69    /// back. Filesystem adapters check their canonical install path under
70    /// `env.home`; adapters with no auto-discovery rule (e.g. API adapters
71    /// that need explicit creds) return `None`.
72    fn probe_default(&self, env: &Env) -> Option<Value>;
73
74    /// Restore one canonical session into this adapter's native file layout.
75    fn serialize(
76        &self,
77        session: &SessionWithMessages,
78        fidelity: RestoreFidelity,
79    ) -> Result<Vec<RestoredFile>, AdapterError>;
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83pub enum RestoreFidelity {
84    Native,
85    Foreign,
86}
87
88#[derive(Debug, Clone, PartialEq, Eq)]
89pub struct RestoredFile {
90    pub relative_path: PathBuf,
91    pub bytes: Vec<u8>,
92    /// Fidelity actually served when this file was produced. Equal to the
93    /// requested fidelity unless the adapter had to downgrade (e.g. caller
94    /// asked `Native` but the session lacks a stored `raw_record`, so the
95    /// adapter served `Foreign`). spec.md#adapter-native-restore-lossless:
96    /// native may be impossible on older logs; the signal lets the CLI warn
97    /// rather than silently degrade.
98    pub actual_fidelity: RestoreFidelity,
99}
100
101impl RestoredFile {
102    pub(crate) fn new(
103        relative_path: impl Into<PathBuf>,
104        bytes: Vec<u8>,
105        actual_fidelity: RestoreFidelity,
106    ) -> Self {
107        Self {
108            relative_path: relative_path.into(),
109            bytes,
110            actual_fidelity,
111        }
112    }
113}
114
115/// Live, configured adapter instance. Holds whatever handle the source needs
116/// (an open directory root, an HTTP client + auth, a database connection)
117/// for the lifetime of its event stream.
118pub trait Adapter: Send + Sync {
119    /// Stream every canonical event for every session this adapter knows
120    /// about, in append-only order per session. The stream borrows `self`
121    /// so callers can pass `&adapter` or hold a `Box<dyn Adapter>` and
122    /// invoke this through `as_ref()`.
123    fn events(&self) -> EventStream<'_> {
124        let stream = self.events_with(&NoopOracle);
125        Box::pin(stream.filter_map(|res| match res {
126            Ok(AdapterYield::Event(event)) => Some(Ok(event)),
127            Ok(AdapterYield::Skipped { .. }) => None,
128            Err(error) => Some(Err(error)),
129        }))
130    }
131
132    /// Count how many sessions [`Self::events`] will produce, used by the
133    /// CLI bar to set its length up front. A filesystem adapter walks its
134    /// root and counts `.jsonl` files; an API adapter calls its list
135    /// endpoint. Cheap and best-effort: errors here only mean we run with
136    /// an unknown total (the bar still ticks per session), so callers
137    /// fall back to a rolling counter rather than failing the sync.
138    fn discover(&self) -> DiscoverFuture<'_>;
139
140    /// Stream events with a [`SkipOracle`] the adapter MAY consult to
141    /// short-circuit per-session re-decoding (spec.md#adapter-integrity-event-ordering). Default impl
142    /// ignores the oracle.
143    fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a>;
144}
145
146/// Per-session watermark lookup: when did pond last write this session?
147/// Backed by Lance's `_row_last_updated_at_version` joined to the manifest
148/// commit timestamp (spec.md#adapter-integrity-event-ordering). Adapter compares this to the source
149/// file's mtime to decide whether to re-decode.
150pub trait SkipOracle: Send + Sync {
151    fn last_ingested_at(&self, session_id: &str) -> Option<DateTime<Utc>>;
152
153    /// Fast-path hint: the oracle has no watermarks at all (first ingest or
154    /// `NoopOracle`). Lets adapters skip the per-file work needed to ask
155    /// `last_ingested_at` - typically the JSONL header peek + driver parse.
156    /// Defaults to `false` so existing oracles stay correct without changes.
157    fn is_empty(&self) -> bool {
158        false
159    }
160}
161
162/// `SkipOracle` that always returns `None`. Used by tests and benches that
163/// don't want skip behavior interfering with their assertions.
164#[derive(Debug, Default, Clone, Copy)]
165pub struct NoopOracle;
166
167impl SkipOracle for NoopOracle {
168    fn last_ingested_at(&self, _session_id: &str) -> Option<DateTime<Utc>> {
169        None
170    }
171
172    fn is_empty(&self) -> bool {
173        true
174    }
175}
176
177#[derive(Debug, Clone)]
178pub enum AdapterYield {
179    Event(IngestEvent),
180    Skipped {
181        /// `None` for files that never yield a session id (empty `.jsonl`).
182        session_id: Option<String>,
183        project: Option<String>,
184        reason: SkipReason,
185    },
186}
187
188#[derive(Debug, Clone, Copy, PartialEq, Eq)]
189pub enum SkipReason {
190    Fresh,
191    /// File produced no importable session (empty `.jsonl`, sidecar-only rows,
192    /// or an unextractable header). Benign: counted, never an error or a
193    /// per-event drop. The underlying cause is logged at `-vv` (debug) verbosity.
194    Empty,
195}
196
197pub type AdapterYieldStream<'a> =
198    std::pin::Pin<Box<dyn Stream<Item = Result<AdapterYield, AdapterError>> + Send + 'a>>;
199
200/// Boxed future returning the number of sessions an adapter will emit. The
201/// shape mirrors [`EventStream`] - one alias per async trait method so the
202/// trait stays `dyn`-compatible without per-adapter associated types.
203pub type DiscoverFuture<'a> =
204    std::pin::Pin<Box<dyn std::future::Future<Output = Result<usize, AdapterError>> + Send + 'a>>;
205
206/// Environment slice handed to [`AdapterFactory::probe_default`]. Kept
207/// deliberately small - just `home`, because env-var lookups for API creds
208/// are unreliable and most adapters with API backends should require
209/// explicit config rather than opportunistic env reads.
210pub struct Env {
211    pub home: PathBuf,
212}
213
214impl Env {
215    /// Read `home` from the `HOME` env var. Returns `None` when `HOME` is
216    /// unset (CI, post-install hooks, sandboxed runs).
217    pub fn from_env() -> Option<Self> {
218        let home = std::env::var_os("HOME")?;
219        Some(Self {
220            home: PathBuf::from(home),
221        })
222    }
223
224    /// Construct an `Env` with an explicit home. Tests use this to inject a
225    /// `TempDir`-backed home without touching the process env.
226    pub fn with_home(home: impl Into<PathBuf>) -> Self {
227        Self { home: home.into() }
228    }
229}
230
231/// Boxed, `Send`-only stream of [`IngestEvent`]s with one shared error type.
232/// The lifetime parameter lets future adapters borrow from their config; for
233/// `self: Box<Self>` impls the lifetime collapses to `'static`.
234pub type EventStream<'a> =
235    std::pin::Pin<Box<dyn Stream<Item = Result<IngestEvent, AdapterError>> + Send + 'a>>;
236
237/// One error type for every adapter. Each call site tags the error with the
238/// adapter's name (so multi-adapter syncs can attribute failures) and a
239/// `location` string the operator can act on (file path, URL, line number,
240/// config key, ...). The `kind` carries the underlying class.
241#[derive(Debug)]
242pub struct AdapterError {
243    pub adapter: &'static str,
244    pub location: String,
245    pub kind: AdapterErrorKind,
246}
247
248#[derive(Debug)]
249pub enum AdapterErrorKind {
250    /// Filesystem / network IO at `location`.
251    Io(std::io::Error),
252    /// JSON parse error at line `line` inside `location`.
253    Parse {
254        line: usize,
255        source: serde_json::Error,
256    },
257    /// Format-specific shape error: missing required field, unknown role,
258    /// unsupported record type. The `String` is operator-facing.
259    Schema(String),
260    /// `AdapterFactory::open` rejected its config blob.
261    Config(String),
262    /// HTTP / RPC / timeout error from an API-backed adapter.
263    Transport(String),
264    /// Auth failure from an API-backed adapter (bad token, expired creds).
265    Auth(String),
266}
267
268impl AdapterError {
269    pub fn io(adapter: &'static str, location: impl Into<String>, source: std::io::Error) -> Self {
270        Self {
271            adapter,
272            location: location.into(),
273            kind: AdapterErrorKind::Io(source),
274        }
275    }
276
277    pub fn parse(
278        adapter: &'static str,
279        location: impl Into<String>,
280        line: usize,
281        source: serde_json::Error,
282    ) -> Self {
283        Self {
284            adapter,
285            location: location.into(),
286            kind: AdapterErrorKind::Parse { line, source },
287        }
288    }
289
290    pub fn schema(
291        adapter: &'static str,
292        location: impl Into<String>,
293        message: impl Into<String>,
294    ) -> Self {
295        Self {
296            adapter,
297            location: location.into(),
298            kind: AdapterErrorKind::Schema(message.into()),
299        }
300    }
301
302    pub fn config(adapter: &'static str, message: impl Into<String>) -> Self {
303        Self {
304            adapter,
305            location: "config".to_owned(),
306            kind: AdapterErrorKind::Config(message.into()),
307        }
308    }
309}
310
311impl std::fmt::Display for AdapterError {
312    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313        match &self.kind {
314            AdapterErrorKind::Io(source) => {
315                write!(
316                    formatter,
317                    "{} io error at {}: {source}",
318                    self.adapter, self.location
319                )
320            }
321            AdapterErrorKind::Parse { line, source } => write!(
322                formatter,
323                "{} json parse error at {}:{line}: {source}",
324                self.adapter, self.location,
325            ),
326            AdapterErrorKind::Schema(message) => {
327                write!(
328                    formatter,
329                    "{} schema error at {}: {message}",
330                    self.adapter, self.location
331                )
332            }
333            AdapterErrorKind::Config(message) => {
334                write!(formatter, "{} config error: {message}", self.adapter)
335            }
336            AdapterErrorKind::Transport(message) => write!(
337                formatter,
338                "{} transport error at {}: {message}",
339                self.adapter, self.location,
340            ),
341            AdapterErrorKind::Auth(message) => {
342                write!(formatter, "{} auth error: {message}", self.adapter)
343            }
344        }
345    }
346}
347
348impl std::error::Error for AdapterError {
349    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
350        match &self.kind {
351            AdapterErrorKind::Io(source) => Some(source),
352            AdapterErrorKind::Parse { source, .. } => Some(source),
353            _ => None,
354        }
355    }
356}
357
358/// The static, ordered registry of every adapter pond knows. A new adapter
359/// adds one `&Factory` here plus one file under `src/adapter/`. Order is the
360/// order discovery presents to the operator.
361pub fn registry() -> &'static [&'static dyn AdapterFactory] {
362    &[
363        &ClaudeCodeFactory,
364        &CodexCliFactory,
365        &OpencodeFactory,
366        &PiCodingAgentFactory,
367    ]
368}
369
370/// Look up a factory by name. Returns `None` for unknown names; callers
371/// usually wrap that in a clear error using [`known_names`].
372pub fn by_name(name: &str) -> Option<&'static dyn AdapterFactory> {
373    registry().iter().copied().find(|f| f.name() == name)
374}
375
376/// The names of every registered adapter. Drives error messages
377/// ("unknown adapter X; known: ...") and the discovery picker labels.
378pub fn known_names() -> Vec<&'static str> {
379    registry().iter().map(|f| f.name()).collect()
380}
381
382/// Probe every adapter for a default config under `env.home`. Returns
383/// `(name, default_config)` pairs in registry order, skipping adapters whose
384/// `probe_default` returned `None`. The picker shows these to the operator.
385pub fn probe_all(env: &Env) -> Vec<(&'static str, Value)> {
386    registry()
387        .iter()
388        .filter_map(|factory| factory.probe_default(env).map(|cfg| (factory.name(), cfg)))
389        .collect()
390}
391
392/// Stable Part-row id: `"{message_id}:{ordinal:04}"`. Both JSONL adapters use
393/// this shape so the cross-adapter id space stays predictable.
394pub(crate) fn part_id(message_id: &str, ordinal: usize) -> String {
395    format!("{message_id}:{ordinal:04}")
396}
397
398/// Compact (no-whitespace) JSON serialization used as a fallback Part body
399/// when a row carries something we don't have a richer canonical shape for.
400pub(crate) fn compact_json(value: &Value) -> String {
401    serde_json::to_string(value).unwrap_or_default()
402}
403
404pub(crate) fn jsonl_bytes(
405    adapter: &'static str,
406    records: &[Value],
407) -> Result<Vec<u8>, AdapterError> {
408    let mut bytes = Vec::new();
409    for record in records {
410        let line = serde_json::to_vec(record).map_err(|err| {
411            AdapterError::schema(adapter, "serialize", format!("json encode failed: {err}"))
412        })?;
413        bytes.extend(line);
414        bytes.push(b'\n');
415    }
416    Ok(bytes)
417}
418
419/// Shared `AdapterFactory::open` plumbing: parse the config blob's `path` and
420/// expand a leading `~` against `$HOME` once, not per path adapter.
421pub(crate) fn config_path(adapter: &'static str, config: Value) -> Result<PathBuf, AdapterError> {
422    use serde::Deserialize;
423    #[derive(Deserialize)]
424    struct Cfg {
425        path: PathBuf,
426    }
427    let cfg: Cfg = serde_json::from_value(config)
428        .map_err(|err| AdapterError::config(adapter, format!("bad config blob: {err}")))?;
429    Ok(match std::env::var_os("HOME") {
430        Some(home) => crate::config::expand_home_under(&cfg.path, Path::new(&home)),
431        None => cfg.path,
432    })
433}
434
435pub(crate) fn raw_record(options: &ProviderOptions) -> Option<Value> {
436    options
437        .get("source")
438        .and_then(|source| source.get("raw_record"))
439        .cloned()
440}
441
442/// Standard `options.source = {adapter, raw_record}` shape used by every
443/// adapter that captures its source record for native restore. Centralized so
444/// the writer side of the raw-record convention lives next to the reader
445/// ([`raw_record`]); per-adapter side-fields (e.g. claude-code's `cwd`,
446/// codex-cli's `git`) extend this map after construction.
447pub(crate) fn source_options(adapter: &'static str, raw: &Value) -> ProviderOptions {
448    let mut options = ProviderOptions::new();
449    options.insert(
450        "source".to_owned(),
451        serde_json::json!({
452            "adapter": adapter,
453            "raw_record": extract_raw_record(raw),
454        }),
455    );
456    options
457}
458
459/// `Part.ordinal` is stored as `i32`; ingest counts as `usize`. A session
460/// could in principle exceed `i32::MAX` parts, in which case we clamp rather
461/// than drop the record.
462#[inline]
463pub(crate) fn part_ordinal(ordinal: usize) -> i32 {
464    i32::try_from(ordinal).unwrap_or(i32::MAX)
465}
466
467/// Reject `/`, `\`, `..`, and absolute paths in any segment that will become
468/// part of a filesystem path during restore. Centralizing it here keeps every
469/// adapter's restore-write path on the same allowlist; the writer
470/// ([`write_restored_files`]) re-applies it as a defense-in-depth check on
471/// every segment regardless of which adapter built the `RestoredFile`.
472pub(crate) fn validate_path_id(
473    adapter: &'static str,
474    kind: &str,
475    id: &str,
476    location: impl Into<String>,
477) -> Result<(), AdapterError> {
478    if id.is_empty()
479        || id.contains('/')
480        || id.contains('\\')
481        || id.contains("..")
482        || std::path::Path::new(id).is_absolute()
483    {
484        return Err(AdapterError::schema(
485            adapter,
486            location,
487            format!("{kind} contains a path separator or traversal marker: {id}"),
488        ));
489    }
490    Ok(())
491}
492
493/// Atomically write a batch of `RestoredFile`s under `root`. Every path
494/// segment is re-validated and the joined path is required to stay inside
495/// `root` (spec.md#adapter-native-restore-lossless: restore writes are
496/// adapter-supplied, but the gate lives at the writer so a single audit
497/// covers every adapter today and tomorrow). On partial failure the
498/// half-written tree is discarded before the error is returned.
499///
500/// Currently exercised only by adapter tests; the production restore CLI
501/// will route through this same helper when it lands.
502#[allow(dead_code)]
503pub(crate) fn write_restored_files(
504    root: &Path,
505    files: Vec<RestoredFile>,
506) -> Result<(), AdapterError> {
507    // Stage under a sibling temp dir and atomically rename so a partial
508    // failure cannot leave a half-populated restore in place.
509    let parent = root.parent().unwrap_or_else(|| Path::new("."));
510    let stem = root
511        .file_name()
512        .and_then(|n| n.to_str())
513        .unwrap_or("restore");
514    let staging = parent.join(format!(".{stem}.tmp"));
515    let io =
516        |location: String, source: std::io::Error| AdapterError::io("restore", location, source);
517    let _ = std::fs::remove_dir_all(&staging);
518    std::fs::create_dir_all(&staging).map_err(|e| io(staging.display().to_string(), e))?;
519
520    let result = (|| -> Result<(), AdapterError> {
521        for file in files {
522            write_one_into_staging(&staging, &file)?;
523        }
524        Ok(())
525    })();
526
527    if let Err(error) = result {
528        let _ = std::fs::remove_dir_all(&staging);
529        return Err(error);
530    }
531
532    // Replace any existing restore root; the staging dir becomes the new root.
533    let _ = std::fs::remove_dir_all(root);
534    if let Some(parent) = root.parent()
535        && !parent.as_os_str().is_empty()
536    {
537        std::fs::create_dir_all(parent).map_err(|e| io(parent.display().to_string(), e))?;
538    }
539    std::fs::rename(&staging, root).map_err(|e| {
540        let _ = std::fs::remove_dir_all(&staging);
541        io(root.display().to_string(), e)
542    })?;
543    Ok(())
544}
545
546#[allow(dead_code)]
547fn write_one_into_staging(staging: &Path, file: &RestoredFile) -> Result<(), AdapterError> {
548    // Validate every segment of the supplied relative path.
549    for component in file.relative_path.components() {
550        use std::path::Component;
551        let segment = match component {
552            Component::Normal(s) => s,
553            Component::CurDir => continue,
554            // Absolute prefixes, root, and `..` are categorically rejected -
555            // a restored file's relative_path is by contract relative + safe.
556            _ => {
557                return Err(AdapterError::schema(
558                    "restore",
559                    file.relative_path.display().to_string(),
560                    "relative_path component is not a normal name",
561                ));
562            }
563        };
564        let Some(text) = segment.to_str() else {
565            return Err(AdapterError::schema(
566                "restore",
567                file.relative_path.display().to_string(),
568                "relative_path segment is not UTF-8",
569            ));
570        };
571        validate_path_id(
572            "restore",
573            "relative_path segment",
574            text,
575            file.relative_path.display().to_string(),
576        )?;
577    }
578
579    let dest = staging.join(&file.relative_path);
580    // Defense-in-depth: confirm the joined path is still inside the staging
581    // dir even if every individual segment passed the syntactic check.
582    if !dest.starts_with(staging) {
583        return Err(AdapterError::schema(
584            "restore",
585            file.relative_path.display().to_string(),
586            "relative_path escaped the restore root after join",
587        ));
588    }
589    let io =
590        |location: String, source: std::io::Error| AdapterError::io("restore", location, source);
591    if let Some(parent) = dest.parent() {
592        std::fs::create_dir_all(parent).map_err(|e| io(parent.display().to_string(), e))?;
593    }
594    std::fs::write(&dest, &file.bytes).map_err(|e| io(dest.display().to_string(), e))?;
595    Ok(())
596}
597
598pub(crate) fn extracted_text(value: &Option<Extracted<String>>) -> &str {
599    value.as_deref().map(String::as_str).unwrap_or("")
600}
601
602/// Deterministic message ordering for restore: timestamp, then id as a
603/// tiebreaker so equal-timestamp messages always serialize in a stable order.
604pub(crate) fn by_timestamp_then_id(
605    left: &MessageWithParts,
606    right: &MessageWithParts,
607) -> std::cmp::Ordering {
608    left.message
609        .timestamp()
610        .cmp(&right.message.timestamp())
611        .then_with(|| left.message.id().cmp(right.message.id()))
612}
613
614/// `ProviderOptions::new()` shortcut; both adapters reach for an empty
615/// options map often enough that naming the no-op clarifies the call sites.
616#[inline]
617pub(crate) fn empty_options() -> ProviderOptions {
618    ProviderOptions::new()
619}
620
621#[cfg(test)]
622pub(crate) mod test_support {
623    use std::{
624        collections::BTreeSet,
625        path::{Path, PathBuf},
626    };
627
628    use serde_json::Value;
629    use tempfile::TempDir;
630
631    use super::{Adapter, AdapterFactory, Env, NoopOracle, RestoreFidelity};
632    use crate::{handlers::ingest_adapter, sessions::Store};
633
634    /// Shared probe_default contract: when the adapter's expected install
635    /// subpath exists under an injected `HOME`, `probe_default` returns it;
636    /// when the path is removed, it returns `None`. Each adapter owns its
637    /// `probe_default_*` test (per the seam-boundaries rule) but the shape
638    /// is the same, so the helper takes the factory + its expected subpath.
639    pub(crate) fn assert_probe_default(
640        factory: &dyn AdapterFactory,
641        expected_subpath: &[&str],
642    ) -> anyhow::Result<()> {
643        let temp = TempDir::new()?;
644        let mut expected = temp.path().to_path_buf();
645        for segment in expected_subpath {
646            expected.push(segment);
647        }
648        std::fs::create_dir_all(&expected)?;
649        let env = Env::with_home(temp.path());
650
651        let probe = factory.probe_default(&env);
652        let got = probe
653            .as_ref()
654            .and_then(|value| value.get("path"))
655            .and_then(Value::as_str);
656        anyhow::ensure!(
657            got == expected.to_str(),
658            "factory must probe its install path: got {got:?}, expected {expected:?}",
659        );
660
661        std::fs::remove_dir_all(&expected)?;
662        anyhow::ensure!(
663            factory.probe_default(&env).is_none(),
664            "probe_default must be None once the install path disappears",
665        );
666        Ok(())
667    }
668
669    pub(crate) async fn assert_native_restore(
670        factory: &dyn AdapterFactory,
671        adapter: &dyn Adapter,
672        source_root: &Path,
673    ) -> anyhow::Result<()> {
674        let temp = TempDir::new()?;
675        let store = Store::open_local(temp.path()).await?;
676        ingest_adapter(&store, adapter, &NoopOracle, |_| {}).await?;
677        let session_ids = store.session_ids().await?;
678        assert!(
679            !session_ids.is_empty(),
680            "native restore fixture must ingest at least one session",
681        );
682
683        let mut restored_paths = BTreeSet::new();
684        for session_id in session_ids {
685            let Some(session) = store.get_session(&session_id).await? else {
686                anyhow::bail!("session id listed by store was not readable: {session_id}");
687            };
688            let restored = factory.serialize(&session, RestoreFidelity::Native)?;
689            for file in restored {
690                let expected = source_root.join(&file.relative_path);
691                let expected_bytes = std::fs::read(&expected)
692                    .map_err(|err| anyhow::anyhow!("read {}: {err}", expected.display()))?;
693                assert_json_file_equal(&expected, &expected_bytes, &file.bytes)?;
694                restored_paths.insert(file.relative_path);
695            }
696        }
697        assert_eq!(
698            restored_paths,
699            source_json_files(source_root)?,
700            "native restore must emit exactly the source JSON/JSONL file set",
701        );
702        Ok(())
703    }
704
705    fn source_json_files(root: &Path) -> anyhow::Result<BTreeSet<PathBuf>> {
706        let mut out = BTreeSet::new();
707        collect_source_json_files(root, root, &mut out)?;
708        Ok(out)
709    }
710
711    fn collect_source_json_files(
712        root: &Path,
713        dir: &Path,
714        out: &mut BTreeSet<PathBuf>,
715    ) -> anyhow::Result<()> {
716        for entry in std::fs::read_dir(dir)? {
717            let entry = entry?;
718            let path = entry.path();
719            if entry.file_type()?.is_dir() {
720                collect_source_json_files(root, &path, out)?;
721                continue;
722            }
723            if let Some("json" | "jsonl") = path.extension().and_then(|ext| ext.to_str()) {
724                out.insert(path.strip_prefix(root)?.to_path_buf());
725            }
726        }
727        Ok(())
728    }
729
730    fn assert_json_file_equal(path: &Path, expected: &[u8], actual: &[u8]) -> anyhow::Result<()> {
731        if path.extension().and_then(|ext| ext.to_str()) == Some("jsonl") {
732            let expected_lines = json_lines(expected)?;
733            let actual_lines = json_lines(actual)?;
734            assert_eq!(
735                actual_lines,
736                expected_lines,
737                "jsonl mismatch at {}",
738                path.display()
739            );
740        } else {
741            let expected_value: serde_json::Value = serde_json::from_slice(expected)?;
742            let actual_value: serde_json::Value = serde_json::from_slice(actual)?;
743            assert_eq!(
744                actual_value,
745                expected_value,
746                "json mismatch at {}",
747                path.display()
748            );
749        }
750        Ok(())
751    }
752
753    fn json_lines(bytes: &[u8]) -> anyhow::Result<Vec<serde_json::Value>> {
754        let text = std::str::from_utf8(bytes)?;
755        text.lines()
756            .filter(|line| !line.trim().is_empty())
757            .map(|line| serde_json::from_str(line).map_err(Into::into))
758            .collect()
759    }
760}