Skip to main content

pond/adapter/
mod.rs

1//! Source-adapter seam.
2//!
3//! Pond ingests sessions from many runtimes. The seam splits in two:
4//!
5//! - [`AdapterFactory`] is the stateless face every format publishes once,
6//!   collected by [`registry`]. It knows how to construct configured adapters
7//!   from an opaque JSON config blob ([`AdapterFactory::open`]) and how to
8//!   probe the user's environment for a default config
9//!   ([`AdapterFactory::probe_default`]).
10//! - [`Adapter`] is the live, configured instance. Its only job is
11//!   [`Adapter::events`]: stream canonical [`IngestEvent`]s in append-only
12//!   order per session. The "source" is opaque to the seam - a directory
13//!   tree, an HTTP endpoint, a database, an archive file.
14//!
15//! Concrete implementations live in `adapter/<format>.rs` and are tied
16//! together by [`registry`]. A new adapter is one file plus one line in the
17//! registry; no central dispatch table to edit.
18
19use std::path::{Path, PathBuf};
20
21use chrono::{DateTime, Utc};
22use serde_json::Value;
23use tokio_stream::{Stream, StreamExt};
24
25use crate::{
26    sessions::{IngestEvent, MessageWithParts, SessionWithMessages},
27    wire::ProviderOptions,
28};
29
30mod claude_code;
31mod codex_cli;
32mod discovery;
33pub mod extract;
34mod jsonl;
35mod opencode;
36mod pi_coding_agent;
37
38pub use claude_code::{ClaudeCodeAdapter, ClaudeCodeFactory};
39pub use codex_cli::{CodexCliAdapter, CodexCliFactory};
40pub use discovery::{
41    Candidate, PromptOutcome, discover, persist_accept, persist_decline, probe_unconfigured,
42    prompt_and_persist, prompt_each,
43};
44pub use extract::{
45    Extracted, Source, extract_bool, extract_compact_repr, extract_raw_record, extract_self_str,
46    extract_str, extract_value,
47};
48pub use opencode::{OpencodeAdapter, OpencodeFactory};
49pub use pi_coding_agent::{PiCodingAgentAdapter, PiCodingAgentFactory};
50
51/// Stateless face of an adapter type: how the registry knows about it without
52/// instantiating it. One implementation per known format, registered in
53/// [`registry`].
54pub trait AdapterFactory: Send + Sync {
55    /// Stable short name. Used as the `[sources.<name>]` config key, the
56    /// `pond sync <name>` positional arg, and the `Session.source_agent`
57    /// value emitted by the corresponding adapter.
58    fn name(&self) -> &'static str;
59
60    /// Open a configured adapter from a JSON-shaped config blob. The shape is
61    /// owned by each factory: filesystem adapters expect `{ "path": "..." }`,
62    /// API-backed adapters expect `{ "endpoint": "...", "auth_token": "..." }`,
63    /// etc. The seam doesn't know or care. A factory rejects a bad blob with
64    /// [`AdapterErrorKind::Config`].
65    fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError>;
66
67    /// Probe the user's environment for a default config. Returns the JSON
68    /// blob that would go into `[sources.<name>]` if the picker writes it
69    /// back. Filesystem adapters check their canonical install path under
70    /// `env.home`; adapters with no auto-discovery rule (e.g. API adapters
71    /// that need explicit creds) return `None`.
72    fn probe_default(&self, env: &Env) -> Option<Value>;
73
74    /// Restore one canonical session into this adapter's native file layout.
75    fn serialize(
76        &self,
77        session: &SessionWithMessages,
78        fidelity: RestoreFidelity,
79    ) -> Result<Vec<RestoredFile>, AdapterError>;
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83pub enum RestoreFidelity {
84    Native,
85    Foreign,
86}
87
88#[derive(Debug, Clone, PartialEq, Eq)]
89pub struct RestoredFile {
90    pub relative_path: PathBuf,
91    pub bytes: Vec<u8>,
92    /// Fidelity actually served when this file was produced. Equal to the
93    /// requested fidelity unless the adapter had to downgrade (e.g. caller
94    /// asked `Native` but the session lacks a stored `raw_record`, so the
95    /// adapter served `Foreign`). spec.md#adapter-native-restore-lossless:
96    /// native may be impossible on older logs; the signal lets the CLI warn
97    /// rather than silently degrade.
98    pub actual_fidelity: RestoreFidelity,
99}
100
101impl RestoredFile {
102    pub(crate) fn new(
103        relative_path: impl Into<PathBuf>,
104        bytes: Vec<u8>,
105        actual_fidelity: RestoreFidelity,
106    ) -> Self {
107        Self {
108            relative_path: relative_path.into(),
109            bytes,
110            actual_fidelity,
111        }
112    }
113}
114
115/// Live, configured adapter instance. Holds whatever handle the source needs
116/// (an open directory root, an HTTP client + auth, a database connection)
117/// for the lifetime of its event stream.
118pub trait Adapter: Send + Sync {
119    /// Stream every canonical event for every session this adapter knows
120    /// about, in append-only order per session. The stream borrows `self`
121    /// so callers can pass `&adapter` or hold a `Box<dyn Adapter>` and
122    /// invoke this through `as_ref()`.
123    fn events(&self) -> EventStream<'_> {
124        let stream = self.events_with(&NoopOracle);
125        Box::pin(stream.filter_map(|res| match res {
126            Ok(AdapterYield::Event(event)) => Some(Ok(event)),
127            Ok(AdapterYield::Skipped { .. }) => None,
128            Err(error) => Some(Err(error)),
129        }))
130    }
131
132    /// Count how many sessions [`Self::events`] will produce, used by the
133    /// CLI bar to set its length up front. A filesystem adapter walks its
134    /// root and counts `.jsonl` files; an API adapter calls its list
135    /// endpoint. Cheap and best-effort: errors here only mean we run with
136    /// an unknown total (the bar still ticks per session), so callers
137    /// fall back to a rolling counter rather than failing the sync.
138    fn discover(&self) -> DiscoverFuture<'_>;
139
140    /// Stream events with a [`SkipOracle`] the adapter MAY consult to
141    /// short-circuit per-session re-decoding (spec.md#adapter-integrity-event-ordering). Default impl
142    /// ignores the oracle.
143    fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a>;
144}
145
146/// Per-session watermark lookup: when did pond last write this session?
147/// Backed by Lance's `_row_last_updated_at_version` joined to the manifest
148/// commit timestamp (spec.md#adapter-integrity-event-ordering). Adapter compares this to the source
149/// file's mtime to decide whether to re-decode.
150pub trait SkipOracle: Send + Sync {
151    fn last_ingested_at(&self, session_id: &str) -> Option<DateTime<Utc>>;
152
153    /// Fast-path hint: the oracle has no watermarks at all (first ingest or
154    /// `NoopOracle`). Lets adapters skip the per-file work needed to ask
155    /// `last_ingested_at` - typically the JSONL header peek + driver parse.
156    /// Defaults to `false` so existing oracles stay correct without changes.
157    fn is_empty(&self) -> bool {
158        false
159    }
160}
161
162/// `SkipOracle` that always returns `None`. Used by tests and benches that
163/// don't want skip behavior interfering with their assertions.
164#[derive(Debug, Default, Clone, Copy)]
165pub struct NoopOracle;
166
167impl SkipOracle for NoopOracle {
168    fn last_ingested_at(&self, _session_id: &str) -> Option<DateTime<Utc>> {
169        None
170    }
171
172    fn is_empty(&self) -> bool {
173        true
174    }
175}
176
177#[derive(Debug, Clone)]
178pub enum AdapterYield {
179    Event(IngestEvent),
180    Skipped {
181        /// `None` for files that never yield a session id (empty `.jsonl`).
182        session_id: Option<String>,
183        project: Option<String>,
184        reason: SkipReason,
185    },
186}
187
188#[derive(Debug, Clone, PartialEq, Eq)]
189pub enum SkipReason {
190    Fresh,
191    /// File produced no importable session (empty `.jsonl`, sidecar-only rows,
192    /// or an unextractable header). Benign: counted, never an error or a
193    /// per-event drop. The underlying cause is logged at `-vv` (debug) verbosity.
194    Empty,
195    /// File is structurally a known sidecar whose specific shape this adapter
196    /// version can't ingest. Surfaced as a visible, counted failure - NOT a
197    /// benign skip - so the gap is actionable and the file is never folded into
198    /// another session under a borrowed id. The payload is the user-facing
199    /// reason naming the file and the fix.
200    Unsupported(String),
201}
202
203pub type AdapterYieldStream<'a> =
204    std::pin::Pin<Box<dyn Stream<Item = Result<AdapterYield, AdapterError>> + Send + 'a>>;
205
206/// Boxed future returning the number of sessions an adapter will emit. The
207/// shape mirrors [`EventStream`] - one alias per async trait method so the
208/// trait stays `dyn`-compatible without per-adapter associated types.
209pub type DiscoverFuture<'a> =
210    std::pin::Pin<Box<dyn std::future::Future<Output = Result<usize, AdapterError>> + Send + 'a>>;
211
212/// Environment slice handed to [`AdapterFactory::probe_default`]. Kept
213/// deliberately small - just `home`, because env-var lookups for API creds
214/// are unreliable and most adapters with API backends should require
215/// explicit config rather than opportunistic env reads.
216pub struct Env {
217    pub home: PathBuf,
218}
219
220impl Env {
221    /// Read `home` from the `HOME` env var. Returns `None` when `HOME` is
222    /// unset (CI, post-install hooks, sandboxed runs).
223    pub fn from_env() -> Option<Self> {
224        let home = std::env::var_os("HOME")?;
225        Some(Self {
226            home: PathBuf::from(home),
227        })
228    }
229
230    /// Construct an `Env` with an explicit home. Tests use this to inject a
231    /// `TempDir`-backed home without touching the process env.
232    pub fn with_home(home: impl Into<PathBuf>) -> Self {
233        Self { home: home.into() }
234    }
235}
236
237/// Boxed, `Send`-only stream of [`IngestEvent`]s with one shared error type.
238/// The lifetime parameter lets future adapters borrow from their config; for
239/// `self: Box<Self>` impls the lifetime collapses to `'static`.
240pub type EventStream<'a> =
241    std::pin::Pin<Box<dyn Stream<Item = Result<IngestEvent, AdapterError>> + Send + 'a>>;
242
243/// One error type for every adapter. Each call site tags the error with the
244/// adapter's name (so multi-adapter syncs can attribute failures) and a
245/// `location` string the operator can act on (file path, URL, line number,
246/// config key, ...). The `kind` carries the underlying class.
247#[derive(Debug)]
248pub struct AdapterError {
249    pub adapter: &'static str,
250    pub location: String,
251    pub kind: AdapterErrorKind,
252}
253
254#[derive(Debug)]
255pub enum AdapterErrorKind {
256    /// Filesystem / network IO at `location`.
257    Io(std::io::Error),
258    /// JSON parse error at line `line` inside `location`.
259    Parse {
260        line: usize,
261        source: serde_json::Error,
262    },
263    /// Format-specific shape error: missing required field, unknown role,
264    /// unsupported record type. The `String` is operator-facing.
265    Schema(String),
266    /// `AdapterFactory::open` rejected its config blob.
267    Config(String),
268    /// HTTP / RPC / timeout error from an API-backed adapter.
269    Transport(String),
270    /// Auth failure from an API-backed adapter (bad token, expired creds).
271    Auth(String),
272}
273
274impl AdapterError {
275    pub fn io(adapter: &'static str, location: impl Into<String>, source: std::io::Error) -> Self {
276        Self {
277            adapter,
278            location: location.into(),
279            kind: AdapterErrorKind::Io(source),
280        }
281    }
282
283    pub fn parse(
284        adapter: &'static str,
285        location: impl Into<String>,
286        line: usize,
287        source: serde_json::Error,
288    ) -> Self {
289        Self {
290            adapter,
291            location: location.into(),
292            kind: AdapterErrorKind::Parse { line, source },
293        }
294    }
295
296    pub fn schema(
297        adapter: &'static str,
298        location: impl Into<String>,
299        message: impl Into<String>,
300    ) -> Self {
301        Self {
302            adapter,
303            location: location.into(),
304            kind: AdapterErrorKind::Schema(message.into()),
305        }
306    }
307
308    pub fn config(adapter: &'static str, message: impl Into<String>) -> Self {
309        Self {
310            adapter,
311            location: "config".to_owned(),
312            kind: AdapterErrorKind::Config(message.into()),
313        }
314    }
315}
316
317impl std::fmt::Display for AdapterError {
318    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
319        match &self.kind {
320            AdapterErrorKind::Io(source) => {
321                write!(
322                    formatter,
323                    "{} io error at {}: {source}",
324                    self.adapter, self.location
325                )
326            }
327            AdapterErrorKind::Parse { line, source } => write!(
328                formatter,
329                "{} json parse error at {}:{line}: {source}",
330                self.adapter, self.location,
331            ),
332            AdapterErrorKind::Schema(message) => {
333                write!(
334                    formatter,
335                    "{} schema error at {}: {message}",
336                    self.adapter, self.location
337                )
338            }
339            AdapterErrorKind::Config(message) => {
340                write!(formatter, "{} config error: {message}", self.adapter)
341            }
342            AdapterErrorKind::Transport(message) => write!(
343                formatter,
344                "{} transport error at {}: {message}",
345                self.adapter, self.location,
346            ),
347            AdapterErrorKind::Auth(message) => {
348                write!(formatter, "{} auth error: {message}", self.adapter)
349            }
350        }
351    }
352}
353
354impl std::error::Error for AdapterError {
355    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
356        match &self.kind {
357            AdapterErrorKind::Io(source) => Some(source),
358            AdapterErrorKind::Parse { source, .. } => Some(source),
359            _ => None,
360        }
361    }
362}
363
364/// The static, ordered registry of every adapter pond knows. A new adapter
365/// adds one `&Factory` here plus one file under `src/adapter/`. Order is the
366/// order discovery presents to the operator.
367pub fn registry() -> &'static [&'static dyn AdapterFactory] {
368    &[
369        &ClaudeCodeFactory,
370        &CodexCliFactory,
371        &OpencodeFactory,
372        &PiCodingAgentFactory,
373    ]
374}
375
376/// Look up a factory by name. Returns `None` for unknown names; callers
377/// usually wrap that in a clear error using [`known_names`].
378pub fn by_name(name: &str) -> Option<&'static dyn AdapterFactory> {
379    registry().iter().copied().find(|f| f.name() == name)
380}
381
382/// The names of every registered adapter. Drives error messages
383/// ("unknown adapter X; known: ...") and the discovery picker labels.
384pub fn known_names() -> Vec<&'static str> {
385    registry().iter().map(|f| f.name()).collect()
386}
387
388/// Probe every adapter for a default config under `env.home`. Returns
389/// `(name, default_config)` pairs in registry order, skipping adapters whose
390/// `probe_default` returned `None`. The picker shows these to the operator.
391pub fn probe_all(env: &Env) -> Vec<(&'static str, Value)> {
392    registry()
393        .iter()
394        .filter_map(|factory| factory.probe_default(env).map(|cfg| (factory.name(), cfg)))
395        .collect()
396}
397
398/// Stable Part-row id: `"{message_id}:{ordinal:04}"`. Both JSONL adapters use
399/// this shape so the cross-adapter id space stays predictable.
400pub(crate) fn part_id(message_id: &str, ordinal: usize) -> String {
401    format!("{message_id}:{ordinal:04}")
402}
403
404/// Compact (no-whitespace) JSON serialization used as a fallback Part body
405/// when a row carries something we don't have a richer canonical shape for.
406pub(crate) fn compact_json(value: &Value) -> String {
407    serde_json::to_string(value).unwrap_or_default()
408}
409
410pub(crate) fn jsonl_bytes(
411    adapter: &'static str,
412    records: &[Value],
413) -> Result<Vec<u8>, AdapterError> {
414    let mut bytes = Vec::new();
415    for record in records {
416        let line = serde_json::to_vec(record).map_err(|err| {
417            AdapterError::schema(adapter, "serialize", format!("json encode failed: {err}"))
418        })?;
419        bytes.extend(line);
420        bytes.push(b'\n');
421    }
422    Ok(bytes)
423}
424
425/// Shared `AdapterFactory::open` plumbing: parse the config blob's `path` and
426/// expand a leading `~` against `$HOME` once, not per path adapter.
427pub(crate) fn config_path(adapter: &'static str, config: Value) -> Result<PathBuf, AdapterError> {
428    use serde::Deserialize;
429    #[derive(Deserialize)]
430    struct Cfg {
431        path: PathBuf,
432    }
433    let cfg: Cfg = serde_json::from_value(config)
434        .map_err(|err| AdapterError::config(adapter, format!("bad config blob: {err}")))?;
435    Ok(match std::env::var_os("HOME") {
436        Some(home) => crate::config::expand_home_under(&cfg.path, Path::new(&home)),
437        None => cfg.path,
438    })
439}
440
441pub(crate) fn raw_record(options: &ProviderOptions) -> Option<Value> {
442    options
443        .get("source")
444        .and_then(|source| source.get("raw_record"))
445        .cloned()
446}
447
448/// Standard `options.source = {adapter, raw_record}` shape used by every
449/// adapter that captures its source record for native restore. Centralized so
450/// the writer side of the raw-record convention lives next to the reader
451/// ([`raw_record`]); per-adapter side-fields (e.g. claude-code's `cwd`,
452/// codex-cli's `git`) extend this map after construction.
453pub(crate) fn source_options(adapter: &'static str, raw: &Value) -> ProviderOptions {
454    let mut options = ProviderOptions::new();
455    options.insert(
456        "source".to_owned(),
457        serde_json::json!({
458            "adapter": adapter,
459            "raw_record": extract_raw_record(raw),
460        }),
461    );
462    options
463}
464
465/// `Part.ordinal` is stored as `i32`; ingest counts as `usize`. A session
466/// could in principle exceed `i32::MAX` parts, in which case we clamp rather
467/// than drop the record.
468#[inline]
469pub(crate) fn part_ordinal(ordinal: usize) -> i32 {
470    i32::try_from(ordinal).unwrap_or(i32::MAX)
471}
472
473/// Reject `/`, `\`, `..`, and absolute paths in any segment that will become
474/// part of a filesystem path during restore. Centralizing it here keeps every
475/// adapter's restore-write path on the same allowlist; the writer
476/// ([`write_restored_files`]) re-applies it as a defense-in-depth check on
477/// every segment regardless of which adapter built the `RestoredFile`.
478pub(crate) fn validate_path_id(
479    adapter: &'static str,
480    kind: &str,
481    id: &str,
482    location: impl Into<String>,
483) -> Result<(), AdapterError> {
484    if id.is_empty()
485        || id.contains('/')
486        || id.contains('\\')
487        || id.contains("..")
488        || std::path::Path::new(id).is_absolute()
489    {
490        return Err(AdapterError::schema(
491            adapter,
492            location,
493            format!("{kind} contains a path separator or traversal marker: {id}"),
494        ));
495    }
496    Ok(())
497}
498
499/// Atomically write a batch of `RestoredFile`s under `root`. Every path
500/// segment is re-validated and the joined path is required to stay inside
501/// `root` (spec.md#adapter-native-restore-lossless: restore writes are
502/// adapter-supplied, but the gate lives at the writer so a single audit
503/// covers every adapter today and tomorrow). On partial failure the
504/// half-written tree is discarded before the error is returned.
505///
506/// Currently exercised only by adapter tests; the production restore CLI
507/// will route through this same helper when it lands.
508#[allow(dead_code)]
509pub(crate) fn write_restored_files(
510    root: &Path,
511    files: Vec<RestoredFile>,
512) -> Result<(), AdapterError> {
513    // Stage under a sibling temp dir and atomically rename so a partial
514    // failure cannot leave a half-populated restore in place.
515    let parent = root.parent().unwrap_or_else(|| Path::new("."));
516    let stem = root
517        .file_name()
518        .and_then(|n| n.to_str())
519        .unwrap_or("restore");
520    let staging = parent.join(format!(".{stem}.tmp"));
521    let io =
522        |location: String, source: std::io::Error| AdapterError::io("restore", location, source);
523    let _ = std::fs::remove_dir_all(&staging);
524    std::fs::create_dir_all(&staging).map_err(|e| io(staging.display().to_string(), e))?;
525
526    let result = (|| -> Result<(), AdapterError> {
527        for file in files {
528            write_one_into_staging(&staging, &file)?;
529        }
530        Ok(())
531    })();
532
533    if let Err(error) = result {
534        let _ = std::fs::remove_dir_all(&staging);
535        return Err(error);
536    }
537
538    // Replace any existing restore root; the staging dir becomes the new root.
539    let _ = std::fs::remove_dir_all(root);
540    if let Some(parent) = root.parent()
541        && !parent.as_os_str().is_empty()
542    {
543        std::fs::create_dir_all(parent).map_err(|e| io(parent.display().to_string(), e))?;
544    }
545    std::fs::rename(&staging, root).map_err(|e| {
546        let _ = std::fs::remove_dir_all(&staging);
547        io(root.display().to_string(), e)
548    })?;
549    Ok(())
550}
551
552#[allow(dead_code)]
553fn write_one_into_staging(staging: &Path, file: &RestoredFile) -> Result<(), AdapterError> {
554    // Validate every segment of the supplied relative path.
555    for component in file.relative_path.components() {
556        use std::path::Component;
557        let segment = match component {
558            Component::Normal(s) => s,
559            Component::CurDir => continue,
560            // Absolute prefixes, root, and `..` are categorically rejected -
561            // a restored file's relative_path is by contract relative + safe.
562            _ => {
563                return Err(AdapterError::schema(
564                    "restore",
565                    file.relative_path.display().to_string(),
566                    "relative_path component is not a normal name",
567                ));
568            }
569        };
570        let Some(text) = segment.to_str() else {
571            return Err(AdapterError::schema(
572                "restore",
573                file.relative_path.display().to_string(),
574                "relative_path segment is not UTF-8",
575            ));
576        };
577        validate_path_id(
578            "restore",
579            "relative_path segment",
580            text,
581            file.relative_path.display().to_string(),
582        )?;
583    }
584
585    let dest = staging.join(&file.relative_path);
586    // Defense-in-depth: confirm the joined path is still inside the staging
587    // dir even if every individual segment passed the syntactic check.
588    if !dest.starts_with(staging) {
589        return Err(AdapterError::schema(
590            "restore",
591            file.relative_path.display().to_string(),
592            "relative_path escaped the restore root after join",
593        ));
594    }
595    let io =
596        |location: String, source: std::io::Error| AdapterError::io("restore", location, source);
597    if let Some(parent) = dest.parent() {
598        std::fs::create_dir_all(parent).map_err(|e| io(parent.display().to_string(), e))?;
599    }
600    std::fs::write(&dest, &file.bytes).map_err(|e| io(dest.display().to_string(), e))?;
601    Ok(())
602}
603
604pub(crate) fn extracted_text(value: &Option<Extracted<String>>) -> &str {
605    value.as_deref().map(String::as_str).unwrap_or("")
606}
607
608/// Deterministic message ordering for restore: timestamp, then id as a
609/// tiebreaker so equal-timestamp messages always serialize in a stable order.
610pub(crate) fn by_timestamp_then_id(
611    left: &MessageWithParts,
612    right: &MessageWithParts,
613) -> std::cmp::Ordering {
614    left.message
615        .timestamp()
616        .cmp(&right.message.timestamp())
617        .then_with(|| left.message.id().cmp(right.message.id()))
618}
619
620/// `ProviderOptions::new()` shortcut; both adapters reach for an empty
621/// options map often enough that naming the no-op clarifies the call sites.
622#[inline]
623pub(crate) fn empty_options() -> ProviderOptions {
624    ProviderOptions::new()
625}
626
627#[cfg(test)]
628pub(crate) mod test_support {
629    use std::{
630        collections::BTreeSet,
631        path::{Path, PathBuf},
632    };
633
634    use serde_json::Value;
635    use tempfile::TempDir;
636
637    use super::{Adapter, AdapterFactory, Env, NoopOracle, RestoreFidelity};
638    use crate::{handlers::ingest_adapter, sessions::Store};
639
640    /// Shared probe_default contract: when the adapter's expected install
641    /// subpath exists under an injected `HOME`, `probe_default` returns it;
642    /// when the path is removed, it returns `None`. Each adapter owns its
643    /// `probe_default_*` test (per the seam-boundaries rule) but the shape
644    /// is the same, so the helper takes the factory + its expected subpath.
645    pub(crate) fn assert_probe_default(
646        factory: &dyn AdapterFactory,
647        expected_subpath: &[&str],
648    ) -> anyhow::Result<()> {
649        let temp = TempDir::new()?;
650        let mut expected = temp.path().to_path_buf();
651        for segment in expected_subpath {
652            expected.push(segment);
653        }
654        std::fs::create_dir_all(&expected)?;
655        let env = Env::with_home(temp.path());
656
657        let probe = factory.probe_default(&env);
658        let got = probe
659            .as_ref()
660            .and_then(|value| value.get("path"))
661            .and_then(Value::as_str);
662        anyhow::ensure!(
663            got == expected.to_str(),
664            "factory must probe its install path: got {got:?}, expected {expected:?}",
665        );
666
667        std::fs::remove_dir_all(&expected)?;
668        anyhow::ensure!(
669            factory.probe_default(&env).is_none(),
670            "probe_default must be None once the install path disappears",
671        );
672        Ok(())
673    }
674
675    pub(crate) async fn assert_native_restore(
676        factory: &dyn AdapterFactory,
677        adapter: &dyn Adapter,
678        source_root: &Path,
679    ) -> anyhow::Result<()> {
680        let temp = TempDir::new()?;
681        let store = Store::open_local(temp.path()).await?;
682        ingest_adapter(&store, adapter, &NoopOracle, |_| {}).await?;
683        let session_ids = store.session_ids().await?;
684        assert!(
685            !session_ids.is_empty(),
686            "native restore fixture must ingest at least one session",
687        );
688
689        let mut restored_paths = BTreeSet::new();
690        for session_id in session_ids {
691            let Some(session) = store.get_session(&session_id).await? else {
692                anyhow::bail!("session id listed by store was not readable: {session_id}");
693            };
694            let restored = factory.serialize(&session, RestoreFidelity::Native)?;
695            for file in restored {
696                let expected = source_root.join(&file.relative_path);
697                let expected_bytes = std::fs::read(&expected)
698                    .map_err(|err| anyhow::anyhow!("read {}: {err}", expected.display()))?;
699                assert_json_file_equal(&expected, &expected_bytes, &file.bytes)?;
700                restored_paths.insert(file.relative_path);
701            }
702        }
703        assert_eq!(
704            restored_paths,
705            source_json_files(source_root)?,
706            "native restore must emit exactly the source JSON/JSONL file set",
707        );
708        Ok(())
709    }
710
711    fn source_json_files(root: &Path) -> anyhow::Result<BTreeSet<PathBuf>> {
712        let mut out = BTreeSet::new();
713        collect_source_json_files(root, root, &mut out)?;
714        Ok(out)
715    }
716
717    fn collect_source_json_files(
718        root: &Path,
719        dir: &Path,
720        out: &mut BTreeSet<PathBuf>,
721    ) -> anyhow::Result<()> {
722        for entry in std::fs::read_dir(dir)? {
723            let entry = entry?;
724            let path = entry.path();
725            if entry.file_type()?.is_dir() {
726                collect_source_json_files(root, &path, out)?;
727                continue;
728            }
729            if let Some("json" | "jsonl") = path.extension().and_then(|ext| ext.to_str()) {
730                out.insert(path.strip_prefix(root)?.to_path_buf());
731            }
732        }
733        Ok(())
734    }
735
736    fn assert_json_file_equal(path: &Path, expected: &[u8], actual: &[u8]) -> anyhow::Result<()> {
737        if path.extension().and_then(|ext| ext.to_str()) == Some("jsonl") {
738            let expected_lines = json_lines(expected)?;
739            let actual_lines = json_lines(actual)?;
740            assert_eq!(
741                actual_lines,
742                expected_lines,
743                "jsonl mismatch at {}",
744                path.display()
745            );
746        } else {
747            let expected_value: serde_json::Value = serde_json::from_slice(expected)?;
748            let actual_value: serde_json::Value = serde_json::from_slice(actual)?;
749            assert_eq!(
750                actual_value,
751                expected_value,
752                "json mismatch at {}",
753                path.display()
754            );
755        }
756        Ok(())
757    }
758
759    fn json_lines(bytes: &[u8]) -> anyhow::Result<Vec<serde_json::Value>> {
760        let text = std::str::from_utf8(bytes)?;
761        text.lines()
762            .filter(|line| !line.trim().is_empty())
763            .map(|line| serde_json::from_str(line).map_err(Into::into))
764            .collect()
765    }
766}