1use std::path::{Path, PathBuf};
20
21use chrono::{DateTime, Utc};
22use serde_json::Value;
23use tokio_stream::{Stream, StreamExt};
24
25use crate::{
26 sessions::{IngestEvent, MessageWithParts, SessionWithMessages},
27 wire::ProviderOptions,
28};
29
30mod claude_code;
31mod codex_cli;
32mod discovery;
33pub mod extract;
34mod jsonl;
35mod opencode;
36mod pi_coding_agent;
37
38pub use claude_code::{ClaudeCodeAdapter, ClaudeCodeFactory};
39pub use codex_cli::{CodexCliAdapter, CodexCliFactory};
40pub use discovery::{
41 Candidate, PromptOutcome, discover, persist_accept, persist_decline, probe_unconfigured,
42 prompt_and_persist, prompt_each,
43};
44pub use extract::{
45 Extracted, Source, extract_bool, extract_compact_repr, extract_raw_record, extract_self_str,
46 extract_str, extract_value,
47};
48pub use opencode::{OpencodeAdapter, OpencodeFactory};
49pub use pi_coding_agent::{PiCodingAgentAdapter, PiCodingAgentFactory};
50
51pub trait AdapterFactory: Send + Sync {
55 fn name(&self) -> &'static str;
59
60 fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError>;
66
67 fn probe_default(&self, env: &Env) -> Option<Value>;
73
74 fn serialize(
76 &self,
77 session: &SessionWithMessages,
78 fidelity: RestoreFidelity,
79 ) -> Result<Vec<RestoredFile>, AdapterError>;
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83pub enum RestoreFidelity {
84 Native,
85 Foreign,
86}
87
88#[derive(Debug, Clone, PartialEq, Eq)]
89pub struct RestoredFile {
90 pub relative_path: PathBuf,
91 pub bytes: Vec<u8>,
92 pub actual_fidelity: RestoreFidelity,
99}
100
101impl RestoredFile {
102 pub(crate) fn new(
103 relative_path: impl Into<PathBuf>,
104 bytes: Vec<u8>,
105 actual_fidelity: RestoreFidelity,
106 ) -> Self {
107 Self {
108 relative_path: relative_path.into(),
109 bytes,
110 actual_fidelity,
111 }
112 }
113}
114
115pub trait Adapter: Send + Sync {
119 fn events(&self) -> EventStream<'_> {
124 let stream = self.events_with(&NoopOracle);
125 Box::pin(stream.filter_map(|res| match res {
126 Ok(AdapterYield::Event(event)) => Some(Ok(event)),
127 Ok(AdapterYield::Skipped { .. }) => None,
128 Err(error) => Some(Err(error)),
129 }))
130 }
131
132 fn discover(&self) -> DiscoverFuture<'_>;
139
140 fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a>;
144}
145
146pub trait SkipOracle: Send + Sync {
151 fn last_ingested_at(&self, session_id: &str) -> Option<DateTime<Utc>>;
152
153 fn is_empty(&self) -> bool {
158 false
159 }
160}
161
162#[derive(Debug, Default, Clone, Copy)]
165pub struct NoopOracle;
166
167impl SkipOracle for NoopOracle {
168 fn last_ingested_at(&self, _session_id: &str) -> Option<DateTime<Utc>> {
169 None
170 }
171
172 fn is_empty(&self) -> bool {
173 true
174 }
175}
176
177#[derive(Debug, Clone)]
178pub enum AdapterYield {
179 Event(IngestEvent),
180 Skipped {
181 session_id: Option<String>,
183 project: Option<String>,
184 reason: SkipReason,
185 },
186}
187
188#[derive(Debug, Clone, Copy, PartialEq, Eq)]
189pub enum SkipReason {
190 Fresh,
191 Empty,
195}
196
197pub type AdapterYieldStream<'a> =
198 std::pin::Pin<Box<dyn Stream<Item = Result<AdapterYield, AdapterError>> + Send + 'a>>;
199
200pub type DiscoverFuture<'a> =
204 std::pin::Pin<Box<dyn std::future::Future<Output = Result<usize, AdapterError>> + Send + 'a>>;
205
206pub struct Env {
211 pub home: PathBuf,
212}
213
214impl Env {
215 pub fn from_env() -> Option<Self> {
218 let home = std::env::var_os("HOME")?;
219 Some(Self {
220 home: PathBuf::from(home),
221 })
222 }
223
224 pub fn with_home(home: impl Into<PathBuf>) -> Self {
227 Self { home: home.into() }
228 }
229}
230
231pub type EventStream<'a> =
235 std::pin::Pin<Box<dyn Stream<Item = Result<IngestEvent, AdapterError>> + Send + 'a>>;
236
237#[derive(Debug)]
242pub struct AdapterError {
243 pub adapter: &'static str,
244 pub location: String,
245 pub kind: AdapterErrorKind,
246}
247
248#[derive(Debug)]
249pub enum AdapterErrorKind {
250 Io(std::io::Error),
252 Parse {
254 line: usize,
255 source: serde_json::Error,
256 },
257 Schema(String),
260 Config(String),
262 Transport(String),
264 Auth(String),
266}
267
268impl AdapterError {
269 pub fn io(adapter: &'static str, location: impl Into<String>, source: std::io::Error) -> Self {
270 Self {
271 adapter,
272 location: location.into(),
273 kind: AdapterErrorKind::Io(source),
274 }
275 }
276
277 pub fn parse(
278 adapter: &'static str,
279 location: impl Into<String>,
280 line: usize,
281 source: serde_json::Error,
282 ) -> Self {
283 Self {
284 adapter,
285 location: location.into(),
286 kind: AdapterErrorKind::Parse { line, source },
287 }
288 }
289
290 pub fn schema(
291 adapter: &'static str,
292 location: impl Into<String>,
293 message: impl Into<String>,
294 ) -> Self {
295 Self {
296 adapter,
297 location: location.into(),
298 kind: AdapterErrorKind::Schema(message.into()),
299 }
300 }
301
302 pub fn config(adapter: &'static str, message: impl Into<String>) -> Self {
303 Self {
304 adapter,
305 location: "config".to_owned(),
306 kind: AdapterErrorKind::Config(message.into()),
307 }
308 }
309}
310
311impl std::fmt::Display for AdapterError {
312 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313 match &self.kind {
314 AdapterErrorKind::Io(source) => {
315 write!(
316 formatter,
317 "{} io error at {}: {source}",
318 self.adapter, self.location
319 )
320 }
321 AdapterErrorKind::Parse { line, source } => write!(
322 formatter,
323 "{} json parse error at {}:{line}: {source}",
324 self.adapter, self.location,
325 ),
326 AdapterErrorKind::Schema(message) => {
327 write!(
328 formatter,
329 "{} schema error at {}: {message}",
330 self.adapter, self.location
331 )
332 }
333 AdapterErrorKind::Config(message) => {
334 write!(formatter, "{} config error: {message}", self.adapter)
335 }
336 AdapterErrorKind::Transport(message) => write!(
337 formatter,
338 "{} transport error at {}: {message}",
339 self.adapter, self.location,
340 ),
341 AdapterErrorKind::Auth(message) => {
342 write!(formatter, "{} auth error: {message}", self.adapter)
343 }
344 }
345 }
346}
347
348impl std::error::Error for AdapterError {
349 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
350 match &self.kind {
351 AdapterErrorKind::Io(source) => Some(source),
352 AdapterErrorKind::Parse { source, .. } => Some(source),
353 _ => None,
354 }
355 }
356}
357
358pub fn registry() -> &'static [&'static dyn AdapterFactory] {
362 &[
363 &ClaudeCodeFactory,
364 &CodexCliFactory,
365 &OpencodeFactory,
366 &PiCodingAgentFactory,
367 ]
368}
369
370pub fn by_name(name: &str) -> Option<&'static dyn AdapterFactory> {
373 registry().iter().copied().find(|f| f.name() == name)
374}
375
376pub fn known_names() -> Vec<&'static str> {
379 registry().iter().map(|f| f.name()).collect()
380}
381
382pub fn probe_all(env: &Env) -> Vec<(&'static str, Value)> {
386 registry()
387 .iter()
388 .filter_map(|factory| factory.probe_default(env).map(|cfg| (factory.name(), cfg)))
389 .collect()
390}
391
392pub(crate) fn part_id(message_id: &str, ordinal: usize) -> String {
395 format!("{message_id}:{ordinal:04}")
396}
397
398pub(crate) fn compact_json(value: &Value) -> String {
401 serde_json::to_string(value).unwrap_or_default()
402}
403
404pub(crate) fn jsonl_bytes(
405 adapter: &'static str,
406 records: &[Value],
407) -> Result<Vec<u8>, AdapterError> {
408 let mut bytes = Vec::new();
409 for record in records {
410 let line = serde_json::to_vec(record).map_err(|err| {
411 AdapterError::schema(adapter, "serialize", format!("json encode failed: {err}"))
412 })?;
413 bytes.extend(line);
414 bytes.push(b'\n');
415 }
416 Ok(bytes)
417}
418
419pub(crate) fn config_path(adapter: &'static str, config: Value) -> Result<PathBuf, AdapterError> {
422 use serde::Deserialize;
423 #[derive(Deserialize)]
424 struct Cfg {
425 path: PathBuf,
426 }
427 let cfg: Cfg = serde_json::from_value(config)
428 .map_err(|err| AdapterError::config(adapter, format!("bad config blob: {err}")))?;
429 Ok(match std::env::var_os("HOME") {
430 Some(home) => crate::config::expand_home_under(&cfg.path, Path::new(&home)),
431 None => cfg.path,
432 })
433}
434
435pub(crate) fn raw_record(options: &ProviderOptions) -> Option<Value> {
436 options
437 .get("source")
438 .and_then(|source| source.get("raw_record"))
439 .cloned()
440}
441
442pub(crate) fn source_options(adapter: &'static str, raw: &Value) -> ProviderOptions {
448 let mut options = ProviderOptions::new();
449 options.insert(
450 "source".to_owned(),
451 serde_json::json!({
452 "adapter": adapter,
453 "raw_record": extract_raw_record(raw),
454 }),
455 );
456 options
457}
458
459#[inline]
463pub(crate) fn part_ordinal(ordinal: usize) -> i32 {
464 i32::try_from(ordinal).unwrap_or(i32::MAX)
465}
466
467pub(crate) fn validate_path_id(
473 adapter: &'static str,
474 kind: &str,
475 id: &str,
476 location: impl Into<String>,
477) -> Result<(), AdapterError> {
478 if id.is_empty()
479 || id.contains('/')
480 || id.contains('\\')
481 || id.contains("..")
482 || std::path::Path::new(id).is_absolute()
483 {
484 return Err(AdapterError::schema(
485 adapter,
486 location,
487 format!("{kind} contains a path separator or traversal marker: {id}"),
488 ));
489 }
490 Ok(())
491}
492
493#[allow(dead_code)]
503pub(crate) fn write_restored_files(
504 root: &Path,
505 files: Vec<RestoredFile>,
506) -> Result<(), AdapterError> {
507 let parent = root.parent().unwrap_or_else(|| Path::new("."));
510 let stem = root
511 .file_name()
512 .and_then(|n| n.to_str())
513 .unwrap_or("restore");
514 let staging = parent.join(format!(".{stem}.tmp"));
515 let io =
516 |location: String, source: std::io::Error| AdapterError::io("restore", location, source);
517 let _ = std::fs::remove_dir_all(&staging);
518 std::fs::create_dir_all(&staging).map_err(|e| io(staging.display().to_string(), e))?;
519
520 let result = (|| -> Result<(), AdapterError> {
521 for file in files {
522 write_one_into_staging(&staging, &file)?;
523 }
524 Ok(())
525 })();
526
527 if let Err(error) = result {
528 let _ = std::fs::remove_dir_all(&staging);
529 return Err(error);
530 }
531
532 let _ = std::fs::remove_dir_all(root);
534 if let Some(parent) = root.parent()
535 && !parent.as_os_str().is_empty()
536 {
537 std::fs::create_dir_all(parent).map_err(|e| io(parent.display().to_string(), e))?;
538 }
539 std::fs::rename(&staging, root).map_err(|e| {
540 let _ = std::fs::remove_dir_all(&staging);
541 io(root.display().to_string(), e)
542 })?;
543 Ok(())
544}
545
546#[allow(dead_code)]
547fn write_one_into_staging(staging: &Path, file: &RestoredFile) -> Result<(), AdapterError> {
548 for component in file.relative_path.components() {
550 use std::path::Component;
551 let segment = match component {
552 Component::Normal(s) => s,
553 Component::CurDir => continue,
554 _ => {
557 return Err(AdapterError::schema(
558 "restore",
559 file.relative_path.display().to_string(),
560 "relative_path component is not a normal name",
561 ));
562 }
563 };
564 let Some(text) = segment.to_str() else {
565 return Err(AdapterError::schema(
566 "restore",
567 file.relative_path.display().to_string(),
568 "relative_path segment is not UTF-8",
569 ));
570 };
571 validate_path_id(
572 "restore",
573 "relative_path segment",
574 text,
575 file.relative_path.display().to_string(),
576 )?;
577 }
578
579 let dest = staging.join(&file.relative_path);
580 if !dest.starts_with(staging) {
583 return Err(AdapterError::schema(
584 "restore",
585 file.relative_path.display().to_string(),
586 "relative_path escaped the restore root after join",
587 ));
588 }
589 let io =
590 |location: String, source: std::io::Error| AdapterError::io("restore", location, source);
591 if let Some(parent) = dest.parent() {
592 std::fs::create_dir_all(parent).map_err(|e| io(parent.display().to_string(), e))?;
593 }
594 std::fs::write(&dest, &file.bytes).map_err(|e| io(dest.display().to_string(), e))?;
595 Ok(())
596}
597
598pub(crate) fn extracted_text(value: &Option<Extracted<String>>) -> &str {
599 value.as_deref().map(String::as_str).unwrap_or("")
600}
601
602pub(crate) fn by_timestamp_then_id(
605 left: &MessageWithParts,
606 right: &MessageWithParts,
607) -> std::cmp::Ordering {
608 left.message
609 .timestamp()
610 .cmp(&right.message.timestamp())
611 .then_with(|| left.message.id().cmp(right.message.id()))
612}
613
614#[inline]
617pub(crate) fn empty_options() -> ProviderOptions {
618 ProviderOptions::new()
619}
620
621#[cfg(test)]
622pub(crate) mod test_support {
623 use std::{
624 collections::BTreeSet,
625 path::{Path, PathBuf},
626 };
627
628 use serde_json::Value;
629 use tempfile::TempDir;
630
631 use super::{Adapter, AdapterFactory, Env, NoopOracle, RestoreFidelity};
632 use crate::{handlers::ingest_adapter, sessions::Store};
633
634 pub(crate) fn assert_probe_default(
640 factory: &dyn AdapterFactory,
641 expected_subpath: &[&str],
642 ) -> anyhow::Result<()> {
643 let temp = TempDir::new()?;
644 let mut expected = temp.path().to_path_buf();
645 for segment in expected_subpath {
646 expected.push(segment);
647 }
648 std::fs::create_dir_all(&expected)?;
649 let env = Env::with_home(temp.path());
650
651 let probe = factory.probe_default(&env);
652 let got = probe
653 .as_ref()
654 .and_then(|value| value.get("path"))
655 .and_then(Value::as_str);
656 anyhow::ensure!(
657 got == expected.to_str(),
658 "factory must probe its install path: got {got:?}, expected {expected:?}",
659 );
660
661 std::fs::remove_dir_all(&expected)?;
662 anyhow::ensure!(
663 factory.probe_default(&env).is_none(),
664 "probe_default must be None once the install path disappears",
665 );
666 Ok(())
667 }
668
669 pub(crate) async fn assert_native_restore(
670 factory: &dyn AdapterFactory,
671 adapter: &dyn Adapter,
672 source_root: &Path,
673 ) -> anyhow::Result<()> {
674 let temp = TempDir::new()?;
675 let store = Store::open_local(temp.path()).await?;
676 ingest_adapter(&store, adapter, &NoopOracle, |_| {}).await?;
677 let session_ids = store.session_ids().await?;
678 assert!(
679 !session_ids.is_empty(),
680 "native restore fixture must ingest at least one session",
681 );
682
683 let mut restored_paths = BTreeSet::new();
684 for session_id in session_ids {
685 let Some(session) = store.get_session(&session_id).await? else {
686 anyhow::bail!("session id listed by store was not readable: {session_id}");
687 };
688 let restored = factory.serialize(&session, RestoreFidelity::Native)?;
689 for file in restored {
690 let expected = source_root.join(&file.relative_path);
691 let expected_bytes = std::fs::read(&expected)
692 .map_err(|err| anyhow::anyhow!("read {}: {err}", expected.display()))?;
693 assert_json_file_equal(&expected, &expected_bytes, &file.bytes)?;
694 restored_paths.insert(file.relative_path);
695 }
696 }
697 assert_eq!(
698 restored_paths,
699 source_json_files(source_root)?,
700 "native restore must emit exactly the source JSON/JSONL file set",
701 );
702 Ok(())
703 }
704
705 fn source_json_files(root: &Path) -> anyhow::Result<BTreeSet<PathBuf>> {
706 let mut out = BTreeSet::new();
707 collect_source_json_files(root, root, &mut out)?;
708 Ok(out)
709 }
710
711 fn collect_source_json_files(
712 root: &Path,
713 dir: &Path,
714 out: &mut BTreeSet<PathBuf>,
715 ) -> anyhow::Result<()> {
716 for entry in std::fs::read_dir(dir)? {
717 let entry = entry?;
718 let path = entry.path();
719 if entry.file_type()?.is_dir() {
720 collect_source_json_files(root, &path, out)?;
721 continue;
722 }
723 if let Some("json" | "jsonl") = path.extension().and_then(|ext| ext.to_str()) {
724 out.insert(path.strip_prefix(root)?.to_path_buf());
725 }
726 }
727 Ok(())
728 }
729
730 fn assert_json_file_equal(path: &Path, expected: &[u8], actual: &[u8]) -> anyhow::Result<()> {
731 if path.extension().and_then(|ext| ext.to_str()) == Some("jsonl") {
732 let expected_lines = json_lines(expected)?;
733 let actual_lines = json_lines(actual)?;
734 assert_eq!(
735 actual_lines,
736 expected_lines,
737 "jsonl mismatch at {}",
738 path.display()
739 );
740 } else {
741 let expected_value: serde_json::Value = serde_json::from_slice(expected)?;
742 let actual_value: serde_json::Value = serde_json::from_slice(actual)?;
743 assert_eq!(
744 actual_value,
745 expected_value,
746 "json mismatch at {}",
747 path.display()
748 );
749 }
750 Ok(())
751 }
752
753 fn json_lines(bytes: &[u8]) -> anyhow::Result<Vec<serde_json::Value>> {
754 let text = std::str::from_utf8(bytes)?;
755 text.lines()
756 .filter(|line| !line.trim().is_empty())
757 .map(|line| serde_json::from_str(line).map_err(Into::into))
758 .collect()
759 }
760}