use std::path::{Path, PathBuf};
use chrono::{DateTime, Utc};
use serde_json::Value;
use tokio_stream::{Stream, StreamExt};
use crate::{
sessions::{IngestEvent, MessageWithParts, SessionWithMessages},
wire::ProviderOptions,
};
mod claude_code;
mod codex_cli;
mod discovery;
pub mod extract;
mod jsonl;
mod opencode;
mod pi_coding_agent;
pub use claude_code::{ClaudeCodeAdapter, ClaudeCodeFactory};
pub use codex_cli::{CodexCliAdapter, CodexCliFactory};
pub use discovery::{
Candidate, PromptOutcome, discover, persist_accept, persist_decline, probe_unconfigured,
prompt_and_persist, prompt_each,
};
pub use extract::{
Extracted, Source, extract_bool, extract_compact_repr, extract_raw_record, extract_self_str,
extract_str, extract_value,
};
pub use opencode::{OpencodeAdapter, OpencodeFactory};
pub use pi_coding_agent::{PiCodingAgentAdapter, PiCodingAgentFactory};
pub trait AdapterFactory: Send + Sync {
fn name(&self) -> &'static str;
fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError>;
fn probe_default(&self, env: &Env) -> Option<Value>;
fn serialize(
&self,
session: &SessionWithMessages,
fidelity: RestoreFidelity,
) -> Result<Vec<RestoredFile>, AdapterError>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RestoreFidelity {
Native,
Foreign,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RestoredFile {
pub relative_path: PathBuf,
pub bytes: Vec<u8>,
pub actual_fidelity: RestoreFidelity,
}
impl RestoredFile {
pub(crate) fn new(
relative_path: impl Into<PathBuf>,
bytes: Vec<u8>,
actual_fidelity: RestoreFidelity,
) -> Self {
Self {
relative_path: relative_path.into(),
bytes,
actual_fidelity,
}
}
}
pub trait Adapter: Send + Sync {
fn events(&self) -> EventStream<'_> {
let stream = self.events_with(&NoopOracle);
Box::pin(stream.filter_map(|res| match res {
Ok(AdapterYield::Event(event)) => Some(Ok(event)),
Ok(AdapterYield::Skipped { .. }) => None,
Err(error) => Some(Err(error)),
}))
}
fn discover(&self) -> DiscoverFuture<'_>;
fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a>;
}
pub trait SkipOracle: Send + Sync {
fn last_ingested_at(&self, session_id: &str) -> Option<DateTime<Utc>>;
fn is_empty(&self) -> bool {
false
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopOracle;
impl SkipOracle for NoopOracle {
fn last_ingested_at(&self, _session_id: &str) -> Option<DateTime<Utc>> {
None
}
fn is_empty(&self) -> bool {
true
}
}
#[derive(Debug, Clone)]
pub enum AdapterYield {
Event(IngestEvent),
Skipped {
session_id: Option<String>,
project: Option<String>,
reason: SkipReason,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SkipReason {
Fresh,
Empty,
Unsupported(String),
}
pub type AdapterYieldStream<'a> =
std::pin::Pin<Box<dyn Stream<Item = Result<AdapterYield, AdapterError>> + Send + 'a>>;
pub type DiscoverFuture<'a> =
std::pin::Pin<Box<dyn std::future::Future<Output = Result<usize, AdapterError>> + Send + 'a>>;
pub struct Env {
pub home: PathBuf,
}
impl Env {
pub fn from_env() -> Option<Self> {
let home = std::env::var_os("HOME")?;
Some(Self {
home: PathBuf::from(home),
})
}
pub fn with_home(home: impl Into<PathBuf>) -> Self {
Self { home: home.into() }
}
}
pub type EventStream<'a> =
std::pin::Pin<Box<dyn Stream<Item = Result<IngestEvent, AdapterError>> + Send + 'a>>;
#[derive(Debug)]
pub struct AdapterError {
pub adapter: &'static str,
pub location: String,
pub kind: AdapterErrorKind,
}
#[derive(Debug)]
pub enum AdapterErrorKind {
Io(std::io::Error),
Parse {
line: usize,
source: serde_json::Error,
},
Schema(String),
Config(String),
Transport(String),
Auth(String),
}
impl AdapterError {
pub fn io(adapter: &'static str, location: impl Into<String>, source: std::io::Error) -> Self {
Self {
adapter,
location: location.into(),
kind: AdapterErrorKind::Io(source),
}
}
pub fn parse(
adapter: &'static str,
location: impl Into<String>,
line: usize,
source: serde_json::Error,
) -> Self {
Self {
adapter,
location: location.into(),
kind: AdapterErrorKind::Parse { line, source },
}
}
pub fn schema(
adapter: &'static str,
location: impl Into<String>,
message: impl Into<String>,
) -> Self {
Self {
adapter,
location: location.into(),
kind: AdapterErrorKind::Schema(message.into()),
}
}
pub fn config(adapter: &'static str, message: impl Into<String>) -> Self {
Self {
adapter,
location: "config".to_owned(),
kind: AdapterErrorKind::Config(message.into()),
}
}
}
impl std::fmt::Display for AdapterError {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.kind {
AdapterErrorKind::Io(source) => {
write!(
formatter,
"{} io error at {}: {source}",
self.adapter, self.location
)
}
AdapterErrorKind::Parse { line, source } => write!(
formatter,
"{} json parse error at {}:{line}: {source}",
self.adapter, self.location,
),
AdapterErrorKind::Schema(message) => {
write!(
formatter,
"{} schema error at {}: {message}",
self.adapter, self.location
)
}
AdapterErrorKind::Config(message) => {
write!(formatter, "{} config error: {message}", self.adapter)
}
AdapterErrorKind::Transport(message) => write!(
formatter,
"{} transport error at {}: {message}",
self.adapter, self.location,
),
AdapterErrorKind::Auth(message) => {
write!(formatter, "{} auth error: {message}", self.adapter)
}
}
}
}
impl std::error::Error for AdapterError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match &self.kind {
AdapterErrorKind::Io(source) => Some(source),
AdapterErrorKind::Parse { source, .. } => Some(source),
_ => None,
}
}
}
pub fn registry() -> &'static [&'static dyn AdapterFactory] {
&[
&ClaudeCodeFactory,
&CodexCliFactory,
&OpencodeFactory,
&PiCodingAgentFactory,
]
}
pub fn by_name(name: &str) -> Option<&'static dyn AdapterFactory> {
registry().iter().copied().find(|f| f.name() == name)
}
pub fn known_names() -> Vec<&'static str> {
registry().iter().map(|f| f.name()).collect()
}
pub fn probe_all(env: &Env) -> Vec<(&'static str, Value)> {
registry()
.iter()
.filter_map(|factory| factory.probe_default(env).map(|cfg| (factory.name(), cfg)))
.collect()
}
pub(crate) fn part_id(message_id: &str, ordinal: usize) -> String {
format!("{message_id}:{ordinal:04}")
}
pub(crate) fn compact_json(value: &Value) -> String {
serde_json::to_string(value).unwrap_or_default()
}
pub(crate) fn jsonl_bytes(
adapter: &'static str,
records: &[Value],
) -> Result<Vec<u8>, AdapterError> {
let mut bytes = Vec::new();
for record in records {
let line = serde_json::to_vec(record).map_err(|err| {
AdapterError::schema(adapter, "serialize", format!("json encode failed: {err}"))
})?;
bytes.extend(line);
bytes.push(b'\n');
}
Ok(bytes)
}
pub(crate) fn config_path(adapter: &'static str, config: Value) -> Result<PathBuf, AdapterError> {
use serde::Deserialize;
#[derive(Deserialize)]
struct Cfg {
path: PathBuf,
}
let cfg: Cfg = serde_json::from_value(config)
.map_err(|err| AdapterError::config(adapter, format!("bad config blob: {err}")))?;
Ok(match std::env::var_os("HOME") {
Some(home) => crate::config::expand_home_under(&cfg.path, Path::new(&home)),
None => cfg.path,
})
}
pub(crate) fn raw_record(options: &ProviderOptions) -> Option<Value> {
options
.get("source")
.and_then(|source| source.get("raw_record"))
.cloned()
}
pub(crate) fn source_options(adapter: &'static str, raw: &Value) -> ProviderOptions {
let mut options = ProviderOptions::new();
options.insert(
"source".to_owned(),
serde_json::json!({
"adapter": adapter,
"raw_record": extract_raw_record(raw),
}),
);
options
}
#[inline]
pub(crate) fn part_ordinal(ordinal: usize) -> i32 {
i32::try_from(ordinal).unwrap_or(i32::MAX)
}
pub(crate) fn validate_path_id(
adapter: &'static str,
kind: &str,
id: &str,
location: impl Into<String>,
) -> Result<(), AdapterError> {
if id.is_empty()
|| id.contains('/')
|| id.contains('\\')
|| id.contains("..")
|| std::path::Path::new(id).is_absolute()
{
return Err(AdapterError::schema(
adapter,
location,
format!("{kind} contains a path separator or traversal marker: {id}"),
));
}
Ok(())
}
#[allow(dead_code)]
pub(crate) fn write_restored_files(
root: &Path,
files: Vec<RestoredFile>,
) -> Result<(), AdapterError> {
let parent = root.parent().unwrap_or_else(|| Path::new("."));
let stem = root
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("restore");
let staging = parent.join(format!(".{stem}.tmp"));
let io =
|location: String, source: std::io::Error| AdapterError::io("restore", location, source);
let _ = std::fs::remove_dir_all(&staging);
std::fs::create_dir_all(&staging).map_err(|e| io(staging.display().to_string(), e))?;
let result = (|| -> Result<(), AdapterError> {
for file in files {
write_one_into_staging(&staging, &file)?;
}
Ok(())
})();
if let Err(error) = result {
let _ = std::fs::remove_dir_all(&staging);
return Err(error);
}
let _ = std::fs::remove_dir_all(root);
if let Some(parent) = root.parent()
&& !parent.as_os_str().is_empty()
{
std::fs::create_dir_all(parent).map_err(|e| io(parent.display().to_string(), e))?;
}
std::fs::rename(&staging, root).map_err(|e| {
let _ = std::fs::remove_dir_all(&staging);
io(root.display().to_string(), e)
})?;
Ok(())
}
#[allow(dead_code)]
fn write_one_into_staging(staging: &Path, file: &RestoredFile) -> Result<(), AdapterError> {
for component in file.relative_path.components() {
use std::path::Component;
let segment = match component {
Component::Normal(s) => s,
Component::CurDir => continue,
_ => {
return Err(AdapterError::schema(
"restore",
file.relative_path.display().to_string(),
"relative_path component is not a normal name",
));
}
};
let Some(text) = segment.to_str() else {
return Err(AdapterError::schema(
"restore",
file.relative_path.display().to_string(),
"relative_path segment is not UTF-8",
));
};
validate_path_id(
"restore",
"relative_path segment",
text,
file.relative_path.display().to_string(),
)?;
}
let dest = staging.join(&file.relative_path);
if !dest.starts_with(staging) {
return Err(AdapterError::schema(
"restore",
file.relative_path.display().to_string(),
"relative_path escaped the restore root after join",
));
}
let io =
|location: String, source: std::io::Error| AdapterError::io("restore", location, source);
if let Some(parent) = dest.parent() {
std::fs::create_dir_all(parent).map_err(|e| io(parent.display().to_string(), e))?;
}
std::fs::write(&dest, &file.bytes).map_err(|e| io(dest.display().to_string(), e))?;
Ok(())
}
pub(crate) fn extracted_text(value: &Option<Extracted<String>>) -> &str {
value.as_deref().map(String::as_str).unwrap_or("")
}
pub(crate) fn by_timestamp_then_id(
left: &MessageWithParts,
right: &MessageWithParts,
) -> std::cmp::Ordering {
left.message
.timestamp()
.cmp(&right.message.timestamp())
.then_with(|| left.message.id().cmp(right.message.id()))
}
#[inline]
pub(crate) fn empty_options() -> ProviderOptions {
ProviderOptions::new()
}
#[cfg(test)]
pub(crate) mod test_support {
use std::{
collections::BTreeSet,
path::{Path, PathBuf},
};
use serde_json::Value;
use tempfile::TempDir;
use super::{Adapter, AdapterFactory, Env, NoopOracle, RestoreFidelity};
use crate::{handlers::ingest_adapter, sessions::Store};
pub(crate) fn assert_probe_default(
factory: &dyn AdapterFactory,
expected_subpath: &[&str],
) -> anyhow::Result<()> {
let temp = TempDir::new()?;
let mut expected = temp.path().to_path_buf();
for segment in expected_subpath {
expected.push(segment);
}
std::fs::create_dir_all(&expected)?;
let env = Env::with_home(temp.path());
let probe = factory.probe_default(&env);
let got = probe
.as_ref()
.and_then(|value| value.get("path"))
.and_then(Value::as_str);
anyhow::ensure!(
got == expected.to_str(),
"factory must probe its install path: got {got:?}, expected {expected:?}",
);
std::fs::remove_dir_all(&expected)?;
anyhow::ensure!(
factory.probe_default(&env).is_none(),
"probe_default must be None once the install path disappears",
);
Ok(())
}
pub(crate) async fn assert_native_restore(
factory: &dyn AdapterFactory,
adapter: &dyn Adapter,
source_root: &Path,
) -> anyhow::Result<()> {
let temp = TempDir::new()?;
let store = Store::open_local(temp.path()).await?;
ingest_adapter(&store, adapter, &NoopOracle, |_| {}).await?;
let session_ids = store.session_ids().await?;
assert!(
!session_ids.is_empty(),
"native restore fixture must ingest at least one session",
);
let mut restored_paths = BTreeSet::new();
for session_id in session_ids {
let Some(session) = store.get_session(&session_id).await? else {
anyhow::bail!("session id listed by store was not readable: {session_id}");
};
let restored = factory.serialize(&session, RestoreFidelity::Native)?;
for file in restored {
let expected = source_root.join(&file.relative_path);
let expected_bytes = std::fs::read(&expected)
.map_err(|err| anyhow::anyhow!("read {}: {err}", expected.display()))?;
assert_json_file_equal(&expected, &expected_bytes, &file.bytes)?;
restored_paths.insert(file.relative_path);
}
}
assert_eq!(
restored_paths,
source_json_files(source_root)?,
"native restore must emit exactly the source JSON/JSONL file set",
);
Ok(())
}
fn source_json_files(root: &Path) -> anyhow::Result<BTreeSet<PathBuf>> {
let mut out = BTreeSet::new();
collect_source_json_files(root, root, &mut out)?;
Ok(out)
}
fn collect_source_json_files(
root: &Path,
dir: &Path,
out: &mut BTreeSet<PathBuf>,
) -> anyhow::Result<()> {
for entry in std::fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if entry.file_type()?.is_dir() {
collect_source_json_files(root, &path, out)?;
continue;
}
if let Some("json" | "jsonl") = path.extension().and_then(|ext| ext.to_str()) {
out.insert(path.strip_prefix(root)?.to_path_buf());
}
}
Ok(())
}
fn assert_json_file_equal(path: &Path, expected: &[u8], actual: &[u8]) -> anyhow::Result<()> {
if path.extension().and_then(|ext| ext.to_str()) == Some("jsonl") {
let expected_lines = json_lines(expected)?;
let actual_lines = json_lines(actual)?;
assert_eq!(
actual_lines,
expected_lines,
"jsonl mismatch at {}",
path.display()
);
} else {
let expected_value: serde_json::Value = serde_json::from_slice(expected)?;
let actual_value: serde_json::Value = serde_json::from_slice(actual)?;
assert_eq!(
actual_value,
expected_value,
"json mismatch at {}",
path.display()
);
}
Ok(())
}
fn json_lines(bytes: &[u8]) -> anyhow::Result<Vec<serde_json::Value>> {
let text = std::str::from_utf8(bytes)?;
text.lines()
.filter(|line| !line.trim().is_empty())
.map(|line| serde_json::from_str(line).map_err(Into::into))
.collect()
}
}