1use std::path::{Path, PathBuf};
20
21use serde_json::Value;
22use tokio_stream::{Stream, StreamExt};
23
24use crate::{
25 sessions::{IngestEvent, MessageWithParts, SessionWithMessages},
26 wire::ProviderOptions,
27};
28
29mod claude_ai_export;
30mod claude_code;
31mod claude_desktop_app;
32mod codex_cli;
33mod discovery;
34pub mod extract;
35mod jsonl;
36mod opencode;
37mod pi_coding_agent;
38
39pub use claude_ai_export::{ClaudeAiExportAdapter, ClaudeAiExportFactory};
40pub use claude_code::{ClaudeCodeAdapter, ClaudeCodeFactory};
41pub use claude_desktop_app::{ClaudeDesktopAppAdapter, ClaudeDesktopAppFactory};
42pub use codex_cli::{CodexCliAdapter, CodexCliFactory};
43pub use discovery::{
44 Candidate, apply_to_doc, discover, persist_accept, probe_unconfigured, prompt_and_persist,
45 set_adapter_enabled,
46};
47pub use extract::{
48 Extracted, Source, extract_bool, extract_compact_repr, extract_raw_record, extract_self_str,
49 extract_str, extract_value,
50};
51pub use opencode::{OpencodeAdapter, OpencodeFactory};
52pub use pi_coding_agent::{PiCodingAgentAdapter, PiCodingAgentFactory};
53
54pub trait AdapterFactory: Send + Sync {
58 fn name(&self) -> &'static str;
62
63 fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError>;
69
70 fn probe_default(&self, env: &Env) -> Option<Value>;
76
77 fn serialize(
79 &self,
80 session: &SessionWithMessages,
81 fidelity: RestoreFidelity,
82 ) -> Result<Vec<RestoredFile>, AdapterError>;
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum RestoreFidelity {
87 Native,
88 Foreign,
89}
90
91#[derive(Debug, Clone, PartialEq, Eq)]
92pub struct RestoredFile {
93 pub relative_path: PathBuf,
94 pub bytes: Vec<u8>,
95 pub actual_fidelity: RestoreFidelity,
102}
103
104impl RestoredFile {
105 pub(crate) fn new(
106 relative_path: impl Into<PathBuf>,
107 bytes: Vec<u8>,
108 actual_fidelity: RestoreFidelity,
109 ) -> Self {
110 Self {
111 relative_path: relative_path.into(),
112 bytes,
113 actual_fidelity,
114 }
115 }
116}
117
118pub trait Adapter: Send + Sync {
122 fn events(&self) -> EventStream<'_> {
127 let stream = self.events_with(&NoopOracle);
128 Box::pin(stream.filter_map(|res| match res {
129 Ok(AdapterYield::Event(event)) => Some(Ok(event)),
130 Ok(AdapterYield::Skipped { .. } | AdapterYield::SkippedBatch { .. }) => None,
131 Err(error) => Some(Err(error)),
132 }))
133 }
134
135 fn discover(&self) -> DiscoverFuture<'_>;
142
143 fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a>;
147}
148
149pub trait SkipOracle: Send + Sync {
162 fn session_max_ts(&self, session_id: &str) -> Option<i64>;
163
164 fn is_empty(&self) -> bool {
168 false
169 }
170}
171
172pub fn is_session_fresh(
177 oracle: &dyn SkipOracle,
178 session_id: &str,
179 source_last_ts_micros: Option<i64>,
180) -> bool {
181 matches!(
182 (oracle.session_max_ts(session_id), source_last_ts_micros),
183 (Some(stored), Some(source)) if source <= stored
184 )
185}
186
187#[derive(Debug, Default, Clone, Copy)]
190pub struct NoopOracle;
191
192impl SkipOracle for NoopOracle {
193 fn session_max_ts(&self, _session_id: &str) -> Option<i64> {
194 None
195 }
196
197 fn is_empty(&self) -> bool {
198 true
199 }
200}
201
202#[derive(Debug, Clone)]
203pub enum AdapterYield {
204 Event(IngestEvent),
205 Skipped {
206 session_id: Option<String>,
208 project: Option<String>,
209 reason: SkipReason,
210 },
211 SkippedBatch {
214 reason: SkipReason,
215 count: usize,
216 },
217}
218
219#[derive(Debug, Clone, PartialEq, Eq)]
220pub enum SkipReason {
221 Fresh,
222 Empty,
226 Unsupported(String),
232}
233
234pub type AdapterYieldStream<'a> =
235 std::pin::Pin<Box<dyn Stream<Item = Result<AdapterYield, AdapterError>> + Send + 'a>>;
236
237pub type DiscoverFuture<'a> =
241 std::pin::Pin<Box<dyn std::future::Future<Output = Result<usize, AdapterError>> + Send + 'a>>;
242
243pub struct Env {
248 pub home: PathBuf,
249}
250
251impl Env {
252 pub fn from_env() -> Option<Self> {
255 let home = std::env::var_os("HOME")?;
256 Some(Self {
257 home: PathBuf::from(home),
258 })
259 }
260
261 pub fn with_home(home: impl Into<PathBuf>) -> Self {
264 Self { home: home.into() }
265 }
266}
267
268pub type EventStream<'a> =
272 std::pin::Pin<Box<dyn Stream<Item = Result<IngestEvent, AdapterError>> + Send + 'a>>;
273
274#[derive(Debug)]
279pub struct AdapterError {
280 pub adapter: &'static str,
281 pub location: String,
282 pub kind: AdapterErrorKind,
283}
284
285#[derive(Debug)]
286pub enum AdapterErrorKind {
287 Io(std::io::Error),
289 Parse {
291 line: usize,
292 source: serde_json::Error,
293 },
294 Schema(String),
297 Config(String),
299 Transport(String),
301 Auth(String),
303}
304
305impl AdapterError {
306 pub fn io(adapter: &'static str, location: impl Into<String>, source: std::io::Error) -> Self {
307 Self {
308 adapter,
309 location: location.into(),
310 kind: AdapterErrorKind::Io(source),
311 }
312 }
313
314 pub fn parse(
315 adapter: &'static str,
316 location: impl Into<String>,
317 line: usize,
318 source: serde_json::Error,
319 ) -> Self {
320 Self {
321 adapter,
322 location: location.into(),
323 kind: AdapterErrorKind::Parse { line, source },
324 }
325 }
326
327 pub fn schema(
328 adapter: &'static str,
329 location: impl Into<String>,
330 message: impl Into<String>,
331 ) -> Self {
332 Self {
333 adapter,
334 location: location.into(),
335 kind: AdapterErrorKind::Schema(message.into()),
336 }
337 }
338
339 pub fn config(adapter: &'static str, message: impl Into<String>) -> Self {
340 Self {
341 adapter,
342 location: "config".to_owned(),
343 kind: AdapterErrorKind::Config(message.into()),
344 }
345 }
346}
347
348impl std::fmt::Display for AdapterError {
349 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
350 match &self.kind {
351 AdapterErrorKind::Io(source) => {
352 write!(
353 formatter,
354 "{} io error at {}: {source}",
355 self.adapter, self.location
356 )
357 }
358 AdapterErrorKind::Parse { line, source } => write!(
359 formatter,
360 "{} json parse error at {}:{line}: {source}",
361 self.adapter, self.location,
362 ),
363 AdapterErrorKind::Schema(message) => {
364 write!(
365 formatter,
366 "{} schema error at {}: {message}",
367 self.adapter, self.location
368 )
369 }
370 AdapterErrorKind::Config(message) => {
371 write!(formatter, "{} config error: {message}", self.adapter)
372 }
373 AdapterErrorKind::Transport(message) => write!(
374 formatter,
375 "{} transport error at {}: {message}",
376 self.adapter, self.location,
377 ),
378 AdapterErrorKind::Auth(message) => {
379 write!(formatter, "{} auth error: {message}", self.adapter)
380 }
381 }
382 }
383}
384
385impl std::error::Error for AdapterError {
386 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
387 match &self.kind {
388 AdapterErrorKind::Io(source) => Some(source),
389 AdapterErrorKind::Parse { source, .. } => Some(source),
390 _ => None,
391 }
392 }
393}
394
395pub fn registry() -> &'static [&'static dyn AdapterFactory] {
399 &[
400 &ClaudeCodeFactory,
401 &ClaudeDesktopAppFactory,
402 &ClaudeAiExportFactory,
403 &CodexCliFactory,
404 &OpencodeFactory,
405 &PiCodingAgentFactory,
406 ]
407}
408
409pub fn by_name(name: &str) -> Option<&'static dyn AdapterFactory> {
412 registry().iter().copied().find(|f| f.name() == name)
413}
414
415pub fn known_names() -> Vec<&'static str> {
418 registry().iter().map(|f| f.name()).collect()
419}
420
421pub fn probe_all(env: &Env) -> Vec<(&'static str, Value)> {
425 registry()
426 .iter()
427 .filter_map(|factory| factory.probe_default(env).map(|cfg| (factory.name(), cfg)))
428 .collect()
429}
430
431pub(crate) fn part_id(message_id: &str, ordinal: usize) -> String {
434 format!("{message_id}:{ordinal:04}")
435}
436
437pub(crate) fn compact_json(value: &Value) -> String {
440 serde_json::to_string(value).unwrap_or_default()
441}
442
443pub(crate) fn jsonl_bytes(
444 adapter: &'static str,
445 records: &[Value],
446) -> Result<Vec<u8>, AdapterError> {
447 let mut bytes = Vec::new();
448 for record in records {
449 let line = serde_json::to_vec(record).map_err(|err| {
450 AdapterError::schema(adapter, "serialize", format!("json encode failed: {err}"))
451 })?;
452 bytes.extend(line);
453 bytes.push(b'\n');
454 }
455 Ok(bytes)
456}
457
458pub(crate) fn config_path(adapter: &'static str, config: Value) -> Result<PathBuf, AdapterError> {
461 use serde::Deserialize;
462 #[derive(Deserialize)]
463 struct Cfg {
464 path: PathBuf,
465 }
466 let cfg: Cfg = serde_json::from_value(config)
467 .map_err(|err| AdapterError::config(adapter, format!("bad config blob: {err}")))?;
468 Ok(match std::env::var_os("HOME") {
469 Some(home) => crate::config::expand_home_under(&cfg.path, Path::new(&home)),
470 None => cfg.path,
471 })
472}
473
474pub(crate) fn raw_record(options: &ProviderOptions) -> Option<Value> {
475 options
476 .get("source")
477 .and_then(|source| source.get("raw_record"))
478 .cloned()
479}
480
481pub(crate) fn source_options(adapter: &'static str, raw: &Value) -> ProviderOptions {
487 let mut options = ProviderOptions::new();
488 options.insert(
489 "source".to_owned(),
490 serde_json::json!({
491 "adapter": adapter,
492 "raw_record": extract_raw_record(raw),
493 }),
494 );
495 options
496}
497
498#[inline]
502pub(crate) fn part_ordinal(ordinal: usize) -> i32 {
503 i32::try_from(ordinal).unwrap_or(i32::MAX)
504}
505
506pub(crate) fn validate_path_id(
512 adapter: &'static str,
513 kind: &str,
514 id: &str,
515 location: impl Into<String>,
516) -> Result<(), AdapterError> {
517 if id.is_empty()
518 || id.contains('/')
519 || id.contains('\\')
520 || id.contains("..")
521 || std::path::Path::new(id).is_absolute()
522 {
523 return Err(AdapterError::schema(
524 adapter,
525 location,
526 format!("{kind} contains a path separator or traversal marker: {id}"),
527 ));
528 }
529 Ok(())
530}
531
532#[allow(dead_code)]
542pub(crate) fn write_restored_files(
543 root: &Path,
544 files: Vec<RestoredFile>,
545) -> Result<(), AdapterError> {
546 let parent = root.parent().unwrap_or_else(|| Path::new("."));
549 let stem = root
550 .file_name()
551 .and_then(|n| n.to_str())
552 .unwrap_or("restore");
553 let staging = parent.join(format!(".{stem}.tmp"));
554 let io =
555 |location: String, source: std::io::Error| AdapterError::io("restore", location, source);
556 let _ = std::fs::remove_dir_all(&staging);
557 std::fs::create_dir_all(&staging).map_err(|e| io(staging.display().to_string(), e))?;
558
559 let result = (|| -> Result<(), AdapterError> {
560 for file in files {
561 write_one_into_staging(&staging, &file)?;
562 }
563 Ok(())
564 })();
565
566 if let Err(error) = result {
567 let _ = std::fs::remove_dir_all(&staging);
568 return Err(error);
569 }
570
571 let _ = std::fs::remove_dir_all(root);
573 if let Some(parent) = root.parent()
574 && !parent.as_os_str().is_empty()
575 {
576 std::fs::create_dir_all(parent).map_err(|e| io(parent.display().to_string(), e))?;
577 }
578 std::fs::rename(&staging, root).map_err(|e| {
579 let _ = std::fs::remove_dir_all(&staging);
580 io(root.display().to_string(), e)
581 })?;
582 Ok(())
583}
584
585#[allow(dead_code)]
586fn write_one_into_staging(staging: &Path, file: &RestoredFile) -> Result<(), AdapterError> {
587 for component in file.relative_path.components() {
589 use std::path::Component;
590 let segment = match component {
591 Component::Normal(s) => s,
592 Component::CurDir => continue,
593 _ => {
596 return Err(AdapterError::schema(
597 "restore",
598 file.relative_path.display().to_string(),
599 "relative_path component is not a normal name",
600 ));
601 }
602 };
603 let Some(text) = segment.to_str() else {
604 return Err(AdapterError::schema(
605 "restore",
606 file.relative_path.display().to_string(),
607 "relative_path segment is not UTF-8",
608 ));
609 };
610 validate_path_id(
611 "restore",
612 "relative_path segment",
613 text,
614 file.relative_path.display().to_string(),
615 )?;
616 }
617
618 let dest = staging.join(&file.relative_path);
619 if !dest.starts_with(staging) {
622 return Err(AdapterError::schema(
623 "restore",
624 file.relative_path.display().to_string(),
625 "relative_path escaped the restore root after join",
626 ));
627 }
628 let io =
629 |location: String, source: std::io::Error| AdapterError::io("restore", location, source);
630 if let Some(parent) = dest.parent() {
631 std::fs::create_dir_all(parent).map_err(|e| io(parent.display().to_string(), e))?;
632 }
633 std::fs::write(&dest, &file.bytes).map_err(|e| io(dest.display().to_string(), e))?;
634 Ok(())
635}
636
637pub(crate) fn extracted_text(value: &Option<Extracted<String>>) -> &str {
638 value.as_deref().map(String::as_str).unwrap_or("")
639}
640
641pub(crate) fn by_timestamp_then_id(
644 left: &MessageWithParts,
645 right: &MessageWithParts,
646) -> std::cmp::Ordering {
647 left.message
648 .timestamp()
649 .cmp(&right.message.timestamp())
650 .then_with(|| left.message.id().cmp(right.message.id()))
651}
652
653#[inline]
656pub(crate) fn empty_options() -> ProviderOptions {
657 ProviderOptions::new()
658}
659
660#[cfg(test)]
661pub(crate) mod test_support {
662 use std::{
663 collections::BTreeSet,
664 path::{Path, PathBuf},
665 };
666
667 use serde_json::Value;
668 use tempfile::TempDir;
669
670 use super::{Adapter, AdapterFactory, Env, NoopOracle, RestoreFidelity};
671 use crate::{handlers::ingest_adapter, sessions::Store};
672
673 pub(crate) fn assert_probe_default(
679 factory: &dyn AdapterFactory,
680 expected_subpath: &[&str],
681 ) -> anyhow::Result<()> {
682 let temp = TempDir::new()?;
683 let mut expected = temp.path().to_path_buf();
684 for segment in expected_subpath {
685 expected.push(segment);
686 }
687 std::fs::create_dir_all(&expected)?;
688 let env = Env::with_home(temp.path());
689
690 let probe = factory.probe_default(&env);
691 let got = probe
692 .as_ref()
693 .and_then(|value| value.get("path"))
694 .and_then(Value::as_str);
695 anyhow::ensure!(
696 got == expected.to_str(),
697 "factory must probe its install path: got {got:?}, expected {expected:?}",
698 );
699
700 std::fs::remove_dir_all(&expected)?;
701 anyhow::ensure!(
702 factory.probe_default(&env).is_none(),
703 "probe_default must be None once the install path disappears",
704 );
705 Ok(())
706 }
707
708 pub(crate) async fn assert_native_restore(
709 factory: &dyn AdapterFactory,
710 adapter: &dyn Adapter,
711 source_root: &Path,
712 ) -> anyhow::Result<()> {
713 let temp = TempDir::new()?;
714 let store = Store::open_local(temp.path()).await?;
715 ingest_adapter(&store, adapter, &NoopOracle, |_| {}).await?;
716 let session_ids = store.session_ids().await?;
717 assert!(
718 !session_ids.is_empty(),
719 "native restore fixture must ingest at least one session",
720 );
721
722 let mut restored_paths = BTreeSet::new();
723 for session_id in session_ids {
724 let Some(session) = store.get_session(&session_id).await? else {
725 anyhow::bail!("session id listed by store was not readable: {session_id}");
726 };
727 let restored = factory.serialize(&session, RestoreFidelity::Native)?;
728 for file in restored {
729 let expected = source_root.join(&file.relative_path);
730 let expected_bytes = std::fs::read(&expected)
731 .map_err(|err| anyhow::anyhow!("read {}: {err}", expected.display()))?;
732 assert_json_file_equal(&expected, &expected_bytes, &file.bytes)?;
733 restored_paths.insert(file.relative_path);
734 }
735 }
736 assert_eq!(
737 restored_paths,
738 source_json_files(source_root)?,
739 "native restore must emit exactly the source JSON/JSONL file set",
740 );
741 Ok(())
742 }
743
744 fn source_json_files(root: &Path) -> anyhow::Result<BTreeSet<PathBuf>> {
745 let mut out = BTreeSet::new();
746 collect_source_json_files(root, root, &mut out)?;
747 Ok(out)
748 }
749
750 fn collect_source_json_files(
751 root: &Path,
752 dir: &Path,
753 out: &mut BTreeSet<PathBuf>,
754 ) -> anyhow::Result<()> {
755 for entry in std::fs::read_dir(dir)? {
756 let entry = entry?;
757 let path = entry.path();
758 if entry.file_type()?.is_dir() {
759 collect_source_json_files(root, &path, out)?;
760 continue;
761 }
762 if let Some("json" | "jsonl") = path.extension().and_then(|ext| ext.to_str()) {
763 out.insert(path.strip_prefix(root)?.to_path_buf());
764 }
765 }
766 Ok(())
767 }
768
769 fn assert_json_file_equal(path: &Path, expected: &[u8], actual: &[u8]) -> anyhow::Result<()> {
770 if path.extension().and_then(|ext| ext.to_str()) == Some("jsonl") {
771 let expected_lines = json_lines(expected)?;
772 let actual_lines = json_lines(actual)?;
773 assert_eq!(
774 actual_lines,
775 expected_lines,
776 "jsonl mismatch at {}",
777 path.display()
778 );
779 } else {
780 let expected_value: serde_json::Value = serde_json::from_slice(expected)?;
781 let actual_value: serde_json::Value = serde_json::from_slice(actual)?;
782 assert_eq!(
783 actual_value,
784 expected_value,
785 "json mismatch at {}",
786 path.display()
787 );
788 }
789 Ok(())
790 }
791
792 fn json_lines(bytes: &[u8]) -> anyhow::Result<Vec<serde_json::Value>> {
793 let text = std::str::from_utf8(bytes)?;
794 text.lines()
795 .filter(|line| !line.trim().is_empty())
796 .map(|line| serde_json::from_str(line).map_err(Into::into))
797 .collect()
798 }
799}