Skip to main content

pond/adapter/
mod.rs

1//! Source-adapter seam.
2//!
3//! Pond ingests sessions from many runtimes. The seam splits in two:
4//!
5//! - [`AdapterFactory`] is the stateless face every format publishes once,
6//!   collected by [`registry`]. It knows how to construct configured adapters
7//!   from an opaque JSON config blob ([`AdapterFactory::open`]) and how to
8//!   probe the user's environment for a default config
9//!   ([`AdapterFactory::probe_default`]).
10//! - [`Adapter`] is the live, configured instance. Its only job is
11//!   [`Adapter::events`]: stream canonical [`IngestEvent`]s in append-only
12//!   order per session. The "source" is opaque to the seam - a directory
13//!   tree, an HTTP endpoint, a database, an archive file.
14//!
15//! Concrete implementations live in `adapter/<format>.rs` and are tied
16//! together by [`registry`]. A new adapter is one file plus one line in the
17//! registry; no central dispatch table to edit.
18
19use std::path::{Path, PathBuf};
20
21use chrono::{DateTime, Utc};
22use serde_json::Value;
23use tokio_stream::{Stream, StreamExt};
24
25use crate::{
26    sessions::{IngestEvent, MessageWithParts, SessionWithMessages},
27    wire::ProviderOptions,
28};
29
30mod claude_code;
31mod codex_cli;
32mod discovery;
33pub mod extract;
34mod jsonl;
35
36pub use claude_code::{ClaudeCodeAdapter, ClaudeCodeFactory};
37pub use codex_cli::{CodexCliAdapter, CodexCliFactory};
38pub use discovery::{Candidate, discover, prompt_and_persist};
39pub use extract::{
40    Extracted, Source, extract_bool, extract_compact_repr, extract_raw_record, extract_self_str,
41    extract_str, extract_value,
42};
43
44/// Stateless face of an adapter type: how the registry knows about it without
45/// instantiating it. One implementation per known format, registered in
46/// [`registry`].
47pub trait AdapterFactory: Send + Sync {
48    /// Stable short name. Used as the `[sources.<name>]` config key, the
49    /// `pond sync <name>` positional arg, and the `Session.source_agent`
50    /// value emitted by the corresponding adapter.
51    fn name(&self) -> &'static str;
52
53    /// Open a configured adapter from a JSON-shaped config blob. The shape is
54    /// owned by each factory: filesystem adapters expect `{ "path": "..." }`,
55    /// API-backed adapters expect `{ "endpoint": "...", "auth_token": "..." }`,
56    /// etc. The seam doesn't know or care. A factory rejects a bad blob with
57    /// [`AdapterErrorKind::Config`].
58    fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError>;
59
60    /// Probe the user's environment for a default config. Returns the JSON
61    /// blob that would go into `[sources.<name>]` if the picker writes it
62    /// back. Filesystem adapters check their canonical install path under
63    /// `env.home`; adapters with no auto-discovery rule (e.g. API adapters
64    /// that need explicit creds) return `None`.
65    fn probe_default(&self, env: &Env) -> Option<Value>;
66
67    /// Restore one canonical session into this adapter's native file layout.
68    fn serialize(
69        &self,
70        session: &SessionWithMessages,
71        fidelity: RestoreFidelity,
72    ) -> Result<Vec<RestoredFile>, AdapterError>;
73}
74
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum RestoreFidelity {
77    Native,
78    Foreign,
79}
80
81#[derive(Debug, Clone, PartialEq, Eq)]
82pub struct RestoredFile {
83    pub relative_path: PathBuf,
84    pub bytes: Vec<u8>,
85}
86
87/// Live, configured adapter instance. Holds whatever handle the source needs
88/// (an open directory root, an HTTP client + auth, a database connection)
89/// for the lifetime of its event stream.
90pub trait Adapter: Send + Sync {
91    /// Stream every canonical event for every session this adapter knows
92    /// about, in append-only order per session. The stream borrows `self`
93    /// so callers can pass `&adapter` or hold a `Box<dyn Adapter>` and
94    /// invoke this through `as_ref()`.
95    fn events(&self) -> EventStream<'_> {
96        let stream = self.events_with(&NoopOracle);
97        Box::pin(stream.filter_map(|res| match res {
98            Ok(AdapterYield::Event(event)) => Some(Ok(event)),
99            Ok(AdapterYield::Skipped { .. }) => None,
100            Err(error) => Some(Err(error)),
101        }))
102    }
103
104    /// Count how many sessions [`Self::events`] will produce, used by the
105    /// CLI bar to set its length up front. A filesystem adapter walks its
106    /// root and counts `.jsonl` files; an API adapter calls its list
107    /// endpoint. Cheap and best-effort: errors here only mean we run with
108    /// an unknown total (the bar still ticks per session), so callers
109    /// fall back to a rolling counter rather than failing the sync.
110    fn discover(&self) -> DiscoverFuture<'_>;
111
112    /// Stream events with a [`SkipOracle`] the adapter MAY consult to
113    /// short-circuit per-session re-decoding (spec.md#adapter-integrity-event-ordering). Default impl
114    /// ignores the oracle.
115    fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a>;
116}
117
118/// Per-session watermark lookup: when did pond last write this session?
119/// Backed by Lance's `_row_last_updated_at_version` joined to the manifest
120/// commit timestamp (spec.md#adapter-integrity-event-ordering). Adapter compares this to the source
121/// file's mtime to decide whether to re-decode.
122pub trait SkipOracle: Send + Sync {
123    fn last_ingested_at(&self, session_id: &str) -> Option<DateTime<Utc>>;
124}
125
126/// `SkipOracle` that always returns `None`. Used by tests and benches that
127/// don't want skip behavior interfering with their assertions.
128#[derive(Debug, Default, Clone, Copy)]
129pub struct NoopOracle;
130
131impl SkipOracle for NoopOracle {
132    fn last_ingested_at(&self, _session_id: &str) -> Option<DateTime<Utc>> {
133        None
134    }
135}
136
137#[derive(Debug, Clone)]
138pub enum AdapterYield {
139    Event(IngestEvent),
140    Skipped {
141        /// `None` for files that never yield a session id (empty `.jsonl`).
142        session_id: Option<String>,
143        project: Option<String>,
144        reason: SkipReason,
145    },
146}
147
148#[derive(Debug, Clone, Copy, PartialEq, Eq)]
149pub enum SkipReason {
150    Fresh,
151    /// File produced no importable session (empty `.jsonl`, sidecar-only rows,
152    /// or an unextractable header). Benign: counted, never an error or a
153    /// per-event drop. The underlying cause is logged at `POND_LOG=debug`.
154    Empty,
155}
156
157pub type AdapterYieldStream<'a> =
158    std::pin::Pin<Box<dyn Stream<Item = Result<AdapterYield, AdapterError>> + Send + 'a>>;
159
160/// Boxed future returning the number of sessions an adapter will emit. The
161/// shape mirrors [`EventStream`] - one alias per async trait method so the
162/// trait stays `dyn`-compatible without per-adapter associated types.
163pub type DiscoverFuture<'a> =
164    std::pin::Pin<Box<dyn std::future::Future<Output = Result<usize, AdapterError>> + Send + 'a>>;
165
166/// Environment slice handed to [`AdapterFactory::probe_default`]. Kept
167/// deliberately small - just `home`, because env-var lookups for API creds
168/// are unreliable and most adapters with API backends should require
169/// explicit config rather than opportunistic env reads.
170pub struct Env {
171    pub home: PathBuf,
172}
173
174impl Env {
175    /// Read `home` from the `HOME` env var. Returns `None` when `HOME` is
176    /// unset (CI, post-install hooks, sandboxed runs).
177    pub fn from_env() -> Option<Self> {
178        let home = std::env::var_os("HOME")?;
179        Some(Self {
180            home: PathBuf::from(home),
181        })
182    }
183
184    /// Construct an `Env` with an explicit home. Tests use this to inject a
185    /// `TempDir`-backed home without touching the process env.
186    pub fn with_home(home: impl Into<PathBuf>) -> Self {
187        Self { home: home.into() }
188    }
189}
190
191/// Boxed, `Send`-only stream of [`IngestEvent`]s with one shared error type.
192/// The lifetime parameter lets future adapters borrow from their config; for
193/// `self: Box<Self>` impls the lifetime collapses to `'static`.
194pub type EventStream<'a> =
195    std::pin::Pin<Box<dyn Stream<Item = Result<IngestEvent, AdapterError>> + Send + 'a>>;
196
197/// One error type for every adapter. Each call site tags the error with the
198/// adapter's name (so multi-adapter syncs can attribute failures) and a
199/// `location` string the operator can act on (file path, URL, line number,
200/// config key, ...). The `kind` carries the underlying class.
201#[derive(Debug)]
202pub struct AdapterError {
203    pub adapter: &'static str,
204    pub location: String,
205    pub kind: AdapterErrorKind,
206}
207
208#[derive(Debug)]
209pub enum AdapterErrorKind {
210    /// Filesystem / network IO at `location`.
211    Io(std::io::Error),
212    /// JSON parse error at line `line` inside `location`.
213    Parse {
214        line: usize,
215        source: serde_json::Error,
216    },
217    /// Format-specific shape error: missing required field, unknown role,
218    /// unsupported record type. The `String` is operator-facing.
219    Schema(String),
220    /// `AdapterFactory::open` rejected its config blob.
221    Config(String),
222    /// HTTP / RPC / timeout error from an API-backed adapter.
223    Transport(String),
224    /// Auth failure from an API-backed adapter (bad token, expired creds).
225    Auth(String),
226}
227
228impl AdapterError {
229    pub fn io(adapter: &'static str, location: impl Into<String>, source: std::io::Error) -> Self {
230        Self {
231            adapter,
232            location: location.into(),
233            kind: AdapterErrorKind::Io(source),
234        }
235    }
236
237    pub fn parse(
238        adapter: &'static str,
239        location: impl Into<String>,
240        line: usize,
241        source: serde_json::Error,
242    ) -> Self {
243        Self {
244            adapter,
245            location: location.into(),
246            kind: AdapterErrorKind::Parse { line, source },
247        }
248    }
249
250    pub fn schema(
251        adapter: &'static str,
252        location: impl Into<String>,
253        message: impl Into<String>,
254    ) -> Self {
255        Self {
256            adapter,
257            location: location.into(),
258            kind: AdapterErrorKind::Schema(message.into()),
259        }
260    }
261
262    pub fn config(adapter: &'static str, message: impl Into<String>) -> Self {
263        Self {
264            adapter,
265            location: "config".to_owned(),
266            kind: AdapterErrorKind::Config(message.into()),
267        }
268    }
269}
270
271impl std::fmt::Display for AdapterError {
272    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
273        match &self.kind {
274            AdapterErrorKind::Io(source) => {
275                write!(
276                    formatter,
277                    "{} io error at {}: {source}",
278                    self.adapter, self.location
279                )
280            }
281            AdapterErrorKind::Parse { line, source } => write!(
282                formatter,
283                "{} json parse error at {}:{line}: {source}",
284                self.adapter, self.location,
285            ),
286            AdapterErrorKind::Schema(message) => {
287                write!(
288                    formatter,
289                    "{} schema error at {}: {message}",
290                    self.adapter, self.location
291                )
292            }
293            AdapterErrorKind::Config(message) => {
294                write!(formatter, "{} config error: {message}", self.adapter)
295            }
296            AdapterErrorKind::Transport(message) => write!(
297                formatter,
298                "{} transport error at {}: {message}",
299                self.adapter, self.location,
300            ),
301            AdapterErrorKind::Auth(message) => {
302                write!(formatter, "{} auth error: {message}", self.adapter)
303            }
304        }
305    }
306}
307
308impl std::error::Error for AdapterError {
309    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
310        match &self.kind {
311            AdapterErrorKind::Io(source) => Some(source),
312            AdapterErrorKind::Parse { source, .. } => Some(source),
313            _ => None,
314        }
315    }
316}
317
318/// The static, ordered registry of every adapter pond knows. A new adapter
319/// adds one `&Factory` here plus one file under `src/adapter/`. Order is the
320/// order discovery presents to the operator.
321pub fn registry() -> &'static [&'static dyn AdapterFactory] {
322    &[&ClaudeCodeFactory, &CodexCliFactory]
323}
324
325/// Look up a factory by name. Returns `None` for unknown names; callers
326/// usually wrap that in a clear error using [`known_names`].
327pub fn by_name(name: &str) -> Option<&'static dyn AdapterFactory> {
328    registry().iter().copied().find(|f| f.name() == name)
329}
330
331/// The names of every registered adapter. Drives error messages
332/// ("unknown adapter X; known: ...") and the discovery picker labels.
333pub fn known_names() -> Vec<&'static str> {
334    registry().iter().map(|f| f.name()).collect()
335}
336
337/// Probe every adapter for a default config under `env.home`. Returns
338/// `(name, default_config)` pairs in registry order, skipping adapters whose
339/// `probe_default` returned `None`. The picker shows these to the operator.
340pub fn probe_all(env: &Env) -> Vec<(&'static str, Value)> {
341    registry()
342        .iter()
343        .filter_map(|factory| factory.probe_default(env).map(|cfg| (factory.name(), cfg)))
344        .collect()
345}
346
347/// Stable Part-row id: `"{message_id}:{ordinal:04}"`. Both JSONL adapters use
348/// this shape so the cross-adapter id space stays predictable.
349pub(crate) fn part_id(message_id: &str, ordinal: usize) -> String {
350    format!("{message_id}:{ordinal:04}")
351}
352
353/// Compact (no-whitespace) JSON serialization used as a fallback Part body
354/// when a row carries something we don't have a richer canonical shape for.
355pub(crate) fn compact_json(value: &Value) -> String {
356    serde_json::to_string(value).unwrap_or_default()
357}
358
359pub(crate) fn jsonl_bytes(
360    adapter: &'static str,
361    records: &[Value],
362) -> Result<Vec<u8>, AdapterError> {
363    let mut bytes = Vec::new();
364    for record in records {
365        let line = serde_json::to_vec(record).map_err(|err| {
366            AdapterError::schema(adapter, "serialize", format!("json encode failed: {err}"))
367        })?;
368        bytes.extend(line);
369        bytes.push(b'\n');
370    }
371    Ok(bytes)
372}
373
374/// Shared `AdapterFactory::open` plumbing: parse the config blob's `path` and
375/// expand a leading `~` against `$HOME` once, not per path adapter.
376pub(crate) fn config_path(adapter: &'static str, config: Value) -> Result<PathBuf, AdapterError> {
377    use serde::Deserialize;
378    #[derive(Deserialize)]
379    struct Cfg {
380        path: PathBuf,
381    }
382    let cfg: Cfg = serde_json::from_value(config)
383        .map_err(|err| AdapterError::config(adapter, format!("bad config blob: {err}")))?;
384    Ok(match std::env::var_os("HOME") {
385        Some(home) => crate::config::expand_home_under(&cfg.path, Path::new(&home)),
386        None => cfg.path,
387    })
388}
389
390pub(crate) fn raw_record(options: &ProviderOptions) -> Option<Value> {
391    options
392        .get("source")
393        .and_then(|source| source.get("raw_record"))
394        .cloned()
395}
396
397pub(crate) fn extracted_text(value: &Option<Extracted<String>>) -> &str {
398    value.as_deref().map(String::as_str).unwrap_or("")
399}
400
401/// Deterministic message ordering for restore: timestamp, then id as a
402/// tiebreaker so equal-timestamp messages always serialize in a stable order.
403pub(crate) fn by_timestamp_then_id(
404    left: &MessageWithParts,
405    right: &MessageWithParts,
406) -> std::cmp::Ordering {
407    left.message
408        .timestamp()
409        .cmp(&right.message.timestamp())
410        .then_with(|| left.message.id().cmp(right.message.id()))
411}
412
413/// `ProviderOptions::new()` shortcut; both adapters reach for an empty
414/// options map often enough that naming the no-op clarifies the call sites.
415#[inline]
416pub(crate) fn empty_options() -> ProviderOptions {
417    ProviderOptions::new()
418}
419
420#[cfg(test)]
421pub(crate) mod test_support {
422    use std::path::Path;
423
424    use tempfile::TempDir;
425
426    use super::{Adapter, AdapterFactory, NoopOracle, RestoreFidelity};
427    use crate::{handlers::ingest_adapter, sessions::Store};
428
429    pub(crate) async fn assert_native_restore(
430        factory: &dyn AdapterFactory,
431        adapter: &dyn Adapter,
432        source_root: &Path,
433    ) -> anyhow::Result<()> {
434        let temp = TempDir::new()?;
435        let store = Store::open_local(temp.path()).await?;
436        ingest_adapter(&store, adapter, &NoopOracle, |_| {}).await?;
437        for session_id in store.session_ids().await? {
438            let Some(session) = store.get_session(&session_id).await? else {
439                anyhow::bail!("session id listed by store was not readable: {session_id}");
440            };
441            let restored = factory.serialize(&session, RestoreFidelity::Native)?;
442            for file in restored {
443                let expected = source_root.join(&file.relative_path);
444                let expected_bytes = std::fs::read(&expected)
445                    .map_err(|err| anyhow::anyhow!("read {}: {err}", expected.display()))?;
446                assert_json_file_equal(&expected, &expected_bytes, &file.bytes)?;
447            }
448        }
449        Ok(())
450    }
451
452    fn assert_json_file_equal(path: &Path, expected: &[u8], actual: &[u8]) -> anyhow::Result<()> {
453        if path.extension().and_then(|ext| ext.to_str()) == Some("jsonl") {
454            let expected_lines = json_lines(expected)?;
455            let actual_lines = json_lines(actual)?;
456            assert_eq!(
457                actual_lines,
458                expected_lines,
459                "jsonl mismatch at {}",
460                path.display()
461            );
462        } else {
463            let expected_value: serde_json::Value = serde_json::from_slice(expected)?;
464            let actual_value: serde_json::Value = serde_json::from_slice(actual)?;
465            assert_eq!(
466                actual_value,
467                expected_value,
468                "json mismatch at {}",
469                path.display()
470            );
471        }
472        Ok(())
473    }
474
475    fn json_lines(bytes: &[u8]) -> anyhow::Result<Vec<serde_json::Value>> {
476        let text = std::str::from_utf8(bytes)?;
477        text.lines()
478            .filter(|line| !line.trim().is_empty())
479            .map(|line| serde_json::from_str(line).map_err(Into::into))
480            .collect()
481    }
482}
483
484#[cfg(test)]
485mod tests {
486    #![allow(clippy::expect_used, clippy::unwrap_used)]
487
488    use super::*;
489    use serde_json::Value;
490    use tempfile::TempDir;
491
492    #[test]
493    fn each_factory_probes_its_default_under_an_injected_home() {
494        // Per-adapter discovery lives on each factory's `probe_default`, not in
495        // a central name->path table. Driving each one with an injected `home`
496        // proves the rule lives where the format lives.
497        let temp = TempDir::new().unwrap();
498        let home = temp.path();
499        let claude_dir = home.join(".claude").join("projects");
500        let codex_dir = home.join(".codex").join("sessions");
501        std::fs::create_dir_all(&claude_dir).unwrap();
502        std::fs::create_dir_all(&codex_dir).unwrap();
503
504        let env = Env::with_home(home);
505
506        let claude_probe = ClaudeCodeFactory.probe_default(&env);
507        assert_eq!(
508            claude_probe
509                .as_ref()
510                .and_then(|v| v.get("path"))
511                .and_then(Value::as_str),
512            Some(claude_dir.to_str().unwrap()),
513        );
514
515        let codex_probe = CodexCliFactory.probe_default(&env);
516        assert_eq!(
517            codex_probe
518                .as_ref()
519                .and_then(|v| v.get("path"))
520                .and_then(Value::as_str),
521            Some(codex_dir.to_str().unwrap()),
522        );
523
524        // Removing the codex marker dir drops just that factory's probe.
525        std::fs::remove_dir_all(&codex_dir).unwrap();
526        assert!(CodexCliFactory.probe_default(&env).is_none());
527        assert!(ClaudeCodeFactory.probe_default(&env).is_some());
528    }
529}