Skip to main content

pond/adapter/
mod.rs

1//! Source-adapter seam.
2//!
3//! Pond ingests sessions from many runtimes. The seam splits in two:
4//!
5//! - [`AdapterFactory`] is the stateless face every format publishes once,
6//!   collected by [`registry`]. It knows how to construct configured adapters
7//!   from an opaque JSON config blob ([`AdapterFactory::open`]) and how to
8//!   probe the user's environment for a default config
9//!   ([`AdapterFactory::probe_default`]).
10//! - [`Adapter`] is the live, configured instance. Its only job is
11//!   [`Adapter::events`]: stream canonical [`IngestEvent`]s in append-only
12//!   order per session. The "source" is opaque to the seam - a directory
13//!   tree, an HTTP endpoint, a database, an archive file.
14//!
15//! Concrete implementations live in `adapter/<format>.rs` and are tied
16//! together by [`registry`]. A new adapter is one file plus one line in the
17//! registry; no central dispatch table to edit.
18
19use std::path::{Path, PathBuf};
20
21use serde_json::Value;
22use tokio_stream::{Stream, StreamExt};
23
24use crate::{
25    sessions::{IngestEvent, MessageWithParts, SessionWithMessages},
26    wire::ProviderOptions,
27};
28
29mod claude_ai_export;
30mod claude_code;
31mod claude_desktop_app;
32mod codex_cli;
33mod discovery;
34pub mod extract;
35mod jsonl;
36mod opencode;
37mod pi_coding_agent;
38
39pub use claude_ai_export::{ClaudeAiExportAdapter, ClaudeAiExportFactory};
40pub use claude_code::{ClaudeCodeAdapter, ClaudeCodeFactory};
41pub use claude_desktop_app::{ClaudeDesktopAppAdapter, ClaudeDesktopAppFactory};
42pub use codex_cli::{CodexCliAdapter, CodexCliFactory};
43pub use discovery::{
44    Candidate, apply_to_doc, discover, persist_accept, probe_unconfigured, prompt_and_persist,
45    set_adapter_enabled,
46};
47pub use extract::{
48    Extracted, Source, extract_bool, extract_compact_repr, extract_raw_record, extract_self_str,
49    extract_str, extract_value,
50};
51pub use opencode::{OpencodeAdapter, OpencodeFactory};
52pub use pi_coding_agent::{PiCodingAgentAdapter, PiCodingAgentFactory};
53
54/// Stateless face of an adapter type: how the registry knows about it without
55/// instantiating it. One implementation per known format, registered in
56/// [`registry`].
57pub trait AdapterFactory: Send + Sync {
58    /// Stable short name. Used as the `[adapters.<name>]` config key, the
59    /// `pond sync <name>` positional arg, and the `Session.source_agent`
60    /// value emitted by the corresponding adapter.
61    fn name(&self) -> &'static str;
62
63    /// Open a configured adapter from a JSON-shaped config blob. The shape is
64    /// owned by each factory: filesystem adapters expect `{ "path": "..." }`,
65    /// API-backed adapters expect `{ "endpoint": "...", "auth_token": "..." }`,
66    /// etc. The seam doesn't know or care. A factory rejects a bad blob with
67    /// [`AdapterErrorKind::Config`].
68    fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError>;
69
70    /// Probe the user's environment for a default config. Returns the JSON
71    /// blob that would go into `[adapters.<name>]` if the picker writes it
72    /// back. Filesystem adapters check their canonical install path under
73    /// `env.home`; adapters with no auto-discovery rule (e.g. API adapters
74    /// that need explicit creds) return `None`.
75    fn probe_default(&self, env: &Env) -> Option<Value>;
76
77    /// Restore one canonical session into this adapter's native file layout.
78    fn serialize(
79        &self,
80        session: &SessionWithMessages,
81        fidelity: RestoreFidelity,
82    ) -> Result<Vec<RestoredFile>, AdapterError>;
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum RestoreFidelity {
87    Native,
88    Foreign,
89}
90
91#[derive(Debug, Clone, PartialEq, Eq)]
92pub struct RestoredFile {
93    pub relative_path: PathBuf,
94    pub bytes: Vec<u8>,
95    /// Fidelity actually served when this file was produced. Equal to the
96    /// requested fidelity unless the adapter had to downgrade (e.g. caller
97    /// asked `Native` but the session lacks a stored `raw_record`, so the
98    /// adapter served `Foreign`). spec.md#adapter-native-restore-lossless:
99    /// native may be impossible on older logs; the signal lets the CLI warn
100    /// rather than silently degrade.
101    pub actual_fidelity: RestoreFidelity,
102}
103
104impl RestoredFile {
105    pub(crate) fn new(
106        relative_path: impl Into<PathBuf>,
107        bytes: Vec<u8>,
108        actual_fidelity: RestoreFidelity,
109    ) -> Self {
110        Self {
111            relative_path: relative_path.into(),
112            bytes,
113            actual_fidelity,
114        }
115    }
116}
117
118/// Live, configured adapter instance. Holds whatever handle the source needs
119/// (an open directory root, an HTTP client + auth, a database connection)
120/// for the lifetime of its event stream.
121pub trait Adapter: Send + Sync {
122    /// Stream every canonical event for every session this adapter knows
123    /// about, in append-only order per session. The stream borrows `self`
124    /// so callers can pass `&adapter` or hold a `Box<dyn Adapter>` and
125    /// invoke this through `as_ref()`.
126    fn events(&self) -> EventStream<'_> {
127        let stream = self.events_with(&NoopOracle);
128        Box::pin(stream.filter_map(|res| match res {
129            Ok(AdapterYield::Event(event)) => Some(Ok(event)),
130            Ok(AdapterYield::Skipped { .. } | AdapterYield::SkippedBatch { .. }) => None,
131            Err(error) => Some(Err(error)),
132        }))
133    }
134
135    /// Count how many sessions [`Self::events`] will produce, used by the
136    /// CLI bar to set its length up front. A filesystem adapter walks its
137    /// root and counts `.jsonl` files; an API adapter calls its list
138    /// endpoint. Cheap and best-effort: errors here only mean we run with
139    /// an unknown total (the bar still ticks per session), so callers
140    /// fall back to a rolling counter rather than failing the sync.
141    fn discover(&self) -> DiscoverFuture<'_>;
142
143    /// Stream events with a [`SkipOracle`] the adapter MAY consult to
144    /// short-circuit per-session re-decoding (spec.md#adapter-integrity-event-ordering). Default impl
145    /// ignores the oracle.
146    fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a>;
147}
148
149/// Store-side freshness watermark: the max message timestamp (micros) pond
150/// already holds for a session. Backed by the resident row-meta map (zero S3 -
151/// see [`crate::rowmap`]), which is itself rebuilt from the store, so the check
152/// is deterministic with no local cursor to desync. `None` means pond has never
153/// seen the session, or the resident map is behind the store - either way the
154/// caller re-reads.
155///
156/// The skip is sound because pond and every source are append-only: a session's
157/// max message timestamp only advances as it gains messages. The one residual is
158/// two messages sharing the exact micros across a sync boundary (negligible at
159/// sub-second precision, self-healing once any newer message arrives); `pond sync
160/// --verify` (which passes [`NoopOracle`]) is the full-re-read backstop.
161pub trait SkipOracle: Send + Sync {
162    fn session_max_ts(&self, session_id: &str) -> Option<i64>;
163
164    /// Fast-path hint: the oracle has no entries at all (first ingest or
165    /// `NoopOracle`). Lets adapters skip the per-session work needed to read the
166    /// source's latest message timestamp. Defaults to `false`.
167    fn is_empty(&self) -> bool {
168        false
169    }
170}
171
172/// Seam decision rule - the only place the freshness comparison lives. A session
173/// is fresh (skip the re-decode) iff the source's latest message timestamp is no
174/// newer than pond's stored watermark. A missing signal on either side is never
175/// fresh.
176pub fn is_session_fresh(
177    oracle: &dyn SkipOracle,
178    session_id: &str,
179    source_last_ts_micros: Option<i64>,
180) -> bool {
181    matches!(
182        (oracle.session_max_ts(session_id), source_last_ts_micros),
183        (Some(stored), Some(source)) if source <= stored
184    )
185}
186
187/// `SkipOracle` that always returns `None`. Used by `--verify`, tests, and
188/// benches that want every source re-read.
189#[derive(Debug, Default, Clone, Copy)]
190pub struct NoopOracle;
191
192impl SkipOracle for NoopOracle {
193    fn session_max_ts(&self, _session_id: &str) -> Option<i64> {
194        None
195    }
196
197    fn is_empty(&self) -> bool {
198        true
199    }
200}
201
202#[derive(Debug, Clone)]
203pub enum AdapterYield {
204    Event(IngestEvent),
205    Skipped {
206        /// `None` for files that never yield a session id (empty `.jsonl`).
207        session_id: Option<String>,
208        project: Option<String>,
209        reason: SkipReason,
210    },
211    /// Aggregate skip; one yield per N files (typically `Fresh` recurring
212    /// sync) instead of N. Avoids O(N) per-session orchestrator overhead.
213    SkippedBatch {
214        reason: SkipReason,
215        count: usize,
216    },
217}
218
219#[derive(Debug, Clone, PartialEq, Eq)]
220pub enum SkipReason {
221    Fresh,
222    /// File produced no importable session (empty `.jsonl`, sidecar-only rows,
223    /// or an unextractable header). Benign: counted, never an error or a
224    /// per-event drop. The underlying cause is logged at `-vv` (debug) verbosity.
225    Empty,
226    /// File is structurally a known sidecar whose specific shape this adapter
227    /// version can't ingest. Surfaced as a visible, counted failure - NOT a
228    /// benign skip - so the gap is actionable and the file is never folded into
229    /// another session under a borrowed id. The payload is the user-facing
230    /// reason naming the file and the fix.
231    Unsupported(String),
232}
233
234pub type AdapterYieldStream<'a> =
235    std::pin::Pin<Box<dyn Stream<Item = Result<AdapterYield, AdapterError>> + Send + 'a>>;
236
237/// Boxed future returning the number of sessions an adapter will emit. The
238/// shape mirrors [`EventStream`] - one alias per async trait method so the
239/// trait stays `dyn`-compatible without per-adapter associated types.
240pub type DiscoverFuture<'a> =
241    std::pin::Pin<Box<dyn std::future::Future<Output = Result<usize, AdapterError>> + Send + 'a>>;
242
243/// Environment slice handed to [`AdapterFactory::probe_default`]. Kept
244/// deliberately small - just `home`, because env-var lookups for API creds
245/// are unreliable and most adapters with API backends should require
246/// explicit config rather than opportunistic env reads.
247pub struct Env {
248    pub home: PathBuf,
249}
250
251impl Env {
252    /// Read `home` from the `HOME` env var. Returns `None` when `HOME` is
253    /// unset (CI, post-install hooks, sandboxed runs).
254    pub fn from_env() -> Option<Self> {
255        let home = std::env::var_os("HOME")?;
256        Some(Self {
257            home: PathBuf::from(home),
258        })
259    }
260
261    /// Construct an `Env` with an explicit home. Tests use this to inject a
262    /// `TempDir`-backed home without touching the process env.
263    pub fn with_home(home: impl Into<PathBuf>) -> Self {
264        Self { home: home.into() }
265    }
266}
267
268/// Boxed, `Send`-only stream of [`IngestEvent`]s with one shared error type.
269/// The lifetime parameter lets future adapters borrow from their config; for
270/// `self: Box<Self>` impls the lifetime collapses to `'static`.
271pub type EventStream<'a> =
272    std::pin::Pin<Box<dyn Stream<Item = Result<IngestEvent, AdapterError>> + Send + 'a>>;
273
274/// One error type for every adapter. Each call site tags the error with the
275/// adapter's name (so multi-adapter syncs can attribute failures) and a
276/// `location` string the operator can act on (file path, URL, line number,
277/// config key, ...). The `kind` carries the underlying class.
278#[derive(Debug)]
279pub struct AdapterError {
280    pub adapter: &'static str,
281    pub location: String,
282    pub kind: AdapterErrorKind,
283}
284
285#[derive(Debug)]
286pub enum AdapterErrorKind {
287    /// Filesystem / network IO at `location`.
288    Io(std::io::Error),
289    /// JSON parse error at line `line` inside `location`.
290    Parse {
291        line: usize,
292        source: serde_json::Error,
293    },
294    /// Format-specific shape error: missing required field, unknown role,
295    /// unsupported record type. The `String` is operator-facing.
296    Schema(String),
297    /// `AdapterFactory::open` rejected its config blob.
298    Config(String),
299    /// HTTP / RPC / timeout error from an API-backed adapter.
300    Transport(String),
301    /// Auth failure from an API-backed adapter (bad token, expired creds).
302    Auth(String),
303}
304
305impl AdapterError {
306    pub fn io(adapter: &'static str, location: impl Into<String>, source: std::io::Error) -> Self {
307        Self {
308            adapter,
309            location: location.into(),
310            kind: AdapterErrorKind::Io(source),
311        }
312    }
313
314    pub fn parse(
315        adapter: &'static str,
316        location: impl Into<String>,
317        line: usize,
318        source: serde_json::Error,
319    ) -> Self {
320        Self {
321            adapter,
322            location: location.into(),
323            kind: AdapterErrorKind::Parse { line, source },
324        }
325    }
326
327    pub fn schema(
328        adapter: &'static str,
329        location: impl Into<String>,
330        message: impl Into<String>,
331    ) -> Self {
332        Self {
333            adapter,
334            location: location.into(),
335            kind: AdapterErrorKind::Schema(message.into()),
336        }
337    }
338
339    pub fn config(adapter: &'static str, message: impl Into<String>) -> Self {
340        Self {
341            adapter,
342            location: "config".to_owned(),
343            kind: AdapterErrorKind::Config(message.into()),
344        }
345    }
346}
347
348impl std::fmt::Display for AdapterError {
349    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
350        match &self.kind {
351            AdapterErrorKind::Io(source) => {
352                write!(
353                    formatter,
354                    "{} io error at {}: {source}",
355                    self.adapter, self.location
356                )
357            }
358            AdapterErrorKind::Parse { line, source } => write!(
359                formatter,
360                "{} json parse error at {}:{line}: {source}",
361                self.adapter, self.location,
362            ),
363            AdapterErrorKind::Schema(message) => {
364                write!(
365                    formatter,
366                    "{} schema error at {}: {message}",
367                    self.adapter, self.location
368                )
369            }
370            AdapterErrorKind::Config(message) => {
371                write!(formatter, "{} config error: {message}", self.adapter)
372            }
373            AdapterErrorKind::Transport(message) => write!(
374                formatter,
375                "{} transport error at {}: {message}",
376                self.adapter, self.location,
377            ),
378            AdapterErrorKind::Auth(message) => {
379                write!(formatter, "{} auth error: {message}", self.adapter)
380            }
381        }
382    }
383}
384
385impl std::error::Error for AdapterError {
386    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
387        match &self.kind {
388            AdapterErrorKind::Io(source) => Some(source),
389            AdapterErrorKind::Parse { source, .. } => Some(source),
390            _ => None,
391        }
392    }
393}
394
395/// The static, ordered registry of every adapter pond knows. A new adapter
396/// adds one `&Factory` here plus one file under `src/adapter/`. Order is the
397/// order discovery presents to the operator.
398pub fn registry() -> &'static [&'static dyn AdapterFactory] {
399    &[
400        &ClaudeCodeFactory,
401        &ClaudeDesktopAppFactory,
402        &ClaudeAiExportFactory,
403        &CodexCliFactory,
404        &OpencodeFactory,
405        &PiCodingAgentFactory,
406    ]
407}
408
409/// Look up a factory by name. Returns `None` for unknown names; callers
410/// usually wrap that in a clear error using [`known_names`].
411pub fn by_name(name: &str) -> Option<&'static dyn AdapterFactory> {
412    registry().iter().copied().find(|f| f.name() == name)
413}
414
415/// The names of every registered adapter. Drives error messages
416/// ("unknown adapter X; known: ...") and the discovery picker labels.
417pub fn known_names() -> Vec<&'static str> {
418    registry().iter().map(|f| f.name()).collect()
419}
420
421/// Probe every adapter for a default config under `env.home`. Returns
422/// `(name, default_config)` pairs in registry order, skipping adapters whose
423/// `probe_default` returned `None`. The picker shows these to the operator.
424pub fn probe_all(env: &Env) -> Vec<(&'static str, Value)> {
425    registry()
426        .iter()
427        .filter_map(|factory| factory.probe_default(env).map(|cfg| (factory.name(), cfg)))
428        .collect()
429}
430
431/// Stable Part-row id: `"{message_id}:{ordinal:04}"`. Both JSONL adapters use
432/// this shape so the cross-adapter id space stays predictable.
433pub(crate) fn part_id(message_id: &str, ordinal: usize) -> String {
434    format!("{message_id}:{ordinal:04}")
435}
436
437/// Compact (no-whitespace) JSON serialization used as a fallback Part body
438/// when a row carries something we don't have a richer canonical shape for.
439pub(crate) fn compact_json(value: &Value) -> String {
440    serde_json::to_string(value).unwrap_or_default()
441}
442
443pub(crate) fn jsonl_bytes(
444    adapter: &'static str,
445    records: &[Value],
446) -> Result<Vec<u8>, AdapterError> {
447    let mut bytes = Vec::new();
448    for record in records {
449        let line = serde_json::to_vec(record).map_err(|err| {
450            AdapterError::schema(adapter, "serialize", format!("json encode failed: {err}"))
451        })?;
452        bytes.extend(line);
453        bytes.push(b'\n');
454    }
455    Ok(bytes)
456}
457
458/// Shared `AdapterFactory::open` plumbing: parse the config blob's `path` and
459/// expand a leading `~` against `$HOME` once, not per path adapter.
460pub(crate) fn config_path(adapter: &'static str, config: Value) -> Result<PathBuf, AdapterError> {
461    use serde::Deserialize;
462    #[derive(Deserialize)]
463    struct Cfg {
464        path: PathBuf,
465    }
466    let cfg: Cfg = serde_json::from_value(config)
467        .map_err(|err| AdapterError::config(adapter, format!("bad config blob: {err}")))?;
468    Ok(match std::env::var_os("HOME") {
469        Some(home) => crate::config::expand_home_under(&cfg.path, Path::new(&home)),
470        None => cfg.path,
471    })
472}
473
474pub(crate) fn raw_record(options: &ProviderOptions) -> Option<Value> {
475    options
476        .get("source")
477        .and_then(|source| source.get("raw_record"))
478        .cloned()
479}
480
481/// Standard `options.source = {adapter, raw_record}` shape used by every
482/// adapter that captures its source record for native restore. Centralized so
483/// the writer side of the raw-record convention lives next to the reader
484/// ([`raw_record`]); per-adapter side-fields (e.g. claude-code's `cwd`,
485/// codex-cli's `git`) extend this map after construction.
486pub(crate) fn source_options(adapter: &'static str, raw: &Value) -> ProviderOptions {
487    let mut options = ProviderOptions::new();
488    options.insert(
489        "source".to_owned(),
490        serde_json::json!({
491            "adapter": adapter,
492            "raw_record": extract_raw_record(raw),
493        }),
494    );
495    options
496}
497
498/// `Part.ordinal` is stored as `i32`; ingest counts as `usize`. A session
499/// could in principle exceed `i32::MAX` parts, in which case we clamp rather
500/// than drop the record.
501#[inline]
502pub(crate) fn part_ordinal(ordinal: usize) -> i32 {
503    i32::try_from(ordinal).unwrap_or(i32::MAX)
504}
505
506/// Reject `/`, `\`, `..`, and absolute paths in any segment that will become
507/// part of a filesystem path during restore. Centralizing it here keeps every
508/// adapter's restore-write path on the same allowlist; the writer
509/// ([`write_restored_files`]) re-applies it as a defense-in-depth check on
510/// every segment regardless of which adapter built the `RestoredFile`.
511pub(crate) fn validate_path_id(
512    adapter: &'static str,
513    kind: &str,
514    id: &str,
515    location: impl Into<String>,
516) -> Result<(), AdapterError> {
517    if id.is_empty()
518        || id.contains('/')
519        || id.contains('\\')
520        || id.contains("..")
521        || std::path::Path::new(id).is_absolute()
522    {
523        return Err(AdapterError::schema(
524            adapter,
525            location,
526            format!("{kind} contains a path separator or traversal marker: {id}"),
527        ));
528    }
529    Ok(())
530}
531
532/// Atomically write a batch of `RestoredFile`s under `root`. Every path
533/// segment is re-validated and the joined path is required to stay inside
534/// `root` (spec.md#adapter-native-restore-lossless: restore writes are
535/// adapter-supplied, but the gate lives at the writer so a single audit
536/// covers every adapter today and tomorrow). On partial failure the
537/// half-written tree is discarded before the error is returned.
538///
539/// Currently exercised only by adapter tests; the production restore CLI
540/// will route through this same helper when it lands.
541#[allow(dead_code)]
542pub(crate) fn write_restored_files(
543    root: &Path,
544    files: Vec<RestoredFile>,
545) -> Result<(), AdapterError> {
546    // Stage under a sibling temp dir and atomically rename so a partial
547    // failure cannot leave a half-populated restore in place.
548    let parent = root.parent().unwrap_or_else(|| Path::new("."));
549    let stem = root
550        .file_name()
551        .and_then(|n| n.to_str())
552        .unwrap_or("restore");
553    let staging = parent.join(format!(".{stem}.tmp"));
554    let io =
555        |location: String, source: std::io::Error| AdapterError::io("restore", location, source);
556    let _ = std::fs::remove_dir_all(&staging);
557    std::fs::create_dir_all(&staging).map_err(|e| io(staging.display().to_string(), e))?;
558
559    let result = (|| -> Result<(), AdapterError> {
560        for file in files {
561            write_one_into_staging(&staging, &file)?;
562        }
563        Ok(())
564    })();
565
566    if let Err(error) = result {
567        let _ = std::fs::remove_dir_all(&staging);
568        return Err(error);
569    }
570
571    // Replace any existing restore root; the staging dir becomes the new root.
572    let _ = std::fs::remove_dir_all(root);
573    if let Some(parent) = root.parent()
574        && !parent.as_os_str().is_empty()
575    {
576        std::fs::create_dir_all(parent).map_err(|e| io(parent.display().to_string(), e))?;
577    }
578    std::fs::rename(&staging, root).map_err(|e| {
579        let _ = std::fs::remove_dir_all(&staging);
580        io(root.display().to_string(), e)
581    })?;
582    Ok(())
583}
584
585#[allow(dead_code)]
586fn write_one_into_staging(staging: &Path, file: &RestoredFile) -> Result<(), AdapterError> {
587    // Validate every segment of the supplied relative path.
588    for component in file.relative_path.components() {
589        use std::path::Component;
590        let segment = match component {
591            Component::Normal(s) => s,
592            Component::CurDir => continue,
593            // Absolute prefixes, root, and `..` are categorically rejected -
594            // a restored file's relative_path is by contract relative + safe.
595            _ => {
596                return Err(AdapterError::schema(
597                    "restore",
598                    file.relative_path.display().to_string(),
599                    "relative_path component is not a normal name",
600                ));
601            }
602        };
603        let Some(text) = segment.to_str() else {
604            return Err(AdapterError::schema(
605                "restore",
606                file.relative_path.display().to_string(),
607                "relative_path segment is not UTF-8",
608            ));
609        };
610        validate_path_id(
611            "restore",
612            "relative_path segment",
613            text,
614            file.relative_path.display().to_string(),
615        )?;
616    }
617
618    let dest = staging.join(&file.relative_path);
619    // Defense-in-depth: confirm the joined path is still inside the staging
620    // dir even if every individual segment passed the syntactic check.
621    if !dest.starts_with(staging) {
622        return Err(AdapterError::schema(
623            "restore",
624            file.relative_path.display().to_string(),
625            "relative_path escaped the restore root after join",
626        ));
627    }
628    let io =
629        |location: String, source: std::io::Error| AdapterError::io("restore", location, source);
630    if let Some(parent) = dest.parent() {
631        std::fs::create_dir_all(parent).map_err(|e| io(parent.display().to_string(), e))?;
632    }
633    std::fs::write(&dest, &file.bytes).map_err(|e| io(dest.display().to_string(), e))?;
634    Ok(())
635}
636
637pub(crate) fn extracted_text(value: &Option<Extracted<String>>) -> &str {
638    value.as_deref().map(String::as_str).unwrap_or("")
639}
640
641/// Deterministic message ordering for restore: timestamp, then id as a
642/// tiebreaker so equal-timestamp messages always serialize in a stable order.
643pub(crate) fn by_timestamp_then_id(
644    left: &MessageWithParts,
645    right: &MessageWithParts,
646) -> std::cmp::Ordering {
647    left.message
648        .timestamp()
649        .cmp(&right.message.timestamp())
650        .then_with(|| left.message.id().cmp(right.message.id()))
651}
652
653/// `ProviderOptions::new()` shortcut; both adapters reach for an empty
654/// options map often enough that naming the no-op clarifies the call sites.
655#[inline]
656pub(crate) fn empty_options() -> ProviderOptions {
657    ProviderOptions::new()
658}
659
660#[cfg(test)]
661pub(crate) mod test_support {
662    use std::{
663        collections::BTreeSet,
664        path::{Path, PathBuf},
665    };
666
667    use serde_json::Value;
668    use tempfile::TempDir;
669
670    use super::{Adapter, AdapterFactory, Env, NoopOracle, RestoreFidelity};
671    use crate::{handlers::ingest_adapter, sessions::Store};
672
673    /// Shared probe_default contract: when the adapter's expected install
674    /// subpath exists under an injected `HOME`, `probe_default` returns it;
675    /// when the path is removed, it returns `None`. Each adapter owns its
676    /// `probe_default_*` test (per the seam-boundaries rule) but the shape
677    /// is the same, so the helper takes the factory + its expected subpath.
678    pub(crate) fn assert_probe_default(
679        factory: &dyn AdapterFactory,
680        expected_subpath: &[&str],
681    ) -> anyhow::Result<()> {
682        let temp = TempDir::new()?;
683        let mut expected = temp.path().to_path_buf();
684        for segment in expected_subpath {
685            expected.push(segment);
686        }
687        std::fs::create_dir_all(&expected)?;
688        let env = Env::with_home(temp.path());
689
690        let probe = factory.probe_default(&env);
691        let got = probe
692            .as_ref()
693            .and_then(|value| value.get("path"))
694            .and_then(Value::as_str);
695        anyhow::ensure!(
696            got == expected.to_str(),
697            "factory must probe its install path: got {got:?}, expected {expected:?}",
698        );
699
700        std::fs::remove_dir_all(&expected)?;
701        anyhow::ensure!(
702            factory.probe_default(&env).is_none(),
703            "probe_default must be None once the install path disappears",
704        );
705        Ok(())
706    }
707
708    pub(crate) async fn assert_native_restore(
709        factory: &dyn AdapterFactory,
710        adapter: &dyn Adapter,
711        source_root: &Path,
712    ) -> anyhow::Result<()> {
713        let temp = TempDir::new()?;
714        let store = Store::open_local(temp.path()).await?;
715        ingest_adapter(&store, adapter, &NoopOracle, |_| {}).await?;
716        let session_ids = store.session_ids().await?;
717        assert!(
718            !session_ids.is_empty(),
719            "native restore fixture must ingest at least one session",
720        );
721
722        let mut restored_paths = BTreeSet::new();
723        for session_id in session_ids {
724            let Some(session) = store.get_session(&session_id).await? else {
725                anyhow::bail!("session id listed by store was not readable: {session_id}");
726            };
727            let restored = factory.serialize(&session, RestoreFidelity::Native)?;
728            for file in restored {
729                let expected = source_root.join(&file.relative_path);
730                let expected_bytes = std::fs::read(&expected)
731                    .map_err(|err| anyhow::anyhow!("read {}: {err}", expected.display()))?;
732                assert_json_file_equal(&expected, &expected_bytes, &file.bytes)?;
733                restored_paths.insert(file.relative_path);
734            }
735        }
736        assert_eq!(
737            restored_paths,
738            source_json_files(source_root)?,
739            "native restore must emit exactly the source JSON/JSONL file set",
740        );
741        Ok(())
742    }
743
744    fn source_json_files(root: &Path) -> anyhow::Result<BTreeSet<PathBuf>> {
745        let mut out = BTreeSet::new();
746        collect_source_json_files(root, root, &mut out)?;
747        Ok(out)
748    }
749
750    fn collect_source_json_files(
751        root: &Path,
752        dir: &Path,
753        out: &mut BTreeSet<PathBuf>,
754    ) -> anyhow::Result<()> {
755        for entry in std::fs::read_dir(dir)? {
756            let entry = entry?;
757            let path = entry.path();
758            if entry.file_type()?.is_dir() {
759                collect_source_json_files(root, &path, out)?;
760                continue;
761            }
762            if let Some("json" | "jsonl") = path.extension().and_then(|ext| ext.to_str()) {
763                out.insert(path.strip_prefix(root)?.to_path_buf());
764            }
765        }
766        Ok(())
767    }
768
769    fn assert_json_file_equal(path: &Path, expected: &[u8], actual: &[u8]) -> anyhow::Result<()> {
770        if path.extension().and_then(|ext| ext.to_str()) == Some("jsonl") {
771            let expected_lines = json_lines(expected)?;
772            let actual_lines = json_lines(actual)?;
773            assert_eq!(
774                actual_lines,
775                expected_lines,
776                "jsonl mismatch at {}",
777                path.display()
778            );
779        } else {
780            let expected_value: serde_json::Value = serde_json::from_slice(expected)?;
781            let actual_value: serde_json::Value = serde_json::from_slice(actual)?;
782            assert_eq!(
783                actual_value,
784                expected_value,
785                "json mismatch at {}",
786                path.display()
787            );
788        }
789        Ok(())
790    }
791
792    fn json_lines(bytes: &[u8]) -> anyhow::Result<Vec<serde_json::Value>> {
793        let text = std::str::from_utf8(bytes)?;
794        text.lines()
795            .filter(|line| !line.trim().is_empty())
796            .map(|line| serde_json::from_str(line).map_err(Into::into))
797            .collect()
798    }
799}