1use std::path::{Path, PathBuf};
20
21use chrono::{DateTime, Utc};
22use serde_json::Value;
23use tokio_stream::{Stream, StreamExt};
24
25use crate::{
26 sessions::{IngestEvent, MessageWithParts, SessionWithMessages},
27 wire::ProviderOptions,
28};
29
30mod claude_code;
31mod codex_cli;
32mod discovery;
33pub mod extract;
34mod jsonl;
35mod opencode;
36mod pi_coding_agent;
37
38pub use claude_code::{ClaudeCodeAdapter, ClaudeCodeFactory};
39pub use codex_cli::{CodexCliAdapter, CodexCliFactory};
40pub use discovery::{
41 Candidate, PromptOutcome, discover, persist_accept, persist_decline, probe_unconfigured,
42 prompt_and_persist, prompt_each,
43};
44pub use extract::{
45 Extracted, Source, extract_bool, extract_compact_repr, extract_raw_record, extract_self_str,
46 extract_str, extract_value,
47};
48pub use opencode::{OpencodeAdapter, OpencodeFactory};
49pub use pi_coding_agent::{PiCodingAgentAdapter, PiCodingAgentFactory};
50
51pub trait AdapterFactory: Send + Sync {
55 fn name(&self) -> &'static str;
59
60 fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError>;
66
67 fn probe_default(&self, env: &Env) -> Option<Value>;
73
74 fn serialize(
76 &self,
77 session: &SessionWithMessages,
78 fidelity: RestoreFidelity,
79 ) -> Result<Vec<RestoredFile>, AdapterError>;
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83pub enum RestoreFidelity {
84 Native,
85 Foreign,
86}
87
88#[derive(Debug, Clone, PartialEq, Eq)]
89pub struct RestoredFile {
90 pub relative_path: PathBuf,
91 pub bytes: Vec<u8>,
92 pub actual_fidelity: RestoreFidelity,
99}
100
101impl RestoredFile {
102 pub(crate) fn new(
103 relative_path: impl Into<PathBuf>,
104 bytes: Vec<u8>,
105 actual_fidelity: RestoreFidelity,
106 ) -> Self {
107 Self {
108 relative_path: relative_path.into(),
109 bytes,
110 actual_fidelity,
111 }
112 }
113}
114
115pub trait Adapter: Send + Sync {
119 fn events(&self) -> EventStream<'_> {
124 let stream = self.events_with(&NoopOracle);
125 Box::pin(stream.filter_map(|res| match res {
126 Ok(AdapterYield::Event(event)) => Some(Ok(event)),
127 Ok(AdapterYield::Skipped { .. }) => None,
128 Err(error) => Some(Err(error)),
129 }))
130 }
131
132 fn discover(&self) -> DiscoverFuture<'_>;
139
140 fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a>;
144}
145
146pub trait SkipOracle: Send + Sync {
151 fn last_ingested_at(&self, session_id: &str) -> Option<DateTime<Utc>>;
152
153 fn is_empty(&self) -> bool {
158 false
159 }
160}
161
162#[derive(Debug, Default, Clone, Copy)]
165pub struct NoopOracle;
166
167impl SkipOracle for NoopOracle {
168 fn last_ingested_at(&self, _session_id: &str) -> Option<DateTime<Utc>> {
169 None
170 }
171
172 fn is_empty(&self) -> bool {
173 true
174 }
175}
176
177#[derive(Debug, Clone)]
178pub enum AdapterYield {
179 Event(IngestEvent),
180 Skipped {
181 session_id: Option<String>,
183 project: Option<String>,
184 reason: SkipReason,
185 },
186}
187
188#[derive(Debug, Clone, PartialEq, Eq)]
189pub enum SkipReason {
190 Fresh,
191 Empty,
195 Unsupported(String),
201}
202
203pub type AdapterYieldStream<'a> =
204 std::pin::Pin<Box<dyn Stream<Item = Result<AdapterYield, AdapterError>> + Send + 'a>>;
205
206pub type DiscoverFuture<'a> =
210 std::pin::Pin<Box<dyn std::future::Future<Output = Result<usize, AdapterError>> + Send + 'a>>;
211
212pub struct Env {
217 pub home: PathBuf,
218}
219
220impl Env {
221 pub fn from_env() -> Option<Self> {
224 let home = std::env::var_os("HOME")?;
225 Some(Self {
226 home: PathBuf::from(home),
227 })
228 }
229
230 pub fn with_home(home: impl Into<PathBuf>) -> Self {
233 Self { home: home.into() }
234 }
235}
236
237pub type EventStream<'a> =
241 std::pin::Pin<Box<dyn Stream<Item = Result<IngestEvent, AdapterError>> + Send + 'a>>;
242
243#[derive(Debug)]
248pub struct AdapterError {
249 pub adapter: &'static str,
250 pub location: String,
251 pub kind: AdapterErrorKind,
252}
253
254#[derive(Debug)]
255pub enum AdapterErrorKind {
256 Io(std::io::Error),
258 Parse {
260 line: usize,
261 source: serde_json::Error,
262 },
263 Schema(String),
266 Config(String),
268 Transport(String),
270 Auth(String),
272}
273
274impl AdapterError {
275 pub fn io(adapter: &'static str, location: impl Into<String>, source: std::io::Error) -> Self {
276 Self {
277 adapter,
278 location: location.into(),
279 kind: AdapterErrorKind::Io(source),
280 }
281 }
282
283 pub fn parse(
284 adapter: &'static str,
285 location: impl Into<String>,
286 line: usize,
287 source: serde_json::Error,
288 ) -> Self {
289 Self {
290 adapter,
291 location: location.into(),
292 kind: AdapterErrorKind::Parse { line, source },
293 }
294 }
295
296 pub fn schema(
297 adapter: &'static str,
298 location: impl Into<String>,
299 message: impl Into<String>,
300 ) -> Self {
301 Self {
302 adapter,
303 location: location.into(),
304 kind: AdapterErrorKind::Schema(message.into()),
305 }
306 }
307
308 pub fn config(adapter: &'static str, message: impl Into<String>) -> Self {
309 Self {
310 adapter,
311 location: "config".to_owned(),
312 kind: AdapterErrorKind::Config(message.into()),
313 }
314 }
315}
316
317impl std::fmt::Display for AdapterError {
318 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
319 match &self.kind {
320 AdapterErrorKind::Io(source) => {
321 write!(
322 formatter,
323 "{} io error at {}: {source}",
324 self.adapter, self.location
325 )
326 }
327 AdapterErrorKind::Parse { line, source } => write!(
328 formatter,
329 "{} json parse error at {}:{line}: {source}",
330 self.adapter, self.location,
331 ),
332 AdapterErrorKind::Schema(message) => {
333 write!(
334 formatter,
335 "{} schema error at {}: {message}",
336 self.adapter, self.location
337 )
338 }
339 AdapterErrorKind::Config(message) => {
340 write!(formatter, "{} config error: {message}", self.adapter)
341 }
342 AdapterErrorKind::Transport(message) => write!(
343 formatter,
344 "{} transport error at {}: {message}",
345 self.adapter, self.location,
346 ),
347 AdapterErrorKind::Auth(message) => {
348 write!(formatter, "{} auth error: {message}", self.adapter)
349 }
350 }
351 }
352}
353
354impl std::error::Error for AdapterError {
355 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
356 match &self.kind {
357 AdapterErrorKind::Io(source) => Some(source),
358 AdapterErrorKind::Parse { source, .. } => Some(source),
359 _ => None,
360 }
361 }
362}
363
364pub fn registry() -> &'static [&'static dyn AdapterFactory] {
368 &[
369 &ClaudeCodeFactory,
370 &CodexCliFactory,
371 &OpencodeFactory,
372 &PiCodingAgentFactory,
373 ]
374}
375
376pub fn by_name(name: &str) -> Option<&'static dyn AdapterFactory> {
379 registry().iter().copied().find(|f| f.name() == name)
380}
381
382pub fn known_names() -> Vec<&'static str> {
385 registry().iter().map(|f| f.name()).collect()
386}
387
388pub fn probe_all(env: &Env) -> Vec<(&'static str, Value)> {
392 registry()
393 .iter()
394 .filter_map(|factory| factory.probe_default(env).map(|cfg| (factory.name(), cfg)))
395 .collect()
396}
397
398pub(crate) fn part_id(message_id: &str, ordinal: usize) -> String {
401 format!("{message_id}:{ordinal:04}")
402}
403
404pub(crate) fn compact_json(value: &Value) -> String {
407 serde_json::to_string(value).unwrap_or_default()
408}
409
410pub(crate) fn jsonl_bytes(
411 adapter: &'static str,
412 records: &[Value],
413) -> Result<Vec<u8>, AdapterError> {
414 let mut bytes = Vec::new();
415 for record in records {
416 let line = serde_json::to_vec(record).map_err(|err| {
417 AdapterError::schema(adapter, "serialize", format!("json encode failed: {err}"))
418 })?;
419 bytes.extend(line);
420 bytes.push(b'\n');
421 }
422 Ok(bytes)
423}
424
425pub(crate) fn config_path(adapter: &'static str, config: Value) -> Result<PathBuf, AdapterError> {
428 use serde::Deserialize;
429 #[derive(Deserialize)]
430 struct Cfg {
431 path: PathBuf,
432 }
433 let cfg: Cfg = serde_json::from_value(config)
434 .map_err(|err| AdapterError::config(adapter, format!("bad config blob: {err}")))?;
435 Ok(match std::env::var_os("HOME") {
436 Some(home) => crate::config::expand_home_under(&cfg.path, Path::new(&home)),
437 None => cfg.path,
438 })
439}
440
441pub(crate) fn raw_record(options: &ProviderOptions) -> Option<Value> {
442 options
443 .get("source")
444 .and_then(|source| source.get("raw_record"))
445 .cloned()
446}
447
448pub(crate) fn source_options(adapter: &'static str, raw: &Value) -> ProviderOptions {
454 let mut options = ProviderOptions::new();
455 options.insert(
456 "source".to_owned(),
457 serde_json::json!({
458 "adapter": adapter,
459 "raw_record": extract_raw_record(raw),
460 }),
461 );
462 options
463}
464
465#[inline]
469pub(crate) fn part_ordinal(ordinal: usize) -> i32 {
470 i32::try_from(ordinal).unwrap_or(i32::MAX)
471}
472
473pub(crate) fn validate_path_id(
479 adapter: &'static str,
480 kind: &str,
481 id: &str,
482 location: impl Into<String>,
483) -> Result<(), AdapterError> {
484 if id.is_empty()
485 || id.contains('/')
486 || id.contains('\\')
487 || id.contains("..")
488 || std::path::Path::new(id).is_absolute()
489 {
490 return Err(AdapterError::schema(
491 adapter,
492 location,
493 format!("{kind} contains a path separator or traversal marker: {id}"),
494 ));
495 }
496 Ok(())
497}
498
499#[allow(dead_code)]
509pub(crate) fn write_restored_files(
510 root: &Path,
511 files: Vec<RestoredFile>,
512) -> Result<(), AdapterError> {
513 let parent = root.parent().unwrap_or_else(|| Path::new("."));
516 let stem = root
517 .file_name()
518 .and_then(|n| n.to_str())
519 .unwrap_or("restore");
520 let staging = parent.join(format!(".{stem}.tmp"));
521 let io =
522 |location: String, source: std::io::Error| AdapterError::io("restore", location, source);
523 let _ = std::fs::remove_dir_all(&staging);
524 std::fs::create_dir_all(&staging).map_err(|e| io(staging.display().to_string(), e))?;
525
526 let result = (|| -> Result<(), AdapterError> {
527 for file in files {
528 write_one_into_staging(&staging, &file)?;
529 }
530 Ok(())
531 })();
532
533 if let Err(error) = result {
534 let _ = std::fs::remove_dir_all(&staging);
535 return Err(error);
536 }
537
538 let _ = std::fs::remove_dir_all(root);
540 if let Some(parent) = root.parent()
541 && !parent.as_os_str().is_empty()
542 {
543 std::fs::create_dir_all(parent).map_err(|e| io(parent.display().to_string(), e))?;
544 }
545 std::fs::rename(&staging, root).map_err(|e| {
546 let _ = std::fs::remove_dir_all(&staging);
547 io(root.display().to_string(), e)
548 })?;
549 Ok(())
550}
551
552#[allow(dead_code)]
553fn write_one_into_staging(staging: &Path, file: &RestoredFile) -> Result<(), AdapterError> {
554 for component in file.relative_path.components() {
556 use std::path::Component;
557 let segment = match component {
558 Component::Normal(s) => s,
559 Component::CurDir => continue,
560 _ => {
563 return Err(AdapterError::schema(
564 "restore",
565 file.relative_path.display().to_string(),
566 "relative_path component is not a normal name",
567 ));
568 }
569 };
570 let Some(text) = segment.to_str() else {
571 return Err(AdapterError::schema(
572 "restore",
573 file.relative_path.display().to_string(),
574 "relative_path segment is not UTF-8",
575 ));
576 };
577 validate_path_id(
578 "restore",
579 "relative_path segment",
580 text,
581 file.relative_path.display().to_string(),
582 )?;
583 }
584
585 let dest = staging.join(&file.relative_path);
586 if !dest.starts_with(staging) {
589 return Err(AdapterError::schema(
590 "restore",
591 file.relative_path.display().to_string(),
592 "relative_path escaped the restore root after join",
593 ));
594 }
595 let io =
596 |location: String, source: std::io::Error| AdapterError::io("restore", location, source);
597 if let Some(parent) = dest.parent() {
598 std::fs::create_dir_all(parent).map_err(|e| io(parent.display().to_string(), e))?;
599 }
600 std::fs::write(&dest, &file.bytes).map_err(|e| io(dest.display().to_string(), e))?;
601 Ok(())
602}
603
604pub(crate) fn extracted_text(value: &Option<Extracted<String>>) -> &str {
605 value.as_deref().map(String::as_str).unwrap_or("")
606}
607
608pub(crate) fn by_timestamp_then_id(
611 left: &MessageWithParts,
612 right: &MessageWithParts,
613) -> std::cmp::Ordering {
614 left.message
615 .timestamp()
616 .cmp(&right.message.timestamp())
617 .then_with(|| left.message.id().cmp(right.message.id()))
618}
619
620#[inline]
623pub(crate) fn empty_options() -> ProviderOptions {
624 ProviderOptions::new()
625}
626
627#[cfg(test)]
628pub(crate) mod test_support {
629 use std::{
630 collections::BTreeSet,
631 path::{Path, PathBuf},
632 };
633
634 use serde_json::Value;
635 use tempfile::TempDir;
636
637 use super::{Adapter, AdapterFactory, Env, NoopOracle, RestoreFidelity};
638 use crate::{handlers::ingest_adapter, sessions::Store};
639
640 pub(crate) fn assert_probe_default(
646 factory: &dyn AdapterFactory,
647 expected_subpath: &[&str],
648 ) -> anyhow::Result<()> {
649 let temp = TempDir::new()?;
650 let mut expected = temp.path().to_path_buf();
651 for segment in expected_subpath {
652 expected.push(segment);
653 }
654 std::fs::create_dir_all(&expected)?;
655 let env = Env::with_home(temp.path());
656
657 let probe = factory.probe_default(&env);
658 let got = probe
659 .as_ref()
660 .and_then(|value| value.get("path"))
661 .and_then(Value::as_str);
662 anyhow::ensure!(
663 got == expected.to_str(),
664 "factory must probe its install path: got {got:?}, expected {expected:?}",
665 );
666
667 std::fs::remove_dir_all(&expected)?;
668 anyhow::ensure!(
669 factory.probe_default(&env).is_none(),
670 "probe_default must be None once the install path disappears",
671 );
672 Ok(())
673 }
674
675 pub(crate) async fn assert_native_restore(
676 factory: &dyn AdapterFactory,
677 adapter: &dyn Adapter,
678 source_root: &Path,
679 ) -> anyhow::Result<()> {
680 let temp = TempDir::new()?;
681 let store = Store::open_local(temp.path()).await?;
682 ingest_adapter(&store, adapter, &NoopOracle, |_| {}).await?;
683 let session_ids = store.session_ids().await?;
684 assert!(
685 !session_ids.is_empty(),
686 "native restore fixture must ingest at least one session",
687 );
688
689 let mut restored_paths = BTreeSet::new();
690 for session_id in session_ids {
691 let Some(session) = store.get_session(&session_id).await? else {
692 anyhow::bail!("session id listed by store was not readable: {session_id}");
693 };
694 let restored = factory.serialize(&session, RestoreFidelity::Native)?;
695 for file in restored {
696 let expected = source_root.join(&file.relative_path);
697 let expected_bytes = std::fs::read(&expected)
698 .map_err(|err| anyhow::anyhow!("read {}: {err}", expected.display()))?;
699 assert_json_file_equal(&expected, &expected_bytes, &file.bytes)?;
700 restored_paths.insert(file.relative_path);
701 }
702 }
703 assert_eq!(
704 restored_paths,
705 source_json_files(source_root)?,
706 "native restore must emit exactly the source JSON/JSONL file set",
707 );
708 Ok(())
709 }
710
711 fn source_json_files(root: &Path) -> anyhow::Result<BTreeSet<PathBuf>> {
712 let mut out = BTreeSet::new();
713 collect_source_json_files(root, root, &mut out)?;
714 Ok(out)
715 }
716
717 fn collect_source_json_files(
718 root: &Path,
719 dir: &Path,
720 out: &mut BTreeSet<PathBuf>,
721 ) -> anyhow::Result<()> {
722 for entry in std::fs::read_dir(dir)? {
723 let entry = entry?;
724 let path = entry.path();
725 if entry.file_type()?.is_dir() {
726 collect_source_json_files(root, &path, out)?;
727 continue;
728 }
729 if let Some("json" | "jsonl") = path.extension().and_then(|ext| ext.to_str()) {
730 out.insert(path.strip_prefix(root)?.to_path_buf());
731 }
732 }
733 Ok(())
734 }
735
736 fn assert_json_file_equal(path: &Path, expected: &[u8], actual: &[u8]) -> anyhow::Result<()> {
737 if path.extension().and_then(|ext| ext.to_str()) == Some("jsonl") {
738 let expected_lines = json_lines(expected)?;
739 let actual_lines = json_lines(actual)?;
740 assert_eq!(
741 actual_lines,
742 expected_lines,
743 "jsonl mismatch at {}",
744 path.display()
745 );
746 } else {
747 let expected_value: serde_json::Value = serde_json::from_slice(expected)?;
748 let actual_value: serde_json::Value = serde_json::from_slice(actual)?;
749 assert_eq!(
750 actual_value,
751 expected_value,
752 "json mismatch at {}",
753 path.display()
754 );
755 }
756 Ok(())
757 }
758
759 fn json_lines(bytes: &[u8]) -> anyhow::Result<Vec<serde_json::Value>> {
760 let text = std::str::from_utf8(bytes)?;
761 text.lines()
762 .filter(|line| !line.trim().is_empty())
763 .map(|line| serde_json::from_str(line).map_err(Into::into))
764 .collect()
765 }
766}