1use std::path::{Path, PathBuf};
20
21use chrono::{DateTime, Utc};
22use serde_json::Value;
23use tokio_stream::{Stream, StreamExt};
24
25use crate::{
26 sessions::{IngestEvent, MessageWithParts, SessionWithMessages},
27 wire::ProviderOptions,
28};
29
30mod claude_code;
31mod codex_cli;
32mod discovery;
33pub mod extract;
34mod jsonl;
35
36pub use claude_code::{ClaudeCodeAdapter, ClaudeCodeFactory};
37pub use codex_cli::{CodexCliAdapter, CodexCliFactory};
38pub use discovery::{Candidate, discover, prompt_and_persist};
39pub use extract::{
40 Extracted, Source, extract_bool, extract_compact_repr, extract_raw_record, extract_self_str,
41 extract_str, extract_value,
42};
43
44pub trait AdapterFactory: Send + Sync {
48 fn name(&self) -> &'static str;
52
53 fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError>;
59
60 fn probe_default(&self, env: &Env) -> Option<Value>;
66
67 fn serialize(
69 &self,
70 session: &SessionWithMessages,
71 fidelity: RestoreFidelity,
72 ) -> Result<Vec<RestoredFile>, AdapterError>;
73}
74
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum RestoreFidelity {
77 Native,
78 Foreign,
79}
80
81#[derive(Debug, Clone, PartialEq, Eq)]
82pub struct RestoredFile {
83 pub relative_path: PathBuf,
84 pub bytes: Vec<u8>,
85}
86
87pub trait Adapter: Send + Sync {
91 fn events(&self) -> EventStream<'_> {
96 let stream = self.events_with(&NoopOracle);
97 Box::pin(stream.filter_map(|res| match res {
98 Ok(AdapterYield::Event(event)) => Some(Ok(event)),
99 Ok(AdapterYield::Skipped { .. }) => None,
100 Err(error) => Some(Err(error)),
101 }))
102 }
103
104 fn discover(&self) -> DiscoverFuture<'_>;
111
112 fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a>;
116}
117
118pub trait SkipOracle: Send + Sync {
123 fn last_ingested_at(&self, session_id: &str) -> Option<DateTime<Utc>>;
124}
125
126#[derive(Debug, Default, Clone, Copy)]
129pub struct NoopOracle;
130
131impl SkipOracle for NoopOracle {
132 fn last_ingested_at(&self, _session_id: &str) -> Option<DateTime<Utc>> {
133 None
134 }
135}
136
137#[derive(Debug, Clone)]
138pub enum AdapterYield {
139 Event(IngestEvent),
140 Skipped {
141 session_id: Option<String>,
143 project: Option<String>,
144 reason: SkipReason,
145 },
146}
147
148#[derive(Debug, Clone, Copy, PartialEq, Eq)]
149pub enum SkipReason {
150 Fresh,
151 Empty,
155}
156
157pub type AdapterYieldStream<'a> =
158 std::pin::Pin<Box<dyn Stream<Item = Result<AdapterYield, AdapterError>> + Send + 'a>>;
159
160pub type DiscoverFuture<'a> =
164 std::pin::Pin<Box<dyn std::future::Future<Output = Result<usize, AdapterError>> + Send + 'a>>;
165
166pub struct Env {
171 pub home: PathBuf,
172}
173
174impl Env {
175 pub fn from_env() -> Option<Self> {
178 let home = std::env::var_os("HOME")?;
179 Some(Self {
180 home: PathBuf::from(home),
181 })
182 }
183
184 pub fn with_home(home: impl Into<PathBuf>) -> Self {
187 Self { home: home.into() }
188 }
189}
190
191pub type EventStream<'a> =
195 std::pin::Pin<Box<dyn Stream<Item = Result<IngestEvent, AdapterError>> + Send + 'a>>;
196
197#[derive(Debug)]
202pub struct AdapterError {
203 pub adapter: &'static str,
204 pub location: String,
205 pub kind: AdapterErrorKind,
206}
207
208#[derive(Debug)]
209pub enum AdapterErrorKind {
210 Io(std::io::Error),
212 Parse {
214 line: usize,
215 source: serde_json::Error,
216 },
217 Schema(String),
220 Config(String),
222 Transport(String),
224 Auth(String),
226}
227
228impl AdapterError {
229 pub fn io(adapter: &'static str, location: impl Into<String>, source: std::io::Error) -> Self {
230 Self {
231 adapter,
232 location: location.into(),
233 kind: AdapterErrorKind::Io(source),
234 }
235 }
236
237 pub fn parse(
238 adapter: &'static str,
239 location: impl Into<String>,
240 line: usize,
241 source: serde_json::Error,
242 ) -> Self {
243 Self {
244 adapter,
245 location: location.into(),
246 kind: AdapterErrorKind::Parse { line, source },
247 }
248 }
249
250 pub fn schema(
251 adapter: &'static str,
252 location: impl Into<String>,
253 message: impl Into<String>,
254 ) -> Self {
255 Self {
256 adapter,
257 location: location.into(),
258 kind: AdapterErrorKind::Schema(message.into()),
259 }
260 }
261
262 pub fn config(adapter: &'static str, message: impl Into<String>) -> Self {
263 Self {
264 adapter,
265 location: "config".to_owned(),
266 kind: AdapterErrorKind::Config(message.into()),
267 }
268 }
269}
270
271impl std::fmt::Display for AdapterError {
272 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
273 match &self.kind {
274 AdapterErrorKind::Io(source) => {
275 write!(
276 formatter,
277 "{} io error at {}: {source}",
278 self.adapter, self.location
279 )
280 }
281 AdapterErrorKind::Parse { line, source } => write!(
282 formatter,
283 "{} json parse error at {}:{line}: {source}",
284 self.adapter, self.location,
285 ),
286 AdapterErrorKind::Schema(message) => {
287 write!(
288 formatter,
289 "{} schema error at {}: {message}",
290 self.adapter, self.location
291 )
292 }
293 AdapterErrorKind::Config(message) => {
294 write!(formatter, "{} config error: {message}", self.adapter)
295 }
296 AdapterErrorKind::Transport(message) => write!(
297 formatter,
298 "{} transport error at {}: {message}",
299 self.adapter, self.location,
300 ),
301 AdapterErrorKind::Auth(message) => {
302 write!(formatter, "{} auth error: {message}", self.adapter)
303 }
304 }
305 }
306}
307
308impl std::error::Error for AdapterError {
309 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
310 match &self.kind {
311 AdapterErrorKind::Io(source) => Some(source),
312 AdapterErrorKind::Parse { source, .. } => Some(source),
313 _ => None,
314 }
315 }
316}
317
318pub fn registry() -> &'static [&'static dyn AdapterFactory] {
322 &[&ClaudeCodeFactory, &CodexCliFactory]
323}
324
325pub fn by_name(name: &str) -> Option<&'static dyn AdapterFactory> {
328 registry().iter().copied().find(|f| f.name() == name)
329}
330
331pub fn known_names() -> Vec<&'static str> {
334 registry().iter().map(|f| f.name()).collect()
335}
336
337pub fn probe_all(env: &Env) -> Vec<(&'static str, Value)> {
341 registry()
342 .iter()
343 .filter_map(|factory| factory.probe_default(env).map(|cfg| (factory.name(), cfg)))
344 .collect()
345}
346
347pub(crate) fn part_id(message_id: &str, ordinal: usize) -> String {
350 format!("{message_id}:{ordinal:04}")
351}
352
353pub(crate) fn compact_json(value: &Value) -> String {
356 serde_json::to_string(value).unwrap_or_default()
357}
358
359pub(crate) fn jsonl_bytes(
360 adapter: &'static str,
361 records: &[Value],
362) -> Result<Vec<u8>, AdapterError> {
363 let mut bytes = Vec::new();
364 for record in records {
365 let line = serde_json::to_vec(record).map_err(|err| {
366 AdapterError::schema(adapter, "serialize", format!("json encode failed: {err}"))
367 })?;
368 bytes.extend(line);
369 bytes.push(b'\n');
370 }
371 Ok(bytes)
372}
373
374pub(crate) fn config_path(adapter: &'static str, config: Value) -> Result<PathBuf, AdapterError> {
377 use serde::Deserialize;
378 #[derive(Deserialize)]
379 struct Cfg {
380 path: PathBuf,
381 }
382 let cfg: Cfg = serde_json::from_value(config)
383 .map_err(|err| AdapterError::config(adapter, format!("bad config blob: {err}")))?;
384 Ok(match std::env::var_os("HOME") {
385 Some(home) => crate::config::expand_home_under(&cfg.path, Path::new(&home)),
386 None => cfg.path,
387 })
388}
389
390pub(crate) fn raw_record(options: &ProviderOptions) -> Option<Value> {
391 options
392 .get("source")
393 .and_then(|source| source.get("raw_record"))
394 .cloned()
395}
396
397pub(crate) fn extracted_text(value: &Option<Extracted<String>>) -> &str {
398 value.as_deref().map(String::as_str).unwrap_or("")
399}
400
401pub(crate) fn by_timestamp_then_id(
404 left: &MessageWithParts,
405 right: &MessageWithParts,
406) -> std::cmp::Ordering {
407 left.message
408 .timestamp()
409 .cmp(&right.message.timestamp())
410 .then_with(|| left.message.id().cmp(right.message.id()))
411}
412
413#[inline]
416pub(crate) fn empty_options() -> ProviderOptions {
417 ProviderOptions::new()
418}
419
420#[cfg(test)]
421pub(crate) mod test_support {
422 use std::path::Path;
423
424 use tempfile::TempDir;
425
426 use super::{Adapter, AdapterFactory, NoopOracle, RestoreFidelity};
427 use crate::{handlers::ingest_adapter, sessions::Store};
428
429 pub(crate) async fn assert_native_restore(
430 factory: &dyn AdapterFactory,
431 adapter: &dyn Adapter,
432 source_root: &Path,
433 ) -> anyhow::Result<()> {
434 let temp = TempDir::new()?;
435 let store = Store::open_local(temp.path()).await?;
436 ingest_adapter(&store, adapter, &NoopOracle, |_| {}).await?;
437 for session_id in store.session_ids().await? {
438 let Some(session) = store.get_session(&session_id).await? else {
439 anyhow::bail!("session id listed by store was not readable: {session_id}");
440 };
441 let restored = factory.serialize(&session, RestoreFidelity::Native)?;
442 for file in restored {
443 let expected = source_root.join(&file.relative_path);
444 let expected_bytes = std::fs::read(&expected)
445 .map_err(|err| anyhow::anyhow!("read {}: {err}", expected.display()))?;
446 assert_json_file_equal(&expected, &expected_bytes, &file.bytes)?;
447 }
448 }
449 Ok(())
450 }
451
452 fn assert_json_file_equal(path: &Path, expected: &[u8], actual: &[u8]) -> anyhow::Result<()> {
453 if path.extension().and_then(|ext| ext.to_str()) == Some("jsonl") {
454 let expected_lines = json_lines(expected)?;
455 let actual_lines = json_lines(actual)?;
456 assert_eq!(
457 actual_lines,
458 expected_lines,
459 "jsonl mismatch at {}",
460 path.display()
461 );
462 } else {
463 let expected_value: serde_json::Value = serde_json::from_slice(expected)?;
464 let actual_value: serde_json::Value = serde_json::from_slice(actual)?;
465 assert_eq!(
466 actual_value,
467 expected_value,
468 "json mismatch at {}",
469 path.display()
470 );
471 }
472 Ok(())
473 }
474
475 fn json_lines(bytes: &[u8]) -> anyhow::Result<Vec<serde_json::Value>> {
476 let text = std::str::from_utf8(bytes)?;
477 text.lines()
478 .filter(|line| !line.trim().is_empty())
479 .map(|line| serde_json::from_str(line).map_err(Into::into))
480 .collect()
481 }
482}
483
484#[cfg(test)]
485mod tests {
486 #![allow(clippy::expect_used, clippy::unwrap_used)]
487
488 use super::*;
489 use serde_json::Value;
490 use tempfile::TempDir;
491
492 #[test]
493 fn each_factory_probes_its_default_under_an_injected_home() {
494 let temp = TempDir::new().unwrap();
498 let home = temp.path();
499 let claude_dir = home.join(".claude").join("projects");
500 let codex_dir = home.join(".codex").join("sessions");
501 std::fs::create_dir_all(&claude_dir).unwrap();
502 std::fs::create_dir_all(&codex_dir).unwrap();
503
504 let env = Env::with_home(home);
505
506 let claude_probe = ClaudeCodeFactory.probe_default(&env);
507 assert_eq!(
508 claude_probe
509 .as_ref()
510 .and_then(|v| v.get("path"))
511 .and_then(Value::as_str),
512 Some(claude_dir.to_str().unwrap()),
513 );
514
515 let codex_probe = CodexCliFactory.probe_default(&env);
516 assert_eq!(
517 codex_probe
518 .as_ref()
519 .and_then(|v| v.get("path"))
520 .and_then(Value::as_str),
521 Some(codex_dir.to_str().unwrap()),
522 );
523
524 std::fs::remove_dir_all(&codex_dir).unwrap();
526 assert!(CodexCliFactory.probe_default(&env).is_none());
527 assert!(ClaudeCodeFactory.probe_default(&env).is_some());
528 }
529}