use std::path::{Path, PathBuf};
use chrono::{DateTime, Utc};
use serde_json::Value;
use tokio_stream::{Stream, StreamExt};
use crate::{
sessions::{IngestEvent, MessageWithParts, SessionWithMessages},
wire::ProviderOptions,
};
mod claude_code;
mod codex_cli;
mod discovery;
pub mod extract;
mod jsonl;
pub use claude_code::{ClaudeCodeAdapter, ClaudeCodeFactory};
pub use codex_cli::{CodexCliAdapter, CodexCliFactory};
pub use discovery::{Candidate, discover, prompt_and_persist};
pub use extract::{
Extracted, Source, extract_bool, extract_compact_repr, extract_raw_record, extract_self_str,
extract_str, extract_value,
};
pub trait AdapterFactory: Send + Sync {
fn name(&self) -> &'static str;
fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError>;
fn probe_default(&self, env: &Env) -> Option<Value>;
fn serialize(
&self,
session: &SessionWithMessages,
fidelity: RestoreFidelity,
) -> Result<Vec<RestoredFile>, AdapterError>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RestoreFidelity {
Native,
Foreign,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RestoredFile {
pub relative_path: PathBuf,
pub bytes: Vec<u8>,
}
pub trait Adapter: Send + Sync {
fn events(&self) -> EventStream<'_> {
let stream = self.events_with(&NoopOracle);
Box::pin(stream.filter_map(|res| match res {
Ok(AdapterYield::Event(event)) => Some(Ok(event)),
Ok(AdapterYield::Skipped { .. }) => None,
Err(error) => Some(Err(error)),
}))
}
fn discover(&self) -> DiscoverFuture<'_>;
fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a>;
}
pub trait SkipOracle: Send + Sync {
fn last_ingested_at(&self, session_id: &str) -> Option<DateTime<Utc>>;
}
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopOracle;
impl SkipOracle for NoopOracle {
fn last_ingested_at(&self, _session_id: &str) -> Option<DateTime<Utc>> {
None
}
}
#[derive(Debug, Clone)]
pub enum AdapterYield {
Event(IngestEvent),
Skipped {
session_id: Option<String>,
project: Option<String>,
reason: SkipReason,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SkipReason {
Fresh,
Empty,
}
pub type AdapterYieldStream<'a> =
std::pin::Pin<Box<dyn Stream<Item = Result<AdapterYield, AdapterError>> + Send + 'a>>;
pub type DiscoverFuture<'a> =
std::pin::Pin<Box<dyn std::future::Future<Output = Result<usize, AdapterError>> + Send + 'a>>;
pub struct Env {
pub home: PathBuf,
}
impl Env {
pub fn from_env() -> Option<Self> {
let home = std::env::var_os("HOME")?;
Some(Self {
home: PathBuf::from(home),
})
}
pub fn with_home(home: impl Into<PathBuf>) -> Self {
Self { home: home.into() }
}
}
pub type EventStream<'a> =
std::pin::Pin<Box<dyn Stream<Item = Result<IngestEvent, AdapterError>> + Send + 'a>>;
#[derive(Debug)]
pub struct AdapterError {
pub adapter: &'static str,
pub location: String,
pub kind: AdapterErrorKind,
}
#[derive(Debug)]
pub enum AdapterErrorKind {
Io(std::io::Error),
Parse {
line: usize,
source: serde_json::Error,
},
Schema(String),
Config(String),
Transport(String),
Auth(String),
}
impl AdapterError {
pub fn io(adapter: &'static str, location: impl Into<String>, source: std::io::Error) -> Self {
Self {
adapter,
location: location.into(),
kind: AdapterErrorKind::Io(source),
}
}
pub fn parse(
adapter: &'static str,
location: impl Into<String>,
line: usize,
source: serde_json::Error,
) -> Self {
Self {
adapter,
location: location.into(),
kind: AdapterErrorKind::Parse { line, source },
}
}
pub fn schema(
adapter: &'static str,
location: impl Into<String>,
message: impl Into<String>,
) -> Self {
Self {
adapter,
location: location.into(),
kind: AdapterErrorKind::Schema(message.into()),
}
}
pub fn config(adapter: &'static str, message: impl Into<String>) -> Self {
Self {
adapter,
location: "config".to_owned(),
kind: AdapterErrorKind::Config(message.into()),
}
}
}
impl std::fmt::Display for AdapterError {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.kind {
AdapterErrorKind::Io(source) => {
write!(
formatter,
"{} io error at {}: {source}",
self.adapter, self.location
)
}
AdapterErrorKind::Parse { line, source } => write!(
formatter,
"{} json parse error at {}:{line}: {source}",
self.adapter, self.location,
),
AdapterErrorKind::Schema(message) => {
write!(
formatter,
"{} schema error at {}: {message}",
self.adapter, self.location
)
}
AdapterErrorKind::Config(message) => {
write!(formatter, "{} config error: {message}", self.adapter)
}
AdapterErrorKind::Transport(message) => write!(
formatter,
"{} transport error at {}: {message}",
self.adapter, self.location,
),
AdapterErrorKind::Auth(message) => {
write!(formatter, "{} auth error: {message}", self.adapter)
}
}
}
}
impl std::error::Error for AdapterError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match &self.kind {
AdapterErrorKind::Io(source) => Some(source),
AdapterErrorKind::Parse { source, .. } => Some(source),
_ => None,
}
}
}
pub fn registry() -> &'static [&'static dyn AdapterFactory] {
&[&ClaudeCodeFactory, &CodexCliFactory]
}
pub fn by_name(name: &str) -> Option<&'static dyn AdapterFactory> {
registry().iter().copied().find(|f| f.name() == name)
}
pub fn known_names() -> Vec<&'static str> {
registry().iter().map(|f| f.name()).collect()
}
pub fn probe_all(env: &Env) -> Vec<(&'static str, Value)> {
registry()
.iter()
.filter_map(|factory| factory.probe_default(env).map(|cfg| (factory.name(), cfg)))
.collect()
}
pub(crate) fn part_id(message_id: &str, ordinal: usize) -> String {
format!("{message_id}:{ordinal:04}")
}
pub(crate) fn compact_json(value: &Value) -> String {
serde_json::to_string(value).unwrap_or_default()
}
pub(crate) fn jsonl_bytes(
adapter: &'static str,
records: &[Value],
) -> Result<Vec<u8>, AdapterError> {
let mut bytes = Vec::new();
for record in records {
let line = serde_json::to_vec(record).map_err(|err| {
AdapterError::schema(adapter, "serialize", format!("json encode failed: {err}"))
})?;
bytes.extend(line);
bytes.push(b'\n');
}
Ok(bytes)
}
pub(crate) fn config_path(adapter: &'static str, config: Value) -> Result<PathBuf, AdapterError> {
use serde::Deserialize;
#[derive(Deserialize)]
struct Cfg {
path: PathBuf,
}
let cfg: Cfg = serde_json::from_value(config)
.map_err(|err| AdapterError::config(adapter, format!("bad config blob: {err}")))?;
Ok(match std::env::var_os("HOME") {
Some(home) => crate::config::expand_home_under(&cfg.path, Path::new(&home)),
None => cfg.path,
})
}
pub(crate) fn raw_record(options: &ProviderOptions) -> Option<Value> {
options
.get("source")
.and_then(|source| source.get("raw_record"))
.cloned()
}
pub(crate) fn extracted_text(value: &Option<Extracted<String>>) -> &str {
value.as_deref().map(String::as_str).unwrap_or("")
}
pub(crate) fn by_timestamp_then_id(
left: &MessageWithParts,
right: &MessageWithParts,
) -> std::cmp::Ordering {
left.message
.timestamp()
.cmp(&right.message.timestamp())
.then_with(|| left.message.id().cmp(right.message.id()))
}
#[inline]
pub(crate) fn empty_options() -> ProviderOptions {
ProviderOptions::new()
}
#[cfg(test)]
pub(crate) mod test_support {
use std::path::Path;
use tempfile::TempDir;
use super::{Adapter, AdapterFactory, NoopOracle, RestoreFidelity};
use crate::{handlers::ingest_adapter, sessions::Store};
pub(crate) async fn assert_native_restore(
factory: &dyn AdapterFactory,
adapter: &dyn Adapter,
source_root: &Path,
) -> anyhow::Result<()> {
let temp = TempDir::new()?;
let store = Store::open_local(temp.path()).await?;
ingest_adapter(&store, adapter, &NoopOracle, |_| {}).await?;
for session_id in store.session_ids().await? {
let Some(session) = store.get_session(&session_id).await? else {
anyhow::bail!("session id listed by store was not readable: {session_id}");
};
let restored = factory.serialize(&session, RestoreFidelity::Native)?;
for file in restored {
let expected = source_root.join(&file.relative_path);
let expected_bytes = std::fs::read(&expected)
.map_err(|err| anyhow::anyhow!("read {}: {err}", expected.display()))?;
assert_json_file_equal(&expected, &expected_bytes, &file.bytes)?;
}
}
Ok(())
}
fn assert_json_file_equal(path: &Path, expected: &[u8], actual: &[u8]) -> anyhow::Result<()> {
if path.extension().and_then(|ext| ext.to_str()) == Some("jsonl") {
let expected_lines = json_lines(expected)?;
let actual_lines = json_lines(actual)?;
assert_eq!(
actual_lines,
expected_lines,
"jsonl mismatch at {}",
path.display()
);
} else {
let expected_value: serde_json::Value = serde_json::from_slice(expected)?;
let actual_value: serde_json::Value = serde_json::from_slice(actual)?;
assert_eq!(
actual_value,
expected_value,
"json mismatch at {}",
path.display()
);
}
Ok(())
}
fn json_lines(bytes: &[u8]) -> anyhow::Result<Vec<serde_json::Value>> {
let text = std::str::from_utf8(bytes)?;
text.lines()
.filter(|line| !line.trim().is_empty())
.map(|line| serde_json::from_str(line).map_err(Into::into))
.collect()
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::expect_used, clippy::unwrap_used)]
use super::*;
use serde_json::Value;
use tempfile::TempDir;
#[test]
fn each_factory_probes_its_default_under_an_injected_home() {
let temp = TempDir::new().unwrap();
let home = temp.path();
let claude_dir = home.join(".claude").join("projects");
let codex_dir = home.join(".codex").join("sessions");
std::fs::create_dir_all(&claude_dir).unwrap();
std::fs::create_dir_all(&codex_dir).unwrap();
let env = Env::with_home(home);
let claude_probe = ClaudeCodeFactory.probe_default(&env);
assert_eq!(
claude_probe
.as_ref()
.and_then(|v| v.get("path"))
.and_then(Value::as_str),
Some(claude_dir.to_str().unwrap()),
);
let codex_probe = CodexCliFactory.probe_default(&env);
assert_eq!(
codex_probe
.as_ref()
.and_then(|v| v.get("path"))
.and_then(Value::as_str),
Some(codex_dir.to_str().unwrap()),
);
std::fs::remove_dir_all(&codex_dir).unwrap();
assert!(CodexCliFactory.probe_default(&env).is_none());
assert!(ClaudeCodeFactory.probe_default(&env).is_some());
}
}