Skip to main content

coding_agent_search/sources/
index.rs

1//! Remote cass indexing via SSH.
2//!
3//! This module provides functionality to trigger `cass index` on remote machines
4//! after installation, ensuring session data is ready to sync.
5//!
6//! # Why This Matters
7//!
8//! Syncing works by pulling from the remote's indexed data. If the remote has
9//! never run `cass index`, there's nothing meaningful to sync. This module
10//! ensures remotes are indexed before attempting sync.
11//!
12//! # Example
13//!
14//! ```rust,ignore
15//! use coding_agent_search::sources::index::{RemoteIndexer, IndexProgress};
16//! use coding_agent_search::sources::probe::HostProbeResult;
17//!
18//! // Check if indexing is needed
19//! if RemoteIndexer::needs_indexing(&probe_result) {
20//!     let indexer = RemoteIndexer::new("laptop", 600);
21//!
22//!     indexer.run_index(|progress| {
23//!         println!("{}: {}", progress.stage, progress.message);
24//!     })?;
25//! }
26//! ```
27
28use std::io::Write as IoWrite;
29use std::process::{Child, Command, Output, Stdio};
30use std::time::{Duration, Instant};
31
32use serde::{Deserialize, Serialize};
33use thiserror::Error;
34
35use super::{
36    host_key_verification_error, is_host_key_verification_failure,
37    probe::{CassStatus, HostProbeResult},
38    strict_ssh_cli_tokens, wait_for_child_output_with_timeout,
39};
40
41// =============================================================================
42// Constants
43// =============================================================================
44
45/// Default SSH connection timeout for index commands.
46pub const DEFAULT_INDEX_TIMEOUT_SECS: u64 = 600; // 10 minutes
47
48/// Poll interval when waiting for long-running index.
49pub const INDEX_POLL_INTERVAL_SECS: u64 = 5;
50
51/// Maximum wait time for indexing (30 minutes for large histories).
52pub const MAX_INDEX_WAIT_SECS: u64 = 1800;
53
54/// Remote load-per-core ceiling before offloaded indexing defers.
55const REMOTE_INDEX_MAX_LOAD_PER_CPU: f64 = 1.50;
56
57/// Minimum remote MemAvailable before offloaded indexing defers (512 MiB).
58const REMOTE_INDEX_MIN_AVAILABLE_MEM_KIB: u64 = 512 * 1024;
59
60// =============================================================================
61// Error Types
62// =============================================================================
63
64/// Errors that can occur during remote indexing.
65#[derive(Error, Debug)]
66pub enum IndexError {
67    #[error("SSH connection failed: {0}")]
68    SshFailed(String),
69
70    #[error("Index operation timed out after {0} seconds")]
71    Timeout(u64),
72
73    #[error("cass not found on remote host")]
74    CassNotFound,
75
76    #[error("Indexing failed: {stdout}\n{stderr}")]
77    IndexFailed {
78        stdout: String,
79        stderr: String,
80        exit_code: i32,
81    },
82
83    #[error("Disk full on remote host")]
84    DiskFull,
85
86    #[error("Permission denied accessing agent data directories")]
87    PermissionDenied,
88
89    #[error("Remote host pressure guard deferred indexing: {0}")]
90    HostPressure(String),
91
92    #[error("Indexing cancelled")]
93    Cancelled,
94
95    #[error("IO error: {0}")]
96    Io(#[from] std::io::Error),
97}
98
99impl IndexError {
100    /// Get a user-friendly help message for this error.
101    pub fn help_message(&self) -> &'static str {
102        match self {
103            IndexError::DiskFull => "Free disk space on remote and retry.",
104            IndexError::Timeout(_) => {
105                "Index timed out. Try running manually: ssh host 'cass index'"
106            }
107            IndexError::PermissionDenied => "Check file permissions in agent data directories.",
108            IndexError::CassNotFound => "cass is not installed. Run installation first.",
109            IndexError::SshFailed(_) => "Check SSH connection and credentials.",
110            IndexError::HostPressure(_) => {
111                "Remote host is currently busy. Retry later or run indexing manually when idle."
112            }
113            _ => "See error details above.",
114        }
115    }
116}
117
118#[derive(Debug, Clone, PartialEq)]
119struct RemoteHostPressureSnapshot {
120    cpus: Option<u64>,
121    load1: Option<f64>,
122    mem_available_kib: Option<u64>,
123}
124
125#[derive(Debug, Clone, PartialEq)]
126struct RemoteHostPressureDecision {
127    defer_index: bool,
128    reason: String,
129    snapshot: RemoteHostPressureSnapshot,
130}
131
132impl RemoteHostPressureSnapshot {
133    fn from_command_output(output: &str) -> Self {
134        let mut snapshot = Self {
135            cpus: None,
136            load1: None,
137            mem_available_kib: None,
138        };
139
140        for line in output.lines() {
141            let Some((key, value)) = line.split_once('=') else {
142                continue;
143            };
144            match key.trim() {
145                "CPUS" => snapshot.cpus = value.trim().parse::<u64>().ok().filter(|v| *v > 0),
146                "LOAD1" => {
147                    snapshot.load1 = value.trim().parse::<f64>().ok().filter(|v| v.is_finite())
148                }
149                "MEM_AVAILABLE_KIB" => {
150                    snapshot.mem_available_kib = value.trim().parse::<u64>().ok()
151                }
152                _ => {}
153            }
154        }
155
156        snapshot
157    }
158
159    fn decide(self) -> RemoteHostPressureDecision {
160        let mut reasons = Vec::new();
161
162        if let (Some(load1), Some(cpus)) = (self.load1, self.cpus) {
163            let load_per_cpu = load1 / cpus as f64;
164            if load_per_cpu > REMOTE_INDEX_MAX_LOAD_PER_CPU {
165                reasons.push(format!(
166                    "load_per_cpu={load_per_cpu:.2} exceeds ceiling {REMOTE_INDEX_MAX_LOAD_PER_CPU:.2}"
167                ));
168            }
169        }
170
171        if let Some(mem_available_kib) = self.mem_available_kib
172            && mem_available_kib < REMOTE_INDEX_MIN_AVAILABLE_MEM_KIB
173        {
174            reasons.push(format!(
175                "mem_available_kib={mem_available_kib} below floor {REMOTE_INDEX_MIN_AVAILABLE_MEM_KIB}"
176            ));
177        }
178
179        let defer_index = !reasons.is_empty();
180        let reason = if defer_index {
181            reasons.join("; ")
182        } else if self.cpus.is_none() || self.load1.is_none() || self.mem_available_kib.is_none() {
183            "remote pressure metrics incomplete; allowing conservative fallback path".to_string()
184        } else {
185            "remote host pressure is within indexing budget".to_string()
186        };
187
188        RemoteHostPressureDecision {
189            defer_index,
190            reason,
191            snapshot: self,
192        }
193    }
194}
195
196// =============================================================================
197// Progress Types
198// =============================================================================
199
200/// Current stage of indexing.
201#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
202#[serde(tag = "stage", rename_all = "snake_case")]
203pub enum IndexStage {
204    /// Starting the index process.
205    Starting,
206    /// Scanning agent directories for sessions.
207    Scanning { agent: String },
208    /// Building the search index.
209    Building,
210    /// Index complete.
211    Complete,
212    /// Index failed.
213    Failed { error: String },
214}
215
216impl std::fmt::Display for IndexStage {
217    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218        match self {
219            IndexStage::Starting => write!(f, "Starting"),
220            IndexStage::Scanning { agent } => write!(f, "Scanning {}", agent),
221            IndexStage::Building => write!(f, "Building index"),
222            IndexStage::Complete => write!(f, "Complete"),
223            IndexStage::Failed { error } => write!(f, "Failed: {}", error),
224        }
225    }
226}
227
228/// Progress update during indexing.
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct IndexProgress {
231    /// Current stage.
232    pub stage: IndexStage,
233    /// Human-readable message.
234    pub message: String,
235    /// Number of sessions found during scanning.
236    pub sessions_found: u64,
237    /// Number of sessions indexed so far.
238    pub sessions_indexed: u64,
239    /// Optional progress percentage (0-100).
240    pub percent: Option<u8>,
241    /// Elapsed time since start.
242    pub elapsed: Duration,
243}
244
245/// Result of a successful indexing operation.
246#[derive(Debug, Clone, Serialize, Deserialize)]
247pub struct IndexResult {
248    /// Whether indexing completed successfully.
249    pub success: bool,
250    /// Total sessions indexed.
251    pub sessions_indexed: u64,
252    /// Total indexing time.
253    pub duration: Duration,
254    /// Error message if failed.
255    pub error: Option<String>,
256    /// Remote lexical artifact proof written after a successful index run.
257    #[serde(default, skip_serializing_if = "Option::is_none")]
258    pub artifact_manifest: Option<RemoteArtifactManifestResult>,
259}
260
261/// Result of writing a remote lexical artifact evidence manifest.
262#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
263pub struct RemoteArtifactManifestResult {
264    /// Whether the proof command completed and produced a complete manifest.
265    pub success: bool,
266    /// Path to evidence-bundle-manifest.json on the remote host.
267    pub manifest_path: Option<String>,
268    /// Deterministic content-addressed bundle id.
269    pub bundle_id: Option<String>,
270    /// Number of files described by the manifest.
271    pub chunk_count: Option<usize>,
272    /// Total bytes expected by the evidence report.
273    pub expected_bytes: Option<u64>,
274    /// Verification status reported by the remote command.
275    pub verification_status: Option<String>,
276    /// Error message when the proof command failed.
277    pub error: Option<String>,
278}
279
280#[derive(Debug, Deserialize)]
281struct RemoteArtifactManifestCommandOutput {
282    manifest_path: Option<String>,
283    bundle_id: Option<String>,
284    chunk_count: Option<usize>,
285    expected_bytes: Option<u64>,
286    verification_status: Option<String>,
287}
288
289impl RemoteArtifactManifestCommandOutput {
290    fn has_manifest_identity(&self) -> bool {
291        self.manifest_path.is_some() || self.bundle_id.is_some()
292    }
293
294    fn has_complete_manifest_shape(&self) -> bool {
295        self.manifest_path.is_some()
296            && self.bundle_id.is_some()
297            && self.verification_status.is_some()
298    }
299}
300
301impl RemoteArtifactManifestResult {
302    fn from_command_output(output: &str) -> Self {
303        match parse_remote_artifact_manifest_output(output) {
304            Ok(parsed) => {
305                let complete = parsed.verification_status.as_deref() == Some("complete");
306                Self {
307                    success: complete,
308                    manifest_path: parsed.manifest_path,
309                    bundle_id: parsed.bundle_id,
310                    chunk_count: parsed.chunk_count,
311                    expected_bytes: parsed.expected_bytes,
312                    verification_status: parsed.verification_status,
313                    error: if complete {
314                        None
315                    } else {
316                        Some("remote artifact manifest verification was not complete".to_string())
317                    },
318                }
319            }
320            Err(err) => Self {
321                success: false,
322                manifest_path: None,
323                bundle_id: None,
324                chunk_count: None,
325                expected_bytes: None,
326                verification_status: None,
327                error: Some(format!(
328                    "failed to parse remote artifact manifest output: {err}"
329                )),
330            },
331        }
332    }
333
334    fn from_error(error: impl Into<String>) -> Self {
335        Self {
336            success: false,
337            manifest_path: None,
338            bundle_id: None,
339            chunk_count: None,
340            expected_bytes: None,
341            verification_status: None,
342            error: Some(error.into()),
343        }
344    }
345}
346
347fn parse_remote_artifact_manifest_output(
348    output: &str,
349) -> serde_json::Result<RemoteArtifactManifestCommandOutput> {
350    let direct = serde_json::from_str::<RemoteArtifactManifestCommandOutput>(output);
351    if direct.is_ok() {
352        return direct;
353    }
354
355    let mut fallback = None;
356    for (idx, _) in output.char_indices().filter(|(_, ch)| *ch == '{') {
357        let mut deserializer = serde_json::Deserializer::from_str(&output[idx..]);
358        if let Ok(parsed) = RemoteArtifactManifestCommandOutput::deserialize(&mut deserializer) {
359            if parsed.has_complete_manifest_shape() {
360                return Ok(parsed);
361            }
362            if fallback.is_none() && parsed.has_manifest_identity() {
363                fallback = Some(parsed);
364            }
365        }
366    }
367
368    fallback.map_or(direct, Ok)
369}
370
371// =============================================================================
372// RemoteIndexer
373// =============================================================================
374
375fn effective_ssh_command_timeout(requested: Duration, configured_secs: u64) -> Duration {
376    let configured = if configured_secs == 0 {
377        requested
378    } else {
379        Duration::from_secs(configured_secs)
380    };
381    let effective = requested.min(configured);
382    if effective.is_zero() {
383        Duration::from_secs(1)
384    } else {
385        effective
386    }
387}
388
389fn wait_for_command_output_with_timeout(
390    child: Child,
391    timeout: Duration,
392) -> Result<Output, IndexError> {
393    let timeout_secs = timeout.as_secs().max(1);
394    wait_for_child_output_with_timeout(child, timeout)?.ok_or(IndexError::Timeout(timeout_secs))
395}
396
397#[derive(Debug, Clone, Copy, PartialEq, Eq)]
398enum RemoteCassPresence {
399    Found,
400    NotFound,
401    Unknown,
402}
403
404fn parse_remote_cass_presence(output: &str) -> RemoteCassPresence {
405    let mut found = false;
406    let mut not_found = false;
407
408    for line in output.lines().map(str::trim) {
409        match line {
410            "CASS_FOUND" => found = true,
411            "CASS_NOT_FOUND" => not_found = true,
412            _ => {}
413        }
414    }
415
416    match (found, not_found) {
417        (true, false) => RemoteCassPresence::Found,
418        (false, true) => RemoteCassPresence::NotFound,
419        _ => RemoteCassPresence::Unknown,
420    }
421}
422
423fn summarize_remote_output(output: &str) -> String {
424    const MAX_REMOTE_OUTPUT_ERROR_CHARS: usize = 512;
425    let summary: String = output
426        .chars()
427        .take(MAX_REMOTE_OUTPUT_ERROR_CHARS)
428        .collect::<String>()
429        .trim()
430        .to_string();
431    if output.chars().count() > MAX_REMOTE_OUTPUT_ERROR_CHARS {
432        format!("{summary}...")
433    } else {
434        summary
435    }
436}
437
438fn poll_status(output: &str) -> Option<&str> {
439    output
440        .lines()
441        .filter_map(|line| line.trim().strip_prefix("STATUS="))
442        .next_back()
443}
444
445/// Indexer for triggering cass index on remote machines.
446pub struct RemoteIndexer {
447    /// SSH host alias.
448    host: String,
449    /// SSH timeout in seconds.
450    ssh_timeout: u64,
451}
452
453impl RemoteIndexer {
454    /// Create a new indexer for a remote host.
455    pub fn new(host: impl Into<String>, ssh_timeout: u64) -> Self {
456        Self {
457            host: host.into(),
458            ssh_timeout,
459        }
460    }
461
462    /// Create an indexer with default timeout.
463    pub fn with_defaults(host: impl Into<String>) -> Self {
464        Self::new(host, DEFAULT_INDEX_TIMEOUT_SECS)
465    }
466
467    /// Get the host name.
468    pub fn host(&self) -> &str {
469        &self.host
470    }
471
472    /// Check if indexing is needed based on probe result.
473    ///
474    /// Returns true if the remote should be indexed:
475    /// - cass installed but never indexed
476    /// - Index exists but has zero sessions
477    ///
478    /// Returns false if:
479    /// - cass not found (can't index without cass)
480    /// - Already has indexed sessions
481    pub fn needs_indexing(probe: &HostProbeResult) -> bool {
482        match &probe.cass_status {
483            // Not found - can't index without cass installed
484            CassStatus::NotFound => false,
485            // Explicitly not indexed - needs indexing
486            CassStatus::InstalledNotIndexed { .. } => true,
487            // Indexed but empty - try indexing again
488            CassStatus::Indexed { session_count, .. } => *session_count == 0,
489            // Unknown status - assume we should try
490            CassStatus::Unknown => true,
491        }
492    }
493
494    /// Run indexing on the remote host.
495    ///
496    /// Streams progress updates via the callback as indexing proceeds.
497    /// For hosts with large session histories (100k+), uses background
498    /// execution with polling to avoid SSH timeout.
499    pub fn run_index<F>(&self, on_progress: F) -> Result<IndexResult, IndexError>
500    where
501        F: Fn(IndexProgress) + Send + Sync,
502    {
503        let start = Instant::now();
504
505        on_progress(IndexProgress {
506            stage: IndexStage::Starting,
507            message: format!("Starting index on {}...", self.host),
508            sessions_found: 0,
509            sessions_indexed: 0,
510            percent: Some(0),
511            elapsed: start.elapsed(),
512        });
513
514        // First check if cass is available
515        self.verify_cass_installed()?;
516        self.verify_remote_host_pressure()?;
517
518        // Run indexing in background with log file for progress tracking
519        let mut result = self.run_index_with_polling(&on_progress, start)?;
520        if result.success {
521            result.artifact_manifest = Some(self.write_remote_artifact_manifest());
522        }
523
524        // Report final result
525        if result.success {
526            on_progress(IndexProgress {
527                stage: IndexStage::Complete,
528                message: format!(
529                    "Indexed {} sessions on {} ({:.1}s)",
530                    result.sessions_indexed,
531                    self.host,
532                    result.duration.as_secs_f64()
533                ),
534                sessions_found: result.sessions_indexed,
535                sessions_indexed: result.sessions_indexed,
536                percent: Some(100),
537                elapsed: start.elapsed(),
538            });
539        } else {
540            on_progress(IndexProgress {
541                stage: IndexStage::Failed {
542                    error: result.error.clone().unwrap_or_default(),
543                },
544                message: result
545                    .error
546                    .clone()
547                    .unwrap_or_else(|| "Unknown error".into()),
548                sessions_found: 0,
549                sessions_indexed: 0,
550                percent: None,
551                elapsed: start.elapsed(),
552            });
553        }
554
555        Ok(result)
556    }
557
558    /// Verify cass is installed on the remote.
559    fn verify_cass_installed(&self) -> Result<(), IndexError> {
560        let script = r#"
561source ~/.cargo/env 2>/dev/null || true
562export PATH="$HOME/.local/bin:$HOME/.cargo/bin:$PATH"
563command -v cass >/dev/null 2>&1 && echo "CASS_FOUND" || echo "CASS_NOT_FOUND"
564"#;
565
566        let output = self.run_ssh_command(script, Duration::from_secs(30))?;
567
568        match parse_remote_cass_presence(&output) {
569            RemoteCassPresence::Found => Ok(()),
570            RemoteCassPresence::NotFound => Err(IndexError::CassNotFound),
571            RemoteCassPresence::Unknown => Err(IndexError::SshFailed(format!(
572                "Unexpected cass availability probe output: {}",
573                summarize_remote_output(&output)
574            ))),
575        }
576    }
577
578    fn host_pressure_script() -> &'static str {
579        r#"
580CPUS=$(getconf _NPROCESSORS_ONLN 2>/dev/null || nproc 2>/dev/null || echo "")
581LOAD1=$(awk '{print $1}' /proc/loadavg 2>/dev/null || echo "")
582MEM_AVAILABLE_KIB=$(awk '/MemAvailable:/ {print $2}' /proc/meminfo 2>/dev/null || echo "")
583printf 'CPUS=%s\n' "$CPUS"
584printf 'LOAD1=%s\n' "$LOAD1"
585printf 'MEM_AVAILABLE_KIB=%s\n' "$MEM_AVAILABLE_KIB"
586"#
587    }
588
589    fn verify_remote_host_pressure(&self) -> Result<(), IndexError> {
590        let output = self.run_ssh_command(Self::host_pressure_script(), Duration::from_secs(15))?;
591        let decision = RemoteHostPressureSnapshot::from_command_output(&output).decide();
592        if decision.defer_index {
593            Err(IndexError::HostPressure(decision.reason))
594        } else {
595            Ok(())
596        }
597    }
598
599    fn artifact_manifest_script() -> &'static str {
600        r#"
601source ~/.cargo/env 2>/dev/null || true
602export PATH="$HOME/.local/bin:$HOME/.cargo/bin:$PATH"
603cass sources artifact-manifest --write --json
604"#
605    }
606
607    fn write_remote_artifact_manifest(&self) -> RemoteArtifactManifestResult {
608        match self.run_ssh_command(Self::artifact_manifest_script(), Duration::from_secs(60)) {
609            Ok(output) => RemoteArtifactManifestResult::from_command_output(&output),
610            Err(err) => RemoteArtifactManifestResult::from_error(err.to_string()),
611        }
612    }
613
614    /// Run indexing with background execution and polling.
615    ///
616    /// This approach prevents SSH timeout for large indexes:
617    /// 1. Start `cass index` in background with nohup, logging to file
618    /// 2. Poll log file for progress and completion
619    fn run_index_with_polling<F>(
620        &self,
621        on_progress: &F,
622        start: Instant,
623    ) -> Result<IndexResult, IndexError>
624    where
625        F: Fn(IndexProgress),
626    {
627        // Start indexing in background
628        let start_script = r#"
629source ~/.cargo/env 2>/dev/null || true
630export PATH="$HOME/.local/bin:$HOME/.cargo/bin:$PATH"
631
632LOG_FILE=~/.cass_index.log
633rm -f "$LOG_FILE"
634
635nohup bash -c '
636set -o pipefail
637source "$HOME/.cargo/env" 2>/dev/null || true
638export PATH="$HOME/.local/bin:$HOME/.cargo/bin:$PATH"
639cass index --progress 2>&1 | tee "$HOME/.cass_index.log"
640STATUS=${PIPESTATUS[0]}
641if [ "$STATUS" -eq 0 ]; then
642    echo "===INDEX_COMPLETE===" >> "$HOME/.cass_index.log"
643else
644    echo "===INDEX_FAILED:${STATUS}===" >> "$HOME/.cass_index.log"
645fi
646' > /dev/null 2>&1 &
647
648echo "INDEX_PID=$!"
649"#;
650
651        let output = self.run_ssh_command(start_script, Duration::from_secs(30))?;
652
653        // Extract PID (for potential future use)
654        let _pid = output
655            .lines()
656            .find(|l| l.starts_with("INDEX_PID="))
657            .and_then(|l| l.strip_prefix("INDEX_PID="))
658            .and_then(|p| p.trim().parse::<u32>().ok());
659
660        // Poll for progress and completion
661        self.poll_index_progress(on_progress, start)
662    }
663
664    /// Poll the remote log file for indexing progress.
665    fn poll_index_progress<F>(
666        &self,
667        on_progress: &F,
668        start: Instant,
669    ) -> Result<IndexResult, IndexError>
670    where
671        F: Fn(IndexProgress),
672    {
673        let poll_script = r#"
674LOG_FILE=~/.cass_index.log
675if [ -f "$LOG_FILE" ]; then
676    if grep -q "===INDEX_FAILED:" "$LOG_FILE"; then
677        echo "STATUS=ERROR"
678        tail -30 "$LOG_FILE"
679    elif grep -q "===INDEX_COMPLETE===" "$LOG_FILE"; then
680        echo "STATUS=COMPLETE"
681        # Get session count from health
682        source ~/.cargo/env 2>/dev/null || true
683        export PATH="$HOME/.local/bin:$HOME/.cargo/bin:$PATH"
684        STATS=$(cass stats --json 2>/dev/null || echo '{}')
685        SESSIONS=$(echo "$STATS" | tr -d '\n' | sed -n 's/.*"conversations"[[:space:]]*:[[:space:]]*\\([0-9][0-9]*\\).*/\\1/p')
686        echo "SESSIONS=${SESSIONS:-0}"
687    elif grep -qi "error" "$LOG_FILE" && ! grep -q "===INDEX_COMPLETE===" "$LOG_FILE"; then
688        # Check if it's a real error or just log noise
689        if grep -qE "(FATAL|panicked|No such file|Permission denied|disk full)" "$LOG_FILE"; then
690            echo "STATUS=ERROR"
691            tail -30 "$LOG_FILE"
692        else
693            echo "STATUS=RUNNING"
694            tail -10 "$LOG_FILE" | grep -E "(Scanning|Building|Indexed|Processing)" | tail -3
695        fi
696    else
697        echo "STATUS=RUNNING"
698        tail -10 "$LOG_FILE" | grep -E "(Scanning|Building|Indexed|Processing)" | tail -3
699    fi
700else
701    echo "STATUS=NOT_STARTED"
702fi
703"#;
704
705        let max_wait = Duration::from_secs(MAX_INDEX_WAIT_SECS);
706        let poll_interval = Duration::from_secs(INDEX_POLL_INTERVAL_SECS);
707        let mut sessions_found: u64 = 0;
708        let mut last_agent = String::new();
709        let mut progress_pct: u8 = 5;
710
711        loop {
712            if start.elapsed() > max_wait {
713                return Err(IndexError::Timeout(max_wait.as_secs()));
714            }
715
716            std::thread::sleep(poll_interval);
717
718            let output = self.run_ssh_command(poll_script, Duration::from_secs(30))?;
719            // Track if we've seen Building this poll cycle (avoid multiple increments per poll)
720            let mut saw_building_this_poll = false;
721
722            if poll_status(&output) == Some("COMPLETE") {
723                // Extract session count
724                let sessions = output
725                    .lines()
726                    .find(|l| l.starts_with("SESSIONS="))
727                    .and_then(|l| l.strip_prefix("SESSIONS="))
728                    .and_then(|s| s.trim().parse::<u64>().ok())
729                    .unwrap_or(0);
730
731                return Ok(IndexResult {
732                    success: true,
733                    sessions_indexed: sessions,
734                    duration: start.elapsed(),
735                    error: None,
736                    artifact_manifest: None,
737                });
738            }
739
740            if poll_status(&output) == Some("ERROR") {
741                let error_lines: Vec<&str> = output
742                    .lines()
743                    .filter(|l| !l.trim_start().starts_with("STATUS="))
744                    .collect();
745                let error_msg = error_lines.join("\n");
746
747                // Detect specific errors
748                if error_msg.contains("disk full") || error_msg.contains("No space left") {
749                    return Err(IndexError::DiskFull);
750                }
751                if error_msg.contains("Permission denied") {
752                    return Err(IndexError::PermissionDenied);
753                }
754
755                return Ok(IndexResult {
756                    success: false,
757                    sessions_indexed: 0,
758                    duration: start.elapsed(),
759                    error: Some(error_msg),
760                    artifact_manifest: None,
761                });
762            }
763
764            // Parse progress from output
765            for line in output.lines() {
766                // Look for scanning progress
767                if line.contains("Scanning")
768                    && let Some(agent) = extract_agent_from_line(line)
769                    && agent != last_agent
770                {
771                    progress_pct = (progress_pct + 5).min(40);
772                    on_progress(IndexProgress {
773                        stage: IndexStage::Scanning {
774                            agent: agent.clone(),
775                        },
776                        message: format!("Scanning {}...", agent),
777                        sessions_found,
778                        sessions_indexed: 0,
779                        percent: Some(progress_pct),
780                        elapsed: start.elapsed(),
781                    });
782                    last_agent = agent;
783                }
784
785                // Look for session count updates
786                if let Some(count) = extract_session_count(line) {
787                    sessions_found = count;
788                }
789
790                // Look for building phase (only report once per poll to avoid racing progress)
791                if !saw_building_this_poll
792                    && (line.contains("Building") || line.contains("Indexing"))
793                {
794                    saw_building_this_poll = true;
795                    progress_pct = (progress_pct + 5).min(85);
796                    on_progress(IndexProgress {
797                        stage: IndexStage::Building,
798                        message: "Building search index...".into(),
799                        sessions_found,
800                        sessions_indexed: 0,
801                        percent: Some(progress_pct),
802                        elapsed: start.elapsed(),
803                    });
804                }
805            }
806        }
807    }
808
809    /// Run an SSH command on the remote host.
810    fn run_ssh_command(&self, script: &str, timeout: Duration) -> Result<String, IndexError> {
811        let command_timeout = effective_ssh_command_timeout(timeout, self.ssh_timeout);
812        let connect_timeout_secs = command_timeout.as_secs().clamp(1, 30);
813
814        let mut cmd = Command::new("ssh");
815        cmd.args(strict_ssh_cli_tokens(connect_timeout_secs))
816            .arg("-o")
817            .arg("LogLevel=ERROR")
818            .arg("--")
819            .arg(&self.host)
820            .arg("bash")
821            .arg("-s");
822
823        cmd.stdin(Stdio::piped())
824            .stdout(Stdio::piped())
825            .stderr(Stdio::piped());
826
827        let mut child = cmd.spawn()?;
828
829        let write_error = if let Some(mut stdin) = child.stdin.take() {
830            stdin.write_all(script.as_bytes()).err()
831        } else {
832            None
833        };
834
835        let output = wait_for_command_output_with_timeout(child, command_timeout)?;
836
837        if !output.status.success() {
838            let stderr = String::from_utf8_lossy(&output.stderr);
839            if is_host_key_verification_failure(&stderr) {
840                return Err(IndexError::SshFailed(host_key_verification_error(
841                    &self.host,
842                )));
843            }
844            if stderr.contains("Connection refused")
845                || stderr.contains("Connection timed out")
846                || stderr.contains("Permission denied")
847            {
848                return Err(IndexError::SshFailed(stderr.trim().to_string()));
849            }
850            // Fail fast on any other non-zero exit — surface the exit code and
851            // stderr so operators can diagnose the root cause immediately.
852            let code = output.status.code().unwrap_or(-1);
853            return Err(IndexError::SshFailed(format!(
854                "Remote script exited with code {code}: {}",
855                stderr.trim()
856            )));
857        }
858        if let Some(err) = write_error {
859            return Err(IndexError::Io(err));
860        }
861
862        Ok(String::from_utf8_lossy(&output.stdout).to_string())
863    }
864}
865
866/// Extract agent name from a scanning log line.
867fn extract_agent_from_line(line: &str) -> Option<String> {
868    // Match patterns like "Scanning ~/.claude/projects" or "Scanning claude_code"
869    if let Some(idx) = line.find("Scanning") {
870        let rest = &line[idx + 8..].trim();
871        // Extract first word or path segment, stripping leading dots from hidden dirs
872        let agent = rest
873            .split(|c: char| c.is_whitespace() || c == '/')
874            .filter(|s| !s.is_empty() && *s != "~" && *s != ".")
875            .map(|s| s.trim_start_matches('.'))
876            .find(|s| !s.is_empty())?;
877
878        // Map path components to agent names
879        let agent_name = match agent {
880            "claude" => "claude_code",
881            "codex" => "codex",
882            "cursor" => "cursor",
883            "gemini" => "gemini",
884            "aider" => "aider",
885            "goose" => "goose",
886            "continue" => "continue",
887            _ => agent,
888        };
889
890        return Some(agent_name.to_string());
891    }
892    None
893}
894
895/// Extract session count from a log line.
896fn extract_session_count(line: &str) -> Option<u64> {
897    // Match patterns like "found 234 sessions" or "Indexed 291 sessions"
898    // Avoid picking unrelated numbers (timestamps, IDs) by anchoring near
899    // session/conversation keywords.
900    let lower = line.to_lowercase();
901    let tokens: Vec<&str> = lower.split_whitespace().collect();
902
903    for (idx, token) in tokens.iter().enumerate() {
904        let word = token.trim_matches(|c: char| !c.is_ascii_alphabetic());
905        if matches!(
906            word,
907            "session" | "sessions" | "conversation" | "conversations"
908        ) {
909            if idx > 0
910                && let Some(count) = parse_count(tokens[idx - 1])
911            {
912                return Some(count);
913            }
914            if idx + 1 < tokens.len()
915                && let Some(count) = parse_count(tokens[idx + 1])
916            {
917                return Some(count);
918            }
919        }
920    }
921
922    None
923}
924
925fn parse_count(token: &str) -> Option<u64> {
926    let trimmed = token.trim_matches(|c: char| !c.is_ascii_digit() && c != '/');
927    let candidate = trimmed.split('/').next().unwrap_or(trimmed);
928    let digits: String = candidate.chars().filter(|c| c.is_ascii_digit()).collect();
929    if digits.is_empty() {
930        None
931    } else {
932        digits.parse::<u64>().ok()
933    }
934}
935
936#[cfg(test)]
937mod tests {
938    use super::*;
939    use crate::sources::probe::HostProbeResult;
940    use std::path::PathBuf;
941
942    /// Load a probe fixture from the tests/fixtures/sources/probe directory.
943    fn load_probe_fixture(name: &str) -> HostProbeResult {
944        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
945            .join("tests/fixtures/sources/probe")
946            .join(format!("{}.json", name));
947        let content = std::fs::read_to_string(&path)
948            .unwrap_or_else(|e| panic!("Failed to read fixture {}: {}", path.display(), e));
949        serde_json::from_str(&content)
950            .unwrap_or_else(|e| panic!("Failed to parse fixture {}: {}", path.display(), e))
951    }
952
953    #[test]
954    fn test_no_indexing_when_not_found() {
955        // Can't index if cass isn't installed
956        let probe = load_probe_fixture("no_cass_host");
957        assert!(!RemoteIndexer::needs_indexing(&probe));
958    }
959
960    #[test]
961    fn test_needs_indexing_when_not_indexed() {
962        let probe = load_probe_fixture("not_indexed_host");
963        assert!(RemoteIndexer::needs_indexing(&probe));
964    }
965
966    #[test]
967    fn test_needs_indexing_when_empty_index() {
968        let probe = load_probe_fixture("empty_index_host");
969        assert!(RemoteIndexer::needs_indexing(&probe));
970    }
971
972    #[test]
973    fn test_no_indexing_needed_when_has_sessions() {
974        let probe = load_probe_fixture("indexed_host");
975        assert!(!RemoteIndexer::needs_indexing(&probe));
976    }
977
978    #[test]
979    fn test_needs_indexing_when_unknown() {
980        let probe = load_probe_fixture("unknown_status_host");
981        assert!(RemoteIndexer::needs_indexing(&probe));
982    }
983
984    #[test]
985    fn test_extract_agent_from_line() {
986        assert_eq!(
987            extract_agent_from_line("Scanning ~/.claude/projects..."),
988            Some("claude_code".into())
989        );
990        assert_eq!(
991            extract_agent_from_line("Scanning ~/.codex/sessions..."),
992            Some("codex".into())
993        );
994        assert_eq!(
995            extract_agent_from_line("Scanning cursor data..."),
996            Some("cursor".into())
997        );
998        assert_eq!(extract_agent_from_line("Some other line"), None);
999    }
1000
1001    #[test]
1002    fn test_extract_session_count() {
1003        assert_eq!(extract_session_count("found 234 sessions"), Some(234));
1004        assert_eq!(extract_session_count("Indexed 291 sessions"), Some(291));
1005        assert_eq!(
1006            extract_session_count("Processing 42 conversations"),
1007            Some(42)
1008        );
1009        assert_eq!(
1010            extract_session_count("2026-01-11 12:00:00 Indexed 291 sessions"),
1011            Some(291)
1012        );
1013        assert_eq!(extract_session_count("Indexed 5/10 conversations"), Some(5));
1014        assert_eq!(extract_session_count("conversations: 17 total"), Some(17));
1015        assert_eq!(extract_session_count("Some other line"), None);
1016    }
1017
1018    #[test]
1019    fn test_index_stage_display() {
1020        assert_eq!(IndexStage::Starting.to_string(), "Starting");
1021        assert_eq!(
1022            IndexStage::Scanning {
1023                agent: "claude_code".into()
1024            }
1025            .to_string(),
1026            "Scanning claude_code"
1027        );
1028        assert_eq!(IndexStage::Building.to_string(), "Building index");
1029        assert_eq!(IndexStage::Complete.to_string(), "Complete");
1030    }
1031
1032    #[test]
1033    fn test_index_error_help_messages() {
1034        assert!(IndexError::DiskFull.help_message().contains("Free disk"));
1035        assert!(IndexError::Timeout(600).help_message().contains("manually"));
1036        assert!(
1037            IndexError::PermissionDenied
1038                .help_message()
1039                .contains("permissions")
1040        );
1041        assert!(
1042            IndexError::CassNotFound
1043                .help_message()
1044                .contains("installed")
1045        );
1046        assert!(
1047            IndexError::HostPressure("load".into())
1048                .help_message()
1049                .contains("busy")
1050        );
1051    }
1052
1053    #[test]
1054    fn test_remote_indexer_new() {
1055        let indexer = RemoteIndexer::new("laptop", 300);
1056        assert_eq!(indexer.host(), "laptop");
1057
1058        let indexer2 = RemoteIndexer::with_defaults("server");
1059        assert_eq!(indexer2.host(), "server");
1060    }
1061
1062    #[test]
1063    fn test_effective_ssh_command_timeout_clamps_to_smaller_deadline() {
1064        assert_eq!(
1065            effective_ssh_command_timeout(Duration::from_secs(60), 10),
1066            Duration::from_secs(10)
1067        );
1068        assert_eq!(
1069            effective_ssh_command_timeout(Duration::from_secs(15), 60),
1070            Duration::from_secs(15)
1071        );
1072        assert_eq!(
1073            effective_ssh_command_timeout(Duration::from_secs(15), 0),
1074            Duration::from_secs(15)
1075        );
1076        assert_eq!(
1077            effective_ssh_command_timeout(Duration::ZERO, 0),
1078            Duration::from_secs(1)
1079        );
1080    }
1081
1082    #[test]
1083    fn test_parse_remote_cass_presence_requires_unambiguous_status_line() {
1084        assert_eq!(
1085            parse_remote_cass_presence("Welcome to host\nCASS_FOUND\n"),
1086            RemoteCassPresence::Found
1087        );
1088        assert_eq!(
1089            parse_remote_cass_presence("CASS_NOT_FOUND\n"),
1090            RemoteCassPresence::NotFound
1091        );
1092        assert_eq!(
1093            parse_remote_cass_presence("Welcome to host\n"),
1094            RemoteCassPresence::Unknown
1095        );
1096        assert_eq!(
1097            parse_remote_cass_presence("CASS_FOUND\nCASS_NOT_FOUND\n"),
1098            RemoteCassPresence::Unknown
1099        );
1100    }
1101
1102    #[test]
1103    fn test_poll_status_uses_exact_status_line() {
1104        assert_eq!(
1105            poll_status("banner mentions STATUS=ERROR in prose\nSTATUS=COMPLETE\nSESSIONS=7\n"),
1106            Some("COMPLETE")
1107        );
1108        assert_eq!(
1109            poll_status("STATUS=ERROR\nstartup banner\nSTATUS=COMPLETE\nSESSIONS=7\n"),
1110            Some("COMPLETE")
1111        );
1112        assert_eq!(poll_status("  STATUS=ERROR\npanic\n"), Some("ERROR"));
1113        assert_eq!(poll_status("no structured status"), None);
1114    }
1115
1116    #[cfg(unix)]
1117    #[test]
1118    fn test_wait_for_command_output_with_timeout_kills_stalled_child() {
1119        let child = Command::new("sh")
1120            .arg("-c")
1121            .arg("sleep 2")
1122            .stdout(Stdio::piped())
1123            .stderr(Stdio::piped())
1124            .spawn()
1125            .expect("spawn sleep helper");
1126
1127        let started = Instant::now();
1128        let err = wait_for_command_output_with_timeout(child, Duration::from_millis(50))
1129            .expect_err("stalled command should time out");
1130        assert!(matches!(err, IndexError::Timeout(1)));
1131        assert!(started.elapsed() < Duration::from_secs(1));
1132    }
1133
1134    #[cfg(unix)]
1135    #[test]
1136    fn test_wait_for_command_output_with_timeout_drains_large_output() {
1137        let child = Command::new("sh")
1138            .arg("-c")
1139            .arg("yes stdout | head -c 200000; yes stderr | head -c 200000 >&2")
1140            .stdout(Stdio::piped())
1141            .stderr(Stdio::piped())
1142            .spawn()
1143            .expect("spawn large-output helper");
1144
1145        let output = wait_for_command_output_with_timeout(child, Duration::from_secs(5))
1146            .expect("large-output command should finish without filling pipes");
1147        assert!(output.status.success());
1148        assert_eq!(output.stdout.len(), 200_000);
1149        assert_eq!(output.stderr.len(), 200_000);
1150    }
1151
1152    #[cfg(unix)]
1153    #[test]
1154    fn test_wait_for_command_output_with_timeout_bounds_inherited_pipe_waits() {
1155        let child = Command::new("sh")
1156            .arg("-c")
1157            .arg("(sleep 2) & printf parent-done")
1158            .stdout(Stdio::piped())
1159            .stderr(Stdio::piped())
1160            .spawn()
1161            .expect("spawn inherited-pipe helper");
1162
1163        let started = Instant::now();
1164        let err = wait_for_command_output_with_timeout(child, Duration::from_millis(100))
1165            .expect_err("inherited pipe should not outlive command deadline");
1166        assert!(matches!(err, IndexError::Timeout(1)));
1167        assert!(started.elapsed() < Duration::from_secs(1));
1168    }
1169
1170    #[test]
1171    fn test_artifact_manifest_script_uses_robot_safe_write_command() {
1172        let script = RemoteIndexer::artifact_manifest_script();
1173        assert!(script.contains("cass sources artifact-manifest --write --json"));
1174        assert!(!script.contains("cass sources artifact-manifest --write\n"));
1175    }
1176
1177    #[test]
1178    fn test_host_pressure_script_reads_cheap_linux_metrics() {
1179        let script = RemoteIndexer::host_pressure_script();
1180        assert!(script.contains("_NPROCESSORS_ONLN"));
1181        assert!(script.contains("/proc/loadavg"));
1182        assert!(script.contains("MemAvailable"));
1183    }
1184
1185    #[test]
1186    fn test_remote_host_pressure_allows_incomplete_metrics() {
1187        let decision = RemoteHostPressureSnapshot::from_command_output("CPUS=\nLOAD1=\n").decide();
1188
1189        assert!(!decision.defer_index);
1190        assert!(
1191            decision.reason.contains("metrics incomplete"),
1192            "{decision:?}"
1193        );
1194    }
1195
1196    #[test]
1197    fn test_remote_host_pressure_defers_high_load() {
1198        let decision = RemoteHostPressureSnapshot::from_command_output(
1199            "CPUS=4\nLOAD1=7.20\nMEM_AVAILABLE_KIB=1048576\n",
1200        )
1201        .decide();
1202
1203        assert!(decision.defer_index);
1204        assert!(decision.reason.contains("load_per_cpu"), "{decision:?}");
1205    }
1206
1207    #[test]
1208    fn test_remote_host_pressure_defers_low_memory() {
1209        let decision = RemoteHostPressureSnapshot::from_command_output(
1210            "CPUS=64\nLOAD1=12.00\nMEM_AVAILABLE_KIB=131072\n",
1211        )
1212        .decide();
1213
1214        assert!(decision.defer_index);
1215        assert!(
1216            decision.reason.contains("mem_available_kib"),
1217            "{decision:?}"
1218        );
1219    }
1220
1221    #[test]
1222    fn test_remote_artifact_manifest_result_parses_command_output() {
1223        let result = RemoteArtifactManifestResult::from_command_output(
1224            r#"{
1225              "status": "ok",
1226              "manifest_path": "/home/user/.local/share/cass/index/v1/evidence-bundle-manifest.json",
1227              "bundle_id": "cass-lexical-abc",
1228              "chunk_count": 3,
1229              "expected_bytes": 42,
1230              "verification_status": "complete"
1231            }"#,
1232        );
1233
1234        assert!(result.success);
1235        assert_eq!(result.bundle_id.as_deref(), Some("cass-lexical-abc"));
1236        assert_eq!(result.chunk_count, Some(3));
1237        assert_eq!(result.expected_bytes, Some(42));
1238        assert_eq!(result.error, None);
1239    }
1240
1241    #[test]
1242    fn test_remote_artifact_manifest_result_parses_json_with_ssh_noise() {
1243        let result = RemoteArtifactManifestResult::from_command_output(
1244            r#"
1245Welcome to remote host {}
1246MOTD: maintenance starts at 02:00
1247{
1248  "status": "ok",
1249  "manifest_path": "/home/user/.local/share/cass/index/v1/evidence-bundle-manifest.json",
1250  "bundle_id": "cass-lexical-noisy",
1251  "chunk_count": 2,
1252  "expected_bytes": 99,
1253  "verification_status": "complete"
1254}
1255"#,
1256        );
1257
1258        assert!(result.success);
1259        assert_eq!(result.bundle_id.as_deref(), Some("cass-lexical-noisy"));
1260        assert_eq!(result.chunk_count, Some(2));
1261        assert_eq!(result.expected_bytes, Some(99));
1262        assert_eq!(result.error, None);
1263    }
1264
1265    #[test]
1266    fn test_remote_artifact_manifest_result_skips_partial_noise_json() {
1267        let result = RemoteArtifactManifestResult::from_command_output(
1268            r#"
1269Welcome to remote host
1270{"verification_status": "complete"}
1271{
1272  "status": "ok",
1273  "manifest_path": "/home/user/.local/share/cass/index/v1/evidence-bundle-manifest.json",
1274  "bundle_id": "cass-lexical-real",
1275  "chunk_count": 4,
1276  "expected_bytes": 123,
1277  "verification_status": "complete"
1278}
1279"#,
1280        );
1281
1282        assert!(result.success);
1283        assert_eq!(result.bundle_id.as_deref(), Some("cass-lexical-real"));
1284        assert_eq!(result.chunk_count, Some(4));
1285        assert_eq!(result.expected_bytes, Some(123));
1286        assert_eq!(result.error, None);
1287    }
1288}