Skip to main content

coding_agent_search/sources/
mod.rs

1//! Remote sources management for cass.
2//!
3//! This module provides functionality for configuring and syncing agent session
4//! data from remote machines via SSH. It enables cass to search across conversation
5//! history from multiple machines.
6//!
7//! # Architecture
8//!
9//! - **config**: Configuration types for defining remote sources
10//! - **provenance**: Types for tracking conversation origins
11//! - **sync**: Sync engine for pulling sessions from remotes via rsync/SSH
12//! - **status** (future): Sync status tracking
13//!
14//! # Configuration
15//!
16//! Sources are configured in `~/.config/cass/sources.toml`:
17//!
18//! ```toml
19//! [[sources]]
20//! name = "laptop"
21//! type = "ssh"
22//! host = "user@laptop.local"
23//! paths = ["~/.claude/projects", "~/.cursor"]
24//! ```
25//!
26//! # Provenance
27//!
28//! Each conversation tracks where it came from via [`provenance::Origin`]:
29//!
30//! ```rust,ignore
31//! use coding_agent_search::sources::provenance::{Origin, SourceKind};
32//!
33//! // Local conversation
34//! let local = Origin::local();
35//!
36//! // Remote conversation
37//! let remote = Origin::remote("work-laptop");
38//! ```
39//!
40//! # Syncing
41//!
42//! The sync engine uses rsync over SSH for efficient delta transfers:
43//!
44//! ```rust,ignore
45//! use coding_agent_search::sources::sync::SyncEngine;
46//! use coding_agent_search::sources::config::SourcesConfig;
47//!
48//! let config = SourcesConfig::load()?;
49//! let engine = SyncEngine::new(&data_dir);
50//!
51//! for source in config.remote_sources() {
52//!     let report = engine.sync_source(source)?;
53//!     println!("Synced {}: {} files", source.name, report.total_files());
54//! }
55//! ```
56//!
57//! # Usage
58//!
59//! ```rust,ignore
60//! use coding_agent_search::sources::config::SourcesConfig;
61//!
62//! // Load configuration
63//! let config = SourcesConfig::load()?;
64//!
65//! // Iterate remote sources
66//! for source in config.remote_sources() {
67//!     println!("Source: {} ({})", source.name, source.host.as_deref().unwrap_or("-"));
68//! }
69//! ```
70
71pub mod config;
72pub mod index;
73pub mod install;
74pub mod interactive;
75pub mod probe;
76pub mod provenance;
77pub mod setup;
78pub mod sync;
79
80use std::io::Read as IoRead;
81use std::process::{Child, Command, Output};
82use std::sync::mpsc::{self, Receiver, RecvTimeoutError};
83use std::thread::JoinHandle;
84use std::time::{Duration, Instant};
85
86use wait_timeout::ChildExt;
87
88/// Canonical SSH stderr marker for host-key verification failures.
89pub(crate) const HOST_KEY_VERIFICATION_FAILED: &str = "Host key verification failed";
90
91/// Build strict SSH CLI tokens with consistent trust policy.
92///
93/// The returned vector contains full `ssh` argument tokens:
94/// `-o BatchMode=yes -o ConnectTimeout=<secs> -o StrictHostKeyChecking=yes`.
95pub(crate) fn strict_ssh_cli_tokens(connect_timeout_secs: u64) -> Vec<String> {
96    let mut tokens = Vec::new();
97    if let Some(config_path) = ssh_config_override() {
98        tokens.push("-F".to_string());
99        tokens.push(config_path);
100    }
101    tokens.extend([
102        "-o".to_string(),
103        "BatchMode=yes".to_string(),
104        "-o".to_string(),
105        format!("ConnectTimeout={connect_timeout_secs}"),
106        "-o".to_string(),
107        "ServerAliveInterval=15".to_string(),
108        "-o".to_string(),
109        "ServerAliveCountMax=3".to_string(),
110        "-o".to_string(),
111        "StrictHostKeyChecking=yes".to_string(),
112    ]);
113    tokens
114}
115
116/// Build strict SSH command string for tools that require a single shell fragment.
117pub(crate) fn strict_ssh_command_for_rsync(connect_timeout_secs: u64) -> String {
118    let config_arg = ssh_config_override()
119        .map(|path| format!(" -F {}", shell_quote_ssh_arg(&path)))
120        .unwrap_or_default();
121    format!(
122        "ssh{config_arg} -o BatchMode=yes -o ConnectTimeout={connect_timeout_secs} -o ServerAliveInterval=15 -o ServerAliveCountMax=3 -o StrictHostKeyChecking=yes"
123    )
124}
125
126fn ssh_config_override() -> Option<String> {
127    dotenvy::var("CASS_SSH_CONFIG")
128        .ok()
129        .map(|value| value.trim().to_string())
130        .filter(|value| !value.is_empty())
131}
132
133fn shell_quote_ssh_arg(value: &str) -> String {
134    if !value.is_empty()
135        && value
136            .chars()
137            .all(|c| c.is_ascii_alphanumeric() || matches!(c, '/' | '.' | '_' | '-' | ':' | '@'))
138    {
139        return value.to_string();
140    }
141    format!("'{}'", value.replace('\'', "'\\''"))
142}
143
144struct ChildPipeReader {
145    receiver: Receiver<std::io::Result<Vec<u8>>>,
146    handle: JoinHandle<()>,
147}
148
149fn drain_child_pipe<R>(mut pipe: R) -> ChildPipeReader
150where
151    R: IoRead + Send + 'static,
152{
153    let (sender, receiver) = mpsc::channel();
154    let handle = std::thread::spawn(move || {
155        let mut output = Vec::new();
156        let result = pipe.read_to_end(&mut output).map(|_| output);
157        let _ = sender.send(result);
158    });
159    ChildPipeReader { receiver, handle }
160}
161
162fn finish_child_pipe(
163    pipe_reader: Option<ChildPipeReader>,
164    deadline: Instant,
165) -> std::io::Result<Option<Vec<u8>>> {
166    match pipe_reader {
167        Some(reader) => {
168            let remaining = deadline
169                .checked_duration_since(Instant::now())
170                .unwrap_or(Duration::ZERO);
171            match reader.receiver.recv_timeout(remaining) {
172                Ok(result) => {
173                    reader
174                        .handle
175                        .join()
176                        .map_err(|_| std::io::Error::other("child pipe reader panicked"))?;
177                    result.map(Some)
178                }
179                Err(RecvTimeoutError::Timeout) => Ok(None),
180                Err(RecvTimeoutError::Disconnected) => {
181                    let reader_panicked = reader.handle.join().is_err();
182                    let message = if reader_panicked {
183                        "child pipe reader panicked before sending output"
184                    } else {
185                        "child pipe reader disconnected before sending output"
186                    };
187                    Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, message))
188                }
189            }
190        }
191        None => Ok(Some(Vec::new())),
192    }
193}
194
195#[cfg(unix)]
196pub(crate) fn configure_child_process_group(cmd: &mut Command) {
197    use std::os::unix::process::CommandExt;
198
199    cmd.process_group(0);
200}
201
202#[cfg(not(unix))]
203pub(crate) fn configure_child_process_group(_cmd: &mut Command) {}
204
205#[cfg(unix)]
206fn kill_child_process_group(pid: u32) {
207    let process_group = format!("-{pid}");
208    let _ = Command::new("/bin/kill")
209        .args(["-KILL", &process_group])
210        .stdin(std::process::Stdio::null())
211        .stdout(std::process::Stdio::null())
212        .stderr(std::process::Stdio::null())
213        .status();
214}
215
216#[cfg(not(unix))]
217fn kill_child_process_group(_pid: u32) {}
218
219/// Wait for a child process while draining stdout/stderr without letting either
220/// process execution or pipe collection outlive the same wall-clock deadline.
221///
222/// Callers should pass children configured with
223/// [`configure_child_process_group`] before `spawn()`. Without that, the direct
224/// child can be killed but shell grandchildren may keep inherited pipe FDs open
225/// until they exit naturally.
226pub(crate) fn wait_for_child_output_with_timeout(
227    mut child: Child,
228    timeout: Duration,
229) -> std::io::Result<Option<Output>> {
230    let timeout = if timeout.is_zero() {
231        Duration::from_secs(1)
232    } else {
233        timeout
234    };
235    let start = Instant::now();
236    let deadline = start.checked_add(timeout).unwrap_or(start);
237    let child_pid = child.id();
238    let stdout_reader = child.stdout.take().map(drain_child_pipe);
239    let stderr_reader = child.stderr.take().map(drain_child_pipe);
240
241    match child.wait_timeout(timeout)? {
242        Some(status) => {
243            let Some(stdout) = finish_child_pipe(stdout_reader, deadline)? else {
244                kill_child_process_group(child_pid);
245                return Ok(None);
246            };
247            let Some(stderr) = finish_child_pipe(stderr_reader, deadline)? else {
248                kill_child_process_group(child_pid);
249                return Ok(None);
250            };
251            Ok(Some(Output {
252                status,
253                stdout,
254                stderr,
255            }))
256        }
257        None => {
258            kill_child_process_group(child_pid);
259            let _ = child.kill();
260            let _ = child.wait();
261            Ok(None)
262        }
263    }
264}
265
266/// Whether stderr indicates SSH host-key verification failure.
267pub(crate) fn is_host_key_verification_failure(stderr: &str) -> bool {
268    stderr.contains(HOST_KEY_VERIFICATION_FAILED)
269}
270
271/// Standard user-facing error for host-key verification failures.
272pub(crate) fn host_key_verification_error(host: &str) -> String {
273    format!(
274        "Host key verification failed for {host} (add/verify host key in ~/.ssh/known_hosts first)"
275    )
276}
277
278// Re-export commonly used config types
279pub use config::{
280    BackupInfo, ConfigError, ConfigPreview, DiscoveredHost, MergeResult, PathMapping, Platform,
281    SkipReason, SourceConfigGenerator, SourceDefinition, SourcesConfig, SyncSchedule,
282    discover_ssh_hosts, get_preset_paths,
283};
284
285// Re-export commonly used provenance types
286pub use provenance::{LOCAL_SOURCE_ID, Origin, Source, SourceFilter, SourceKind};
287
288// Re-export commonly used sync types
289pub use sync::{
290    PathSyncResult, SourceHealthKind, SourceSyncAction, SourceSyncDecision, SourceSyncInfo,
291    SyncEngine, SyncError, SyncMethod, SyncReport, SyncResult, SyncStatus,
292};
293
294// Re-export commonly used probe types
295pub use probe::{
296    CassStatus, DetectedAgent, HostProbeResult, ProbeCache, ResourceInfo, SystemInfo, probe_host,
297    probe_hosts_parallel,
298};
299
300// Re-export commonly used install types
301pub use install::{
302    InstallError, InstallMethod, InstallProgress, InstallResult, InstallStage, RemoteInstaller,
303};
304
305// Re-export commonly used index types
306pub use index::{IndexError, IndexProgress, IndexResult, IndexStage, RemoteIndexer};
307
308// Re-export commonly used interactive types
309pub use interactive::{
310    CassStatusDisplay, HostDisplayInfo, HostSelectionResult, HostSelector, HostState,
311    InteractiveError, confirm_action, confirm_with_details, probe_to_display_info,
312    run_host_selection,
313};
314
315// Re-export commonly used setup types
316pub use setup::{SetupError, SetupOptions, SetupResult, SetupState, run_setup};
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321
322    struct EnvGuard {
323        key: &'static str,
324        previous: Option<String>,
325    }
326
327    impl EnvGuard {
328        fn set(key: &'static str, value: &str) -> Self {
329            let previous = dotenvy::var(key).ok();
330            // SAFETY: test helper toggles a process-local env var for isolation.
331            unsafe {
332                std::env::set_var(key, value);
333            }
334            Self { key, previous }
335        }
336    }
337
338    impl Drop for EnvGuard {
339        fn drop(&mut self) {
340            if let Some(value) = &self.previous {
341                // SAFETY: test helper restores prior process env for isolation.
342                unsafe {
343                    std::env::set_var(self.key, value);
344                }
345            } else {
346                // SAFETY: test helper restores prior process env for isolation.
347                unsafe {
348                    std::env::remove_var(self.key);
349                }
350            }
351        }
352    }
353
354    #[test]
355    #[serial_test::serial]
356    fn strict_ssh_cli_tokens_include_config_override() {
357        let _guard = EnvGuard::set("CASS_SSH_CONFIG", "/tmp/cass ssh/config");
358
359        let tokens = strict_ssh_cli_tokens(5);
360
361        assert_eq!(tokens[0], "-F");
362        assert_eq!(tokens[1], "/tmp/cass ssh/config");
363        assert!(tokens.contains(&"StrictHostKeyChecking=yes".to_string()));
364    }
365
366    #[test]
367    #[serial_test::serial]
368    fn strict_ssh_command_for_rsync_quotes_config_override() {
369        let _guard = EnvGuard::set("CASS_SSH_CONFIG", "/tmp/cass'ssh/config");
370
371        let command = strict_ssh_command_for_rsync(5);
372
373        assert!(command.starts_with("ssh -F '/tmp/cass'\\''ssh/config' "));
374        assert!(command.contains("StrictHostKeyChecking=yes"));
375    }
376
377    #[test]
378    #[serial_test::serial]
379    fn strict_ssh_helpers_ignore_empty_config_override() {
380        let _guard = EnvGuard::set("CASS_SSH_CONFIG", "   ");
381
382        let tokens = strict_ssh_cli_tokens(5);
383        let command = strict_ssh_command_for_rsync(5);
384
385        assert!(!tokens.contains(&"-F".to_string()));
386        assert!(!command.contains(" -F "));
387    }
388}