1use std::io::Write as IoWrite;
29use std::process::{Child, Command, Output, Stdio};
30use std::time::{Duration, Instant};
31
32use serde::{Deserialize, Serialize};
33use thiserror::Error;
34
35use super::{
36 host_key_verification_error, is_host_key_verification_failure,
37 probe::{CassStatus, HostProbeResult},
38 strict_ssh_cli_tokens, wait_for_child_output_with_timeout,
39};
40
41pub const DEFAULT_INDEX_TIMEOUT_SECS: u64 = 600; pub const INDEX_POLL_INTERVAL_SECS: u64 = 5;
50
51pub const MAX_INDEX_WAIT_SECS: u64 = 1800;
53
54const REMOTE_INDEX_MAX_LOAD_PER_CPU: f64 = 1.50;
56
57const REMOTE_INDEX_MIN_AVAILABLE_MEM_KIB: u64 = 512 * 1024;
59
60#[derive(Error, Debug)]
66pub enum IndexError {
67 #[error("SSH connection failed: {0}")]
68 SshFailed(String),
69
70 #[error("Index operation timed out after {0} seconds")]
71 Timeout(u64),
72
73 #[error("cass not found on remote host")]
74 CassNotFound,
75
76 #[error("Indexing failed: {stdout}\n{stderr}")]
77 IndexFailed {
78 stdout: String,
79 stderr: String,
80 exit_code: i32,
81 },
82
83 #[error("Disk full on remote host")]
84 DiskFull,
85
86 #[error("Permission denied accessing agent data directories")]
87 PermissionDenied,
88
89 #[error("Remote host pressure guard deferred indexing: {0}")]
90 HostPressure(String),
91
92 #[error("Indexing cancelled")]
93 Cancelled,
94
95 #[error("IO error: {0}")]
96 Io(#[from] std::io::Error),
97}
98
99impl IndexError {
100 pub fn help_message(&self) -> &'static str {
102 match self {
103 IndexError::DiskFull => "Free disk space on remote and retry.",
104 IndexError::Timeout(_) => {
105 "Index timed out. Try running manually: ssh host 'cass index'"
106 }
107 IndexError::PermissionDenied => "Check file permissions in agent data directories.",
108 IndexError::CassNotFound => "cass is not installed. Run installation first.",
109 IndexError::SshFailed(_) => "Check SSH connection and credentials.",
110 IndexError::HostPressure(_) => {
111 "Remote host is currently busy. Retry later or run indexing manually when idle."
112 }
113 _ => "See error details above.",
114 }
115 }
116}
117
118#[derive(Debug, Clone, PartialEq)]
119struct RemoteHostPressureSnapshot {
120 cpus: Option<u64>,
121 load1: Option<f64>,
122 mem_available_kib: Option<u64>,
123}
124
125#[derive(Debug, Clone, PartialEq)]
126struct RemoteHostPressureDecision {
127 defer_index: bool,
128 reason: String,
129 snapshot: RemoteHostPressureSnapshot,
130}
131
132impl RemoteHostPressureSnapshot {
133 fn from_command_output(output: &str) -> Self {
134 let mut snapshot = Self {
135 cpus: None,
136 load1: None,
137 mem_available_kib: None,
138 };
139
140 for line in output.lines() {
141 let Some((key, value)) = line.split_once('=') else {
142 continue;
143 };
144 match key.trim() {
145 "CPUS" => snapshot.cpus = value.trim().parse::<u64>().ok().filter(|v| *v > 0),
146 "LOAD1" => {
147 snapshot.load1 = value.trim().parse::<f64>().ok().filter(|v| v.is_finite())
148 }
149 "MEM_AVAILABLE_KIB" => {
150 snapshot.mem_available_kib = value.trim().parse::<u64>().ok()
151 }
152 _ => {}
153 }
154 }
155
156 snapshot
157 }
158
159 fn decide(self) -> RemoteHostPressureDecision {
160 let mut reasons = Vec::new();
161
162 if let (Some(load1), Some(cpus)) = (self.load1, self.cpus) {
163 let load_per_cpu = load1 / cpus as f64;
164 if load_per_cpu > REMOTE_INDEX_MAX_LOAD_PER_CPU {
165 reasons.push(format!(
166 "load_per_cpu={load_per_cpu:.2} exceeds ceiling {REMOTE_INDEX_MAX_LOAD_PER_CPU:.2}"
167 ));
168 }
169 }
170
171 if let Some(mem_available_kib) = self.mem_available_kib
172 && mem_available_kib < REMOTE_INDEX_MIN_AVAILABLE_MEM_KIB
173 {
174 reasons.push(format!(
175 "mem_available_kib={mem_available_kib} below floor {REMOTE_INDEX_MIN_AVAILABLE_MEM_KIB}"
176 ));
177 }
178
179 let defer_index = !reasons.is_empty();
180 let reason = if defer_index {
181 reasons.join("; ")
182 } else if self.cpus.is_none() || self.load1.is_none() || self.mem_available_kib.is_none() {
183 "remote pressure metrics incomplete; allowing conservative fallback path".to_string()
184 } else {
185 "remote host pressure is within indexing budget".to_string()
186 };
187
188 RemoteHostPressureDecision {
189 defer_index,
190 reason,
191 snapshot: self,
192 }
193 }
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
202#[serde(tag = "stage", rename_all = "snake_case")]
203pub enum IndexStage {
204 Starting,
206 Scanning { agent: String },
208 Building,
210 Complete,
212 Failed { error: String },
214}
215
216impl std::fmt::Display for IndexStage {
217 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218 match self {
219 IndexStage::Starting => write!(f, "Starting"),
220 IndexStage::Scanning { agent } => write!(f, "Scanning {}", agent),
221 IndexStage::Building => write!(f, "Building index"),
222 IndexStage::Complete => write!(f, "Complete"),
223 IndexStage::Failed { error } => write!(f, "Failed: {}", error),
224 }
225 }
226}
227
228#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct IndexProgress {
231 pub stage: IndexStage,
233 pub message: String,
235 pub sessions_found: u64,
237 pub sessions_indexed: u64,
239 pub percent: Option<u8>,
241 pub elapsed: Duration,
243}
244
245#[derive(Debug, Clone, Serialize, Deserialize)]
247pub struct IndexResult {
248 pub success: bool,
250 pub sessions_indexed: u64,
252 pub duration: Duration,
254 pub error: Option<String>,
256 #[serde(default, skip_serializing_if = "Option::is_none")]
258 pub artifact_manifest: Option<RemoteArtifactManifestResult>,
259}
260
261#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
263pub struct RemoteArtifactManifestResult {
264 pub success: bool,
266 pub manifest_path: Option<String>,
268 pub bundle_id: Option<String>,
270 pub chunk_count: Option<usize>,
272 pub expected_bytes: Option<u64>,
274 pub verification_status: Option<String>,
276 pub error: Option<String>,
278}
279
280#[derive(Debug, Deserialize)]
281struct RemoteArtifactManifestCommandOutput {
282 manifest_path: Option<String>,
283 bundle_id: Option<String>,
284 chunk_count: Option<usize>,
285 expected_bytes: Option<u64>,
286 verification_status: Option<String>,
287}
288
289impl RemoteArtifactManifestCommandOutput {
290 fn has_manifest_identity(&self) -> bool {
291 self.manifest_path.is_some() || self.bundle_id.is_some()
292 }
293
294 fn has_complete_manifest_shape(&self) -> bool {
295 self.manifest_path.is_some()
296 && self.bundle_id.is_some()
297 && self.verification_status.is_some()
298 }
299}
300
301impl RemoteArtifactManifestResult {
302 fn from_command_output(output: &str) -> Self {
303 match parse_remote_artifact_manifest_output(output) {
304 Ok(parsed) => {
305 let complete = parsed.verification_status.as_deref() == Some("complete");
306 Self {
307 success: complete,
308 manifest_path: parsed.manifest_path,
309 bundle_id: parsed.bundle_id,
310 chunk_count: parsed.chunk_count,
311 expected_bytes: parsed.expected_bytes,
312 verification_status: parsed.verification_status,
313 error: if complete {
314 None
315 } else {
316 Some("remote artifact manifest verification was not complete".to_string())
317 },
318 }
319 }
320 Err(err) => Self {
321 success: false,
322 manifest_path: None,
323 bundle_id: None,
324 chunk_count: None,
325 expected_bytes: None,
326 verification_status: None,
327 error: Some(format!(
328 "failed to parse remote artifact manifest output: {err}"
329 )),
330 },
331 }
332 }
333
334 fn from_error(error: impl Into<String>) -> Self {
335 Self {
336 success: false,
337 manifest_path: None,
338 bundle_id: None,
339 chunk_count: None,
340 expected_bytes: None,
341 verification_status: None,
342 error: Some(error.into()),
343 }
344 }
345}
346
347fn parse_remote_artifact_manifest_output(
348 output: &str,
349) -> serde_json::Result<RemoteArtifactManifestCommandOutput> {
350 let direct = serde_json::from_str::<RemoteArtifactManifestCommandOutput>(output);
351 if direct.is_ok() {
352 return direct;
353 }
354
355 let mut fallback = None;
356 for (idx, _) in output.char_indices().filter(|(_, ch)| *ch == '{') {
357 let mut deserializer = serde_json::Deserializer::from_str(&output[idx..]);
358 if let Ok(parsed) = RemoteArtifactManifestCommandOutput::deserialize(&mut deserializer) {
359 if parsed.has_complete_manifest_shape() {
360 return Ok(parsed);
361 }
362 if fallback.is_none() && parsed.has_manifest_identity() {
363 fallback = Some(parsed);
364 }
365 }
366 }
367
368 fallback.map_or(direct, Ok)
369}
370
371fn effective_ssh_command_timeout(requested: Duration, configured_secs: u64) -> Duration {
376 let configured = if configured_secs == 0 {
377 requested
378 } else {
379 Duration::from_secs(configured_secs)
380 };
381 let effective = requested.min(configured);
382 if effective.is_zero() {
383 Duration::from_secs(1)
384 } else {
385 effective
386 }
387}
388
389fn wait_for_command_output_with_timeout(
390 child: Child,
391 timeout: Duration,
392) -> Result<Output, IndexError> {
393 let timeout_secs = timeout.as_secs().max(1);
394 wait_for_child_output_with_timeout(child, timeout)?.ok_or(IndexError::Timeout(timeout_secs))
395}
396
397#[derive(Debug, Clone, Copy, PartialEq, Eq)]
398enum RemoteCassPresence {
399 Found,
400 NotFound,
401 Unknown,
402}
403
404fn parse_remote_cass_presence(output: &str) -> RemoteCassPresence {
405 let mut found = false;
406 let mut not_found = false;
407
408 for line in output.lines().map(str::trim) {
409 match line {
410 "CASS_FOUND" => found = true,
411 "CASS_NOT_FOUND" => not_found = true,
412 _ => {}
413 }
414 }
415
416 match (found, not_found) {
417 (true, false) => RemoteCassPresence::Found,
418 (false, true) => RemoteCassPresence::NotFound,
419 _ => RemoteCassPresence::Unknown,
420 }
421}
422
423fn summarize_remote_output(output: &str) -> String {
424 const MAX_REMOTE_OUTPUT_ERROR_CHARS: usize = 512;
425 let summary: String = output
426 .chars()
427 .take(MAX_REMOTE_OUTPUT_ERROR_CHARS)
428 .collect::<String>()
429 .trim()
430 .to_string();
431 if output.chars().count() > MAX_REMOTE_OUTPUT_ERROR_CHARS {
432 format!("{summary}...")
433 } else {
434 summary
435 }
436}
437
438fn poll_status(output: &str) -> Option<&str> {
439 output
440 .lines()
441 .filter_map(|line| line.trim().strip_prefix("STATUS="))
442 .next_back()
443}
444
445pub struct RemoteIndexer {
447 host: String,
449 ssh_timeout: u64,
451}
452
453impl RemoteIndexer {
454 pub fn new(host: impl Into<String>, ssh_timeout: u64) -> Self {
456 Self {
457 host: host.into(),
458 ssh_timeout,
459 }
460 }
461
462 pub fn with_defaults(host: impl Into<String>) -> Self {
464 Self::new(host, DEFAULT_INDEX_TIMEOUT_SECS)
465 }
466
467 pub fn host(&self) -> &str {
469 &self.host
470 }
471
472 pub fn needs_indexing(probe: &HostProbeResult) -> bool {
482 match &probe.cass_status {
483 CassStatus::NotFound => false,
485 CassStatus::InstalledNotIndexed { .. } => true,
487 CassStatus::Indexed { session_count, .. } => *session_count == 0,
489 CassStatus::Unknown => true,
491 }
492 }
493
494 pub fn run_index<F>(&self, on_progress: F) -> Result<IndexResult, IndexError>
500 where
501 F: Fn(IndexProgress) + Send + Sync,
502 {
503 let start = Instant::now();
504
505 on_progress(IndexProgress {
506 stage: IndexStage::Starting,
507 message: format!("Starting index on {}...", self.host),
508 sessions_found: 0,
509 sessions_indexed: 0,
510 percent: Some(0),
511 elapsed: start.elapsed(),
512 });
513
514 self.verify_cass_installed()?;
516 self.verify_remote_host_pressure()?;
517
518 let mut result = self.run_index_with_polling(&on_progress, start)?;
520 if result.success {
521 result.artifact_manifest = Some(self.write_remote_artifact_manifest());
522 }
523
524 if result.success {
526 on_progress(IndexProgress {
527 stage: IndexStage::Complete,
528 message: format!(
529 "Indexed {} sessions on {} ({:.1}s)",
530 result.sessions_indexed,
531 self.host,
532 result.duration.as_secs_f64()
533 ),
534 sessions_found: result.sessions_indexed,
535 sessions_indexed: result.sessions_indexed,
536 percent: Some(100),
537 elapsed: start.elapsed(),
538 });
539 } else {
540 on_progress(IndexProgress {
541 stage: IndexStage::Failed {
542 error: result.error.clone().unwrap_or_default(),
543 },
544 message: result
545 .error
546 .clone()
547 .unwrap_or_else(|| "Unknown error".into()),
548 sessions_found: 0,
549 sessions_indexed: 0,
550 percent: None,
551 elapsed: start.elapsed(),
552 });
553 }
554
555 Ok(result)
556 }
557
558 fn verify_cass_installed(&self) -> Result<(), IndexError> {
560 let script = r#"
561source ~/.cargo/env 2>/dev/null || true
562export PATH="$HOME/.local/bin:$HOME/.cargo/bin:$PATH"
563command -v cass >/dev/null 2>&1 && echo "CASS_FOUND" || echo "CASS_NOT_FOUND"
564"#;
565
566 let output = self.run_ssh_command(script, Duration::from_secs(30))?;
567
568 match parse_remote_cass_presence(&output) {
569 RemoteCassPresence::Found => Ok(()),
570 RemoteCassPresence::NotFound => Err(IndexError::CassNotFound),
571 RemoteCassPresence::Unknown => Err(IndexError::SshFailed(format!(
572 "Unexpected cass availability probe output: {}",
573 summarize_remote_output(&output)
574 ))),
575 }
576 }
577
578 fn host_pressure_script() -> &'static str {
579 r#"
580CPUS=$(getconf _NPROCESSORS_ONLN 2>/dev/null || nproc 2>/dev/null || echo "")
581LOAD1=$(awk '{print $1}' /proc/loadavg 2>/dev/null || echo "")
582MEM_AVAILABLE_KIB=$(awk '/MemAvailable:/ {print $2}' /proc/meminfo 2>/dev/null || echo "")
583printf 'CPUS=%s\n' "$CPUS"
584printf 'LOAD1=%s\n' "$LOAD1"
585printf 'MEM_AVAILABLE_KIB=%s\n' "$MEM_AVAILABLE_KIB"
586"#
587 }
588
589 fn verify_remote_host_pressure(&self) -> Result<(), IndexError> {
590 let output = self.run_ssh_command(Self::host_pressure_script(), Duration::from_secs(15))?;
591 let decision = RemoteHostPressureSnapshot::from_command_output(&output).decide();
592 if decision.defer_index {
593 Err(IndexError::HostPressure(decision.reason))
594 } else {
595 Ok(())
596 }
597 }
598
599 fn artifact_manifest_script() -> &'static str {
600 r#"
601source ~/.cargo/env 2>/dev/null || true
602export PATH="$HOME/.local/bin:$HOME/.cargo/bin:$PATH"
603cass sources artifact-manifest --write --json
604"#
605 }
606
607 fn write_remote_artifact_manifest(&self) -> RemoteArtifactManifestResult {
608 match self.run_ssh_command(Self::artifact_manifest_script(), Duration::from_secs(60)) {
609 Ok(output) => RemoteArtifactManifestResult::from_command_output(&output),
610 Err(err) => RemoteArtifactManifestResult::from_error(err.to_string()),
611 }
612 }
613
614 fn run_index_with_polling<F>(
620 &self,
621 on_progress: &F,
622 start: Instant,
623 ) -> Result<IndexResult, IndexError>
624 where
625 F: Fn(IndexProgress),
626 {
627 let start_script = r#"
629source ~/.cargo/env 2>/dev/null || true
630export PATH="$HOME/.local/bin:$HOME/.cargo/bin:$PATH"
631
632LOG_FILE=~/.cass_index.log
633rm -f "$LOG_FILE"
634
635nohup bash -c '
636set -o pipefail
637source "$HOME/.cargo/env" 2>/dev/null || true
638export PATH="$HOME/.local/bin:$HOME/.cargo/bin:$PATH"
639cass index --progress 2>&1 | tee "$HOME/.cass_index.log"
640STATUS=${PIPESTATUS[0]}
641if [ "$STATUS" -eq 0 ]; then
642 echo "===INDEX_COMPLETE===" >> "$HOME/.cass_index.log"
643else
644 echo "===INDEX_FAILED:${STATUS}===" >> "$HOME/.cass_index.log"
645fi
646' > /dev/null 2>&1 &
647
648echo "INDEX_PID=$!"
649"#;
650
651 let output = self.run_ssh_command(start_script, Duration::from_secs(30))?;
652
653 let _pid = output
655 .lines()
656 .find(|l| l.starts_with("INDEX_PID="))
657 .and_then(|l| l.strip_prefix("INDEX_PID="))
658 .and_then(|p| p.trim().parse::<u32>().ok());
659
660 self.poll_index_progress(on_progress, start)
662 }
663
664 fn poll_index_progress<F>(
666 &self,
667 on_progress: &F,
668 start: Instant,
669 ) -> Result<IndexResult, IndexError>
670 where
671 F: Fn(IndexProgress),
672 {
673 let poll_script = r#"
674LOG_FILE=~/.cass_index.log
675if [ -f "$LOG_FILE" ]; then
676 if grep -q "===INDEX_FAILED:" "$LOG_FILE"; then
677 echo "STATUS=ERROR"
678 tail -30 "$LOG_FILE"
679 elif grep -q "===INDEX_COMPLETE===" "$LOG_FILE"; then
680 echo "STATUS=COMPLETE"
681 # Get session count from health
682 source ~/.cargo/env 2>/dev/null || true
683 export PATH="$HOME/.local/bin:$HOME/.cargo/bin:$PATH"
684 STATS=$(cass stats --json 2>/dev/null || echo '{}')
685 SESSIONS=$(echo "$STATS" | tr -d '\n' | sed -n 's/.*"conversations"[[:space:]]*:[[:space:]]*\\([0-9][0-9]*\\).*/\\1/p')
686 echo "SESSIONS=${SESSIONS:-0}"
687 elif grep -qi "error" "$LOG_FILE" && ! grep -q "===INDEX_COMPLETE===" "$LOG_FILE"; then
688 # Check if it's a real error or just log noise
689 if grep -qE "(FATAL|panicked|No such file|Permission denied|disk full)" "$LOG_FILE"; then
690 echo "STATUS=ERROR"
691 tail -30 "$LOG_FILE"
692 else
693 echo "STATUS=RUNNING"
694 tail -10 "$LOG_FILE" | grep -E "(Scanning|Building|Indexed|Processing)" | tail -3
695 fi
696 else
697 echo "STATUS=RUNNING"
698 tail -10 "$LOG_FILE" | grep -E "(Scanning|Building|Indexed|Processing)" | tail -3
699 fi
700else
701 echo "STATUS=NOT_STARTED"
702fi
703"#;
704
705 let max_wait = Duration::from_secs(MAX_INDEX_WAIT_SECS);
706 let poll_interval = Duration::from_secs(INDEX_POLL_INTERVAL_SECS);
707 let mut sessions_found: u64 = 0;
708 let mut last_agent = String::new();
709 let mut progress_pct: u8 = 5;
710
711 loop {
712 if start.elapsed() > max_wait {
713 return Err(IndexError::Timeout(max_wait.as_secs()));
714 }
715
716 std::thread::sleep(poll_interval);
717
718 let output = self.run_ssh_command(poll_script, Duration::from_secs(30))?;
719 let mut saw_building_this_poll = false;
721
722 if poll_status(&output) == Some("COMPLETE") {
723 let sessions = output
725 .lines()
726 .find(|l| l.starts_with("SESSIONS="))
727 .and_then(|l| l.strip_prefix("SESSIONS="))
728 .and_then(|s| s.trim().parse::<u64>().ok())
729 .unwrap_or(0);
730
731 return Ok(IndexResult {
732 success: true,
733 sessions_indexed: sessions,
734 duration: start.elapsed(),
735 error: None,
736 artifact_manifest: None,
737 });
738 }
739
740 if poll_status(&output) == Some("ERROR") {
741 let error_lines: Vec<&str> = output
742 .lines()
743 .filter(|l| !l.trim_start().starts_with("STATUS="))
744 .collect();
745 let error_msg = error_lines.join("\n");
746
747 if error_msg.contains("disk full") || error_msg.contains("No space left") {
749 return Err(IndexError::DiskFull);
750 }
751 if error_msg.contains("Permission denied") {
752 return Err(IndexError::PermissionDenied);
753 }
754
755 return Ok(IndexResult {
756 success: false,
757 sessions_indexed: 0,
758 duration: start.elapsed(),
759 error: Some(error_msg),
760 artifact_manifest: None,
761 });
762 }
763
764 for line in output.lines() {
766 if line.contains("Scanning")
768 && let Some(agent) = extract_agent_from_line(line)
769 && agent != last_agent
770 {
771 progress_pct = (progress_pct + 5).min(40);
772 on_progress(IndexProgress {
773 stage: IndexStage::Scanning {
774 agent: agent.clone(),
775 },
776 message: format!("Scanning {}...", agent),
777 sessions_found,
778 sessions_indexed: 0,
779 percent: Some(progress_pct),
780 elapsed: start.elapsed(),
781 });
782 last_agent = agent;
783 }
784
785 if let Some(count) = extract_session_count(line) {
787 sessions_found = count;
788 }
789
790 if !saw_building_this_poll
792 && (line.contains("Building") || line.contains("Indexing"))
793 {
794 saw_building_this_poll = true;
795 progress_pct = (progress_pct + 5).min(85);
796 on_progress(IndexProgress {
797 stage: IndexStage::Building,
798 message: "Building search index...".into(),
799 sessions_found,
800 sessions_indexed: 0,
801 percent: Some(progress_pct),
802 elapsed: start.elapsed(),
803 });
804 }
805 }
806 }
807 }
808
809 fn run_ssh_command(&self, script: &str, timeout: Duration) -> Result<String, IndexError> {
811 let command_timeout = effective_ssh_command_timeout(timeout, self.ssh_timeout);
812 let connect_timeout_secs = command_timeout.as_secs().clamp(1, 30);
813
814 let mut cmd = Command::new("ssh");
815 cmd.args(strict_ssh_cli_tokens(connect_timeout_secs))
816 .arg("-o")
817 .arg("LogLevel=ERROR")
818 .arg("--")
819 .arg(&self.host)
820 .arg("bash")
821 .arg("-s");
822
823 cmd.stdin(Stdio::piped())
824 .stdout(Stdio::piped())
825 .stderr(Stdio::piped());
826
827 let mut child = cmd.spawn()?;
828
829 let write_error = if let Some(mut stdin) = child.stdin.take() {
830 stdin.write_all(script.as_bytes()).err()
831 } else {
832 None
833 };
834
835 let output = wait_for_command_output_with_timeout(child, command_timeout)?;
836
837 if !output.status.success() {
838 let stderr = String::from_utf8_lossy(&output.stderr);
839 if is_host_key_verification_failure(&stderr) {
840 return Err(IndexError::SshFailed(host_key_verification_error(
841 &self.host,
842 )));
843 }
844 if stderr.contains("Connection refused")
845 || stderr.contains("Connection timed out")
846 || stderr.contains("Permission denied")
847 {
848 return Err(IndexError::SshFailed(stderr.trim().to_string()));
849 }
850 let code = output.status.code().unwrap_or(-1);
853 return Err(IndexError::SshFailed(format!(
854 "Remote script exited with code {code}: {}",
855 stderr.trim()
856 )));
857 }
858 if let Some(err) = write_error {
859 return Err(IndexError::Io(err));
860 }
861
862 Ok(String::from_utf8_lossy(&output.stdout).to_string())
863 }
864}
865
866fn extract_agent_from_line(line: &str) -> Option<String> {
868 if let Some(idx) = line.find("Scanning") {
870 let rest = &line[idx + 8..].trim();
871 let agent = rest
873 .split(|c: char| c.is_whitespace() || c == '/')
874 .filter(|s| !s.is_empty() && *s != "~" && *s != ".")
875 .map(|s| s.trim_start_matches('.'))
876 .find(|s| !s.is_empty())?;
877
878 let agent_name = match agent {
880 "claude" => "claude_code",
881 "codex" => "codex",
882 "cursor" => "cursor",
883 "gemini" => "gemini",
884 "aider" => "aider",
885 "goose" => "goose",
886 "continue" => "continue",
887 _ => agent,
888 };
889
890 return Some(agent_name.to_string());
891 }
892 None
893}
894
895fn extract_session_count(line: &str) -> Option<u64> {
897 let lower = line.to_lowercase();
901 let tokens: Vec<&str> = lower.split_whitespace().collect();
902
903 for (idx, token) in tokens.iter().enumerate() {
904 let word = token.trim_matches(|c: char| !c.is_ascii_alphabetic());
905 if matches!(
906 word,
907 "session" | "sessions" | "conversation" | "conversations"
908 ) {
909 if idx > 0
910 && let Some(count) = parse_count(tokens[idx - 1])
911 {
912 return Some(count);
913 }
914 if idx + 1 < tokens.len()
915 && let Some(count) = parse_count(tokens[idx + 1])
916 {
917 return Some(count);
918 }
919 }
920 }
921
922 None
923}
924
925fn parse_count(token: &str) -> Option<u64> {
926 let trimmed = token.trim_matches(|c: char| !c.is_ascii_digit() && c != '/');
927 let candidate = trimmed.split('/').next().unwrap_or(trimmed);
928 let digits: String = candidate.chars().filter(|c| c.is_ascii_digit()).collect();
929 if digits.is_empty() {
930 None
931 } else {
932 digits.parse::<u64>().ok()
933 }
934}
935
936#[cfg(test)]
937mod tests {
938 use super::*;
939 use crate::sources::probe::HostProbeResult;
940 use std::path::PathBuf;
941
942 fn load_probe_fixture(name: &str) -> HostProbeResult {
944 let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
945 .join("tests/fixtures/sources/probe")
946 .join(format!("{}.json", name));
947 let content = std::fs::read_to_string(&path)
948 .unwrap_or_else(|e| panic!("Failed to read fixture {}: {}", path.display(), e));
949 serde_json::from_str(&content)
950 .unwrap_or_else(|e| panic!("Failed to parse fixture {}: {}", path.display(), e))
951 }
952
953 #[test]
954 fn test_no_indexing_when_not_found() {
955 let probe = load_probe_fixture("no_cass_host");
957 assert!(!RemoteIndexer::needs_indexing(&probe));
958 }
959
960 #[test]
961 fn test_needs_indexing_when_not_indexed() {
962 let probe = load_probe_fixture("not_indexed_host");
963 assert!(RemoteIndexer::needs_indexing(&probe));
964 }
965
966 #[test]
967 fn test_needs_indexing_when_empty_index() {
968 let probe = load_probe_fixture("empty_index_host");
969 assert!(RemoteIndexer::needs_indexing(&probe));
970 }
971
972 #[test]
973 fn test_no_indexing_needed_when_has_sessions() {
974 let probe = load_probe_fixture("indexed_host");
975 assert!(!RemoteIndexer::needs_indexing(&probe));
976 }
977
978 #[test]
979 fn test_needs_indexing_when_unknown() {
980 let probe = load_probe_fixture("unknown_status_host");
981 assert!(RemoteIndexer::needs_indexing(&probe));
982 }
983
984 #[test]
985 fn test_extract_agent_from_line() {
986 assert_eq!(
987 extract_agent_from_line("Scanning ~/.claude/projects..."),
988 Some("claude_code".into())
989 );
990 assert_eq!(
991 extract_agent_from_line("Scanning ~/.codex/sessions..."),
992 Some("codex".into())
993 );
994 assert_eq!(
995 extract_agent_from_line("Scanning cursor data..."),
996 Some("cursor".into())
997 );
998 assert_eq!(extract_agent_from_line("Some other line"), None);
999 }
1000
1001 #[test]
1002 fn test_extract_session_count() {
1003 assert_eq!(extract_session_count("found 234 sessions"), Some(234));
1004 assert_eq!(extract_session_count("Indexed 291 sessions"), Some(291));
1005 assert_eq!(
1006 extract_session_count("Processing 42 conversations"),
1007 Some(42)
1008 );
1009 assert_eq!(
1010 extract_session_count("2026-01-11 12:00:00 Indexed 291 sessions"),
1011 Some(291)
1012 );
1013 assert_eq!(extract_session_count("Indexed 5/10 conversations"), Some(5));
1014 assert_eq!(extract_session_count("conversations: 17 total"), Some(17));
1015 assert_eq!(extract_session_count("Some other line"), None);
1016 }
1017
1018 #[test]
1019 fn test_index_stage_display() {
1020 assert_eq!(IndexStage::Starting.to_string(), "Starting");
1021 assert_eq!(
1022 IndexStage::Scanning {
1023 agent: "claude_code".into()
1024 }
1025 .to_string(),
1026 "Scanning claude_code"
1027 );
1028 assert_eq!(IndexStage::Building.to_string(), "Building index");
1029 assert_eq!(IndexStage::Complete.to_string(), "Complete");
1030 }
1031
1032 #[test]
1033 fn test_index_error_help_messages() {
1034 assert!(IndexError::DiskFull.help_message().contains("Free disk"));
1035 assert!(IndexError::Timeout(600).help_message().contains("manually"));
1036 assert!(
1037 IndexError::PermissionDenied
1038 .help_message()
1039 .contains("permissions")
1040 );
1041 assert!(
1042 IndexError::CassNotFound
1043 .help_message()
1044 .contains("installed")
1045 );
1046 assert!(
1047 IndexError::HostPressure("load".into())
1048 .help_message()
1049 .contains("busy")
1050 );
1051 }
1052
1053 #[test]
1054 fn test_remote_indexer_new() {
1055 let indexer = RemoteIndexer::new("laptop", 300);
1056 assert_eq!(indexer.host(), "laptop");
1057
1058 let indexer2 = RemoteIndexer::with_defaults("server");
1059 assert_eq!(indexer2.host(), "server");
1060 }
1061
1062 #[test]
1063 fn test_effective_ssh_command_timeout_clamps_to_smaller_deadline() {
1064 assert_eq!(
1065 effective_ssh_command_timeout(Duration::from_secs(60), 10),
1066 Duration::from_secs(10)
1067 );
1068 assert_eq!(
1069 effective_ssh_command_timeout(Duration::from_secs(15), 60),
1070 Duration::from_secs(15)
1071 );
1072 assert_eq!(
1073 effective_ssh_command_timeout(Duration::from_secs(15), 0),
1074 Duration::from_secs(15)
1075 );
1076 assert_eq!(
1077 effective_ssh_command_timeout(Duration::ZERO, 0),
1078 Duration::from_secs(1)
1079 );
1080 }
1081
1082 #[test]
1083 fn test_parse_remote_cass_presence_requires_unambiguous_status_line() {
1084 assert_eq!(
1085 parse_remote_cass_presence("Welcome to host\nCASS_FOUND\n"),
1086 RemoteCassPresence::Found
1087 );
1088 assert_eq!(
1089 parse_remote_cass_presence("CASS_NOT_FOUND\n"),
1090 RemoteCassPresence::NotFound
1091 );
1092 assert_eq!(
1093 parse_remote_cass_presence("Welcome to host\n"),
1094 RemoteCassPresence::Unknown
1095 );
1096 assert_eq!(
1097 parse_remote_cass_presence("CASS_FOUND\nCASS_NOT_FOUND\n"),
1098 RemoteCassPresence::Unknown
1099 );
1100 }
1101
1102 #[test]
1103 fn test_poll_status_uses_exact_status_line() {
1104 assert_eq!(
1105 poll_status("banner mentions STATUS=ERROR in prose\nSTATUS=COMPLETE\nSESSIONS=7\n"),
1106 Some("COMPLETE")
1107 );
1108 assert_eq!(
1109 poll_status("STATUS=ERROR\nstartup banner\nSTATUS=COMPLETE\nSESSIONS=7\n"),
1110 Some("COMPLETE")
1111 );
1112 assert_eq!(poll_status(" STATUS=ERROR\npanic\n"), Some("ERROR"));
1113 assert_eq!(poll_status("no structured status"), None);
1114 }
1115
1116 #[cfg(unix)]
1117 #[test]
1118 fn test_wait_for_command_output_with_timeout_kills_stalled_child() {
1119 let child = Command::new("sh")
1120 .arg("-c")
1121 .arg("sleep 2")
1122 .stdout(Stdio::piped())
1123 .stderr(Stdio::piped())
1124 .spawn()
1125 .expect("spawn sleep helper");
1126
1127 let started = Instant::now();
1128 let err = wait_for_command_output_with_timeout(child, Duration::from_millis(50))
1129 .expect_err("stalled command should time out");
1130 assert!(matches!(err, IndexError::Timeout(1)));
1131 assert!(started.elapsed() < Duration::from_secs(1));
1132 }
1133
1134 #[cfg(unix)]
1135 #[test]
1136 fn test_wait_for_command_output_with_timeout_drains_large_output() {
1137 let child = Command::new("sh")
1138 .arg("-c")
1139 .arg("yes stdout | head -c 200000; yes stderr | head -c 200000 >&2")
1140 .stdout(Stdio::piped())
1141 .stderr(Stdio::piped())
1142 .spawn()
1143 .expect("spawn large-output helper");
1144
1145 let output = wait_for_command_output_with_timeout(child, Duration::from_secs(5))
1146 .expect("large-output command should finish without filling pipes");
1147 assert!(output.status.success());
1148 assert_eq!(output.stdout.len(), 200_000);
1149 assert_eq!(output.stderr.len(), 200_000);
1150 }
1151
1152 #[cfg(unix)]
1153 #[test]
1154 fn test_wait_for_command_output_with_timeout_bounds_inherited_pipe_waits() {
1155 let child = Command::new("sh")
1156 .arg("-c")
1157 .arg("(sleep 2) & printf parent-done")
1158 .stdout(Stdio::piped())
1159 .stderr(Stdio::piped())
1160 .spawn()
1161 .expect("spawn inherited-pipe helper");
1162
1163 let started = Instant::now();
1164 let err = wait_for_command_output_with_timeout(child, Duration::from_millis(100))
1165 .expect_err("inherited pipe should not outlive command deadline");
1166 assert!(matches!(err, IndexError::Timeout(1)));
1167 assert!(started.elapsed() < Duration::from_secs(1));
1168 }
1169
1170 #[test]
1171 fn test_artifact_manifest_script_uses_robot_safe_write_command() {
1172 let script = RemoteIndexer::artifact_manifest_script();
1173 assert!(script.contains("cass sources artifact-manifest --write --json"));
1174 assert!(!script.contains("cass sources artifact-manifest --write\n"));
1175 }
1176
1177 #[test]
1178 fn test_host_pressure_script_reads_cheap_linux_metrics() {
1179 let script = RemoteIndexer::host_pressure_script();
1180 assert!(script.contains("_NPROCESSORS_ONLN"));
1181 assert!(script.contains("/proc/loadavg"));
1182 assert!(script.contains("MemAvailable"));
1183 }
1184
1185 #[test]
1186 fn test_remote_host_pressure_allows_incomplete_metrics() {
1187 let decision = RemoteHostPressureSnapshot::from_command_output("CPUS=\nLOAD1=\n").decide();
1188
1189 assert!(!decision.defer_index);
1190 assert!(
1191 decision.reason.contains("metrics incomplete"),
1192 "{decision:?}"
1193 );
1194 }
1195
1196 #[test]
1197 fn test_remote_host_pressure_defers_high_load() {
1198 let decision = RemoteHostPressureSnapshot::from_command_output(
1199 "CPUS=4\nLOAD1=7.20\nMEM_AVAILABLE_KIB=1048576\n",
1200 )
1201 .decide();
1202
1203 assert!(decision.defer_index);
1204 assert!(decision.reason.contains("load_per_cpu"), "{decision:?}");
1205 }
1206
1207 #[test]
1208 fn test_remote_host_pressure_defers_low_memory() {
1209 let decision = RemoteHostPressureSnapshot::from_command_output(
1210 "CPUS=64\nLOAD1=12.00\nMEM_AVAILABLE_KIB=131072\n",
1211 )
1212 .decide();
1213
1214 assert!(decision.defer_index);
1215 assert!(
1216 decision.reason.contains("mem_available_kib"),
1217 "{decision:?}"
1218 );
1219 }
1220
1221 #[test]
1222 fn test_remote_artifact_manifest_result_parses_command_output() {
1223 let result = RemoteArtifactManifestResult::from_command_output(
1224 r#"{
1225 "status": "ok",
1226 "manifest_path": "/home/user/.local/share/cass/index/v1/evidence-bundle-manifest.json",
1227 "bundle_id": "cass-lexical-abc",
1228 "chunk_count": 3,
1229 "expected_bytes": 42,
1230 "verification_status": "complete"
1231 }"#,
1232 );
1233
1234 assert!(result.success);
1235 assert_eq!(result.bundle_id.as_deref(), Some("cass-lexical-abc"));
1236 assert_eq!(result.chunk_count, Some(3));
1237 assert_eq!(result.expected_bytes, Some(42));
1238 assert_eq!(result.error, None);
1239 }
1240
1241 #[test]
1242 fn test_remote_artifact_manifest_result_parses_json_with_ssh_noise() {
1243 let result = RemoteArtifactManifestResult::from_command_output(
1244 r#"
1245Welcome to remote host {}
1246MOTD: maintenance starts at 02:00
1247{
1248 "status": "ok",
1249 "manifest_path": "/home/user/.local/share/cass/index/v1/evidence-bundle-manifest.json",
1250 "bundle_id": "cass-lexical-noisy",
1251 "chunk_count": 2,
1252 "expected_bytes": 99,
1253 "verification_status": "complete"
1254}
1255"#,
1256 );
1257
1258 assert!(result.success);
1259 assert_eq!(result.bundle_id.as_deref(), Some("cass-lexical-noisy"));
1260 assert_eq!(result.chunk_count, Some(2));
1261 assert_eq!(result.expected_bytes, Some(99));
1262 assert_eq!(result.error, None);
1263 }
1264
1265 #[test]
1266 fn test_remote_artifact_manifest_result_skips_partial_noise_json() {
1267 let result = RemoteArtifactManifestResult::from_command_output(
1268 r#"
1269Welcome to remote host
1270{"verification_status": "complete"}
1271{
1272 "status": "ok",
1273 "manifest_path": "/home/user/.local/share/cass/index/v1/evidence-bundle-manifest.json",
1274 "bundle_id": "cass-lexical-real",
1275 "chunk_count": 4,
1276 "expected_bytes": 123,
1277 "verification_status": "complete"
1278}
1279"#,
1280 );
1281
1282 assert!(result.success);
1283 assert_eq!(result.bundle_id.as_deref(), Some("cass-lexical-real"));
1284 assert_eq!(result.chunk_count, Some(4));
1285 assert_eq!(result.expected_bytes, Some(123));
1286 assert_eq!(result.error, None);
1287 }
1288}