1use std::io::Write as IoWrite;
29use std::process::{Child, Command, Output, Stdio};
30use std::time::{Duration, Instant};
31
32use serde::{Deserialize, Serialize};
33use thiserror::Error;
34
35use super::{
36 configure_child_process_group, host_key_verification_error, is_host_key_verification_failure,
37 probe::{CassStatus, HostProbeResult},
38 strict_ssh_cli_tokens, wait_for_child_output_with_timeout,
39};
40
41pub const DEFAULT_INDEX_TIMEOUT_SECS: u64 = 600; pub const INDEX_POLL_INTERVAL_SECS: u64 = 5;
50
51pub const MAX_INDEX_WAIT_SECS: u64 = 1800;
53
54const REMOTE_INDEX_MAX_LOAD_PER_CPU: f64 = 1.50;
56
57const REMOTE_INDEX_MIN_AVAILABLE_MEM_KIB: u64 = 512 * 1024;
59
60#[derive(Error, Debug)]
66pub enum IndexError {
67 #[error("SSH connection failed: {0}")]
68 SshFailed(String),
69
70 #[error("Index operation timed out after {0} seconds")]
71 Timeout(u64),
72
73 #[error("cass not found on remote host")]
74 CassNotFound,
75
76 #[error("Indexing failed: {stdout}\n{stderr}")]
77 IndexFailed {
78 stdout: String,
79 stderr: String,
80 exit_code: i32,
81 },
82
83 #[error("Disk full on remote host")]
84 DiskFull,
85
86 #[error("Permission denied accessing agent data directories")]
87 PermissionDenied,
88
89 #[error("Remote host pressure guard deferred indexing: {0}")]
90 HostPressure(String),
91
92 #[error("Indexing cancelled")]
93 Cancelled,
94
95 #[error("IO error: {0}")]
96 Io(#[from] std::io::Error),
97}
98
99impl IndexError {
100 pub fn help_message(&self) -> &'static str {
102 match self {
103 IndexError::DiskFull => "Free disk space on remote and retry.",
104 IndexError::Timeout(_) => {
105 "Index timed out. Try running manually: ssh host 'cass index'"
106 }
107 IndexError::PermissionDenied => "Check file permissions in agent data directories.",
108 IndexError::CassNotFound => "cass is not installed. Run installation first.",
109 IndexError::SshFailed(_) => "Check SSH connection and credentials.",
110 IndexError::HostPressure(_) => {
111 "Remote host is currently busy. Retry later or run indexing manually when idle."
112 }
113 _ => "See error details above.",
114 }
115 }
116}
117
118#[derive(Debug, Clone, PartialEq)]
119struct RemoteHostPressureSnapshot {
120 cpus: Option<u64>,
121 load1: Option<f64>,
122 mem_available_kib: Option<u64>,
123}
124
125#[derive(Debug, Clone, PartialEq)]
126struct RemoteHostPressureDecision {
127 defer_index: bool,
128 reason: String,
129 snapshot: RemoteHostPressureSnapshot,
130}
131
132impl RemoteHostPressureSnapshot {
133 fn from_command_output(output: &str) -> Self {
134 let mut snapshot = Self {
135 cpus: None,
136 load1: None,
137 mem_available_kib: None,
138 };
139
140 for line in output.lines() {
141 let Some((key, value)) = line.split_once('=') else {
142 continue;
143 };
144 match key.trim() {
145 "CPUS" => snapshot.cpus = value.trim().parse::<u64>().ok().filter(|v| *v > 0),
146 "LOAD1" => {
147 snapshot.load1 = value.trim().parse::<f64>().ok().filter(|v| v.is_finite())
148 }
149 "MEM_AVAILABLE_KIB" => {
150 snapshot.mem_available_kib = value.trim().parse::<u64>().ok()
151 }
152 _ => {}
153 }
154 }
155
156 snapshot
157 }
158
159 fn decide(self) -> RemoteHostPressureDecision {
160 let mut reasons = Vec::new();
161
162 if let (Some(load1), Some(cpus)) = (self.load1, self.cpus) {
163 let load_per_cpu = load1 / cpus as f64;
164 if load_per_cpu > REMOTE_INDEX_MAX_LOAD_PER_CPU {
165 reasons.push(format!(
166 "load_per_cpu={load_per_cpu:.2} exceeds ceiling {REMOTE_INDEX_MAX_LOAD_PER_CPU:.2}"
167 ));
168 }
169 }
170
171 if let Some(mem_available_kib) = self.mem_available_kib
172 && mem_available_kib < REMOTE_INDEX_MIN_AVAILABLE_MEM_KIB
173 {
174 reasons.push(format!(
175 "mem_available_kib={mem_available_kib} below floor {REMOTE_INDEX_MIN_AVAILABLE_MEM_KIB}"
176 ));
177 }
178
179 let defer_index = !reasons.is_empty();
180 let reason = if defer_index {
181 reasons.join("; ")
182 } else if self.cpus.is_none() || self.load1.is_none() || self.mem_available_kib.is_none() {
183 "remote pressure metrics incomplete; allowing conservative fallback path".to_string()
184 } else {
185 "remote host pressure is within indexing budget".to_string()
186 };
187
188 RemoteHostPressureDecision {
189 defer_index,
190 reason,
191 snapshot: self,
192 }
193 }
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
202#[serde(tag = "stage", rename_all = "snake_case")]
203pub enum IndexStage {
204 Starting,
206 Scanning { agent: String },
208 Building,
210 Complete,
212 Failed { error: String },
214}
215
216impl std::fmt::Display for IndexStage {
217 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218 match self {
219 IndexStage::Starting => write!(f, "Starting"),
220 IndexStage::Scanning { agent } => write!(f, "Scanning {}", agent),
221 IndexStage::Building => write!(f, "Building index"),
222 IndexStage::Complete => write!(f, "Complete"),
223 IndexStage::Failed { error } => write!(f, "Failed: {}", error),
224 }
225 }
226}
227
228#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct IndexProgress {
231 pub stage: IndexStage,
233 pub message: String,
235 pub sessions_found: u64,
237 pub sessions_indexed: u64,
239 pub percent: Option<u8>,
241 pub elapsed: Duration,
243}
244
245#[derive(Debug, Clone, Serialize, Deserialize)]
247pub struct IndexResult {
248 pub success: bool,
250 pub sessions_indexed: u64,
252 pub duration: Duration,
254 pub error: Option<String>,
256 #[serde(default, skip_serializing_if = "Option::is_none")]
258 pub artifact_manifest: Option<RemoteArtifactManifestResult>,
259}
260
261#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
263pub struct RemoteArtifactManifestResult {
264 pub success: bool,
266 pub manifest_path: Option<String>,
268 pub bundle_id: Option<String>,
270 pub chunk_count: Option<usize>,
272 pub expected_bytes: Option<u64>,
274 pub verification_status: Option<String>,
276 pub error: Option<String>,
278}
279
280#[derive(Debug, Deserialize)]
281struct RemoteArtifactManifestCommandOutput {
282 manifest_path: Option<String>,
283 bundle_id: Option<String>,
284 chunk_count: Option<usize>,
285 expected_bytes: Option<u64>,
286 verification_status: Option<String>,
287}
288
289impl RemoteArtifactManifestCommandOutput {
290 fn has_manifest_identity(&self) -> bool {
291 self.manifest_path.is_some() || self.bundle_id.is_some()
292 }
293
294 fn has_complete_manifest_shape(&self) -> bool {
295 self.manifest_path.is_some()
296 && self.bundle_id.is_some()
297 && self.verification_status.is_some()
298 }
299}
300
301impl RemoteArtifactManifestResult {
302 fn from_command_output(output: &str) -> Self {
303 match parse_remote_artifact_manifest_output(output) {
304 Ok(parsed) => {
305 let complete = parsed.verification_status.as_deref() == Some("complete");
306 Self {
307 success: complete,
308 manifest_path: parsed.manifest_path,
309 bundle_id: parsed.bundle_id,
310 chunk_count: parsed.chunk_count,
311 expected_bytes: parsed.expected_bytes,
312 verification_status: parsed.verification_status,
313 error: if complete {
314 None
315 } else {
316 Some("remote artifact manifest verification was not complete".to_string())
317 },
318 }
319 }
320 Err(err) => Self {
321 success: false,
322 manifest_path: None,
323 bundle_id: None,
324 chunk_count: None,
325 expected_bytes: None,
326 verification_status: None,
327 error: Some(format!(
328 "failed to parse remote artifact manifest output: {err}"
329 )),
330 },
331 }
332 }
333
334 fn from_error(error: impl Into<String>) -> Self {
335 Self {
336 success: false,
337 manifest_path: None,
338 bundle_id: None,
339 chunk_count: None,
340 expected_bytes: None,
341 verification_status: None,
342 error: Some(error.into()),
343 }
344 }
345}
346
347fn parse_remote_artifact_manifest_output(
348 output: &str,
349) -> serde_json::Result<RemoteArtifactManifestCommandOutput> {
350 let direct = serde_json::from_str::<RemoteArtifactManifestCommandOutput>(output);
351 if direct.is_ok() {
352 return direct;
353 }
354
355 let mut fallback = None;
356 for (idx, _) in output.char_indices().filter(|(_, ch)| *ch == '{') {
357 let mut deserializer = serde_json::Deserializer::from_str(&output[idx..]);
358 if let Ok(parsed) = RemoteArtifactManifestCommandOutput::deserialize(&mut deserializer) {
359 if parsed.has_complete_manifest_shape() {
360 return Ok(parsed);
361 }
362 if fallback.is_none() && parsed.has_manifest_identity() {
363 fallback = Some(parsed);
364 }
365 }
366 }
367
368 fallback.map_or(direct, Ok)
369}
370
371fn effective_ssh_command_timeout(requested: Duration, configured_secs: u64) -> Duration {
376 let configured = if configured_secs == 0 {
377 requested
378 } else {
379 Duration::from_secs(configured_secs)
380 };
381 let effective = requested.min(configured);
382 if effective.is_zero() {
383 Duration::from_secs(1)
384 } else {
385 effective
386 }
387}
388
389fn wait_for_command_output_with_timeout(
390 child: Child,
391 timeout: Duration,
392) -> Result<Output, IndexError> {
393 let timeout_secs = timeout.as_secs().max(1);
394 wait_for_child_output_with_timeout(child, timeout)?.ok_or(IndexError::Timeout(timeout_secs))
395}
396
397#[derive(Debug, Clone, Copy, PartialEq, Eq)]
398enum RemoteCassPresence {
399 Found,
400 NotFound,
401 Unknown,
402}
403
404fn parse_remote_cass_presence(output: &str) -> RemoteCassPresence {
405 let mut found = false;
406 let mut not_found = false;
407
408 for line in output.lines().map(str::trim) {
409 match line {
410 "CASS_FOUND" => found = true,
411 "CASS_NOT_FOUND" => not_found = true,
412 _ => {}
413 }
414 }
415
416 match (found, not_found) {
417 (true, false) => RemoteCassPresence::Found,
418 (false, true) => RemoteCassPresence::NotFound,
419 _ => RemoteCassPresence::Unknown,
420 }
421}
422
423fn summarize_remote_output(output: &str) -> String {
424 const MAX_REMOTE_OUTPUT_ERROR_CHARS: usize = 512;
425 let summary: String = output
426 .chars()
427 .take(MAX_REMOTE_OUTPUT_ERROR_CHARS)
428 .collect::<String>()
429 .trim()
430 .to_string();
431 if output.chars().count() > MAX_REMOTE_OUTPUT_ERROR_CHARS {
432 format!("{summary}...")
433 } else {
434 summary
435 }
436}
437
438fn poll_status(output: &str) -> Option<&str> {
439 output
440 .lines()
441 .filter_map(|line| line.trim().strip_prefix("STATUS="))
442 .next_back()
443}
444
445pub struct RemoteIndexer {
447 host: String,
449 ssh_timeout: u64,
451}
452
453impl RemoteIndexer {
454 pub fn new(host: impl Into<String>, ssh_timeout: u64) -> Self {
456 Self {
457 host: host.into(),
458 ssh_timeout,
459 }
460 }
461
462 pub fn with_defaults(host: impl Into<String>) -> Self {
464 Self::new(host, DEFAULT_INDEX_TIMEOUT_SECS)
465 }
466
467 pub fn host(&self) -> &str {
469 &self.host
470 }
471
472 pub fn needs_indexing(probe: &HostProbeResult) -> bool {
482 match &probe.cass_status {
483 CassStatus::NotFound => false,
485 CassStatus::InstalledNotIndexed { .. } => true,
487 CassStatus::Indexed { session_count, .. } => *session_count == 0,
489 CassStatus::Unknown => true,
491 }
492 }
493
494 pub fn run_index<F>(&self, on_progress: F) -> Result<IndexResult, IndexError>
500 where
501 F: Fn(IndexProgress) + Send + Sync,
502 {
503 let start = Instant::now();
504
505 on_progress(IndexProgress {
506 stage: IndexStage::Starting,
507 message: format!("Starting index on {}...", self.host),
508 sessions_found: 0,
509 sessions_indexed: 0,
510 percent: Some(0),
511 elapsed: start.elapsed(),
512 });
513
514 self.verify_cass_installed()?;
516 self.verify_remote_host_pressure()?;
517
518 let mut result = self.run_index_with_polling(&on_progress, start)?;
520 if result.success {
521 result.artifact_manifest = Some(self.write_remote_artifact_manifest());
522 }
523
524 if result.success {
526 on_progress(IndexProgress {
527 stage: IndexStage::Complete,
528 message: format!(
529 "Indexed {} sessions on {} ({:.1}s)",
530 result.sessions_indexed,
531 self.host,
532 result.duration.as_secs_f64()
533 ),
534 sessions_found: result.sessions_indexed,
535 sessions_indexed: result.sessions_indexed,
536 percent: Some(100),
537 elapsed: start.elapsed(),
538 });
539 } else {
540 on_progress(IndexProgress {
541 stage: IndexStage::Failed {
542 error: result.error.clone().unwrap_or_default(),
543 },
544 message: result
545 .error
546 .clone()
547 .unwrap_or_else(|| "Unknown error".into()),
548 sessions_found: 0,
549 sessions_indexed: 0,
550 percent: None,
551 elapsed: start.elapsed(),
552 });
553 }
554
555 Ok(result)
556 }
557
558 fn verify_cass_installed(&self) -> Result<(), IndexError> {
560 let script = r#"
561source ~/.cargo/env 2>/dev/null || true
562export PATH="$HOME/.local/bin:$HOME/.cargo/bin:$PATH"
563command -v cass >/dev/null 2>&1 && echo "CASS_FOUND" || echo "CASS_NOT_FOUND"
564"#;
565
566 let output = self.run_ssh_command(script, Duration::from_secs(30))?;
567
568 match parse_remote_cass_presence(&output) {
569 RemoteCassPresence::Found => Ok(()),
570 RemoteCassPresence::NotFound => Err(IndexError::CassNotFound),
571 RemoteCassPresence::Unknown => Err(IndexError::SshFailed(format!(
572 "Unexpected cass availability probe output: {}",
573 summarize_remote_output(&output)
574 ))),
575 }
576 }
577
578 fn host_pressure_script() -> &'static str {
579 r#"
580CPUS=$(getconf _NPROCESSORS_ONLN 2>/dev/null || nproc 2>/dev/null || echo "")
581LOAD1=$(awk '{print $1}' /proc/loadavg 2>/dev/null || echo "")
582MEM_AVAILABLE_KIB=$(awk '/MemAvailable:/ {print $2}' /proc/meminfo 2>/dev/null || echo "")
583printf 'CPUS=%s\n' "$CPUS"
584printf 'LOAD1=%s\n' "$LOAD1"
585printf 'MEM_AVAILABLE_KIB=%s\n' "$MEM_AVAILABLE_KIB"
586"#
587 }
588
589 fn verify_remote_host_pressure(&self) -> Result<(), IndexError> {
590 let output = self.run_ssh_command(Self::host_pressure_script(), Duration::from_secs(15))?;
591 let decision = RemoteHostPressureSnapshot::from_command_output(&output).decide();
592 if decision.defer_index {
593 Err(IndexError::HostPressure(decision.reason))
594 } else {
595 Ok(())
596 }
597 }
598
599 fn artifact_manifest_script() -> &'static str {
600 r#"
601source ~/.cargo/env 2>/dev/null || true
602export PATH="$HOME/.local/bin:$HOME/.cargo/bin:$PATH"
603cass sources artifact-manifest --write --json
604"#
605 }
606
607 fn write_remote_artifact_manifest(&self) -> RemoteArtifactManifestResult {
608 match self.run_ssh_command(Self::artifact_manifest_script(), Duration::from_secs(60)) {
609 Ok(output) => RemoteArtifactManifestResult::from_command_output(&output),
610 Err(err) => RemoteArtifactManifestResult::from_error(err.to_string()),
611 }
612 }
613
614 fn run_index_with_polling<F>(
620 &self,
621 on_progress: &F,
622 start: Instant,
623 ) -> Result<IndexResult, IndexError>
624 where
625 F: Fn(IndexProgress),
626 {
627 let start_script = r#"
629source ~/.cargo/env 2>/dev/null || true
630export PATH="$HOME/.local/bin:$HOME/.cargo/bin:$PATH"
631
632LOG_FILE=~/.cass_index.log
633rm -f "$LOG_FILE"
634
635nohup bash -c '
636set -o pipefail
637source "$HOME/.cargo/env" 2>/dev/null || true
638export PATH="$HOME/.local/bin:$HOME/.cargo/bin:$PATH"
639cass index --progress 2>&1 | tee "$HOME/.cass_index.log"
640STATUS=${PIPESTATUS[0]}
641if [ "$STATUS" -eq 0 ]; then
642 echo "===INDEX_COMPLETE===" >> "$HOME/.cass_index.log"
643else
644 echo "===INDEX_FAILED:${STATUS}===" >> "$HOME/.cass_index.log"
645fi
646' > /dev/null 2>&1 &
647
648echo "INDEX_PID=$!"
649"#;
650
651 let output = self.run_ssh_command(start_script, Duration::from_secs(30))?;
652
653 let _pid = output
655 .lines()
656 .find(|l| l.starts_with("INDEX_PID="))
657 .and_then(|l| l.strip_prefix("INDEX_PID="))
658 .and_then(|p| p.trim().parse::<u32>().ok());
659
660 self.poll_index_progress(on_progress, start)
662 }
663
664 fn poll_index_progress<F>(
666 &self,
667 on_progress: &F,
668 start: Instant,
669 ) -> Result<IndexResult, IndexError>
670 where
671 F: Fn(IndexProgress),
672 {
673 let poll_script = r#"
674LOG_FILE=~/.cass_index.log
675if [ -f "$LOG_FILE" ]; then
676 if grep -q "===INDEX_FAILED:" "$LOG_FILE"; then
677 echo "STATUS=ERROR"
678 tail -30 "$LOG_FILE"
679 elif grep -q "===INDEX_COMPLETE===" "$LOG_FILE"; then
680 echo "STATUS=COMPLETE"
681 # Get session count from health
682 source ~/.cargo/env 2>/dev/null || true
683 export PATH="$HOME/.local/bin:$HOME/.cargo/bin:$PATH"
684 STATS=$(cass stats --json 2>/dev/null || echo '{}')
685 SESSIONS=$(echo "$STATS" | tr -d '\n' | sed -n 's/.*"conversations"[[:space:]]*:[[:space:]]*\\([0-9][0-9]*\\).*/\\1/p')
686 echo "SESSIONS=${SESSIONS:-0}"
687 elif grep -qi "error" "$LOG_FILE" && ! grep -q "===INDEX_COMPLETE===" "$LOG_FILE"; then
688 # Check if it's a real error or just log noise
689 if grep -qE "(FATAL|panicked|No such file|Permission denied|disk full)" "$LOG_FILE"; then
690 echo "STATUS=ERROR"
691 tail -30 "$LOG_FILE"
692 else
693 echo "STATUS=RUNNING"
694 tail -10 "$LOG_FILE" | grep -E "(Scanning|Building|Indexed|Processing)" | tail -3
695 fi
696 else
697 echo "STATUS=RUNNING"
698 tail -10 "$LOG_FILE" | grep -E "(Scanning|Building|Indexed|Processing)" | tail -3
699 fi
700else
701 echo "STATUS=NOT_STARTED"
702fi
703"#;
704
705 let max_wait = Duration::from_secs(MAX_INDEX_WAIT_SECS);
706 let poll_interval = Duration::from_secs(INDEX_POLL_INTERVAL_SECS);
707 let mut sessions_found: u64 = 0;
708 let mut last_agent = String::new();
709 let mut progress_pct: u8 = 5;
710
711 loop {
712 if start.elapsed() > max_wait {
713 return Err(IndexError::Timeout(max_wait.as_secs()));
714 }
715
716 std::thread::sleep(poll_interval);
717
718 let output = self.run_ssh_command(poll_script, Duration::from_secs(30))?;
719 let mut saw_building_this_poll = false;
721
722 if poll_status(&output) == Some("COMPLETE") {
723 let sessions = output
725 .lines()
726 .find(|l| l.starts_with("SESSIONS="))
727 .and_then(|l| l.strip_prefix("SESSIONS="))
728 .and_then(|s| s.trim().parse::<u64>().ok())
729 .unwrap_or(0);
730
731 return Ok(IndexResult {
732 success: true,
733 sessions_indexed: sessions,
734 duration: start.elapsed(),
735 error: None,
736 artifact_manifest: None,
737 });
738 }
739
740 if poll_status(&output) == Some("ERROR") {
741 let error_lines: Vec<&str> = output
742 .lines()
743 .filter(|l| !l.trim_start().starts_with("STATUS="))
744 .collect();
745 let error_msg = error_lines.join("\n");
746
747 if error_msg.contains("disk full") || error_msg.contains("No space left") {
749 return Err(IndexError::DiskFull);
750 }
751 if error_msg.contains("Permission denied") {
752 return Err(IndexError::PermissionDenied);
753 }
754
755 return Ok(IndexResult {
756 success: false,
757 sessions_indexed: 0,
758 duration: start.elapsed(),
759 error: Some(error_msg),
760 artifact_manifest: None,
761 });
762 }
763
764 for line in output.lines() {
766 if line.contains("Scanning")
768 && let Some(agent) = extract_agent_from_line(line)
769 && agent != last_agent
770 {
771 progress_pct = (progress_pct + 5).min(40);
772 on_progress(IndexProgress {
773 stage: IndexStage::Scanning {
774 agent: agent.clone(),
775 },
776 message: format!("Scanning {}...", agent),
777 sessions_found,
778 sessions_indexed: 0,
779 percent: Some(progress_pct),
780 elapsed: start.elapsed(),
781 });
782 last_agent = agent;
783 }
784
785 if let Some(count) = extract_session_count(line) {
787 sessions_found = count;
788 }
789
790 if !saw_building_this_poll
792 && (line.contains("Building") || line.contains("Indexing"))
793 {
794 saw_building_this_poll = true;
795 progress_pct = (progress_pct + 5).min(85);
796 on_progress(IndexProgress {
797 stage: IndexStage::Building,
798 message: "Building search index...".into(),
799 sessions_found,
800 sessions_indexed: 0,
801 percent: Some(progress_pct),
802 elapsed: start.elapsed(),
803 });
804 }
805 }
806 }
807 }
808
809 fn run_ssh_command(&self, script: &str, timeout: Duration) -> Result<String, IndexError> {
811 let command_timeout = effective_ssh_command_timeout(timeout, self.ssh_timeout);
812 let connect_timeout_secs = command_timeout.as_secs().clamp(1, 30);
813
814 let mut cmd = Command::new("ssh");
815 cmd.args(strict_ssh_cli_tokens(connect_timeout_secs))
816 .arg("-o")
817 .arg("LogLevel=ERROR")
818 .arg("--")
819 .arg(&self.host)
820 .arg("bash")
821 .arg("-s");
822
823 cmd.stdin(Stdio::piped())
824 .stdout(Stdio::piped())
825 .stderr(Stdio::piped());
826 configure_child_process_group(&mut cmd);
827
828 let mut child = cmd.spawn()?;
829
830 let write_error = if let Some(mut stdin) = child.stdin.take() {
831 stdin.write_all(script.as_bytes()).err()
832 } else {
833 None
834 };
835
836 let output = wait_for_command_output_with_timeout(child, command_timeout)?;
837
838 if !output.status.success() {
839 let stderr = String::from_utf8_lossy(&output.stderr);
840 if is_host_key_verification_failure(&stderr) {
841 return Err(IndexError::SshFailed(host_key_verification_error(
842 &self.host,
843 )));
844 }
845 if stderr.contains("Connection refused")
846 || stderr.contains("Connection timed out")
847 || stderr.contains("Permission denied")
848 {
849 return Err(IndexError::SshFailed(stderr.trim().to_string()));
850 }
851 let code = output.status.code().unwrap_or(-1);
854 return Err(IndexError::SshFailed(format!(
855 "Remote script exited with code {code}: {}",
856 stderr.trim()
857 )));
858 }
859 if let Some(err) = write_error {
860 return Err(IndexError::Io(err));
861 }
862
863 Ok(String::from_utf8_lossy(&output.stdout).to_string())
864 }
865}
866
867fn extract_agent_from_line(line: &str) -> Option<String> {
869 if let Some(idx) = line.find("Scanning") {
871 let rest = &line[idx + 8..].trim();
872 let agent = rest
874 .split(|c: char| c.is_whitespace() || c == '/')
875 .filter(|s| !s.is_empty() && *s != "~" && *s != ".")
876 .map(|s| s.trim_start_matches('.'))
877 .find(|s| !s.is_empty())?;
878
879 let agent_name = match agent {
881 "claude" => "claude_code",
882 "codex" => "codex",
883 "cursor" => "cursor",
884 "gemini" => "gemini",
885 "aider" => "aider",
886 "goose" => "goose",
887 "continue" => "continue",
888 _ => agent,
889 };
890
891 return Some(agent_name.to_string());
892 }
893 None
894}
895
896fn extract_session_count(line: &str) -> Option<u64> {
898 let lower = line.to_lowercase();
902 let tokens: Vec<&str> = lower.split_whitespace().collect();
903
904 for (idx, token) in tokens.iter().enumerate() {
905 let word = token.trim_matches(|c: char| !c.is_ascii_alphabetic());
906 if matches!(
907 word,
908 "session" | "sessions" | "conversation" | "conversations"
909 ) {
910 if idx > 0
911 && let Some(count) = parse_count(tokens[idx - 1])
912 {
913 return Some(count);
914 }
915 if idx + 1 < tokens.len()
916 && let Some(count) = parse_count(tokens[idx + 1])
917 {
918 return Some(count);
919 }
920 }
921 }
922
923 None
924}
925
926fn parse_count(token: &str) -> Option<u64> {
927 let trimmed = token.trim_matches(|c: char| !c.is_ascii_digit() && c != '/');
928 let candidate = trimmed.split('/').next().unwrap_or(trimmed);
929 let digits: String = candidate.chars().filter(|c| c.is_ascii_digit()).collect();
930 if digits.is_empty() {
931 None
932 } else {
933 digits.parse::<u64>().ok()
934 }
935}
936
937#[cfg(test)]
938mod tests {
939 use super::*;
940 use crate::sources::probe::HostProbeResult;
941 use std::path::PathBuf;
942
943 fn load_probe_fixture(name: &str) -> HostProbeResult {
945 let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
946 .join("tests/fixtures/sources/probe")
947 .join(format!("{}.json", name));
948 let content = std::fs::read_to_string(&path)
949 .unwrap_or_else(|e| panic!("Failed to read fixture {}: {}", path.display(), e));
950 serde_json::from_str(&content)
951 .unwrap_or_else(|e| panic!("Failed to parse fixture {}: {}", path.display(), e))
952 }
953
954 #[test]
955 fn test_no_indexing_when_not_found() {
956 let probe = load_probe_fixture("no_cass_host");
958 assert!(!RemoteIndexer::needs_indexing(&probe));
959 }
960
961 #[test]
962 fn test_needs_indexing_when_not_indexed() {
963 let probe = load_probe_fixture("not_indexed_host");
964 assert!(RemoteIndexer::needs_indexing(&probe));
965 }
966
967 #[test]
968 fn test_needs_indexing_when_empty_index() {
969 let probe = load_probe_fixture("empty_index_host");
970 assert!(RemoteIndexer::needs_indexing(&probe));
971 }
972
973 #[test]
974 fn test_no_indexing_needed_when_has_sessions() {
975 let probe = load_probe_fixture("indexed_host");
976 assert!(!RemoteIndexer::needs_indexing(&probe));
977 }
978
979 #[test]
980 fn test_needs_indexing_when_unknown() {
981 let probe = load_probe_fixture("unknown_status_host");
982 assert!(RemoteIndexer::needs_indexing(&probe));
983 }
984
985 #[test]
986 fn test_extract_agent_from_line() {
987 assert_eq!(
988 extract_agent_from_line("Scanning ~/.claude/projects..."),
989 Some("claude_code".into())
990 );
991 assert_eq!(
992 extract_agent_from_line("Scanning ~/.codex/sessions..."),
993 Some("codex".into())
994 );
995 assert_eq!(
996 extract_agent_from_line("Scanning cursor data..."),
997 Some("cursor".into())
998 );
999 assert_eq!(extract_agent_from_line("Some other line"), None);
1000 }
1001
1002 #[test]
1003 fn test_extract_session_count() {
1004 assert_eq!(extract_session_count("found 234 sessions"), Some(234));
1005 assert_eq!(extract_session_count("Indexed 291 sessions"), Some(291));
1006 assert_eq!(
1007 extract_session_count("Processing 42 conversations"),
1008 Some(42)
1009 );
1010 assert_eq!(
1011 extract_session_count("2026-01-11 12:00:00 Indexed 291 sessions"),
1012 Some(291)
1013 );
1014 assert_eq!(extract_session_count("Indexed 5/10 conversations"), Some(5));
1015 assert_eq!(extract_session_count("conversations: 17 total"), Some(17));
1016 assert_eq!(extract_session_count("Some other line"), None);
1017 }
1018
1019 #[test]
1020 fn test_index_stage_display() {
1021 assert_eq!(IndexStage::Starting.to_string(), "Starting");
1022 assert_eq!(
1023 IndexStage::Scanning {
1024 agent: "claude_code".into()
1025 }
1026 .to_string(),
1027 "Scanning claude_code"
1028 );
1029 assert_eq!(IndexStage::Building.to_string(), "Building index");
1030 assert_eq!(IndexStage::Complete.to_string(), "Complete");
1031 }
1032
1033 #[test]
1034 fn test_index_error_help_messages() {
1035 assert!(IndexError::DiskFull.help_message().contains("Free disk"));
1036 assert!(IndexError::Timeout(600).help_message().contains("manually"));
1037 assert!(
1038 IndexError::PermissionDenied
1039 .help_message()
1040 .contains("permissions")
1041 );
1042 assert!(
1043 IndexError::CassNotFound
1044 .help_message()
1045 .contains("installed")
1046 );
1047 assert!(
1048 IndexError::HostPressure("load".into())
1049 .help_message()
1050 .contains("busy")
1051 );
1052 }
1053
1054 #[test]
1055 fn test_remote_indexer_new() {
1056 let indexer = RemoteIndexer::new("laptop", 300);
1057 assert_eq!(indexer.host(), "laptop");
1058
1059 let indexer2 = RemoteIndexer::with_defaults("server");
1060 assert_eq!(indexer2.host(), "server");
1061 }
1062
1063 #[test]
1064 fn test_effective_ssh_command_timeout_clamps_to_smaller_deadline() {
1065 assert_eq!(
1066 effective_ssh_command_timeout(Duration::from_secs(60), 10),
1067 Duration::from_secs(10)
1068 );
1069 assert_eq!(
1070 effective_ssh_command_timeout(Duration::from_secs(15), 60),
1071 Duration::from_secs(15)
1072 );
1073 assert_eq!(
1074 effective_ssh_command_timeout(Duration::from_secs(15), 0),
1075 Duration::from_secs(15)
1076 );
1077 assert_eq!(
1078 effective_ssh_command_timeout(Duration::ZERO, 0),
1079 Duration::from_secs(1)
1080 );
1081 }
1082
1083 #[test]
1084 fn test_parse_remote_cass_presence_requires_unambiguous_status_line() {
1085 assert_eq!(
1086 parse_remote_cass_presence("Welcome to host\nCASS_FOUND\n"),
1087 RemoteCassPresence::Found
1088 );
1089 assert_eq!(
1090 parse_remote_cass_presence("CASS_NOT_FOUND\n"),
1091 RemoteCassPresence::NotFound
1092 );
1093 assert_eq!(
1094 parse_remote_cass_presence("Welcome to host\n"),
1095 RemoteCassPresence::Unknown
1096 );
1097 assert_eq!(
1098 parse_remote_cass_presence("CASS_FOUND\nCASS_NOT_FOUND\n"),
1099 RemoteCassPresence::Unknown
1100 );
1101 }
1102
1103 #[test]
1104 fn test_poll_status_uses_exact_status_line() {
1105 assert_eq!(
1106 poll_status("banner mentions STATUS=ERROR in prose\nSTATUS=COMPLETE\nSESSIONS=7\n"),
1107 Some("COMPLETE")
1108 );
1109 assert_eq!(
1110 poll_status("STATUS=ERROR\nstartup banner\nSTATUS=COMPLETE\nSESSIONS=7\n"),
1111 Some("COMPLETE")
1112 );
1113 assert_eq!(poll_status(" STATUS=ERROR\npanic\n"), Some("ERROR"));
1114 assert_eq!(poll_status("no structured status"), None);
1115 }
1116
1117 #[cfg(unix)]
1118 #[test]
1119 fn test_wait_for_command_output_with_timeout_kills_stalled_child() -> anyhow::Result<()> {
1120 let mut cmd = Command::new("sh");
1121 cmd.arg("-c")
1122 .arg("sleep 2")
1123 .stdout(Stdio::piped())
1124 .stderr(Stdio::piped());
1125 configure_child_process_group(&mut cmd);
1126 let child = cmd.spawn()?;
1127
1128 let started = Instant::now();
1129 let Err(err) = wait_for_command_output_with_timeout(child, Duration::from_millis(50))
1130 else {
1131 return Err(anyhow::anyhow!("stalled command should time out"));
1132 };
1133 anyhow::ensure!(
1134 matches!(err, IndexError::Timeout(1)),
1135 "unexpected timeout error: {err}"
1136 );
1137 anyhow::ensure!(
1138 started.elapsed() < Duration::from_secs(1),
1139 "timeout helper waited too long for stalled child"
1140 );
1141 Ok(())
1142 }
1143
1144 #[cfg(unix)]
1145 #[test]
1146 fn test_wait_for_command_output_with_timeout_drains_large_output() -> anyhow::Result<()> {
1147 let mut cmd = Command::new("sh");
1148 cmd.arg("-c")
1149 .arg("yes stdout | head -c 200000; yes stderr | head -c 200000 >&2")
1150 .stdout(Stdio::piped())
1151 .stderr(Stdio::piped());
1152 configure_child_process_group(&mut cmd);
1153 let child = cmd.spawn()?;
1154
1155 let output = wait_for_command_output_with_timeout(child, Duration::from_secs(5))?;
1156 anyhow::ensure!(output.status.success(), "large-output command failed");
1157 anyhow::ensure!(
1158 output.stdout.len() == 200_000,
1159 "stdout was not fully drained"
1160 );
1161 anyhow::ensure!(
1162 output.stderr.len() == 200_000,
1163 "stderr was not fully drained"
1164 );
1165 Ok(())
1166 }
1167
1168 #[cfg(unix)]
1169 #[test]
1170 fn test_wait_for_command_output_with_timeout_bounds_inherited_pipe_waits() -> anyhow::Result<()>
1171 {
1172 let temp = tempfile::TempDir::new()?;
1173 let pid_file = temp.path().join("grandchild.pid");
1174 let mut cmd = Command::new("sh");
1175 cmd.env("PID_FILE", &pid_file);
1176 cmd.arg("-c")
1177 .arg("(sleep 30) & printf '%s\\n' \"$!\" > \"$PID_FILE\"; printf parent-done")
1178 .stdout(Stdio::piped())
1179 .stderr(Stdio::piped());
1180 configure_child_process_group(&mut cmd);
1181 let child = cmd.spawn()?;
1182
1183 let started = Instant::now();
1184 let Err(err) = wait_for_command_output_with_timeout(child, Duration::from_millis(100))
1185 else {
1186 return Err(anyhow::anyhow!(
1187 "inherited pipe should not outlive command deadline"
1188 ));
1189 };
1190 anyhow::ensure!(
1191 matches!(err, IndexError::Timeout(1)),
1192 "unexpected inherited-pipe timeout error: {err}"
1193 );
1194 anyhow::ensure!(
1195 started.elapsed() < Duration::from_secs(1),
1196 "timeout helper waited too long for inherited pipe"
1197 );
1198
1199 let grandchild_pid = std::fs::read_to_string(&pid_file)?;
1200 anyhow::ensure!(
1201 wait_until_unix_process_stops(grandchild_pid.trim(), Duration::from_secs(1)),
1202 "timeout must kill inherited-pipe grandchild process {}",
1203 grandchild_pid.trim()
1204 );
1205 Ok(())
1206 }
1207
1208 #[cfg(unix)]
1209 fn wait_until_unix_process_stops(pid: &str, timeout: Duration) -> bool {
1210 let deadline = Instant::now() + timeout;
1211 while Instant::now() < deadline {
1212 if !unix_process_is_running(pid) {
1213 return true;
1214 }
1215 std::thread::sleep(Duration::from_millis(20));
1216 }
1217 !unix_process_is_running(pid)
1218 }
1219
1220 #[cfg(unix)]
1221 fn unix_process_is_running(pid: &str) -> bool {
1222 let Ok(output) = Command::new("ps").args(["-o", "stat=", "-p", pid]).output() else {
1223 return true;
1224 };
1225 if !output.status.success() {
1226 return false;
1227 }
1228 let stat = String::from_utf8_lossy(&output.stdout);
1229 let stat = stat.trim();
1230 !stat.is_empty() && !stat.starts_with('Z')
1231 }
1232
1233 #[test]
1234 fn test_artifact_manifest_script_uses_robot_safe_write_command() {
1235 let script = RemoteIndexer::artifact_manifest_script();
1236 assert!(script.contains("cass sources artifact-manifest --write --json"));
1237 assert!(!script.contains("cass sources artifact-manifest --write\n"));
1238 }
1239
1240 #[test]
1241 fn test_host_pressure_script_reads_cheap_linux_metrics() {
1242 let script = RemoteIndexer::host_pressure_script();
1243 assert!(script.contains("_NPROCESSORS_ONLN"));
1244 assert!(script.contains("/proc/loadavg"));
1245 assert!(script.contains("MemAvailable"));
1246 }
1247
1248 #[test]
1249 fn test_remote_host_pressure_allows_incomplete_metrics() {
1250 let decision = RemoteHostPressureSnapshot::from_command_output("CPUS=\nLOAD1=\n").decide();
1251
1252 assert!(!decision.defer_index);
1253 assert!(
1254 decision.reason.contains("metrics incomplete"),
1255 "{decision:?}"
1256 );
1257 }
1258
1259 #[test]
1260 fn test_remote_host_pressure_defers_high_load() {
1261 let decision = RemoteHostPressureSnapshot::from_command_output(
1262 "CPUS=4\nLOAD1=7.20\nMEM_AVAILABLE_KIB=1048576\n",
1263 )
1264 .decide();
1265
1266 assert!(decision.defer_index);
1267 assert!(decision.reason.contains("load_per_cpu"), "{decision:?}");
1268 }
1269
1270 #[test]
1271 fn test_remote_host_pressure_defers_low_memory() {
1272 let decision = RemoteHostPressureSnapshot::from_command_output(
1273 "CPUS=64\nLOAD1=12.00\nMEM_AVAILABLE_KIB=131072\n",
1274 )
1275 .decide();
1276
1277 assert!(decision.defer_index);
1278 assert!(
1279 decision.reason.contains("mem_available_kib"),
1280 "{decision:?}"
1281 );
1282 }
1283
1284 #[test]
1285 fn test_remote_artifact_manifest_result_parses_command_output() {
1286 let result = RemoteArtifactManifestResult::from_command_output(
1287 r#"{
1288 "status": "ok",
1289 "manifest_path": "/home/user/.local/share/cass/index/v1/evidence-bundle-manifest.json",
1290 "bundle_id": "cass-lexical-abc",
1291 "chunk_count": 3,
1292 "expected_bytes": 42,
1293 "verification_status": "complete"
1294 }"#,
1295 );
1296
1297 assert!(result.success);
1298 assert_eq!(result.bundle_id.as_deref(), Some("cass-lexical-abc"));
1299 assert_eq!(result.chunk_count, Some(3));
1300 assert_eq!(result.expected_bytes, Some(42));
1301 assert_eq!(result.error, None);
1302 }
1303
1304 #[test]
1305 fn test_remote_artifact_manifest_result_parses_json_with_ssh_noise() {
1306 let result = RemoteArtifactManifestResult::from_command_output(
1307 r#"
1308Welcome to remote host {}
1309MOTD: maintenance starts at 02:00
1310{
1311 "status": "ok",
1312 "manifest_path": "/home/user/.local/share/cass/index/v1/evidence-bundle-manifest.json",
1313 "bundle_id": "cass-lexical-noisy",
1314 "chunk_count": 2,
1315 "expected_bytes": 99,
1316 "verification_status": "complete"
1317}
1318"#,
1319 );
1320
1321 assert!(result.success);
1322 assert_eq!(result.bundle_id.as_deref(), Some("cass-lexical-noisy"));
1323 assert_eq!(result.chunk_count, Some(2));
1324 assert_eq!(result.expected_bytes, Some(99));
1325 assert_eq!(result.error, None);
1326 }
1327
1328 #[test]
1329 fn test_remote_artifact_manifest_result_skips_partial_noise_json() {
1330 let result = RemoteArtifactManifestResult::from_command_output(
1331 r#"
1332Welcome to remote host
1333{"verification_status": "complete"}
1334{
1335 "status": "ok",
1336 "manifest_path": "/home/user/.local/share/cass/index/v1/evidence-bundle-manifest.json",
1337 "bundle_id": "cass-lexical-real",
1338 "chunk_count": 4,
1339 "expected_bytes": 123,
1340 "verification_status": "complete"
1341}
1342"#,
1343 );
1344
1345 assert!(result.success);
1346 assert_eq!(result.bundle_id.as_deref(), Some("cass-lexical-real"));
1347 assert_eq!(result.chunk_count, Some(4));
1348 assert_eq!(result.expected_bytes, Some(123));
1349 assert_eq!(result.error, None);
1350 }
1351}