coding_agent_search/sources/
mod.rs1pub mod config;
72pub mod index;
73pub mod install;
74pub mod interactive;
75pub mod probe;
76pub mod provenance;
77pub mod setup;
78pub mod sync;
79
80use std::io::Read as IoRead;
81use std::process::{Child, Command, Output};
82use std::sync::mpsc::{self, Receiver, RecvTimeoutError};
83use std::thread::JoinHandle;
84use std::time::{Duration, Instant};
85
86use wait_timeout::ChildExt;
87
88pub(crate) const HOST_KEY_VERIFICATION_FAILED: &str = "Host key verification failed";
90
91pub(crate) fn strict_ssh_cli_tokens(connect_timeout_secs: u64) -> Vec<String> {
96 let mut tokens = Vec::new();
97 if let Some(config_path) = ssh_config_override() {
98 tokens.push("-F".to_string());
99 tokens.push(config_path);
100 }
101 tokens.extend([
102 "-o".to_string(),
103 "BatchMode=yes".to_string(),
104 "-o".to_string(),
105 format!("ConnectTimeout={connect_timeout_secs}"),
106 "-o".to_string(),
107 "ServerAliveInterval=15".to_string(),
108 "-o".to_string(),
109 "ServerAliveCountMax=3".to_string(),
110 "-o".to_string(),
111 "StrictHostKeyChecking=yes".to_string(),
112 ]);
113 tokens
114}
115
116pub(crate) fn strict_ssh_command_for_rsync(connect_timeout_secs: u64) -> String {
118 let config_arg = ssh_config_override()
119 .map(|path| format!(" -F {}", shell_quote_ssh_arg(&path)))
120 .unwrap_or_default();
121 format!(
122 "ssh{config_arg} -o BatchMode=yes -o ConnectTimeout={connect_timeout_secs} -o ServerAliveInterval=15 -o ServerAliveCountMax=3 -o StrictHostKeyChecking=yes"
123 )
124}
125
126fn ssh_config_override() -> Option<String> {
127 dotenvy::var("CASS_SSH_CONFIG")
128 .ok()
129 .map(|value| value.trim().to_string())
130 .filter(|value| !value.is_empty())
131}
132
133fn shell_quote_ssh_arg(value: &str) -> String {
134 if !value.is_empty()
135 && value
136 .chars()
137 .all(|c| c.is_ascii_alphanumeric() || matches!(c, '/' | '.' | '_' | '-' | ':' | '@'))
138 {
139 return value.to_string();
140 }
141 format!("'{}'", value.replace('\'', "'\\''"))
142}
143
144struct ChildPipeReader {
145 receiver: Receiver<std::io::Result<Vec<u8>>>,
146 handle: JoinHandle<()>,
147}
148
149fn drain_child_pipe<R>(mut pipe: R) -> ChildPipeReader
150where
151 R: IoRead + Send + 'static,
152{
153 let (sender, receiver) = mpsc::channel();
154 let handle = std::thread::spawn(move || {
155 let mut output = Vec::new();
156 let result = pipe.read_to_end(&mut output).map(|_| output);
157 let _ = sender.send(result);
158 });
159 ChildPipeReader { receiver, handle }
160}
161
162fn finish_child_pipe(
163 pipe_reader: Option<ChildPipeReader>,
164 deadline: Instant,
165) -> std::io::Result<Option<Vec<u8>>> {
166 match pipe_reader {
167 Some(reader) => {
168 let remaining = deadline
169 .checked_duration_since(Instant::now())
170 .unwrap_or(Duration::ZERO);
171 match reader.receiver.recv_timeout(remaining) {
172 Ok(result) => {
173 reader
174 .handle
175 .join()
176 .map_err(|_| std::io::Error::other("child pipe reader panicked"))?;
177 result.map(Some)
178 }
179 Err(RecvTimeoutError::Timeout) => Ok(None),
180 Err(RecvTimeoutError::Disconnected) => {
181 let reader_panicked = reader.handle.join().is_err();
182 let message = if reader_panicked {
183 "child pipe reader panicked before sending output"
184 } else {
185 "child pipe reader disconnected before sending output"
186 };
187 Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, message))
188 }
189 }
190 }
191 None => Ok(Some(Vec::new())),
192 }
193}
194
195#[cfg(unix)]
196pub(crate) fn configure_child_process_group(cmd: &mut Command) {
197 use std::os::unix::process::CommandExt;
198
199 cmd.process_group(0);
200}
201
202#[cfg(not(unix))]
203pub(crate) fn configure_child_process_group(_cmd: &mut Command) {}
204
205#[cfg(unix)]
206fn kill_child_process_group(pid: u32) {
207 let process_group = format!("-{pid}");
208 let _ = Command::new("/bin/kill")
209 .args(["-KILL", &process_group])
210 .stdin(std::process::Stdio::null())
211 .stdout(std::process::Stdio::null())
212 .stderr(std::process::Stdio::null())
213 .status();
214}
215
216#[cfg(not(unix))]
217fn kill_child_process_group(_pid: u32) {}
218
219pub(crate) fn wait_for_child_output_with_timeout(
227 mut child: Child,
228 timeout: Duration,
229) -> std::io::Result<Option<Output>> {
230 let timeout = if timeout.is_zero() {
231 Duration::from_secs(1)
232 } else {
233 timeout
234 };
235 let start = Instant::now();
236 let deadline = start.checked_add(timeout).unwrap_or(start);
237 let child_pid = child.id();
238 let stdout_reader = child.stdout.take().map(drain_child_pipe);
239 let stderr_reader = child.stderr.take().map(drain_child_pipe);
240
241 match child.wait_timeout(timeout)? {
242 Some(status) => {
243 let Some(stdout) = finish_child_pipe(stdout_reader, deadline)? else {
244 kill_child_process_group(child_pid);
245 return Ok(None);
246 };
247 let Some(stderr) = finish_child_pipe(stderr_reader, deadline)? else {
248 kill_child_process_group(child_pid);
249 return Ok(None);
250 };
251 Ok(Some(Output {
252 status,
253 stdout,
254 stderr,
255 }))
256 }
257 None => {
258 kill_child_process_group(child_pid);
259 let _ = child.kill();
260 let _ = child.wait();
261 Ok(None)
262 }
263 }
264}
265
266pub(crate) fn is_host_key_verification_failure(stderr: &str) -> bool {
268 stderr.contains(HOST_KEY_VERIFICATION_FAILED)
269}
270
271pub(crate) fn host_key_verification_error(host: &str) -> String {
273 format!(
274 "Host key verification failed for {host} (add/verify host key in ~/.ssh/known_hosts first)"
275 )
276}
277
278pub use config::{
280 BackupInfo, ConfigError, ConfigPreview, DiscoveredHost, MergeResult, PathMapping, Platform,
281 SkipReason, SourceConfigGenerator, SourceDefinition, SourcesConfig, SyncSchedule,
282 discover_ssh_hosts, get_preset_paths,
283};
284
285pub use provenance::{LOCAL_SOURCE_ID, Origin, Source, SourceFilter, SourceKind};
287
288pub use sync::{
290 PathSyncResult, SourceHealthKind, SourceSyncAction, SourceSyncDecision, SourceSyncInfo,
291 SyncEngine, SyncError, SyncMethod, SyncReport, SyncResult, SyncStatus,
292};
293
294pub use probe::{
296 CassStatus, DetectedAgent, HostProbeResult, ProbeCache, ResourceInfo, SystemInfo, probe_host,
297 probe_hosts_parallel,
298};
299
300pub use install::{
302 InstallError, InstallMethod, InstallProgress, InstallResult, InstallStage, RemoteInstaller,
303};
304
305pub use index::{IndexError, IndexProgress, IndexResult, IndexStage, RemoteIndexer};
307
308pub use interactive::{
310 CassStatusDisplay, HostDisplayInfo, HostSelectionResult, HostSelector, HostState,
311 InteractiveError, confirm_action, confirm_with_details, probe_to_display_info,
312 run_host_selection,
313};
314
315pub use setup::{SetupError, SetupOptions, SetupResult, SetupState, run_setup};
317
318#[cfg(test)]
319mod tests {
320 use super::*;
321
322 struct EnvGuard {
323 key: &'static str,
324 previous: Option<String>,
325 }
326
327 impl EnvGuard {
328 fn set(key: &'static str, value: &str) -> Self {
329 let previous = dotenvy::var(key).ok();
330 unsafe {
332 std::env::set_var(key, value);
333 }
334 Self { key, previous }
335 }
336 }
337
338 impl Drop for EnvGuard {
339 fn drop(&mut self) {
340 if let Some(value) = &self.previous {
341 unsafe {
343 std::env::set_var(self.key, value);
344 }
345 } else {
346 unsafe {
348 std::env::remove_var(self.key);
349 }
350 }
351 }
352 }
353
354 #[test]
355 #[serial_test::serial]
356 fn strict_ssh_cli_tokens_include_config_override() {
357 let _guard = EnvGuard::set("CASS_SSH_CONFIG", "/tmp/cass ssh/config");
358
359 let tokens = strict_ssh_cli_tokens(5);
360
361 assert_eq!(tokens[0], "-F");
362 assert_eq!(tokens[1], "/tmp/cass ssh/config");
363 assert!(tokens.contains(&"StrictHostKeyChecking=yes".to_string()));
364 }
365
366 #[test]
367 #[serial_test::serial]
368 fn strict_ssh_command_for_rsync_quotes_config_override() {
369 let _guard = EnvGuard::set("CASS_SSH_CONFIG", "/tmp/cass'ssh/config");
370
371 let command = strict_ssh_command_for_rsync(5);
372
373 assert!(command.starts_with("ssh -F '/tmp/cass'\\''ssh/config' "));
374 assert!(command.contains("StrictHostKeyChecking=yes"));
375 }
376
377 #[test]
378 #[serial_test::serial]
379 fn strict_ssh_helpers_ignore_empty_config_override() {
380 let _guard = EnvGuard::set("CASS_SSH_CONFIG", " ");
381
382 let tokens = strict_ssh_cli_tokens(5);
383 let command = strict_ssh_command_for_rsync(5);
384
385 assert!(!tokens.contains(&"-F".to_string()));
386 assert!(!command.contains(" -F "));
387 }
388}