Skip to main content

coding_agent_search/sources/
mod.rs

1//! Remote sources management for cass.
2//!
3//! This module provides functionality for configuring and syncing agent session
4//! data from remote machines via SSH. It enables cass to search across conversation
5//! history from multiple machines.
6//!
7//! # Architecture
8//!
9//! - **config**: Configuration types for defining remote sources
10//! - **provenance**: Types for tracking conversation origins
11//! - **sync**: Sync engine for pulling sessions from remotes via rsync/SSH
12//! - **status** (future): Sync status tracking
13//!
14//! # Configuration
15//!
16//! Sources are configured in `~/.config/cass/sources.toml`:
17//!
18//! ```toml
19//! [[sources]]
20//! name = "laptop"
21//! type = "ssh"
22//! host = "user@laptop.local"
23//! paths = ["~/.claude/projects", "~/.cursor"]
24//! ```
25//!
26//! # Provenance
27//!
28//! Each conversation tracks where it came from via [`provenance::Origin`]:
29//!
30//! ```rust,ignore
31//! use coding_agent_search::sources::provenance::{Origin, SourceKind};
32//!
33//! // Local conversation
34//! let local = Origin::local();
35//!
36//! // Remote conversation
37//! let remote = Origin::remote("work-laptop");
38//! ```
39//!
40//! # Syncing
41//!
42//! The sync engine uses rsync over SSH for efficient delta transfers:
43//!
44//! ```rust,ignore
45//! use coding_agent_search::sources::sync::SyncEngine;
46//! use coding_agent_search::sources::config::SourcesConfig;
47//!
48//! let config = SourcesConfig::load()?;
49//! let engine = SyncEngine::new(&data_dir);
50//!
51//! for source in config.remote_sources() {
52//!     let report = engine.sync_source(source)?;
53//!     println!("Synced {}: {} files", source.name, report.total_files());
54//! }
55//! ```
56//!
57//! # Usage
58//!
59//! ```rust,ignore
60//! use coding_agent_search::sources::config::SourcesConfig;
61//!
62//! // Load configuration
63//! let config = SourcesConfig::load()?;
64//!
65//! // Iterate remote sources
66//! for source in config.remote_sources() {
67//!     println!("Source: {} ({})", source.name, source.host.as_deref().unwrap_or("-"));
68//! }
69//! ```
70
71pub mod config;
72pub mod index;
73pub mod install;
74pub mod interactive;
75pub mod probe;
76pub mod provenance;
77pub mod setup;
78pub mod sync;
79
80use std::io::Read as IoRead;
81use std::process::{Child, Output};
82use std::sync::mpsc::{self, Receiver, RecvTimeoutError};
83use std::time::{Duration, Instant};
84
85use wait_timeout::ChildExt;
86
87/// Canonical SSH stderr marker for host-key verification failures.
88pub(crate) const HOST_KEY_VERIFICATION_FAILED: &str = "Host key verification failed";
89
90/// Build strict SSH CLI tokens with consistent trust policy.
91///
92/// The returned vector contains full `ssh` argument tokens:
93/// `-o BatchMode=yes -o ConnectTimeout=<secs> -o StrictHostKeyChecking=yes`.
94pub(crate) fn strict_ssh_cli_tokens(connect_timeout_secs: u64) -> Vec<String> {
95    let mut tokens = Vec::new();
96    if let Some(config_path) = ssh_config_override() {
97        tokens.push("-F".to_string());
98        tokens.push(config_path);
99    }
100    tokens.extend([
101        "-o".to_string(),
102        "BatchMode=yes".to_string(),
103        "-o".to_string(),
104        format!("ConnectTimeout={connect_timeout_secs}"),
105        "-o".to_string(),
106        "ServerAliveInterval=15".to_string(),
107        "-o".to_string(),
108        "ServerAliveCountMax=3".to_string(),
109        "-o".to_string(),
110        "StrictHostKeyChecking=yes".to_string(),
111    ]);
112    tokens
113}
114
115/// Build strict SSH command string for tools that require a single shell fragment.
116pub(crate) fn strict_ssh_command_for_rsync(connect_timeout_secs: u64) -> String {
117    let config_arg = ssh_config_override()
118        .map(|path| format!(" -F {}", shell_quote_ssh_arg(&path)))
119        .unwrap_or_default();
120    format!(
121        "ssh{config_arg} -o BatchMode=yes -o ConnectTimeout={connect_timeout_secs} -o ServerAliveInterval=15 -o ServerAliveCountMax=3 -o StrictHostKeyChecking=yes"
122    )
123}
124
125fn ssh_config_override() -> Option<String> {
126    dotenvy::var("CASS_SSH_CONFIG")
127        .ok()
128        .map(|value| value.trim().to_string())
129        .filter(|value| !value.is_empty())
130}
131
132fn shell_quote_ssh_arg(value: &str) -> String {
133    if !value.is_empty()
134        && value
135            .chars()
136            .all(|c| c.is_ascii_alphanumeric() || matches!(c, '/' | '.' | '_' | '-' | ':' | '@'))
137    {
138        return value.to_string();
139    }
140    format!("'{}'", value.replace('\'', "'\\''"))
141}
142
143fn drain_child_pipe<R>(mut pipe: R) -> Receiver<std::io::Result<Vec<u8>>>
144where
145    R: IoRead + Send + 'static,
146{
147    let (sender, receiver) = mpsc::channel();
148    std::thread::spawn(move || {
149        let mut output = Vec::new();
150        let result = pipe.read_to_end(&mut output).map(|_| output);
151        let _ = sender.send(result);
152    });
153    receiver
154}
155
156fn finish_child_pipe(
157    pipe_reader: Option<Receiver<std::io::Result<Vec<u8>>>>,
158    deadline: Instant,
159) -> std::io::Result<Option<Vec<u8>>> {
160    match pipe_reader {
161        Some(reader) => {
162            let remaining = deadline
163                .checked_duration_since(Instant::now())
164                .unwrap_or(Duration::ZERO);
165            match reader.recv_timeout(remaining) {
166                Ok(result) => result.map(Some),
167                Err(RecvTimeoutError::Timeout) => Ok(None),
168                Err(RecvTimeoutError::Disconnected) => Err(std::io::Error::new(
169                    std::io::ErrorKind::BrokenPipe,
170                    "child pipe reader disconnected before sending output",
171                )),
172            }
173        }
174        None => Ok(Some(Vec::new())),
175    }
176}
177
178/// Wait for a child process while draining stdout/stderr without letting either
179/// process execution or pipe collection outlive the same wall-clock deadline.
180pub(crate) fn wait_for_child_output_with_timeout(
181    mut child: Child,
182    timeout: Duration,
183) -> std::io::Result<Option<Output>> {
184    let timeout = if timeout.is_zero() {
185        Duration::from_secs(1)
186    } else {
187        timeout
188    };
189    let start = Instant::now();
190    let deadline = start.checked_add(timeout).unwrap_or(start);
191    let stdout_reader = child.stdout.take().map(drain_child_pipe);
192    let stderr_reader = child.stderr.take().map(drain_child_pipe);
193
194    match child.wait_timeout(timeout)? {
195        Some(status) => {
196            let Some(stdout) = finish_child_pipe(stdout_reader, deadline)? else {
197                return Ok(None);
198            };
199            let Some(stderr) = finish_child_pipe(stderr_reader, deadline)? else {
200                return Ok(None);
201            };
202            Ok(Some(Output {
203                status,
204                stdout,
205                stderr,
206            }))
207        }
208        None => {
209            let _ = child.kill();
210            let _ = child.wait();
211            Ok(None)
212        }
213    }
214}
215
216/// Whether stderr indicates SSH host-key verification failure.
217pub(crate) fn is_host_key_verification_failure(stderr: &str) -> bool {
218    stderr.contains(HOST_KEY_VERIFICATION_FAILED)
219}
220
221/// Standard user-facing error for host-key verification failures.
222pub(crate) fn host_key_verification_error(host: &str) -> String {
223    format!(
224        "Host key verification failed for {host} (add/verify host key in ~/.ssh/known_hosts first)"
225    )
226}
227
228// Re-export commonly used config types
229pub use config::{
230    BackupInfo, ConfigError, ConfigPreview, DiscoveredHost, MergeResult, PathMapping, Platform,
231    SkipReason, SourceConfigGenerator, SourceDefinition, SourcesConfig, SyncSchedule,
232    discover_ssh_hosts, get_preset_paths,
233};
234
235// Re-export commonly used provenance types
236pub use provenance::{LOCAL_SOURCE_ID, Origin, Source, SourceFilter, SourceKind};
237
238// Re-export commonly used sync types
239pub use sync::{
240    PathSyncResult, SourceHealthKind, SourceSyncAction, SourceSyncDecision, SourceSyncInfo,
241    SyncEngine, SyncError, SyncMethod, SyncReport, SyncResult, SyncStatus,
242};
243
244// Re-export commonly used probe types
245pub use probe::{
246    CassStatus, DetectedAgent, HostProbeResult, ProbeCache, ResourceInfo, SystemInfo, probe_host,
247    probe_hosts_parallel,
248};
249
250// Re-export commonly used install types
251pub use install::{
252    InstallError, InstallMethod, InstallProgress, InstallResult, InstallStage, RemoteInstaller,
253};
254
255// Re-export commonly used index types
256pub use index::{IndexError, IndexProgress, IndexResult, IndexStage, RemoteIndexer};
257
258// Re-export commonly used interactive types
259pub use interactive::{
260    CassStatusDisplay, HostDisplayInfo, HostSelectionResult, HostSelector, HostState,
261    InteractiveError, confirm_action, confirm_with_details, probe_to_display_info,
262    run_host_selection,
263};
264
265// Re-export commonly used setup types
266pub use setup::{SetupError, SetupOptions, SetupResult, SetupState, run_setup};
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271
272    struct EnvGuard {
273        key: &'static str,
274        previous: Option<String>,
275    }
276
277    impl EnvGuard {
278        fn set(key: &'static str, value: &str) -> Self {
279            let previous = dotenvy::var(key).ok();
280            // SAFETY: test helper toggles a process-local env var for isolation.
281            unsafe {
282                std::env::set_var(key, value);
283            }
284            Self { key, previous }
285        }
286    }
287
288    impl Drop for EnvGuard {
289        fn drop(&mut self) {
290            if let Some(value) = &self.previous {
291                // SAFETY: test helper restores prior process env for isolation.
292                unsafe {
293                    std::env::set_var(self.key, value);
294                }
295            } else {
296                // SAFETY: test helper restores prior process env for isolation.
297                unsafe {
298                    std::env::remove_var(self.key);
299                }
300            }
301        }
302    }
303
304    #[test]
305    #[serial_test::serial]
306    fn strict_ssh_cli_tokens_include_config_override() {
307        let _guard = EnvGuard::set("CASS_SSH_CONFIG", "/tmp/cass ssh/config");
308
309        let tokens = strict_ssh_cli_tokens(5);
310
311        assert_eq!(tokens[0], "-F");
312        assert_eq!(tokens[1], "/tmp/cass ssh/config");
313        assert!(tokens.contains(&"StrictHostKeyChecking=yes".to_string()));
314    }
315
316    #[test]
317    #[serial_test::serial]
318    fn strict_ssh_command_for_rsync_quotes_config_override() {
319        let _guard = EnvGuard::set("CASS_SSH_CONFIG", "/tmp/cass'ssh/config");
320
321        let command = strict_ssh_command_for_rsync(5);
322
323        assert!(command.starts_with("ssh -F '/tmp/cass'\\''ssh/config' "));
324        assert!(command.contains("StrictHostKeyChecking=yes"));
325    }
326
327    #[test]
328    #[serial_test::serial]
329    fn strict_ssh_helpers_ignore_empty_config_override() {
330        let _guard = EnvGuard::set("CASS_SSH_CONFIG", "   ");
331
332        let tokens = strict_ssh_cli_tokens(5);
333        let command = strict_ssh_command_for_rsync(5);
334
335        assert!(!tokens.contains(&"-F".to_string()));
336        assert!(!command.contains(" -F "));
337    }
338}