Skip to main content

coding_agent_search/search/
asset_state.rs

1//! Shared search asset state evaluation for status, health, and fail-open search planning.
2//!
3//! This module centralizes coarse-grained asset truth so callers stop inferring
4//! lexical freshness, active maintenance, and semantic readiness from ad hoc
5//! file checks spread across the CLI.
6//!
7//! The maintenance coordination layer (`evaluate_maintenance_coordination`,
8//! `decide_maintenance_action`, `poll_maintenance_until_idle`) provides
9//! single-flight semantics: foreground cass actors share one coherent truth
10//! for repair/acquisition work and never duplicate basic maintenance jobs.
11
12use std::fs::OpenOptions;
13use std::io::{Read, Write};
14use std::path::{Path, PathBuf};
15use std::time::{Duration, Instant};
16
17use anyhow::{Context, Result};
18use fs2::FileExt;
19
20use crate::indexer::{
21    LEXICAL_REBUILD_PAGE_SIZE_PUBLIC, LexicalRebuildCheckpoint,
22    lexical_rebuild_page_size_is_compatible, lexical_storage_fingerprint_for_db,
23    load_lexical_rebuild_checkpoint,
24};
25use crate::search::ann_index::hnsw_index_path;
26use crate::search::embedder::Embedder;
27use crate::search::fastembed_embedder::FastEmbedder;
28use crate::search::hash_embedder::HashEmbedder;
29use crate::search::model_manager::{
30    SemanticAvailability, probe_hash_semantic_availability, probe_semantic_availability,
31};
32use crate::search::policy::{
33    CHUNKING_STRATEGY_VERSION, CliSemanticOverrides, SEMANTIC_SCHEMA_VERSION, SemanticPolicy,
34};
35use crate::search::semantic_manifest::{
36    ArtifactRecord, BuildCheckpoint, SemanticManifest, SemanticShardManifest, SemanticShardRecord,
37    TierKind, semantic_shard_artifact_path_is_safe,
38};
39use crate::search::tantivy::SCHEMA_HASH;
40use crate::search::vector_index::{VECTOR_INDEX_DIR, vector_index_path};
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
43pub(crate) enum SearchMaintenanceMode {
44    Index,
45    WatchStartup,
46    Watch,
47    WatchOnce,
48}
49
50impl SearchMaintenanceMode {
51    pub(crate) fn as_lock_value(self) -> &'static str {
52        match self {
53            Self::Index => "index",
54            Self::WatchStartup => "watch_startup",
55            Self::Watch => "watch",
56            Self::WatchOnce => "watch_once",
57        }
58    }
59
60    pub(crate) fn parse_lock_value(raw: &str) -> Option<Self> {
61        match raw.trim() {
62            "index" => Some(Self::Index),
63            "watch_startup" => Some(Self::WatchStartup),
64            "watch" => Some(Self::Watch),
65            "watch_once" => Some(Self::WatchOnce),
66            _ => None,
67        }
68    }
69
70    pub(crate) fn watch_active(self) -> bool {
71        matches!(self, Self::WatchStartup | Self::Watch)
72    }
73
74    pub(crate) fn rebuild_active(self) -> bool {
75        !matches!(self, Self::Watch)
76    }
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
80pub(crate) enum SearchMaintenanceJobKind {
81    LexicalRefresh,
82    SemanticAcquire,
83}
84
85impl SearchMaintenanceJobKind {
86    pub(crate) fn as_lock_value(self) -> &'static str {
87        match self {
88            Self::LexicalRefresh => "lexical_refresh",
89            Self::SemanticAcquire => "semantic_acquire",
90        }
91    }
92
93    pub(crate) fn parse_lock_value(raw: &str) -> Option<Self> {
94        match raw.trim() {
95            "lexical_refresh" => Some(Self::LexicalRefresh),
96            "semantic_acquire" => Some(Self::SemanticAcquire),
97            _ => None,
98        }
99    }
100}
101
102#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize)]
103pub(crate) struct SearchMaintenanceSnapshot {
104    pub active: bool,
105    pub pid: Option<u32>,
106    pub started_at_ms: Option<i64>,
107    pub db_path: Option<PathBuf>,
108    pub mode: Option<SearchMaintenanceMode>,
109    pub job_id: Option<String>,
110    pub job_kind: Option<SearchMaintenanceJobKind>,
111    pub phase: Option<String>,
112    pub updated_at_ms: Option<i64>,
113    pub last_progress_at_ms: Option<i64>,
114    pub orphaned: bool,
115}
116
117pub(crate) fn index_run_lock_path(data_dir: &Path) -> PathBuf {
118    data_dir.join("index-run.lock")
119}
120
121pub(crate) fn index_run_lock_metadata_sidecar_path(lock_path: &Path) -> PathBuf {
122    lock_path.with_file_name("index-run.lock.meta")
123}
124
125pub(crate) fn write_index_run_lock_metadata_sidecar(
126    lock_path: &Path,
127    contents: &str,
128) -> Result<()> {
129    let sidecar_path = index_run_lock_metadata_sidecar_path(lock_path);
130    let mut sidecar = OpenOptions::new()
131        .create(true)
132        .truncate(true)
133        .write(true)
134        .open(&sidecar_path)
135        .with_context(|| {
136            format!(
137                "opening index-run lock metadata sidecar {}",
138                sidecar_path.display()
139            )
140        })?;
141    sidecar.write_all(contents.as_bytes()).with_context(|| {
142        format!(
143            "writing index-run lock metadata sidecar {}",
144            sidecar_path.display()
145        )
146    })?;
147    sidecar.flush().with_context(|| {
148        format!(
149            "flushing index-run lock metadata sidecar {}",
150            sidecar_path.display()
151        )
152    })?;
153    sidecar.sync_all().with_context(|| {
154        format!(
155            "syncing index-run lock metadata sidecar {}",
156            sidecar_path.display()
157        )
158    })
159}
160
161pub(crate) fn clear_index_run_lock_metadata_sidecar(lock_path: &Path) -> Result<()> {
162    let sidecar_path = index_run_lock_metadata_sidecar_path(lock_path);
163    let sidecar = match OpenOptions::new()
164        .truncate(true)
165        .write(true)
166        .open(&sidecar_path)
167    {
168        Ok(sidecar) => sidecar,
169        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(()),
170        Err(err) => {
171            return Err(err).with_context(|| {
172                format!(
173                    "opening index-run lock metadata sidecar {}",
174                    sidecar_path.display()
175                )
176            });
177        }
178    };
179    sidecar.sync_all().with_context(|| {
180        format!(
181            "syncing cleared index-run lock metadata sidecar {}",
182            sidecar_path.display()
183        )
184    })
185}
186
187fn read_capped_metadata_from_path(path: &Path, max_len: u64) -> std::io::Result<String> {
188    let file = OpenOptions::new().read(true).open(path)?;
189    let mut raw = String::new();
190    (&file).take(max_len).read_to_string(&mut raw)?;
191    Ok(raw)
192}
193
194#[cfg(windows)]
195pub(crate) fn windows_lock_conflict(err: &std::io::Error) -> bool {
196    matches!(err.raw_os_error(), Some(32 | 33))
197}
198
199#[cfg(not(windows))]
200pub(crate) fn windows_lock_conflict(_err: &std::io::Error) -> bool {
201    false
202}
203
204pub(crate) fn read_search_maintenance_snapshot(data_dir: &Path) -> SearchMaintenanceSnapshot {
205    // Real index-run.lock files written by `acquire_index_run_lock`
206    // have a fixed key=value shape under ~1 KiB. Cap the read at 64 KiB
207    // so a corrupted or maliciously-large lock file cannot force us to
208    // allocate arbitrary memory just to inspect its metadata.
209    const MAX_LOCK_FILE_READ: u64 = 64 * 1024;
210
211    let lock_path = index_run_lock_path(data_dir);
212    let sidecar_path = index_run_lock_metadata_sidecar_path(&lock_path);
213    let file = match OpenOptions::new().read(true).write(true).open(&lock_path) {
214        Ok(file) => file,
215        Err(err) if windows_lock_conflict(&err) => {
216            let raw = read_capped_metadata_from_path(&sidecar_path, MAX_LOCK_FILE_READ)
217                .unwrap_or_default();
218            return parse_search_maintenance_snapshot(raw, true);
219        }
220        Err(_) => return SearchMaintenanceSnapshot::default(),
221    };
222
223    let mut raw = String::new();
224    let mut read_blocked_by_lock = false;
225    if let Err(err) = (&file).take(MAX_LOCK_FILE_READ).read_to_string(&mut raw)
226        && windows_lock_conflict(&err)
227    {
228        read_blocked_by_lock = true;
229    }
230    if raw.trim().is_empty() {
231        raw = read_capped_metadata_from_path(&sidecar_path, MAX_LOCK_FILE_READ).unwrap_or_default();
232    }
233
234    let mut snapshot = parse_search_maintenance_snapshot(raw, read_blocked_by_lock);
235    if read_blocked_by_lock {
236        return snapshot;
237    }
238
239    let metadata_present = snapshot.pid.is_some()
240        || snapshot.started_at_ms.is_some()
241        || snapshot.db_path.is_some()
242        || snapshot.mode.is_some()
243        || snapshot.job_id.is_some()
244        || snapshot.job_kind.is_some()
245        || snapshot.phase.is_some()
246        || snapshot.updated_at_ms.is_some()
247        || snapshot.last_progress_at_ms.is_some();
248
249    #[cfg(windows)]
250    let current_process_owns_recorded_lock =
251        metadata_present && snapshot.pid == Some(std::process::id());
252    #[cfg(not(windows))]
253    let current_process_owns_recorded_lock = false;
254
255    snapshot.active = if current_process_owns_recorded_lock {
256        true
257    } else {
258        match file.try_lock_exclusive() {
259            Ok(()) => {
260                // We acquired the exclusive lock with no waiting, which is
261                // proof that no process holds it. POSIX flock (via fs2) is
262                // released automatically when the owning file description
263                // is closed — either explicitly on graceful drop, or by the
264                // kernel on process exit / crash. Therefore, if the file
265                // contains metadata but no holder is present, the previous
266                // owner is gone.
267                //
268                // Historically this produced a permanent `orphaned: true`
269                // state that callers (notably the TUI) interpreted as
270                // "rebuild in progress, keep polling" — yielding a tight
271                // CPU-bound loop that only cleared when the user manually
272                // deleted the lock file (see issue #176).
273                //
274                // Reap the stale metadata in place while we hold the lock,
275                // so that this and every subsequent reader observes a
276                // clean state.
277                //
278                // We deliberately do NOT gate this on a `kill(pid, 0)`
279                // liveness probe. Under PID reuse (the recorded pid is
280                // reassigned to an unrelated live process), such a probe
281                // would refuse to reap and the spin would reappear. Flock
282                // acquisition is the stronger and more precise signal.
283                if metadata_present {
284                    if let Err(err) = file.set_len(0) {
285                        tracing::warn!(
286                            path = %lock_path.display(),
287                            error = %err,
288                            "failed to truncate stale index-run lock metadata"
289                        );
290                    } else {
291                        let _ = clear_index_run_lock_metadata_sidecar(&lock_path);
292                        let _ = file.sync_all();
293                        tracing::info!(
294                            path = %lock_path.display(),
295                            stale_pid = ?snapshot.pid,
296                            "cleared stale index-run lock metadata (previous owner gone)"
297                        );
298                        let _ = file.unlock();
299                        return SearchMaintenanceSnapshot::default();
300                    }
301                }
302                let _ = file.unlock();
303                false
304            }
305            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => true,
306            Err(err) if windows_lock_conflict(&err) => true,
307            Err(_) => false,
308        }
309    };
310    snapshot.orphaned = metadata_present && !snapshot.active;
311    snapshot
312}
313
314fn parse_search_maintenance_snapshot(
315    raw: String,
316    active_when_metadata_unreadable: bool,
317) -> SearchMaintenanceSnapshot {
318    let mut pid = None;
319    let mut started_at_ms = None;
320    let mut lock_db_path = None::<PathBuf>;
321    let mut mode = None;
322    let mut job_id = None;
323    let mut job_kind = None;
324    let mut phase = None;
325    let mut updated_at_ms = None;
326    let mut last_progress_at_ms = None;
327    for line in raw.lines() {
328        let Some((key, value)) = line.split_once('=') else {
329            continue;
330        };
331        match key.trim() {
332            "pid" => pid = value.trim().parse::<u32>().ok(),
333            "started_at_ms" => started_at_ms = value.trim().parse::<i64>().ok(),
334            "db_path" => lock_db_path = Some(PathBuf::from(value.trim())),
335            "mode" => mode = SearchMaintenanceMode::parse_lock_value(value),
336            "job_id" => job_id = Some(value.trim().to_string()).filter(|value| !value.is_empty()),
337            "job_kind" => job_kind = SearchMaintenanceJobKind::parse_lock_value(value),
338            "phase" => phase = Some(value.trim().to_string()).filter(|value| !value.is_empty()),
339            "updated_at_ms" => updated_at_ms = value.trim().parse::<i64>().ok(),
340            "last_progress_at_ms" => last_progress_at_ms = value.trim().parse::<i64>().ok(),
341            _ => {}
342        }
343    }
344
345    let metadata_present = pid.is_some()
346        || started_at_ms.is_some()
347        || lock_db_path.is_some()
348        || mode.is_some()
349        || job_id.is_some()
350        || job_kind.is_some()
351        || phase.is_some()
352        || updated_at_ms.is_some()
353        || last_progress_at_ms.is_some();
354
355    SearchMaintenanceSnapshot {
356        active: active_when_metadata_unreadable,
357        pid,
358        started_at_ms,
359        db_path: lock_db_path,
360        mode,
361        job_id,
362        job_kind,
363        phase,
364        updated_at_ms,
365        last_progress_at_ms,
366        orphaned: metadata_present && !active_when_metadata_unreadable,
367    }
368}
369
370pub(crate) const REBUILD_STALL_DETECT_SECS_DEFAULT: u64 = 120;
371
372pub(crate) fn rebuild_stall_detect_threshold_ms() -> Option<i64> {
373    let threshold_secs = dotenvy::var("CASS_REBUILD_STALL_DETECT_SECS")
374        .ok()
375        .and_then(|value| value.trim().parse::<u64>().ok())
376        .unwrap_or(REBUILD_STALL_DETECT_SECS_DEFAULT);
377    if threshold_secs == 0 {
378        return None;
379    }
380    let threshold_ms = threshold_secs.saturating_mul(1000);
381    Some(i64::try_from(threshold_ms).unwrap_or(i64::MAX))
382}
383
384pub(crate) fn maintenance_stall_age_ms(
385    snapshot: &SearchMaintenanceSnapshot,
386    now_ms: i64,
387) -> Option<i64> {
388    if !snapshot.active
389        || !snapshot
390            .mode
391            .is_some_and(SearchMaintenanceMode::rebuild_active)
392    {
393        return None;
394    }
395    let last_progress_at_ms = snapshot.last_progress_at_ms?;
396    let age_ms = now_ms.saturating_sub(last_progress_at_ms);
397    let threshold_ms = rebuild_stall_detect_threshold_ms()?;
398    (age_ms >= threshold_ms).then_some(age_ms)
399}
400
401#[cfg_attr(not(test), allow(dead_code))]
402#[derive(Debug, Clone, Copy, PartialEq, Eq)]
403pub(crate) enum SemanticPreference {
404    DefaultModel,
405    HashFallback,
406}
407
408#[derive(Debug, Clone, PartialEq, Eq)]
409pub(crate) struct SearchAssetSnapshot {
410    pub lexical: LexicalAssetState,
411    pub semantic: SemanticAssetState,
412}
413
414#[derive(Debug, Clone, PartialEq, Eq)]
415pub(crate) struct LexicalAssetState {
416    pub status: &'static str,
417    pub exists: bool,
418    pub fresh: bool,
419    pub stale: bool,
420    pub rebuilding: bool,
421    /// `true` when the rebuild is nominally active but the indexing
422    /// thread has not posted forward progress within the configured
423    /// stall threshold. Driven by `last_progress_at_ms` on the lock
424    /// file, NOT by `updated_at_ms` (which the heartbeat thread
425    /// refreshes unconditionally). See issue #258 for the regression
426    /// this guards against.
427    pub stalled: bool,
428    /// Wall-clock age of the most recent forward-progress event, when
429    /// the indexer thread has posted one. `None` when no
430    /// `last_progress_at_ms` is available (legacy lock file, or the
431    /// rebuild is not active).
432    pub last_progress_age_ms: Option<i64>,
433    pub last_progress_at_ms: Option<i64>,
434    pub watch_active: bool,
435    pub last_indexed_at_ms: Option<i64>,
436    pub age_seconds: Option<u64>,
437    pub stale_threshold_seconds: u64,
438    pub activity_at_ms: Option<i64>,
439    pub pending_sessions: u64,
440    pub processed_conversations: Option<u64>,
441    pub total_conversations: Option<u64>,
442    pub indexed_docs: Option<u64>,
443    pub status_reason: Option<String>,
444    pub fingerprint: LexicalFingerprintState,
445    pub checkpoint: LexicalCheckpointState,
446}
447
448#[derive(Debug, Clone, PartialEq, Eq)]
449pub(crate) struct LexicalFingerprintState {
450    pub current_db_fingerprint: Option<String>,
451    pub checkpoint_fingerprint: Option<String>,
452    pub matches_current_db_fingerprint: Option<bool>,
453}
454
455#[derive(Debug, Clone, PartialEq, Eq)]
456pub(crate) struct LexicalCheckpointState {
457    pub present: bool,
458    pub completed: Option<bool>,
459    pub db_matches: Option<bool>,
460    pub schema_matches: Option<bool>,
461    pub page_size_matches: Option<bool>,
462    pub page_size_compatible: Option<bool>,
463}
464
465#[derive(Debug, Clone, PartialEq, Eq)]
466pub(crate) struct SemanticAssetState {
467    pub status: &'static str,
468    pub availability: &'static str,
469    pub summary: String,
470    pub available: bool,
471    pub can_search: bool,
472    pub fallback_mode: Option<&'static str>,
473    pub preferred_backend: &'static str,
474    pub embedder_id: Option<String>,
475    pub vector_index_path: Option<PathBuf>,
476    pub model_dir: Option<PathBuf>,
477    pub hnsw_path: Option<PathBuf>,
478    pub hnsw_ready: bool,
479    pub progressive_ready: bool,
480    /// Sub-fix 3 for cass#257: true when a quality-tier vector index is
481    /// published, matches the current DB fingerprint, and could serve a
482    /// `--mode semantic` search even if the progressive/hybrid stack is
483    /// still building (e.g. the fast tier hasn't been backfilled yet).
484    ///
485    /// Distinct from `progressive_ready`, which only returns true when
486    /// BOTH the fast and quality tier index files exist on disk —
487    /// useful for the "hybrid stack is good to go" surface but
488    /// misleading for operators who only run `--mode semantic`.
489    pub quality_tier_published: bool,
490    /// Sub-fix 3 for cass#257: true when at least one tier (fast OR
491    /// quality) is queryable against the current DB. This collapses
492    /// the per-tier readiness into a single flag suitable for the
493    /// operator question "can I run `cass search --mode semantic`
494    /// right now?". Mirrors `can_search` but is named so it survives
495    /// future refactors of the can_search semantics.
496    pub semantic_only_search_available: bool,
497    pub hint: Option<String>,
498    pub fast_tier: SemanticTierAssetState,
499    pub quality_tier: SemanticTierAssetState,
500    pub backlog: SemanticBacklogProgressState,
501    pub checkpoint: SemanticCheckpointProgressState,
502}
503
504struct SemanticRuntimeSurface {
505    status: &'static str,
506    availability: &'static str,
507    summary: String,
508    can_search: bool,
509    fallback_mode: Option<&'static str>,
510    hint: Option<String>,
511    embedder_id: Option<String>,
512    vector_index_path: Option<PathBuf>,
513    model_dir: Option<PathBuf>,
514    hnsw_path: Option<PathBuf>,
515}
516
517struct SemanticRuntimeInputs<'a> {
518    data_dir: &'a Path,
519    availability: &'a SemanticAvailability,
520    preference: SemanticPreference,
521    fast_tier: &'a SemanticTierAssetState,
522    quality_tier: &'a SemanticTierAssetState,
523    backlog: &'a SemanticBacklogProgressState,
524    checkpoint: &'a SemanticCheckpointProgressState,
525    base_embedder_id: Option<String>,
526    base_vector_index_path: Option<PathBuf>,
527    base_model_dir: Option<PathBuf>,
528    base_hnsw_path: Option<PathBuf>,
529}
530
531struct SemanticPreferenceSurface {
532    preferred_backend: &'static str,
533    model_dir: Option<PathBuf>,
534}
535
536#[derive(Debug, Clone, Default, PartialEq, Eq)]
537pub(crate) struct SemanticTierAssetState {
538    pub present: bool,
539    pub ready: bool,
540    pub current_db_matches: Option<bool>,
541    pub conversation_count: Option<u64>,
542    pub doc_count: Option<u64>,
543    pub embedder_id: Option<String>,
544    pub model_revision: Option<String>,
545    pub completed_at_ms: Option<i64>,
546    pub size_bytes: Option<u64>,
547    pub index_path: Option<PathBuf>,
548}
549
550#[derive(Debug, Clone, Default, PartialEq, Eq)]
551pub(crate) struct SemanticBacklogProgressState {
552    pub total_conversations: u64,
553    pub fast_tier_processed: u64,
554    pub fast_tier_remaining: u64,
555    pub quality_tier_processed: u64,
556    pub quality_tier_remaining: u64,
557    pub pending_work: bool,
558    pub current_db_matches: Option<bool>,
559    pub computed_at_ms: Option<i64>,
560}
561
562#[derive(Debug, Clone, Default, PartialEq, Eq)]
563pub(crate) struct SemanticCheckpointProgressState {
564    pub active: bool,
565    pub tier: Option<&'static str>,
566    pub current_db_matches: Option<bool>,
567    pub completed: Option<bool>,
568    pub conversations_processed: Option<u64>,
569    pub total_conversations: Option<u64>,
570    pub progress_pct: Option<u8>,
571    pub docs_embedded: Option<u64>,
572    pub last_offset: Option<i64>,
573    pub saved_at_ms: Option<i64>,
574}
575
576pub(crate) struct InspectSearchAssetsInput<'a> {
577    pub data_dir: &'a Path,
578    pub db_path: &'a Path,
579    pub stale_threshold: u64,
580    pub last_indexed_at_ms: Option<i64>,
581    /// Full-precision (millisecond) wall clock used for stall-detection
582    /// math against `last_progress_at_ms`. Callers should pass
583    /// `FrankenStorage::now_millis()` here. F4 (cass tech debt): the
584    /// previous shape only carried `now_secs`, which quantised the
585    /// comparison to second resolution while `last_progress_at_ms` is
586    /// stored at full ms. Down-stream `now_secs` is now derived from
587    /// this value so the two clocks remain consistent.
588    pub now_ms: i64,
589    pub maintenance: SearchMaintenanceSnapshot,
590    pub semantic_preference: SemanticPreference,
591    pub db_available: bool,
592    pub compute_lexical_fingerprint: bool,
593    pub inspect_semantic: bool,
594}
595
596const LEXICAL_STORAGE_FINGERPRINT_MTIME_TOLERANCE_MS: i64 = 1_000;
597
598#[derive(Debug, Clone, Copy, PartialEq, Eq)]
599struct ParsedLexicalStorageFingerprint {
600    db_len: u64,
601    db_mtime_ms: i64,
602    wal_len: u64,
603    wal_mtime_ms: i64,
604}
605
606fn parse_lexical_storage_fingerprint(raw: &str) -> Option<ParsedLexicalStorageFingerprint> {
607    let mut parts = raw.split(':');
608    let fingerprint = ParsedLexicalStorageFingerprint {
609        db_len: parts.next()?.parse().ok()?,
610        db_mtime_ms: parts.next()?.parse().ok()?,
611        wal_len: parts.next()?.parse().ok()?,
612        wal_mtime_ms: parts.next()?.parse().ok()?,
613    };
614    if parts.next().is_some() {
615        return None;
616    }
617    Some(fingerprint)
618}
619
620pub(crate) fn lexical_storage_fingerprints_match(current: &str, saved: &str) -> bool {
621    match (
622        parse_lexical_storage_fingerprint(current),
623        parse_lexical_storage_fingerprint(saved),
624    ) {
625        (Some(current), Some(saved)) => {
626            current.db_len == saved.db_len
627                && current.wal_len == saved.wal_len
628                && current.db_mtime_ms.abs_diff(saved.db_mtime_ms)
629                    <= u64::try_from(LEXICAL_STORAGE_FINGERPRINT_MTIME_TOLERANCE_MS)
630                        .unwrap_or(u64::MAX)
631                && current.wal_mtime_ms.abs_diff(saved.wal_mtime_ms)
632                    <= u64::try_from(LEXICAL_STORAGE_FINGERPRINT_MTIME_TOLERANCE_MS)
633                        .unwrap_or(u64::MAX)
634        }
635        _ => current == saved,
636    }
637}
638
639pub(crate) fn inspect_search_assets(
640    input: InspectSearchAssetsInput<'_>,
641) -> Result<SearchAssetSnapshot> {
642    let InspectSearchAssetsInput {
643        data_dir,
644        db_path,
645        stale_threshold,
646        last_indexed_at_ms,
647        now_ms,
648        maintenance,
649        semantic_preference,
650        db_available,
651        compute_lexical_fingerprint,
652        inspect_semantic,
653    } = input;
654
655    let lexical = inspect_lexical_assets(InspectLexicalAssetsInput {
656        data_dir,
657        db_path,
658        stale_threshold,
659        last_indexed_at_ms,
660        now_ms,
661        maintenance,
662        db_available,
663        compute_lexical_fingerprint,
664    })?;
665    let current_db_fingerprint = lexical.fingerprint.current_db_fingerprint.as_deref();
666    let semantic = if inspect_semantic {
667        inspect_semantic_assets(
668            data_dir,
669            db_path,
670            semantic_preference,
671            current_db_fingerprint,
672            db_available,
673        )
674    } else {
675        semantic_state_not_inspected(data_dir, semantic_preference, current_db_fingerprint)
676    };
677
678    Ok(SearchAssetSnapshot { lexical, semantic })
679}
680
681fn semantic_state_not_inspected(
682    data_dir: &Path,
683    preference: SemanticPreference,
684    current_db_fingerprint: Option<&str>,
685) -> SemanticAssetState {
686    let (fast_tier, quality_tier, backlog, checkpoint) =
687        semantic_manifest_progress(data_dir, current_db_fingerprint);
688    let preference_surface = semantic_preference_surface(data_dir, preference);
689
690    SemanticAssetState {
691        status: "not_inspected",
692        availability: "not_inspected",
693        summary: "semantic assets were not inspected for this fast path".to_string(),
694        available: false,
695        can_search: false,
696        fallback_mode: Some("lexical"),
697        preferred_backend: preference_surface.preferred_backend,
698        embedder_id: None,
699        vector_index_path: None,
700        model_dir: preference_surface.model_dir,
701        hnsw_path: None,
702        hnsw_ready: false,
703        progressive_ready: semantic_progressive_assets_ready(data_dir),
704        // The fast-path skip-DB-open lane doesn't have an
705        // `availability` to consult, so we can't honestly call the
706        // tiers queryable here — leave the sub-fix-3 flags false. The
707        // caller upgrades to `semantic_state_from_availability` when
708        // it needs an answer.
709        quality_tier_published: false,
710        semantic_only_search_available: false,
711        hint: Some(
712            "Use 'cass status --json' or 'cass models status --json' for semantic readiness."
713                .to_string(),
714        ),
715        fast_tier,
716        quality_tier,
717        backlog,
718        checkpoint,
719    }
720}
721
722pub(crate) fn inspect_semantic_assets(
723    data_dir: &Path,
724    db_path: &Path,
725    preference: SemanticPreference,
726    current_db_fingerprint: Option<&str>,
727    db_available: bool,
728) -> SemanticAssetState {
729    if !db_available {
730        let availability = SemanticAvailability::DatabaseUnavailable {
731            db_path: db_path.to_path_buf(),
732            error: "database unavailable during asset inspection".to_string(),
733        };
734        return semantic_state_from_availability(
735            data_dir,
736            &availability,
737            preference,
738            current_db_fingerprint,
739        );
740    }
741
742    let availability = match preference {
743        SemanticPreference::DefaultModel => probe_semantic_availability(data_dir),
744        SemanticPreference::HashFallback => probe_hash_semantic_availability(data_dir),
745    };
746    semantic_state_from_availability(data_dir, &availability, preference, current_db_fingerprint)
747}
748
749pub(crate) fn semantic_state_from_availability(
750    data_dir: &Path,
751    availability: &SemanticAvailability,
752    preference: SemanticPreference,
753    current_db_fingerprint: Option<&str>,
754) -> SemanticAssetState {
755    let (mut fast_tier, mut quality_tier, backlog, checkpoint) =
756        semantic_manifest_progress(data_dir, current_db_fingerprint);
757    let preference_surface = semantic_preference_surface(data_dir, preference);
758    let base_embedder_id = semantic_embedder_id(availability, preference);
759    if let (Some(db_fingerprint), Some(embedder_id)) =
760        (current_db_fingerprint, base_embedder_id.as_deref())
761    {
762        promote_complete_shard_generation_state(
763            data_dir,
764            TierKind::Fast,
765            embedder_id,
766            db_fingerprint,
767            &mut fast_tier,
768        );
769        promote_complete_shard_generation_state(
770            data_dir,
771            TierKind::Quality,
772            embedder_id,
773            db_fingerprint,
774            &mut quality_tier,
775        );
776    }
777    let base_vector_index_path = semantic_vector_index_path(data_dir, availability, preference);
778    let base_model_dir = preference_surface.model_dir;
779    let base_hnsw_path = base_embedder_id
780        .as_deref()
781        .map(|embedder_id| hnsw_index_path(data_dir, embedder_id));
782    let runtime = semantic_runtime_surface(SemanticRuntimeInputs {
783        data_dir,
784        availability,
785        preference,
786        fast_tier: &fast_tier,
787        quality_tier: &quality_tier,
788        backlog: &backlog,
789        checkpoint: &checkpoint,
790        base_embedder_id: base_embedder_id.clone(),
791        base_vector_index_path: base_vector_index_path.clone(),
792        base_model_dir: base_model_dir.clone(),
793        base_hnsw_path: base_hnsw_path.clone(),
794    });
795    let use_runtime_paths = runtime.embedder_id.is_some();
796    let embedder_id = runtime.embedder_id.or(base_embedder_id);
797    let vector_index_path = if use_runtime_paths {
798        runtime.vector_index_path
799    } else {
800        runtime.vector_index_path.or(base_vector_index_path)
801    };
802    let model_dir = if use_runtime_paths {
803        runtime.model_dir
804    } else {
805        runtime.model_dir.or(base_model_dir)
806    };
807    let hnsw_path = if use_runtime_paths {
808        runtime.hnsw_path
809    } else {
810        runtime.hnsw_path.or(base_hnsw_path)
811    };
812    let hnsw_ready = hnsw_path.as_ref().is_some_and(|path| path.is_file());
813    let progressive_ready = semantic_progressive_assets_ready(data_dir);
814
815    // Sub-fix 3 for cass#257: report quality-tier readiness as a
816    // first-class flag so operators querying `--mode semantic` can
817    // tell when the quality index is usable even while the
818    // progressive/hybrid stack remains incomplete.
819    let quality_tier_published = semantic_tier_queryable(availability, &quality_tier);
820    let fast_tier_queryable = semantic_tier_queryable(availability, &fast_tier);
821    let semantic_only_search_available = quality_tier_published || fast_tier_queryable;
822
823    SemanticAssetState {
824        status: runtime.status,
825        availability: runtime.availability,
826        summary: runtime.summary,
827        available: runtime.can_search,
828        can_search: runtime.can_search,
829        fallback_mode: runtime.fallback_mode,
830        preferred_backend: preference_surface.preferred_backend,
831        embedder_id,
832        vector_index_path,
833        model_dir,
834        hnsw_path,
835        hnsw_ready,
836        progressive_ready,
837        quality_tier_published,
838        semantic_only_search_available,
839        hint: runtime.hint,
840        fast_tier,
841        quality_tier,
842        backlog,
843        checkpoint,
844    }
845}
846
847fn semantic_preference_surface(
848    data_dir: &Path,
849    preference: SemanticPreference,
850) -> SemanticPreferenceSurface {
851    match preference {
852        SemanticPreference::DefaultModel => SemanticPreferenceSurface {
853            preferred_backend: "fastembed",
854            model_dir: active_policy_model_dir(data_dir),
855        },
856        SemanticPreference::HashFallback => SemanticPreferenceSurface {
857            preferred_backend: "hash",
858            model_dir: None,
859        },
860    }
861}
862
863fn semantic_runtime_surface(inputs: SemanticRuntimeInputs<'_>) -> SemanticRuntimeSurface {
864    let SemanticRuntimeInputs {
865        data_dir,
866        availability,
867        preference,
868        fast_tier,
869        quality_tier,
870        backlog,
871        checkpoint,
872        base_embedder_id,
873        base_vector_index_path,
874        base_model_dir,
875        base_hnsw_path,
876    } = inputs;
877    let base_status = semantic_status_from_availability(availability);
878    let base_availability = semantic_availability_code(availability);
879    let base_summary = availability.summary();
880    let base_can_search = availability.can_search();
881    let base_hint = semantic_hint(availability, preference);
882
883    if matches!(
884        availability,
885        SemanticAvailability::Disabled { .. }
886            | SemanticAvailability::DatabaseUnavailable { .. }
887            | SemanticAvailability::LoadFailed { .. }
888    ) {
889        return SemanticRuntimeSurface {
890            status: base_status,
891            availability: base_availability,
892            summary: base_summary,
893            can_search: base_can_search,
894            fallback_mode: (!base_can_search).then_some("lexical"),
895            hint: base_hint,
896            embedder_id: base_embedder_id,
897            vector_index_path: base_vector_index_path,
898            model_dir: base_model_dir,
899            hnsw_path: base_hnsw_path,
900        };
901    }
902
903    let quality_queryable = semantic_tier_queryable(availability, quality_tier);
904    let fast_queryable = semantic_tier_queryable(availability, fast_tier);
905    let checkpoint_active = checkpoint.active;
906    let backlog_pending = backlog.pending_work;
907    let manifest_assets_present = fast_tier.present || quality_tier.present;
908    let backfill_active = checkpoint_active || backlog_pending;
909
910    let effective_embedder_id = if quality_queryable {
911        quality_tier.embedder_id.clone()
912    } else if fast_queryable {
913        fast_tier.embedder_id.clone()
914    } else {
915        None
916    };
917    let effective_vector_index_path = if quality_queryable {
918        quality_tier.index_path.clone()
919    } else if fast_queryable {
920        fast_tier.index_path.clone()
921    } else {
922        None
923    }
924    .or_else(|| {
925        effective_embedder_id
926            .as_deref()
927            .map(|embedder_id| vector_index_path(data_dir, embedder_id))
928    });
929    let effective_model_dir = effective_embedder_id.as_deref().and_then(|embedder_id| {
930        (!semantic_embedder_is_hash(embedder_id))
931            .then(|| model_dir_for_embedder_id(data_dir, embedder_id))
932            .flatten()
933    });
934    let effective_hnsw_path = effective_embedder_id
935        .as_deref()
936        .map(|embedder_id| hnsw_index_path(data_dir, embedder_id));
937
938    if quality_queryable || fast_queryable {
939        let fully_ready = (quality_queryable || fast_queryable) && !backfill_active;
940        let summary = if quality_queryable && backfill_active {
941            "semantic quality tier is usable; residual semantic backfill is still finishing"
942                .to_string()
943        } else if quality_queryable {
944            "semantic quality tier ready".to_string()
945        } else if backfill_active {
946            "semantic fast tier is usable; higher-quality semantic backfill is still in progress"
947                .to_string()
948        } else {
949            "semantic fast tier ready".to_string()
950        };
951        let hint = if backfill_active {
952            Some(
953                "Semantic refinement is already usable; continue searching while higher-quality backfill finishes."
954                    .to_string(),
955            )
956        } else {
957            None
958        };
959        return SemanticRuntimeSurface {
960            status: if fully_ready { "ready" } else { "building" },
961            availability: if fully_ready {
962                "ready"
963            } else {
964                "index_building"
965            },
966            summary,
967            can_search: true,
968            fallback_mode: None,
969            hint,
970            embedder_id: effective_embedder_id,
971            vector_index_path: effective_vector_index_path,
972            model_dir: effective_model_dir,
973            hnsw_path: effective_hnsw_path,
974        };
975    }
976
977    if backfill_active {
978        return SemanticRuntimeSurface {
979            status: "building",
980            availability: "index_building",
981            summary: "semantic backfill is in progress for the current database".to_string(),
982            can_search: false,
983            fallback_mode: Some("lexical"),
984            hint: Some(
985                "Run 'cass index --semantic' to finish backfilling current semantic assets; search will use lexical fallback until then."
986                    .to_string(),
987            ),
988            embedder_id: base_embedder_id,
989            vector_index_path: base_vector_index_path,
990            model_dir: base_model_dir,
991            hnsw_path: base_hnsw_path,
992        };
993    }
994
995    if manifest_assets_present {
996        return SemanticRuntimeSurface {
997            status: "stale",
998            availability: "update_available",
999            summary: "semantic artifacts exist but do not match the current database".to_string(),
1000            can_search: false,
1001            fallback_mode: Some("lexical"),
1002            hint: Some(
1003                "Run 'cass index --semantic' to refresh semantic assets for the current database; search will use lexical fallback until then."
1004                    .to_string(),
1005            ),
1006            embedder_id: base_embedder_id,
1007            vector_index_path: base_vector_index_path,
1008            model_dir: base_model_dir,
1009            hnsw_path: base_hnsw_path,
1010        };
1011    }
1012
1013    SemanticRuntimeSurface {
1014        status: base_status,
1015        availability: base_availability,
1016        summary: base_summary,
1017        can_search: base_can_search,
1018        fallback_mode: (!base_can_search).then_some("lexical"),
1019        hint: base_hint,
1020        embedder_id: base_embedder_id,
1021        vector_index_path: base_vector_index_path,
1022        model_dir: base_model_dir,
1023        hnsw_path: base_hnsw_path,
1024    }
1025}
1026
1027fn active_policy_model_dir(data_dir: &Path) -> Option<PathBuf> {
1028    let policy = SemanticPolicy::resolve(&CliSemanticOverrides::default());
1029    let embedder_name = FastEmbedder::canonical_name(&policy.quality_tier_embedder)?;
1030    FastEmbedder::runtime_model_dir_for(data_dir, embedder_name)
1031}
1032
1033fn model_dir_for_embedder_id(data_dir: &Path, embedder_id: &str) -> Option<PathBuf> {
1034    let embedder_name = FastEmbedder::canonical_name(embedder_id)?;
1035    FastEmbedder::runtime_model_dir_for(data_dir, embedder_name)
1036}
1037
1038fn semantic_tier_queryable(
1039    availability: &SemanticAvailability,
1040    tier: &SemanticTierAssetState,
1041) -> bool {
1042    if !tier.ready || tier.current_db_matches == Some(false) {
1043        return false;
1044    }
1045    let Some(embedder_id) = tier.embedder_id.as_deref() else {
1046        return false;
1047    };
1048    if semantic_embedder_is_hash(embedder_id) {
1049        !matches!(
1050            availability,
1051            SemanticAvailability::Disabled { .. }
1052                | SemanticAvailability::DatabaseUnavailable { .. }
1053                | SemanticAvailability::LoadFailed { .. }
1054        )
1055    } else {
1056        matches!(
1057            availability,
1058            SemanticAvailability::Ready { .. }
1059                | SemanticAvailability::UpdateAvailable { .. }
1060                | SemanticAvailability::IndexBuilding { .. }
1061                | SemanticAvailability::IndexMissing { .. }
1062        )
1063    }
1064}
1065
1066fn semantic_embedder_is_hash(embedder_id: &str) -> bool {
1067    embedder_id == HashEmbedder::default().id()
1068}
1069
1070fn semantic_manifest_progress(
1071    data_dir: &Path,
1072    current_db_fingerprint: Option<&str>,
1073) -> (
1074    SemanticTierAssetState,
1075    SemanticTierAssetState,
1076    SemanticBacklogProgressState,
1077    SemanticCheckpointProgressState,
1078) {
1079    let manifest = SemanticManifest::load_or_default(data_dir).unwrap_or_default();
1080    let fast_tier = semantic_tier_asset_state(manifest.fast_tier.as_ref(), current_db_fingerprint);
1081    let quality_tier =
1082        semantic_tier_asset_state(manifest.quality_tier.as_ref(), current_db_fingerprint);
1083    let backlog = semantic_backlog_progress_state(&manifest, current_db_fingerprint);
1084    let checkpoint =
1085        semantic_checkpoint_progress_state(manifest.checkpoint.as_ref(), current_db_fingerprint);
1086    (fast_tier, quality_tier, backlog, checkpoint)
1087}
1088
1089fn semantic_tier_asset_state(
1090    artifact: Option<&ArtifactRecord>,
1091    current_db_fingerprint: Option<&str>,
1092) -> SemanticTierAssetState {
1093    let Some(artifact) = artifact else {
1094        return SemanticTierAssetState::default();
1095    };
1096
1097    SemanticTierAssetState {
1098        present: true,
1099        ready: artifact.ready,
1100        current_db_matches: current_db_fingerprint.map(|fp| artifact.db_fingerprint == fp),
1101        conversation_count: Some(artifact.conversation_count),
1102        doc_count: Some(artifact.doc_count),
1103        embedder_id: Some(artifact.embedder_id.clone()),
1104        model_revision: Some(artifact.model_revision.clone()),
1105        completed_at_ms: Some(artifact.completed_at_ms),
1106        size_bytes: Some(artifact.size_bytes),
1107        index_path: None,
1108    }
1109}
1110
1111fn resolve_semantic_artifact_path(data_dir: &Path, recorded_path: &str) -> Option<PathBuf> {
1112    semantic_shard_artifact_path_is_safe(recorded_path).then(|| data_dir.join(recorded_path))
1113}
1114
1115fn complete_shard_records_for_state(
1116    data_dir: &Path,
1117    tier: TierKind,
1118    embedder_id: &str,
1119    db_fingerprint: &str,
1120) -> Option<Vec<SemanticShardRecord>> {
1121    let manifest = SemanticShardManifest::load(data_dir).ok().flatten()?;
1122    let summary = manifest.summary(tier, embedder_id, db_fingerprint);
1123    if !summary.complete {
1124        return None;
1125    }
1126    let mut records = manifest
1127        .shards
1128        .into_iter()
1129        .filter(|shard| shard.matches_generation(tier, embedder_id, db_fingerprint))
1130        .collect::<Vec<_>>();
1131    records.sort_by_key(|shard| shard.shard_index);
1132    if records.len() != usize::try_from(summary.shard_count).unwrap_or(usize::MAX) {
1133        return None;
1134    }
1135    let first = records.first()?;
1136    for (expected_index, shard) in records.iter().enumerate() {
1137        if shard.shard_index != u32::try_from(expected_index).unwrap_or(u32::MAX)
1138            || !shard.ready
1139            || !shard.mmap_ready
1140            || shard.model_revision != first.model_revision
1141            || shard.schema_version != SEMANTIC_SCHEMA_VERSION
1142            || shard.chunking_version != CHUNKING_STRATEGY_VERSION
1143            || shard.dimension == 0
1144            || shard.dimension != first.dimension
1145            || shard.total_conversations != first.total_conversations
1146        {
1147            return None;
1148        }
1149        let artifact_path = resolve_semantic_artifact_path(data_dir, &shard.index_path)?;
1150        if !artifact_path.is_file() {
1151            return None;
1152        }
1153    }
1154    Some(records)
1155}
1156
1157fn promote_complete_shard_generation_state(
1158    data_dir: &Path,
1159    tier: TierKind,
1160    embedder_id: &str,
1161    db_fingerprint: &str,
1162    state: &mut SemanticTierAssetState,
1163) {
1164    if state.ready && state.current_db_matches == Some(true) {
1165        return;
1166    }
1167    let Some(records) =
1168        complete_shard_records_for_state(data_dir, tier, embedder_id, db_fingerprint)
1169    else {
1170        return;
1171    };
1172    let doc_count = records
1173        .iter()
1174        .map(|shard| shard.doc_count)
1175        .fold(0, u64::saturating_add);
1176    let size_bytes = records
1177        .iter()
1178        .map(|shard| shard.size_bytes)
1179        .fold(0, u64::saturating_add);
1180    let completed_at_ms = records
1181        .iter()
1182        .map(|shard| shard.completed_at_ms)
1183        .max()
1184        .unwrap_or(0);
1185    let first = &records[0];
1186    let Some(first_index_path) = resolve_semantic_artifact_path(data_dir, &first.index_path) else {
1187        return;
1188    };
1189    *state = SemanticTierAssetState {
1190        present: true,
1191        ready: true,
1192        current_db_matches: Some(true),
1193        conversation_count: Some(first.total_conversations),
1194        doc_count: Some(doc_count),
1195        embedder_id: Some(first.embedder_id.clone()),
1196        model_revision: Some(first.model_revision.clone()),
1197        completed_at_ms: Some(completed_at_ms),
1198        size_bytes: Some(size_bytes),
1199        index_path: Some(first_index_path),
1200    };
1201}
1202
1203fn semantic_backlog_progress_state(
1204    manifest: &SemanticManifest,
1205    current_db_fingerprint: Option<&str>,
1206) -> SemanticBacklogProgressState {
1207    let backlog = &manifest.backlog;
1208    let current_db_matches = current_db_fingerprint.and_then(|fp| {
1209        (backlog.computed_at_ms > 0 || !backlog.db_fingerprint.is_empty())
1210            .then(|| backlog.is_current(fp))
1211    });
1212
1213    SemanticBacklogProgressState {
1214        total_conversations: backlog.total_conversations,
1215        fast_tier_processed: backlog.fast_tier_processed,
1216        fast_tier_remaining: backlog.fast_tier_remaining(),
1217        quality_tier_processed: backlog.quality_tier_processed,
1218        quality_tier_remaining: backlog.quality_tier_remaining(),
1219        pending_work: backlog.has_pending_work() || manifest.checkpoint.is_some(),
1220        current_db_matches,
1221        computed_at_ms: (backlog.computed_at_ms > 0).then_some(backlog.computed_at_ms),
1222    }
1223}
1224
1225fn semantic_checkpoint_progress_state(
1226    checkpoint: Option<&BuildCheckpoint>,
1227    current_db_fingerprint: Option<&str>,
1228) -> SemanticCheckpointProgressState {
1229    let Some(checkpoint) = checkpoint else {
1230        return SemanticCheckpointProgressState::default();
1231    };
1232
1233    SemanticCheckpointProgressState {
1234        active: true,
1235        tier: Some(checkpoint.tier.as_str()),
1236        current_db_matches: current_db_fingerprint.map(|fp| checkpoint.is_valid(fp)),
1237        completed: Some(checkpoint.is_complete()),
1238        conversations_processed: Some(checkpoint.conversations_processed),
1239        total_conversations: Some(checkpoint.total_conversations),
1240        progress_pct: Some(checkpoint.progress_pct()),
1241        docs_embedded: Some(checkpoint.docs_embedded),
1242        last_offset: Some(checkpoint.last_offset),
1243        saved_at_ms: Some(checkpoint.saved_at_ms),
1244    }
1245}
1246
1247struct InspectLexicalAssetsInput<'a> {
1248    data_dir: &'a Path,
1249    db_path: &'a Path,
1250    stale_threshold: u64,
1251    last_indexed_at_ms: Option<i64>,
1252    /// F4 (cass tech debt): full-precision wall clock; the legacy
1253    /// `now_secs` field was widening the second-precision comparison
1254    /// against ms-precision `last_progress_at_ms`. Derive `now_secs`
1255    /// locally from this when needed for age math.
1256    now_ms: i64,
1257    maintenance: SearchMaintenanceSnapshot,
1258    db_available: bool,
1259    compute_lexical_fingerprint: bool,
1260}
1261
1262fn inspect_lexical_assets(input: InspectLexicalAssetsInput<'_>) -> Result<LexicalAssetState> {
1263    let InspectLexicalAssetsInput {
1264        data_dir,
1265        db_path,
1266        stale_threshold,
1267        last_indexed_at_ms,
1268        now_ms,
1269        maintenance,
1270        db_available,
1271        compute_lexical_fingerprint,
1272    } = input;
1273    let index_path = crate::search::tantivy::expected_index_dir(data_dir);
1274    let checkpoint = load_lexical_rebuild_checkpoint(&index_path)
1275        .with_context(|| format!("loading lexical checkpoint from {}", index_path.display()))?;
1276    let current_db_fingerprint = if db_available && compute_lexical_fingerprint {
1277        Some(
1278            lexical_storage_fingerprint_for_db(db_path).with_context(|| {
1279                format!(
1280                    "computing lexical storage fingerprint for {}",
1281                    db_path.display()
1282                )
1283            })?,
1284        )
1285    } else {
1286        None
1287    };
1288
1289    Ok(lexical_state_from_observations(LexicalObservationInput {
1290        index_path: &index_path,
1291        db_path,
1292        stale_threshold,
1293        last_indexed_at_ms,
1294        now_ms,
1295        maintenance,
1296        checkpoint: checkpoint.as_ref(),
1297        current_db_fingerprint: current_db_fingerprint.as_deref(),
1298    }))
1299}
1300
1301struct LexicalObservationInput<'a> {
1302    index_path: &'a Path,
1303    db_path: &'a Path,
1304    stale_threshold: u64,
1305    last_indexed_at_ms: Option<i64>,
1306    /// Full-precision wall clock (F4); see [`InspectSearchAssetsInput::now_ms`].
1307    now_ms: i64,
1308    maintenance: SearchMaintenanceSnapshot,
1309    checkpoint: Option<&'a LexicalRebuildCheckpoint>,
1310    current_db_fingerprint: Option<&'a str>,
1311}
1312
1313fn lexical_state_from_observations(input: LexicalObservationInput<'_>) -> LexicalAssetState {
1314    let LexicalObservationInput {
1315        index_path,
1316        db_path,
1317        stale_threshold,
1318        last_indexed_at_ms,
1319        now_ms,
1320        maintenance,
1321        checkpoint,
1322        current_db_fingerprint,
1323    } = input;
1324    let exists = crate::search::tantivy::searchable_index_exists(index_path);
1325    let checkpoint_db_matches =
1326        checkpoint.map(|state| crate::stored_path_identity_matches(&state.db_path, db_path));
1327    let schema_matches = checkpoint.map(|state| state.schema_hash == SCHEMA_HASH);
1328    let page_size_matches =
1329        checkpoint.map(|state| state.page_size == LEXICAL_REBUILD_PAGE_SIZE_PUBLIC);
1330    let page_size_compatible =
1331        checkpoint.map(|state| lexical_rebuild_page_size_is_compatible(state.page_size));
1332    let checkpoint_fingerprint = checkpoint.map(|state| state.storage_fingerprint.as_str());
1333    let fingerprint_matches = match (current_db_fingerprint, checkpoint_fingerprint) {
1334        (Some(current), Some(saved)) => Some(lexical_storage_fingerprints_match(current, saved)),
1335        _ => None,
1336    };
1337    let checkpoint_incomplete = checkpoint.is_some_and(|state| !state.completed);
1338    let checkpoint_db_mismatch = checkpoint_db_matches == Some(false);
1339    let contract_mismatch = schema_matches == Some(false) || page_size_compatible == Some(false);
1340    let fingerprint_mismatch = fingerprint_matches == Some(false);
1341    // F4 (cass tech debt): derive the (legacy) second-resolution clock
1342    // from `now_ms` rather than the other way around so the comparison
1343    // against ms-precision `last_progress_at_ms` below is no longer
1344    // forced into second-bin alignment. `as u64` is correct because
1345    // wall-clock millis fits well inside i63 (until year ~292477).
1346    let now_secs: u64 = now_ms.div_euclid(1000).max(0) as u64;
1347    let age_seconds = last_indexed_at_ms
1348        .and_then(|ts| (ts > 0).then(|| now_secs.saturating_sub((ts / 1000) as u64)));
1349    let age_stale = match age_seconds {
1350        Some(age) => age > stale_threshold,
1351        None => true,
1352    };
1353    let maintenance_targets_current_db = maintenance
1354        .db_path
1355        .as_ref()
1356        .is_none_or(|lock_db_path| crate::path_identities_match(lock_db_path, db_path));
1357    let watch_active = maintenance.active
1358        && maintenance_targets_current_db
1359        && maintenance
1360            .mode
1361            .is_some_and(SearchMaintenanceMode::watch_active);
1362    let rebuilding = maintenance.active
1363        && maintenance_targets_current_db
1364        && maintenance
1365            .mode
1366            .is_some_and(SearchMaintenanceMode::rebuild_active);
1367    let active_rebuild_progress = rebuilding;
1368    // Forward-progress liveness check (issue #258): when a rebuild is
1369    // active but the indexing thread has not posted progress within
1370    // `CASS_REBUILD_STALL_DETECT_SECS` (default 120 s), report
1371    // `stalled` rather than `rebuilding`. The lock file's
1372    // `updated_at_ms` is heartbeat-refreshed every ~1 s by a separate
1373    // thread, so it cannot be used as a "work is happening" signal —
1374    // only the indexer-thread-owned `last_progress_at_ms` can.
1375    //
1376    // `now_ms` is now passed in at full ms precision (F4); the old
1377    // shape derived it from `now_secs` and quantised the comparison.
1378    let stall_age_ms = if rebuilding && maintenance_targets_current_db {
1379        maintenance_stall_age_ms(&maintenance, now_ms)
1380    } else {
1381        None
1382    };
1383    let stalled = stall_age_ms.is_some();
1384    let last_progress_at_ms = maintenance
1385        .last_progress_at_ms
1386        .filter(|_| maintenance_targets_current_db);
1387    let last_progress_age_ms = last_progress_at_ms
1388        .filter(|_| rebuilding)
1389        .map(|ts| now_ms.saturating_sub(ts));
1390    let stale = if rebuilding {
1391        // A stalled rebuild leaves the on-disk index unchanged; if it
1392        // existed before the stall it is still searchable, so treat
1393        // `stalled` like the indexer just hadn't gotten there yet.
1394        !exists || contract_mismatch
1395    } else {
1396        exists
1397            && (age_stale
1398                || checkpoint_db_mismatch
1399                || checkpoint_incomplete
1400                || contract_mismatch
1401                || fingerprint_mismatch)
1402    };
1403    let fresh = exists && !stale && !rebuilding;
1404    let status = if stalled {
1405        "stalled"
1406    } else if rebuilding {
1407        "building"
1408    } else if !exists {
1409        "missing"
1410    } else if stale {
1411        "stale"
1412    } else {
1413        "ready"
1414    };
1415    let status_reason = if stalled {
1416        let secs = stall_age_ms.unwrap_or(0) / 1000;
1417        Some(format!(
1418            "indexing thread has not posted forward progress for {secs}s while the lock heartbeat keeps refreshing — see issue #258 for diagnostics (run `cass doctor check --json` and capture a stack trace)"
1419        ))
1420    } else if rebuilding {
1421        Some("lexical rebuild is in progress".to_string())
1422    } else if !exists {
1423        Some("lexical Tantivy metadata missing".to_string())
1424    } else if checkpoint_db_mismatch {
1425        Some("lexical rebuild checkpoint points at a different database".to_string())
1426    } else if contract_mismatch {
1427        Some("lexical rebuild checkpoint no longer matches the active lexical contract".to_string())
1428    } else if fingerprint_mismatch {
1429        Some("database fingerprint changed since the last lexical checkpoint".to_string())
1430    } else if checkpoint_incomplete {
1431        Some("lexical rebuild checkpoint is incomplete".to_string())
1432    } else if age_stale {
1433        Some("lexical index is older than the stale threshold".to_string())
1434    } else {
1435        None
1436    };
1437    let checkpoint_progress_usable = checkpoint.is_some()
1438        && checkpoint_db_matches == Some(true)
1439        && schema_matches == Some(true)
1440        && page_size_compatible == Some(true)
1441        && if active_rebuild_progress {
1442            true
1443        } else {
1444            current_db_fingerprint.is_some() && fingerprint_matches == Some(true)
1445        };
1446    let pending_sessions = checkpoint
1447        .filter(|_| checkpoint_progress_usable)
1448        .map(|state| {
1449            state
1450                .total_conversations
1451                .saturating_sub(state.processed_conversations) as u64
1452        })
1453        .unwrap_or(0);
1454    let maintenance_activity_at_ms = maintenance_targets_current_db
1455        .then_some(())
1456        .and(maintenance.updated_at_ms.or(maintenance.started_at_ms));
1457    let checkpoint_activity_at_ms = checkpoint
1458        .filter(|_| checkpoint_progress_usable)
1459        .and_then(|state| (state.updated_at_ms > 0).then_some(state.updated_at_ms));
1460    let activity_at_ms = match (checkpoint_activity_at_ms, maintenance_activity_at_ms) {
1461        (Some(checkpoint_ts), Some(maintenance_ts)) => Some(checkpoint_ts.max(maintenance_ts)),
1462        (Some(checkpoint_ts), None) => Some(checkpoint_ts),
1463        (None, Some(maintenance_ts)) => Some(maintenance_ts),
1464        (None, None) => None,
1465    };
1466
1467    LexicalAssetState {
1468        status,
1469        exists,
1470        fresh,
1471        stale,
1472        rebuilding,
1473        stalled,
1474        last_progress_age_ms,
1475        last_progress_at_ms,
1476        watch_active,
1477        last_indexed_at_ms,
1478        age_seconds,
1479        stale_threshold_seconds: stale_threshold,
1480        activity_at_ms,
1481        pending_sessions,
1482        processed_conversations: checkpoint
1483            .filter(|_| checkpoint_progress_usable)
1484            .map(|state| state.processed_conversations as u64),
1485        total_conversations: checkpoint
1486            .filter(|_| checkpoint_progress_usable)
1487            .map(|state| state.total_conversations as u64),
1488        indexed_docs: checkpoint
1489            .filter(|_| checkpoint_progress_usable)
1490            .map(|state| state.indexed_docs as u64),
1491        status_reason,
1492        fingerprint: LexicalFingerprintState {
1493            current_db_fingerprint: current_db_fingerprint.map(ToOwned::to_owned),
1494            checkpoint_fingerprint: checkpoint.map(|state| state.storage_fingerprint.clone()),
1495            matches_current_db_fingerprint: fingerprint_matches,
1496        },
1497        checkpoint: LexicalCheckpointState {
1498            present: checkpoint.is_some(),
1499            completed: checkpoint.map(|state| state.completed),
1500            db_matches: checkpoint_db_matches,
1501            schema_matches,
1502            page_size_matches,
1503            page_size_compatible,
1504        },
1505    }
1506}
1507
1508fn semantic_embedder_id(
1509    availability: &SemanticAvailability,
1510    preference: SemanticPreference,
1511) -> Option<String> {
1512    match availability {
1513        SemanticAvailability::Ready { embedder_id }
1514        | SemanticAvailability::UpdateAvailable { embedder_id, .. }
1515        | SemanticAvailability::IndexBuilding { embedder_id, .. } => Some(embedder_id.clone()),
1516        SemanticAvailability::HashFallback => Some(HashEmbedder::default().id().to_string()),
1517        _ => match preference {
1518            SemanticPreference::DefaultModel => {
1519                Some(FastEmbedder::embedder_id_static().to_string())
1520            }
1521            SemanticPreference::HashFallback => Some(HashEmbedder::default().id().to_string()),
1522        },
1523    }
1524}
1525
1526fn semantic_vector_index_path(
1527    data_dir: &Path,
1528    availability: &SemanticAvailability,
1529    preference: SemanticPreference,
1530) -> Option<PathBuf> {
1531    match availability {
1532        SemanticAvailability::IndexMissing { index_path } => Some(index_path.clone()),
1533        _ => semantic_embedder_id(availability, preference)
1534            .map(|embedder_id| vector_index_path(data_dir, &embedder_id)),
1535    }
1536}
1537
1538fn semantic_progressive_assets_ready(data_dir: &Path) -> bool {
1539    let vector_dir = data_dir.join(VECTOR_INDEX_DIR);
1540    vector_dir.join("vector.fast.idx").is_file() && vector_dir.join("vector.quality.idx").is_file()
1541}
1542
1543fn semantic_availability_code(availability: &SemanticAvailability) -> &'static str {
1544    match availability {
1545        SemanticAvailability::Ready { .. } => "ready",
1546        SemanticAvailability::NotInstalled => "not_installed",
1547        SemanticAvailability::NeedsConsent => "needs_consent",
1548        SemanticAvailability::Downloading { .. } => "downloading",
1549        SemanticAvailability::Verifying => "verifying",
1550        SemanticAvailability::IndexBuilding { .. } => "index_building",
1551        SemanticAvailability::HashFallback => "hash_fallback",
1552        SemanticAvailability::Disabled { .. } => "disabled",
1553        SemanticAvailability::ModelMissing { .. } => "model_missing",
1554        SemanticAvailability::IndexMissing { .. } => "index_missing",
1555        SemanticAvailability::DatabaseUnavailable { .. } => "database_unavailable",
1556        SemanticAvailability::LoadFailed { .. } => "load_failed",
1557        SemanticAvailability::UpdateAvailable { .. } => "update_available",
1558    }
1559}
1560
1561fn semantic_status_from_availability(availability: &SemanticAvailability) -> &'static str {
1562    match availability {
1563        SemanticAvailability::Ready { .. } => "ready",
1564        SemanticAvailability::HashFallback => "hash_fallback",
1565        SemanticAvailability::Downloading { .. }
1566        | SemanticAvailability::Verifying
1567        | SemanticAvailability::IndexBuilding { .. } => "building",
1568        SemanticAvailability::Disabled { .. } => "disabled",
1569        SemanticAvailability::UpdateAvailable { .. } => "stale",
1570        SemanticAvailability::NotInstalled
1571        | SemanticAvailability::NeedsConsent
1572        | SemanticAvailability::ModelMissing { .. }
1573        | SemanticAvailability::IndexMissing { .. } => "missing",
1574        SemanticAvailability::DatabaseUnavailable { .. }
1575        | SemanticAvailability::LoadFailed { .. } => "error",
1576    }
1577}
1578
1579fn semantic_hint(
1580    availability: &SemanticAvailability,
1581    preference: SemanticPreference,
1582) -> Option<String> {
1583    let hint = match (preference, availability) {
1584        (SemanticPreference::HashFallback, SemanticAvailability::IndexMissing { .. }) => {
1585            "Run 'cass index --semantic --embedder hash' to build the hash vector index; lexical search remains available without semantic assets"
1586        }
1587        (SemanticPreference::HashFallback, SemanticAvailability::LoadFailed { .. })
1588        | (SemanticPreference::HashFallback, SemanticAvailability::DatabaseUnavailable { .. }) => {
1589            "Run 'cass index --semantic --embedder hash' after the database is healthy; lexical search remains available"
1590        }
1591        (SemanticPreference::HashFallback, _) => {
1592            "Run 'cass index --semantic --embedder hash' to build the hash vector index; lexical search remains available"
1593        }
1594        (_, SemanticAvailability::NotInstalled)
1595        | (_, SemanticAvailability::NeedsConsent)
1596        | (_, SemanticAvailability::ModelMissing { .. }) => {
1597            "Run 'cass models install' and then 'cass index --semantic'; lexical search remains available without the model"
1598        }
1599        (_, SemanticAvailability::IndexMissing { .. })
1600        | (_, SemanticAvailability::UpdateAvailable { .. })
1601        | (_, SemanticAvailability::IndexBuilding { .. }) => {
1602            "Run 'cass index --semantic' to build or refresh vector assets; lexical search remains available"
1603        }
1604        (_, SemanticAvailability::Downloading { .. }) | (_, SemanticAvailability::Verifying) => {
1605            "Wait for the semantic model installation to finish; lexical search remains available"
1606        }
1607        (_, SemanticAvailability::Disabled { .. }) => {
1608            "Semantic search is disabled by policy; lexical search remains available, or re-enable semantic search"
1609        }
1610        (_, SemanticAvailability::DatabaseUnavailable { .. })
1611        | (_, SemanticAvailability::LoadFailed { .. }) => {
1612            "Restore the semantic assets and database; lexical search remains available when the archive database is healthy"
1613        }
1614        (_, SemanticAvailability::Ready { .. }) | (_, SemanticAvailability::HashFallback) => {
1615            return None;
1616        }
1617    };
1618    Some(hint.to_string())
1619}
1620
1621// ---------------------------------------------------------------------------
1622// Maintenance coordination: single-flight, attach-to-progress, fail-open
1623// ---------------------------------------------------------------------------
1624
1625#[cfg_attr(not(test), allow(dead_code))]
1626const HEARTBEAT_STALE_THRESHOLD_MS: i64 = 30_000;
1627#[cfg_attr(not(test), allow(dead_code))]
1628const BOUNDED_WAIT_DEFAULT: Duration = Duration::from_secs(5);
1629#[cfg_attr(not(test), allow(dead_code))]
1630const POLL_INTERVAL_DEFAULT: Duration = Duration::from_millis(250);
1631
1632#[cfg_attr(not(test), allow(dead_code))]
1633#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
1634pub(crate) enum MaintenanceCoordinationOutcome {
1635    Idle,
1636    Active {
1637        job_id: String,
1638        job_kind: SearchMaintenanceJobKind,
1639        phase: Option<String>,
1640        started_at_ms: i64,
1641        updated_at_ms: i64,
1642    },
1643    Stale {
1644        job_id: String,
1645        reason: String,
1646    },
1647}
1648
1649#[cfg_attr(not(test), allow(dead_code))]
1650#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
1651pub(crate) enum MaintenanceDecision {
1652    Launch,
1653    AttachOrWait {
1654        job_id: String,
1655        job_kind: SearchMaintenanceJobKind,
1656        phase: Option<String>,
1657        elapsed_ms: u64,
1658    },
1659    FailOpen {
1660        reason: String,
1661    },
1662}
1663
1664#[cfg_attr(not(test), allow(dead_code))]
1665pub(crate) fn evaluate_maintenance_coordination(
1666    data_dir: &Path,
1667    now_ms: i64,
1668) -> MaintenanceCoordinationOutcome {
1669    evaluate_maintenance_coordination_from_snapshot(
1670        &read_search_maintenance_snapshot(data_dir),
1671        now_ms,
1672    )
1673}
1674
1675#[cfg_attr(not(test), allow(dead_code))]
1676pub(crate) fn evaluate_maintenance_coordination_from_snapshot(
1677    snapshot: &SearchMaintenanceSnapshot,
1678    now_ms: i64,
1679) -> MaintenanceCoordinationOutcome {
1680    if !snapshot.active {
1681        return MaintenanceCoordinationOutcome::Idle;
1682    }
1683    let job_id = maintenance_snapshot_job_id(snapshot);
1684    if let Some(updated_at_ms) = snapshot.updated_at_ms {
1685        let heartbeat_age_ms = now_ms.saturating_sub(updated_at_ms);
1686        if heartbeat_age_ms > HEARTBEAT_STALE_THRESHOLD_MS {
1687            return MaintenanceCoordinationOutcome::Stale {
1688                job_id,
1689                reason: format!(
1690                    "heartbeat is {heartbeat_age_ms}ms old (threshold {HEARTBEAT_STALE_THRESHOLD_MS}ms)"
1691                ),
1692            };
1693        }
1694    }
1695    // Forward-progress liveness check (issue #258). Even if the
1696    // heartbeat is fresh, treat the job as `Stale` when the indexing
1697    // thread itself has not posted progress for longer than the stall
1698    // threshold. Coordination consumers (search fail-open, attach-or-
1699    // wait) then route around the wedged worker instead of waiting on
1700    // it indefinitely.
1701    if let Some(stall_age_ms) = maintenance_stall_age_ms(snapshot, now_ms) {
1702        let threshold_ms = rebuild_stall_detect_threshold_ms().unwrap_or(0);
1703        return MaintenanceCoordinationOutcome::Stale {
1704            job_id,
1705            reason: format!(
1706                "indexing thread has not posted forward progress for {stall_age_ms}ms while the heartbeat keeps refreshing (stall threshold {threshold_ms}ms) — see issue #258"
1707            ),
1708        };
1709    }
1710    MaintenanceCoordinationOutcome::Active {
1711        job_id,
1712        job_kind: snapshot
1713            .job_kind
1714            .unwrap_or(SearchMaintenanceJobKind::LexicalRefresh),
1715        phase: snapshot.phase.clone(),
1716        started_at_ms: snapshot.started_at_ms.unwrap_or(0),
1717        updated_at_ms: snapshot.updated_at_ms.unwrap_or(now_ms),
1718    }
1719}
1720
1721fn maintenance_snapshot_job_id(snapshot: &SearchMaintenanceSnapshot) -> String {
1722    snapshot
1723        .job_id
1724        .as_ref()
1725        .filter(|job_id| !job_id.is_empty())
1726        .cloned()
1727        .unwrap_or_else(|| {
1728            let mode = snapshot
1729                .mode
1730                .map(|mode| mode.as_lock_value())
1731                .unwrap_or("unknown");
1732            let owner = snapshot
1733                .pid
1734                .map(|pid| pid.to_string())
1735                .unwrap_or_else(|| "unknown-owner".to_string());
1736            format!("{mode}-active-lock-{owner}")
1737        })
1738}
1739
1740fn maintenance_snapshot_job_kind(snapshot: &SearchMaintenanceSnapshot) -> SearchMaintenanceJobKind {
1741    snapshot
1742        .job_kind
1743        .unwrap_or(SearchMaintenanceJobKind::LexicalRefresh)
1744}
1745
1746fn maintenance_elapsed_ms(snapshot: &SearchMaintenanceSnapshot, now_ms: i64) -> u64 {
1747    snapshot
1748        .started_at_ms
1749        .map(|started_at_ms| u64::try_from(now_ms.saturating_sub(started_at_ms)).unwrap_or(0))
1750        .unwrap_or(0)
1751}
1752
1753fn stale_heartbeat_phase(snapshot: &SearchMaintenanceSnapshot) -> Option<String> {
1754    snapshot
1755        .phase
1756        .clone()
1757        .or_else(|| Some("stale-heartbeat".to_string()))
1758}
1759
1760#[cfg_attr(not(test), allow(dead_code))]
1761pub(crate) fn decide_maintenance_action(data_dir: &Path, now_ms: i64) -> MaintenanceDecision {
1762    decide_maintenance_action_from_snapshot(&read_search_maintenance_snapshot(data_dir), now_ms)
1763}
1764
1765#[cfg_attr(not(test), allow(dead_code))]
1766pub(crate) fn decide_maintenance_action_from_snapshot(
1767    snapshot: &SearchMaintenanceSnapshot,
1768    now_ms: i64,
1769) -> MaintenanceDecision {
1770    match evaluate_maintenance_coordination_from_snapshot(snapshot, now_ms) {
1771        MaintenanceCoordinationOutcome::Idle => MaintenanceDecision::Launch,
1772        MaintenanceCoordinationOutcome::Stale { job_id, .. } => MaintenanceDecision::AttachOrWait {
1773            job_id,
1774            job_kind: maintenance_snapshot_job_kind(snapshot),
1775            phase: stale_heartbeat_phase(snapshot),
1776            elapsed_ms: maintenance_elapsed_ms(snapshot, now_ms),
1777        },
1778        MaintenanceCoordinationOutcome::Active {
1779            job_id,
1780            job_kind,
1781            phase,
1782            started_at_ms,
1783            ..
1784        } => MaintenanceDecision::AttachOrWait {
1785            job_id,
1786            job_kind,
1787            phase,
1788            elapsed_ms: u64::try_from(now_ms.saturating_sub(started_at_ms)).unwrap_or(0),
1789        },
1790    }
1791}
1792
1793#[cfg_attr(not(test), allow(dead_code))]
1794pub(crate) fn decide_search_failopen(
1795    data_dir: &Path,
1796    now_ms: i64,
1797    lexical_available: bool,
1798) -> MaintenanceDecision {
1799    let snapshot = read_search_maintenance_snapshot(data_dir);
1800    match evaluate_maintenance_coordination_from_snapshot(&snapshot, now_ms) {
1801        MaintenanceCoordinationOutcome::Idle => MaintenanceDecision::Launch,
1802        MaintenanceCoordinationOutcome::Stale { job_id, reason } => {
1803            if lexical_available {
1804                MaintenanceDecision::FailOpen {
1805                    reason: format!(
1806                        "maintenance job {job_id} has a stale heartbeat ({reason}); lexical index is available, failing open"
1807                    ),
1808                }
1809            } else {
1810                MaintenanceDecision::AttachOrWait {
1811                    job_id,
1812                    job_kind: maintenance_snapshot_job_kind(&snapshot),
1813                    phase: stale_heartbeat_phase(&snapshot),
1814                    elapsed_ms: maintenance_elapsed_ms(&snapshot, now_ms),
1815                }
1816            }
1817        }
1818        MaintenanceCoordinationOutcome::Active {
1819            job_id,
1820            job_kind,
1821            phase,
1822            started_at_ms,
1823            ..
1824        } => {
1825            if lexical_available {
1826                MaintenanceDecision::FailOpen {
1827                    reason: format!(
1828                        "maintenance job {job_id} is active; lexical index is available, failing open"
1829                    ),
1830                }
1831            } else {
1832                MaintenanceDecision::AttachOrWait {
1833                    job_id,
1834                    job_kind,
1835                    phase,
1836                    elapsed_ms: u64::try_from(now_ms.saturating_sub(started_at_ms)).unwrap_or(0),
1837                }
1838            }
1839        }
1840    }
1841}
1842
1843#[cfg_attr(not(test), allow(dead_code))]
1844pub(crate) struct PollResult {
1845    pub outcome: MaintenanceCoordinationOutcome,
1846    pub polls: u32,
1847    pub elapsed: Duration,
1848    pub timed_out: bool,
1849}
1850
1851#[cfg_attr(not(test), allow(dead_code))]
1852pub(crate) fn poll_maintenance_until_idle(
1853    data_dir: &Path,
1854    timeout: Option<Duration>,
1855    poll_interval: Option<Duration>,
1856) -> PollResult {
1857    let timeout = timeout.unwrap_or(BOUNDED_WAIT_DEFAULT);
1858    let interval = poll_interval.unwrap_or(POLL_INTERVAL_DEFAULT);
1859    let start = Instant::now();
1860    let deadline = start + timeout;
1861    let mut polls = 0u32;
1862    loop {
1863        let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
1864        let outcome = evaluate_maintenance_coordination(data_dir, now_ms);
1865        polls += 1;
1866        if matches!(outcome, MaintenanceCoordinationOutcome::Idle) {
1867            return PollResult {
1868                outcome,
1869                polls,
1870                elapsed: start.elapsed(),
1871                timed_out: false,
1872            };
1873        }
1874
1875        let now = Instant::now();
1876        if now >= deadline {
1877            return PollResult {
1878                outcome,
1879                polls,
1880                elapsed: start.elapsed(),
1881                timed_out: true,
1882            };
1883        }
1884        let remaining = deadline - now;
1885        std::thread::sleep(interval.min(remaining));
1886    }
1887}
1888
1889// ---------------------------------------------------------------------------
1890// Rich multi-actor event log, yield/pause signaling, unified view (ibuuh.22)
1891// ---------------------------------------------------------------------------
1892
1893#[cfg_attr(not(test), allow(dead_code))]
1894const MAINTENANCE_EVENTS_FILE: &str = ".maintenance-events.jsonl";
1895#[cfg_attr(not(test), allow(dead_code))]
1896const YIELD_SIGNAL_FILE: &str = "maintenance-yield.signal";
1897#[cfg_attr(not(test), allow(dead_code))]
1898const MAX_EVENT_LOG_ENTRIES: usize = 500;
1899
1900#[cfg_attr(not(test), allow(dead_code))]
1901#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
1902pub(crate) struct MaintenanceEvent {
1903    pub timestamp_ms: i64,
1904    pub job_id: String,
1905    pub actor_pid: u32,
1906    pub kind: MaintenanceEventKind,
1907}
1908
1909#[cfg_attr(not(test), allow(dead_code))]
1910#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
1911pub(crate) enum MaintenanceEventKind {
1912    Started { job_kind: String, phase: String },
1913    PhaseChanged { from: String, to: String },
1914    Progress { processed: u64, total: u64 },
1915    YieldRequested { requester_pid: u32, reason: String },
1916    Paused { reason: String },
1917    Resumed,
1918    Completed { summary: String },
1919    Failed { error: String },
1920    Cancelled { reason: String },
1921}
1922
1923#[cfg_attr(not(test), allow(dead_code))]
1924pub(crate) fn append_maintenance_event(data_dir: &Path, event: &MaintenanceEvent) -> Result<()> {
1925    let path = data_dir.join(MAINTENANCE_EVENTS_FILE);
1926    let line = serde_json::to_string(event).with_context(|| "serializing maintenance event")?;
1927    let mut file = OpenOptions::new()
1928        .create(true)
1929        .append(true)
1930        .open(&path)
1931        .with_context(|| format!("opening maintenance event log at {}", path.display()))?;
1932    use std::io::Write;
1933    writeln!(file, "{line}")
1934        .with_context(|| format!("appending to maintenance event log at {}", path.display()))?;
1935    Ok(())
1936}
1937
1938#[cfg_attr(not(test), allow(dead_code))]
1939pub(crate) fn read_maintenance_events(
1940    data_dir: &Path,
1941    after_ms: Option<i64>,
1942    limit: Option<usize>,
1943) -> Vec<MaintenanceEvent> {
1944    let path = data_dir.join(MAINTENANCE_EVENTS_FILE);
1945    let contents = match std::fs::read_to_string(&path) {
1946        Ok(c) => c,
1947        Err(_) => return Vec::new(),
1948    };
1949    let cap = limit.unwrap_or(MAX_EVENT_LOG_ENTRIES);
1950    contents
1951        .lines()
1952        .filter_map(|line| serde_json::from_str::<MaintenanceEvent>(line).ok())
1953        .filter(|e| after_ms.is_none_or(|threshold| e.timestamp_ms > threshold))
1954        .rev()
1955        .take(cap)
1956        .collect::<Vec<_>>()
1957        .into_iter()
1958        .rev()
1959        .collect()
1960}
1961
1962#[cfg_attr(not(test), allow(dead_code))]
1963pub(crate) fn truncate_maintenance_event_log(data_dir: &Path) -> Result<()> {
1964    let path = data_dir.join(MAINTENANCE_EVENTS_FILE);
1965    let contents = match std::fs::read_to_string(&path) {
1966        Ok(c) => c,
1967        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
1968        Err(e) => {
1969            return Err(e).with_context(|| {
1970                format!("reading event log for truncation at {}", path.display())
1971            });
1972        }
1973    };
1974    let lines: Vec<&str> = contents.lines().collect();
1975    if lines.len() <= MAX_EVENT_LOG_ENTRIES {
1976        return Ok(());
1977    }
1978    let keep = &lines[lines.len() - MAX_EVENT_LOG_ENTRIES..];
1979    let mut output = keep.join("\n");
1980    output.push('\n');
1981    std::fs::write(&path, output)
1982        .with_context(|| format!("truncating event log at {}", path.display()))
1983}
1984
1985// ---------------------------------------------------------------------------
1986// Yield/pause signaling
1987// ---------------------------------------------------------------------------
1988
1989#[cfg_attr(not(test), allow(dead_code))]
1990#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
1991pub(crate) struct YieldRequest {
1992    pub requester_pid: u32,
1993    pub requested_at_ms: i64,
1994    pub reason: String,
1995}
1996
1997#[cfg_attr(not(test), allow(dead_code))]
1998pub(crate) fn request_yield(data_dir: &Path, reason: &str) -> Result<()> {
1999    let path = data_dir.join(YIELD_SIGNAL_FILE);
2000    let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
2001    let req = YieldRequest {
2002        requester_pid: std::process::id(),
2003        requested_at_ms: now_ms,
2004        reason: reason.to_string(),
2005    };
2006    let payload = serde_json::to_string(&req).with_context(|| "serializing yield request")?;
2007    std::fs::write(&path, payload)
2008        .with_context(|| format!("writing yield signal to {}", path.display()))
2009}
2010
2011#[cfg_attr(not(test), allow(dead_code))]
2012pub(crate) fn check_yield_requested(data_dir: &Path) -> Option<YieldRequest> {
2013    let path = data_dir.join(YIELD_SIGNAL_FILE);
2014    let contents = std::fs::read_to_string(&path).ok()?;
2015    serde_json::from_str::<YieldRequest>(&contents).ok()
2016}
2017
2018#[cfg_attr(not(test), allow(dead_code))]
2019pub(crate) fn clear_yield_signal(data_dir: &Path) -> Result<()> {
2020    let path = data_dir.join(YIELD_SIGNAL_FILE);
2021    match std::fs::remove_file(&path) {
2022        Ok(()) => Ok(()),
2023        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
2024        Err(e) => Err(e).with_context(|| format!("clearing yield signal at {}", path.display())),
2025    }
2026}
2027
2028// ---------------------------------------------------------------------------
2029// Unified maintenance view
2030// ---------------------------------------------------------------------------
2031
2032#[cfg_attr(not(test), allow(dead_code))]
2033#[derive(Debug, Clone, serde::Serialize)]
2034pub(crate) struct UnifiedMaintenanceView {
2035    pub coordination: MaintenanceCoordinationOutcome,
2036    pub snapshot: SearchMaintenanceSnapshot,
2037    pub yield_pending: Option<YieldRequest>,
2038    pub recent_events: Vec<MaintenanceEvent>,
2039    pub decision: MaintenanceDecision,
2040}
2041
2042#[cfg_attr(not(test), allow(dead_code))]
2043pub(crate) fn unified_maintenance_view(
2044    data_dir: &Path,
2045    lexical_available: bool,
2046) -> UnifiedMaintenanceView {
2047    let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
2048    let snapshot = read_search_maintenance_snapshot(data_dir);
2049    let coordination = evaluate_maintenance_coordination_from_snapshot(&snapshot, now_ms);
2050    let yield_pending = check_yield_requested(data_dir);
2051    let recent_events = read_maintenance_events(data_dir, None, Some(20));
2052    let decision = if lexical_available {
2053        match &coordination {
2054            MaintenanceCoordinationOutcome::Active {
2055                job_id,
2056                job_kind,
2057                phase,
2058                ..
2059            } => MaintenanceDecision::FailOpen {
2060                reason: format!(
2061                    "maintenance job {} ({:?}) is active (phase: {}); lexical available, failing open",
2062                    job_id,
2063                    job_kind,
2064                    phase.as_deref().unwrap_or("unknown")
2065                ),
2066            },
2067            MaintenanceCoordinationOutcome::Stale { job_id, reason } => {
2068                MaintenanceDecision::FailOpen {
2069                    reason: format!(
2070                        "maintenance job {job_id} has a stale heartbeat ({reason}); lexical available, failing open"
2071                    ),
2072                }
2073            }
2074            _ => decide_maintenance_action_from_snapshot(&snapshot, now_ms),
2075        }
2076    } else {
2077        decide_maintenance_action_from_snapshot(&snapshot, now_ms)
2078    };
2079
2080    UnifiedMaintenanceView {
2081        coordination,
2082        snapshot,
2083        yield_pending,
2084        recent_events,
2085        decision,
2086    }
2087}
2088
2089#[cfg(test)]
2090mod tests {
2091    use super::*;
2092
2093    #[cfg(windows)]
2094    fn active_lock_fixture_pid(_non_windows_pid: u32) -> u32 {
2095        std::process::id()
2096    }
2097
2098    #[cfg(not(windows))]
2099    fn active_lock_fixture_pid(non_windows_pid: u32) -> u32 {
2100        non_windows_pid
2101    }
2102
2103    fn write_locked_metadata(
2104        lock_path: &Path,
2105        owner: &mut std::fs::File,
2106        contents: &str,
2107    ) -> std::io::Result<()> {
2108        use std::io::{Seek, SeekFrom, Write};
2109
2110        owner.set_len(0)?;
2111        owner.seek(SeekFrom::Start(0))?;
2112        owner.write_all(contents.as_bytes())?;
2113        owner.flush()?;
2114        owner.sync_all()?;
2115        write_index_run_lock_metadata_sidecar(lock_path, contents)
2116            .map_err(std::io::Error::other)?;
2117        Ok(())
2118    }
2119
2120    #[test]
2121    fn maintenance_mode_round_trips_lock_values() {
2122        for mode in [
2123            SearchMaintenanceMode::Index,
2124            SearchMaintenanceMode::WatchStartup,
2125            SearchMaintenanceMode::Watch,
2126            SearchMaintenanceMode::WatchOnce,
2127        ] {
2128            assert_eq!(
2129                SearchMaintenanceMode::parse_lock_value(mode.as_lock_value()),
2130                Some(mode)
2131            );
2132        }
2133    }
2134
2135    #[test]
2136    fn maintenance_job_kind_round_trips_lock_values() {
2137        for kind in [
2138            SearchMaintenanceJobKind::LexicalRefresh,
2139            SearchMaintenanceJobKind::SemanticAcquire,
2140        ] {
2141            assert_eq!(
2142                SearchMaintenanceJobKind::parse_lock_value(kind.as_lock_value()),
2143                Some(kind)
2144            );
2145        }
2146    }
2147
2148    #[test]
2149    fn stale_lock_metadata_from_dead_owner_is_reaped_on_read() {
2150        // Regression for issue #176: the TUI used to see a permanent
2151        // `orphaned: true` state when the index-run.lock file contained
2152        // metadata from a crashed process, because nothing in the read
2153        // path cleaned it up. That produced a tight CPU-bound poll loop
2154        // on startup. The read path now reaps stale metadata atomically
2155        // while holding the exclusive flock.
2156        let temp = tempfile::tempdir().expect("tempdir");
2157        let lock_path = temp.path().join("index-run.lock");
2158        // The reap path does not probe the recorded pid — POSIX flock
2159        // acquisition is the signal — so the concrete pid value in the
2160        // fixture is irrelevant. We still record one so the parser
2161        // runs through its full happy path.
2162        std::fs::write(
2163            &lock_path,
2164            concat!(
2165                "pid=4242\n",
2166                "started_at_ms=1733000111000\n",
2167                "updated_at_ms=1733000112000\n",
2168                "db_path=/tmp/cass/agent_search.db\n",
2169                "mode=index\n",
2170                "job_id=lexical-refresh-1733000111000-4242\n",
2171                "job_kind=lexical_refresh\n",
2172                "phase=rebuilding\n"
2173            ),
2174        )
2175        .expect("write lock metadata");
2176
2177        let snapshot = read_search_maintenance_snapshot(temp.path());
2178        assert!(!snapshot.active, "no owner, must not be reported active");
2179        assert!(
2180            !snapshot.orphaned,
2181            "stale metadata must be reaped, not reported as orphaned"
2182        );
2183        assert!(snapshot.pid.is_none(), "pid must be cleared after reap");
2184        assert!(
2185            snapshot.job_id.is_none(),
2186            "job_id must be cleared after reap"
2187        );
2188        assert!(snapshot.phase.is_none(), "phase must be cleared after reap");
2189
2190        // File must still exist (to preserve permissions and avoid
2191        // creating/recreating races) but be empty.
2192        let post = std::fs::metadata(&lock_path).expect("lock file still present");
2193        assert_eq!(post.len(), 0, "stale metadata must be truncated in place");
2194
2195        // Second read also returns a clean default snapshot.
2196        let snapshot2 = read_search_maintenance_snapshot(temp.path());
2197        assert!(!snapshot2.active);
2198        assert!(!snapshot2.orphaned);
2199    }
2200
2201    #[test]
2202    fn live_owner_metadata_is_preserved_when_flock_is_held() -> std::io::Result<()> {
2203        // When the lock is actually held by a live owner, the snapshot
2204        // must report the owner faithfully and must NOT reap the file.
2205        use fs2::FileExt;
2206        let temp = tempfile::tempdir()?;
2207        let lock_path = temp.path().join("index-run.lock");
2208        let owner_pid = active_lock_fixture_pid(4242);
2209        let mut owner = OpenOptions::new()
2210            .create(true)
2211            .read(true)
2212            .write(true)
2213            .truncate(true)
2214            .open(&lock_path)?;
2215        owner.try_lock_exclusive()?;
2216        // Write metadata while holding the lock, matching acquire_index_run_lock's order.
2217        write_locked_metadata(
2218            &lock_path,
2219            &mut owner,
2220            format!(
2221                concat!(
2222                    "pid={}\n",
2223                    "started_at_ms=1733000111000\n",
2224                    "updated_at_ms=1733000112000\n",
2225                    "db_path=/tmp/cass/agent_search.db\n",
2226                    "mode=index\n",
2227                    "job_id=lexical-refresh-1733000111000-4242\n",
2228                    "job_kind=lexical_refresh\n",
2229                    "phase=rebuilding\n"
2230                ),
2231                owner_pid
2232            )
2233            .as_str(),
2234        )?;
2235
2236        let snapshot = read_search_maintenance_snapshot(temp.path());
2237        assert!(snapshot.active, "live owner must be reported active");
2238        assert!(!snapshot.orphaned);
2239        assert_eq!(snapshot.pid, Some(owner_pid));
2240        assert_eq!(
2241            snapshot.job_id.as_deref(),
2242            Some("lexical-refresh-1733000111000-4242")
2243        );
2244        assert_eq!(
2245            snapshot.job_kind,
2246            Some(SearchMaintenanceJobKind::LexicalRefresh)
2247        );
2248        assert_eq!(snapshot.phase.as_deref(), Some("rebuilding"));
2249        assert_eq!(snapshot.updated_at_ms, Some(1_733_000_112_000));
2250
2251        // Metadata must still be present — reaping must NOT have happened.
2252        let post = std::fs::metadata(&lock_path)?;
2253        assert!(post.len() > 0, "live-owner metadata must not be truncated");
2254
2255        FileExt::unlock(&owner)?;
2256        Ok(())
2257    }
2258
2259    #[test]
2260    fn lexical_storage_fingerprint_matching_handles_jitter_and_size_drift() {
2261        let cases = [
2262            (
2263                "small mtime settle jitter",
2264                "323584:1776310228000:329632:1776310227824",
2265                "323584:1776310227832:329632:1776310227824",
2266                true,
2267            ),
2268            (
2269                "wal size drift",
2270                "323584:1776310228000:329632:1776310227824",
2271                "323584:1776310227832:400000:1776310227824",
2272                false,
2273            ),
2274        ];
2275
2276        for (label, current, saved, expected) in cases {
2277            assert_eq!(
2278                lexical_storage_fingerprints_match(current, saved),
2279                expected,
2280                "{label}"
2281            );
2282        }
2283    }
2284
2285    #[test]
2286    fn lexical_state_marks_fingerprint_mismatch_stale() {
2287        let temp = tempfile::tempdir().expect("tempdir");
2288        let index_path = temp.path().join("index").join("v4");
2289        std::fs::create_dir_all(&index_path).expect("create index dir");
2290        // Simulate an existing tantivy index (meta.json present) so the
2291        // "missing" branch in lexical_state_from_observations doesn't short
2292        // circuit before the fingerprint check we want to exercise.
2293        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2294        let db_path = temp.path().join("agent_search.db");
2295        std::fs::write(&db_path, b"db").expect("write db file");
2296
2297        let checkpoint = LexicalRebuildCheckpoint {
2298            db_path: db_path.display().to_string(),
2299            total_conversations: 10,
2300            storage_fingerprint: "before".to_string(),
2301            committed_offset: 10,
2302            committed_conversation_id: Some(10),
2303            processed_conversations: 10,
2304            indexed_docs: 100,
2305            schema_hash: SCHEMA_HASH.to_string(),
2306            page_size: LEXICAL_REBUILD_PAGE_SIZE_PUBLIC,
2307            completed: true,
2308            updated_at_ms: 1_733_000_000_000,
2309        };
2310
2311        let state = lexical_state_from_observations(LexicalObservationInput {
2312            index_path: &index_path,
2313            db_path: &db_path,
2314            stale_threshold: 60,
2315            last_indexed_at_ms: Some(1_733_000_000_000),
2316            now_ms: 1_733_000_001_000,
2317            maintenance: SearchMaintenanceSnapshot::default(),
2318            checkpoint: Some(&checkpoint),
2319            current_db_fingerprint: Some("after"),
2320        });
2321
2322        assert_eq!(state.status, "stale");
2323        assert_eq!(
2324            state.fingerprint.matches_current_db_fingerprint,
2325            Some(false)
2326        );
2327        assert!(
2328            state
2329                .status_reason
2330                .as_deref()
2331                .is_some_and(|reason| reason.contains("fingerprint"))
2332        );
2333        assert_eq!(state.pending_sessions, 0);
2334        assert_eq!(state.processed_conversations, None);
2335        assert_eq!(state.total_conversations, None);
2336        assert_eq!(state.indexed_docs, None);
2337    }
2338
2339    #[test]
2340    fn lexical_state_marks_checkpoint_db_mismatch_stale_without_fingerprint_probe() {
2341        let temp = tempfile::tempdir().expect("tempdir");
2342        let index_path = temp.path().join("index").join("v4");
2343        std::fs::create_dir_all(&index_path).expect("create index dir");
2344        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2345        let db_path = temp.path().join("agent_search.db");
2346        let other_db_path = temp.path().join("other_agent_search.db");
2347        std::fs::write(&db_path, b"db").expect("write db file");
2348        std::fs::write(&other_db_path, b"other db").expect("write other db file");
2349
2350        let checkpoint = LexicalRebuildCheckpoint {
2351            db_path: other_db_path.display().to_string(),
2352            total_conversations: 10,
2353            storage_fingerprint: "old-db-fingerprint".to_string(),
2354            committed_offset: 10,
2355            committed_conversation_id: Some(10),
2356            processed_conversations: 10,
2357            indexed_docs: 100,
2358            schema_hash: SCHEMA_HASH.to_string(),
2359            page_size: LEXICAL_REBUILD_PAGE_SIZE_PUBLIC,
2360            completed: true,
2361            updated_at_ms: 1_733_000_000_000,
2362        };
2363
2364        let state = lexical_state_from_observations(LexicalObservationInput {
2365            index_path: &index_path,
2366            db_path: &db_path,
2367            stale_threshold: 60,
2368            last_indexed_at_ms: Some(1_733_000_000_000),
2369            now_ms: 1_733_000_001_000,
2370            maintenance: SearchMaintenanceSnapshot::default(),
2371            checkpoint: Some(&checkpoint),
2372            current_db_fingerprint: None,
2373        });
2374
2375        assert_eq!(state.status, "stale");
2376        assert!(state.stale);
2377        assert!(!state.fresh);
2378        assert_eq!(state.checkpoint.db_matches, Some(false));
2379        assert_eq!(state.fingerprint.matches_current_db_fingerprint, None);
2380        assert_eq!(state.pending_sessions, 0);
2381        assert_eq!(state.processed_conversations, None);
2382        assert_eq!(state.total_conversations, None);
2383        assert_eq!(state.indexed_docs, None);
2384        assert!(
2385            state
2386                .status_reason
2387                .as_deref()
2388                .is_some_and(|reason| reason.contains("different database"))
2389        );
2390    }
2391
2392    #[test]
2393    fn lexical_state_missing_index_is_not_marked_stale_until_initialized() {
2394        let temp = tempfile::tempdir().expect("tempdir");
2395        let index_path = temp.path().join("index").join("v4");
2396        std::fs::create_dir_all(&index_path).expect("create index dir");
2397        let db_path = temp.path().join("agent_search.db");
2398        std::fs::write(&db_path, b"db").expect("write db file");
2399
2400        let state = lexical_state_from_observations(LexicalObservationInput {
2401            index_path: &index_path,
2402            db_path: &db_path,
2403            stale_threshold: 60,
2404            last_indexed_at_ms: None,
2405            now_ms: 1_733_000_001_000,
2406            maintenance: SearchMaintenanceSnapshot::default(),
2407            checkpoint: None,
2408            current_db_fingerprint: None,
2409        });
2410
2411        assert_eq!(state.status, "missing");
2412        assert!(!state.exists);
2413        assert!(!state.stale);
2414        assert!(!state.fresh);
2415        assert_eq!(
2416            state.status_reason.as_deref(),
2417            Some("lexical Tantivy metadata missing")
2418        );
2419    }
2420
2421    #[test]
2422    fn lexical_state_keeps_progress_visible_during_active_rebuild_despite_fingerprint_drift() {
2423        let temp = tempfile::tempdir().expect("tempdir");
2424        let index_path = temp.path().join("index").join("v4");
2425        std::fs::create_dir_all(&index_path).expect("create index dir");
2426        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2427        let db_path = temp.path().join("agent_search.db");
2428        std::fs::write(&db_path, b"db").expect("write db file");
2429
2430        let checkpoint = LexicalRebuildCheckpoint {
2431            db_path: db_path.display().to_string(),
2432            total_conversations: 10,
2433            storage_fingerprint: "before".to_string(),
2434            committed_offset: 4,
2435            committed_conversation_id: Some(4),
2436            processed_conversations: 4,
2437            indexed_docs: 20,
2438            schema_hash: SCHEMA_HASH.to_string(),
2439            page_size: 200,
2440            completed: false,
2441            updated_at_ms: 1_733_000_123_000,
2442        };
2443
2444        let state = lexical_state_from_observations(LexicalObservationInput {
2445            index_path: &index_path,
2446            db_path: &db_path,
2447            stale_threshold: 60,
2448            last_indexed_at_ms: Some(1_733_000_000_000),
2449            now_ms: 1_733_000_001_000,
2450            maintenance: SearchMaintenanceSnapshot {
2451                active: true,
2452                pid: Some(std::process::id()),
2453                started_at_ms: Some(1_733_000_111_000),
2454                db_path: Some(db_path.clone()),
2455                mode: Some(SearchMaintenanceMode::Index),
2456                job_id: None,
2457                job_kind: None,
2458                phase: None,
2459                updated_at_ms: None,
2460                last_progress_at_ms: None,
2461                orphaned: false,
2462            },
2463            checkpoint: Some(&checkpoint),
2464            current_db_fingerprint: Some("after"),
2465        });
2466
2467        assert_eq!(state.status, "building");
2468        assert!(!state.stale);
2469        assert!(!state.fresh);
2470        assert_eq!(state.pending_sessions, 6);
2471        assert_eq!(state.processed_conversations, Some(4));
2472        assert_eq!(state.total_conversations, Some(10));
2473        assert_eq!(state.indexed_docs, Some(20));
2474        assert_eq!(state.checkpoint.page_size_matches, Some(false));
2475        assert_eq!(state.checkpoint.page_size_compatible, Some(true));
2476        assert_eq!(
2477            state.status_reason.as_deref(),
2478            Some("lexical rebuild is in progress")
2479        );
2480    }
2481
2482    #[test]
2483    fn lexical_state_hides_progress_for_incompatible_page_size_checkpoint() {
2484        let temp = tempfile::tempdir().expect("tempdir");
2485        let index_path = temp.path().join("index").join("v4");
2486        std::fs::create_dir_all(&index_path).expect("create index dir");
2487        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2488        let db_path = temp.path().join("agent_search.db");
2489        std::fs::write(&db_path, b"db").expect("write db file");
2490
2491        let checkpoint = LexicalRebuildCheckpoint {
2492            db_path: db_path.display().to_string(),
2493            total_conversations: 10,
2494            storage_fingerprint: "before".to_string(),
2495            committed_offset: 4,
2496            committed_conversation_id: Some(4),
2497            processed_conversations: 4,
2498            indexed_docs: 20,
2499            schema_hash: SCHEMA_HASH.to_string(),
2500            page_size: 13,
2501            completed: false,
2502            updated_at_ms: 1_733_000_123_000,
2503        };
2504
2505        let state = lexical_state_from_observations(LexicalObservationInput {
2506            index_path: &index_path,
2507            db_path: &db_path,
2508            stale_threshold: 60,
2509            last_indexed_at_ms: Some(1_733_000_000_000),
2510            now_ms: 1_733_000_001_000,
2511            maintenance: SearchMaintenanceSnapshot::default(),
2512            checkpoint: Some(&checkpoint),
2513            current_db_fingerprint: Some("before"),
2514        });
2515
2516        assert_eq!(state.status, "stale");
2517        assert!(state.stale);
2518        assert_eq!(state.pending_sessions, 0);
2519        assert_eq!(state.processed_conversations, None);
2520        assert_eq!(state.total_conversations, None);
2521        assert_eq!(state.indexed_docs, None);
2522        assert_eq!(state.checkpoint.page_size_matches, Some(false));
2523        assert_eq!(state.checkpoint.page_size_compatible, Some(false));
2524        assert!(
2525            state
2526                .status_reason
2527                .as_deref()
2528                .is_some_and(|reason| reason.contains("contract"))
2529        );
2530    }
2531
2532    #[test]
2533    fn lexical_state_prefers_newer_maintenance_heartbeat_over_stale_checkpoint_timestamp() {
2534        let temp = tempfile::tempdir().expect("tempdir");
2535        let index_path = temp.path().join("index").join("v4");
2536        std::fs::create_dir_all(&index_path).expect("create index dir");
2537        let db_path = temp.path().join("agent_search.db");
2538        std::fs::write(&db_path, b"db").expect("write db file");
2539
2540        let checkpoint = LexicalRebuildCheckpoint {
2541            db_path: db_path.display().to_string(),
2542            total_conversations: 10,
2543            storage_fingerprint: "before".to_string(),
2544            committed_offset: 4,
2545            committed_conversation_id: Some(4),
2546            processed_conversations: 4,
2547            indexed_docs: 20,
2548            schema_hash: SCHEMA_HASH.to_string(),
2549            page_size: LEXICAL_REBUILD_PAGE_SIZE_PUBLIC,
2550            completed: false,
2551            updated_at_ms: 1_733_000_123_000,
2552        };
2553
2554        let state = lexical_state_from_observations(LexicalObservationInput {
2555            index_path: &index_path,
2556            db_path: &db_path,
2557            stale_threshold: 60,
2558            last_indexed_at_ms: Some(1_733_000_000_000),
2559            now_ms: 1_733_000_001_000,
2560            maintenance: SearchMaintenanceSnapshot {
2561                active: true,
2562                pid: Some(std::process::id()),
2563                started_at_ms: Some(1_733_000_111_000),
2564                db_path: Some(db_path.clone()),
2565                mode: Some(SearchMaintenanceMode::Index),
2566                job_id: None,
2567                job_kind: None,
2568                phase: None,
2569                updated_at_ms: Some(1_733_000_456_000),
2570                last_progress_at_ms: None,
2571                orphaned: false,
2572            },
2573            checkpoint: Some(&checkpoint),
2574            current_db_fingerprint: Some("after"),
2575        });
2576
2577        assert_eq!(state.status, "building");
2578        assert_eq!(state.activity_at_ms, Some(1_733_000_456_000));
2579    }
2580
2581    #[test]
2582    fn lexical_state_ignores_rebuild_lock_for_different_database() {
2583        let temp = tempfile::tempdir().expect("tempdir");
2584        let index_path = temp.path().join("index").join("v4");
2585        std::fs::create_dir_all(&index_path).expect("create index dir");
2586        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2587        let db_path = temp.path().join("agent_search.db");
2588        std::fs::write(&db_path, b"db").expect("write db file");
2589        let other_db_path = temp.path().join("other.db");
2590        std::fs::write(&other_db_path, b"other").expect("write other db file");
2591
2592        let checkpoint = LexicalRebuildCheckpoint {
2593            db_path: db_path.display().to_string(),
2594            total_conversations: 10,
2595            storage_fingerprint: "before".to_string(),
2596            committed_offset: 4,
2597            committed_conversation_id: Some(4),
2598            processed_conversations: 4,
2599            indexed_docs: 20,
2600            schema_hash: SCHEMA_HASH.to_string(),
2601            page_size: LEXICAL_REBUILD_PAGE_SIZE_PUBLIC,
2602            completed: false,
2603            updated_at_ms: 1_733_000_123_000,
2604        };
2605
2606        let state = lexical_state_from_observations(LexicalObservationInput {
2607            index_path: &index_path,
2608            db_path: &db_path,
2609            stale_threshold: 60,
2610            last_indexed_at_ms: Some(1_733_000_000_000),
2611            now_ms: 1_733_000_001_000,
2612            maintenance: SearchMaintenanceSnapshot {
2613                active: true,
2614                pid: Some(std::process::id()),
2615                started_at_ms: Some(1_733_000_111_000),
2616                db_path: Some(other_db_path),
2617                mode: Some(SearchMaintenanceMode::Index),
2618                job_id: None,
2619                job_kind: None,
2620                phase: None,
2621                updated_at_ms: None,
2622                last_progress_at_ms: None,
2623                orphaned: false,
2624            },
2625            checkpoint: Some(&checkpoint),
2626            current_db_fingerprint: Some("after"),
2627        });
2628
2629        assert_eq!(state.status, "stale");
2630        assert!(state.stale);
2631        assert!(!state.fresh);
2632        assert!(!state.rebuilding);
2633        assert!(!state.watch_active);
2634        assert_eq!(state.activity_at_ms, None);
2635        assert_eq!(state.pending_sessions, 0);
2636        assert_eq!(state.processed_conversations, None);
2637        assert_eq!(state.total_conversations, None);
2638        assert_eq!(state.indexed_docs, None);
2639        assert!(
2640            state
2641                .status_reason
2642                .as_deref()
2643                .is_some_and(|reason| reason.contains("fingerprint"))
2644        );
2645    }
2646
2647    #[test]
2648    fn lexical_state_ignores_watch_lock_for_different_database() {
2649        let temp = tempfile::tempdir().expect("tempdir");
2650        let index_path = temp.path().join("index").join("v4");
2651        std::fs::create_dir_all(&index_path).expect("create index dir");
2652        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2653        let db_path = temp.path().join("agent_search.db");
2654        std::fs::write(&db_path, b"db").expect("write db file");
2655        let other_db_path = temp.path().join("other.db");
2656        std::fs::write(&other_db_path, b"other").expect("write other db file");
2657
2658        let state = lexical_state_from_observations(LexicalObservationInput {
2659            index_path: &index_path,
2660            db_path: &db_path,
2661            stale_threshold: 60,
2662            last_indexed_at_ms: Some(1_733_000_000_000),
2663            now_ms: 1_733_000_020_000,
2664            maintenance: SearchMaintenanceSnapshot {
2665                active: true,
2666                pid: Some(std::process::id()),
2667                started_at_ms: Some(1_733_000_111_000),
2668                db_path: Some(other_db_path),
2669                mode: Some(SearchMaintenanceMode::Watch),
2670                job_id: None,
2671                job_kind: None,
2672                phase: None,
2673                updated_at_ms: None,
2674                last_progress_at_ms: None,
2675                orphaned: false,
2676            },
2677            checkpoint: None,
2678            current_db_fingerprint: None,
2679        });
2680
2681        assert_eq!(state.status, "ready");
2682        assert!(state.fresh);
2683        assert!(!state.stale);
2684        assert!(!state.rebuilding);
2685        assert!(!state.watch_active);
2686        assert_eq!(state.activity_at_ms, None);
2687    }
2688
2689    // ---- Forward-progress liveness / stall detection (issue #258) ----
2690
2691    /// `last_progress_at_ms` is older than the default 120 s stall
2692    /// threshold; the heartbeat `updated_at_ms` is fresh (the heartbeat
2693    /// thread kept refreshing it independently). Status must flip to
2694    /// `stalled`, NOT remain `building` — this is the regression #258
2695    /// guards against.
2696    #[test]
2697    fn lexical_state_reports_stalled_when_progress_is_stale_despite_fresh_heartbeat() {
2698        let temp = tempfile::tempdir().expect("tempdir");
2699        let index_path = temp.path().join("index").join("v4");
2700        std::fs::create_dir_all(&index_path).expect("create index dir");
2701        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2702        let db_path = temp.path().join("agent_search.db");
2703        std::fs::write(&db_path, b"db").expect("write db file");
2704
2705        // now = 1_733_000_300 s (= 1_733_000_300_000 ms).
2706        // heartbeat updated 500 ms ago: fresh.
2707        // forward progress posted 300 s ago: well past the 120 s default stall threshold.
2708        // F4 (cass tech debt): the input now carries full-precision
2709        // `now_ms` end-to-end. Tests that previously needed
2710        // `now_secs as i64 * 1000` no longer have to round-trip.
2711        let now_ms: i64 = 1_733_000_300_000;
2712
2713        let state = lexical_state_from_observations(LexicalObservationInput {
2714            index_path: &index_path,
2715            db_path: &db_path,
2716            stale_threshold: 60,
2717            last_indexed_at_ms: Some(1_733_000_000_000),
2718            now_ms,
2719            maintenance: SearchMaintenanceSnapshot {
2720                active: true,
2721                pid: Some(std::process::id()),
2722                started_at_ms: Some(now_ms - 600_000),
2723                db_path: Some(db_path.clone()),
2724                mode: Some(SearchMaintenanceMode::WatchStartup),
2725                job_id: Some("lexical_refresh-1-1".to_string()),
2726                job_kind: Some(SearchMaintenanceJobKind::LexicalRefresh),
2727                phase: Some("watch_startup".to_string()),
2728                updated_at_ms: Some(now_ms - 500),
2729                last_progress_at_ms: Some(now_ms - 300_000),
2730                orphaned: false,
2731            },
2732            checkpoint: None,
2733            current_db_fingerprint: None,
2734        });
2735
2736        assert!(state.rebuilding, "active rebuild lock must still register");
2737        assert!(
2738            state.stalled,
2739            "stale forward-progress timestamp must flip stalled=true",
2740        );
2741        assert_eq!(state.status, "stalled");
2742        assert_eq!(
2743            state.last_progress_at_ms,
2744            Some(now_ms - 300_000),
2745            "last_progress_at_ms must be surfaced to status callers",
2746        );
2747        let age = state
2748            .last_progress_age_ms
2749            .expect("last_progress_age_ms must be computed");
2750        assert!(
2751            (299_900..=300_100).contains(&age),
2752            "computed last_progress_age_ms ({age}ms) should equal now - last_progress_at_ms (300_000ms)",
2753        );
2754        let reason = state
2755            .status_reason
2756            .as_deref()
2757            .expect("stalled state should populate status_reason");
2758        assert!(
2759            reason.contains("forward progress")
2760                && (reason.contains("#258") || reason.contains("issue #258")),
2761            "status_reason should mention forward progress and reference #258 ({reason})",
2762        );
2763    }
2764
2765    /// Heartbeat is fresh AND forward-progress is fresh: no stall.
2766    /// Status remains `building`. Ensures the new gate does not regress
2767    /// the happy path.
2768    #[test]
2769    fn lexical_state_stays_building_when_progress_is_recent() {
2770        let temp = tempfile::tempdir().expect("tempdir");
2771        let index_path = temp.path().join("index").join("v4");
2772        std::fs::create_dir_all(&index_path).expect("create index dir");
2773        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2774        let db_path = temp.path().join("agent_search.db");
2775        std::fs::write(&db_path, b"db").expect("write db file");
2776
2777        // F4 (cass tech debt): the input now carries full-precision
2778        // `now_ms` end-to-end. Tests that previously needed
2779        // `now_secs as i64 * 1000` no longer have to round-trip.
2780        let now_ms: i64 = 1_733_000_300_000;
2781
2782        let state = lexical_state_from_observations(LexicalObservationInput {
2783            index_path: &index_path,
2784            db_path: &db_path,
2785            stale_threshold: 60,
2786            last_indexed_at_ms: Some(1_733_000_000_000),
2787            now_ms,
2788            maintenance: SearchMaintenanceSnapshot {
2789                active: true,
2790                pid: Some(std::process::id()),
2791                started_at_ms: Some(now_ms - 30_000),
2792                db_path: Some(db_path.clone()),
2793                mode: Some(SearchMaintenanceMode::Index),
2794                job_id: Some("lexical_refresh-1-1".to_string()),
2795                job_kind: Some(SearchMaintenanceJobKind::LexicalRefresh),
2796                phase: Some("scanning".to_string()),
2797                updated_at_ms: Some(now_ms - 500),
2798                last_progress_at_ms: Some(now_ms - 1_000),
2799                orphaned: false,
2800            },
2801            checkpoint: None,
2802            current_db_fingerprint: None,
2803        });
2804
2805        assert!(state.rebuilding);
2806        assert!(!state.stalled, "fresh progress must not flip stalled");
2807        assert_eq!(state.status, "building");
2808    }
2809
2810    /// Legacy lock files (older cass that didn't write
2811    /// `last_progress_at_ms`) must not be misreported as stalled. The
2812    /// stall check only fires when an explicit `last_progress_at_ms`
2813    /// is present; absent that, we fall back to the previous behavior.
2814    #[test]
2815    fn lexical_state_does_not_stall_when_legacy_lock_omits_progress_field() {
2816        let temp = tempfile::tempdir().expect("tempdir");
2817        let index_path = temp.path().join("index").join("v4");
2818        std::fs::create_dir_all(&index_path).expect("create index dir");
2819        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2820        let db_path = temp.path().join("agent_search.db");
2821        std::fs::write(&db_path, b"db").expect("write db file");
2822
2823        // F4 (cass tech debt): the input now carries full-precision
2824        // `now_ms` end-to-end. Tests that previously needed
2825        // `now_secs as i64 * 1000` no longer have to round-trip.
2826        let now_ms: i64 = 1_733_000_300_000;
2827
2828        let state = lexical_state_from_observations(LexicalObservationInput {
2829            index_path: &index_path,
2830            db_path: &db_path,
2831            stale_threshold: 60,
2832            last_indexed_at_ms: Some(1_733_000_000_000),
2833            now_ms,
2834            maintenance: SearchMaintenanceSnapshot {
2835                active: true,
2836                pid: Some(std::process::id()),
2837                started_at_ms: Some(now_ms - 30_000),
2838                db_path: Some(db_path.clone()),
2839                mode: Some(SearchMaintenanceMode::Index),
2840                job_id: None,
2841                job_kind: None,
2842                phase: None,
2843                updated_at_ms: Some(now_ms - 500),
2844                last_progress_at_ms: None,
2845                orphaned: false,
2846            },
2847            checkpoint: None,
2848            current_db_fingerprint: None,
2849        });
2850
2851        assert!(state.rebuilding);
2852        assert!(
2853            !state.stalled,
2854            "legacy lock without last_progress_at_ms must NOT be misreported as stalled",
2855        );
2856        assert_eq!(state.status, "building");
2857        assert!(state.last_progress_age_ms.is_none());
2858    }
2859
2860    /// F4 (cass tech debt): the stall-age comparison must operate at
2861    /// full millisecond precision. Pre-fix, `now_ms` was derived from
2862    /// `now_secs * 1000` inside `lexical_state_from_observations`, which
2863    /// quantised the comparison to second resolution and made a
2864    /// 119_900 ms-old progress timestamp indistinguishable from a 119
2865    /// 000 ms-old one. This test pins a 119_500 ms age so that the only
2866    /// way it surfaces correctly as a `last_progress_age_ms` close to
2867    /// 119_500 (and NOT 119_000 or 120_000) is full-ms plumbing.
2868    #[test]
2869    fn lexical_state_progress_age_is_ms_precision_not_seconds_quantised() {
2870        let temp = tempfile::tempdir().expect("tempdir");
2871        let index_path = temp.path().join("index").join("v4");
2872        std::fs::create_dir_all(&index_path).expect("create index dir");
2873        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2874        let db_path = temp.path().join("agent_search.db");
2875        std::fs::write(&db_path, b"db").expect("write db file");
2876
2877        // `now_ms` deliberately lands at .700 of a second so any
2878        // second-quantisation upstream would shave 700 ms off the diff.
2879        let now_ms: i64 = 1_733_000_300_700;
2880        let last_progress_at_ms = now_ms - 119_500;
2881
2882        let state = lexical_state_from_observations(LexicalObservationInput {
2883            index_path: &index_path,
2884            db_path: &db_path,
2885            stale_threshold: 60,
2886            last_indexed_at_ms: Some(1_733_000_000_000),
2887            now_ms,
2888            maintenance: SearchMaintenanceSnapshot {
2889                active: true,
2890                pid: Some(std::process::id()),
2891                started_at_ms: Some(now_ms - 600_000),
2892                db_path: Some(db_path.clone()),
2893                mode: Some(SearchMaintenanceMode::WatchStartup),
2894                job_id: Some("lexical_refresh-1-1".to_string()),
2895                job_kind: Some(SearchMaintenanceJobKind::LexicalRefresh),
2896                phase: Some("watch_startup".to_string()),
2897                updated_at_ms: Some(now_ms - 500),
2898                last_progress_at_ms: Some(last_progress_at_ms),
2899                orphaned: false,
2900            },
2901            checkpoint: None,
2902            current_db_fingerprint: None,
2903        });
2904
2905        let age = state
2906            .last_progress_age_ms
2907            .expect("forward-progress age must be computed");
2908        assert_eq!(
2909            age, 119_500,
2910            "ms-precision plumbing must surface the exact diff (no second-quantisation)"
2911        );
2912        // 119_500 ms is still under the default 120 s stall threshold,
2913        // so we should be `building`, not `stalled`. Pre-F4, a
2914        // second-quantised clock could either floor the diff to 119_000
2915        // (still building, OK) OR — on different `.fff` ms suffixes —
2916        // round it to 120_000 (false-positive stall). Pinning the
2917        // expected status here protects both edges.
2918        assert!(
2919            !state.stalled,
2920            "119.5 s lag must remain `building`, not flip to `stalled`",
2921        );
2922        assert_eq!(state.status, "building");
2923    }
2924
2925    /// Coordination outcome layer must also degrade to `Stale` when
2926    /// forward progress is stuck — search-side single-flight callers
2927    /// then route around the wedged worker instead of attaching to it.
2928    #[test]
2929    fn coordination_reports_stale_when_forward_progress_is_stuck() {
2930        let now_ms: i64 = 1_733_000_300_000;
2931        let snapshot = SearchMaintenanceSnapshot {
2932            active: true,
2933            pid: Some(12345),
2934            started_at_ms: Some(now_ms - 600_000),
2935            db_path: Some(PathBuf::from("/tmp/cass/agent_search.db")),
2936            mode: Some(SearchMaintenanceMode::WatchStartup),
2937            job_id: Some("lexical_refresh-1-12345".to_string()),
2938            job_kind: Some(SearchMaintenanceJobKind::LexicalRefresh),
2939            phase: Some("watch_startup".to_string()),
2940            updated_at_ms: Some(now_ms - 500),
2941            last_progress_at_ms: Some(now_ms - 300_000),
2942            orphaned: false,
2943        };
2944
2945        let outcome = evaluate_maintenance_coordination_from_snapshot(&snapshot, now_ms);
2946        match outcome {
2947            MaintenanceCoordinationOutcome::Stale { ref reason, .. } => {
2948                assert!(
2949                    reason.contains("forward progress")
2950                        && (reason.contains("#258") || reason.contains("issue #258")),
2951                    "stalled coordination reason should mention forward progress and #258: {reason}",
2952                );
2953            }
2954            other => assert!(
2955                matches!(other, MaintenanceCoordinationOutcome::Stale { .. }),
2956                "stalled forward-progress snapshot must coordinate as Stale, got {other:?}",
2957            ),
2958        }
2959    }
2960
2961    #[test]
2962    fn inspect_search_assets_preserves_semantic_database_unavailable_signal() {
2963        let temp = tempfile::tempdir().expect("tempdir");
2964        let index_path = temp.path().join("index").join("v4");
2965        std::fs::create_dir_all(&index_path).expect("create index dir");
2966        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2967
2968        let db_path = temp.path().join("agent_search.db");
2969        std::fs::create_dir_all(&db_path).expect("create unopenable db path");
2970
2971        let vector_path = vector_index_path(temp.path(), HashEmbedder::default().id());
2972        std::fs::create_dir_all(vector_path.parent().expect("vector parent"))
2973            .expect("create vector dir");
2974        std::fs::write(&vector_path, b"index").expect("write vector index");
2975
2976        let snapshot = inspect_search_assets(InspectSearchAssetsInput {
2977            data_dir: temp.path(),
2978            db_path: &db_path,
2979            stale_threshold: 60,
2980            last_indexed_at_ms: Some(1_733_000_000_000),
2981            now_ms: 1_733_000_001_000,
2982            maintenance: SearchMaintenanceSnapshot::default(),
2983            semantic_preference: SemanticPreference::HashFallback,
2984            db_available: false,
2985            compute_lexical_fingerprint: false,
2986            inspect_semantic: true,
2987        })
2988        .expect("asset inspection should not fail when db availability is already known");
2989
2990        assert_ne!(snapshot.lexical.status, "error");
2991        assert_eq!(snapshot.semantic.status, "error");
2992        assert_eq!(snapshot.semantic.availability, "database_unavailable");
2993        assert_eq!(snapshot.semantic.fallback_mode, Some("lexical"));
2994        assert!(snapshot.semantic.summary.contains("db unavailable"));
2995    }
2996
2997    #[test]
2998    fn inspect_search_assets_can_skip_semantic_db_open_for_fast_paths() {
2999        let temp = tempfile::tempdir().expect("tempdir");
3000        let index_path = temp.path().join("index").join("v4");
3001        std::fs::create_dir_all(&index_path).expect("create index dir");
3002        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
3003
3004        let db_path = temp.path().join("agent_search.db");
3005        std::fs::create_dir_all(&db_path).expect("create unopenable db path");
3006
3007        let vector_path = vector_index_path(temp.path(), HashEmbedder::default().id());
3008        std::fs::create_dir_all(vector_path.parent().expect("vector parent"))
3009            .expect("create vector dir");
3010        std::fs::write(&vector_path, b"index").expect("write vector index");
3011
3012        let snapshot = inspect_search_assets(InspectSearchAssetsInput {
3013            data_dir: temp.path(),
3014            db_path: &db_path,
3015            stale_threshold: 60,
3016            last_indexed_at_ms: Some(1_733_000_000_000),
3017            now_ms: 1_733_000_001_000,
3018            maintenance: SearchMaintenanceSnapshot::default(),
3019            semantic_preference: SemanticPreference::HashFallback,
3020            db_available: false,
3021            compute_lexical_fingerprint: false,
3022            inspect_semantic: false,
3023        })
3024        .expect("asset inspection should not open semantic DB when semantic inspection is skipped");
3025
3026        assert_ne!(snapshot.lexical.status, "error");
3027        assert_eq!(snapshot.semantic.status, "not_inspected");
3028        assert_eq!(snapshot.semantic.availability, "not_inspected");
3029        assert_eq!(snapshot.semantic.fallback_mode, Some("lexical"));
3030    }
3031
3032    #[test]
3033    fn inspect_search_assets_trusts_db_probe_for_semantic_metadata_probe() {
3034        let temp = tempfile::tempdir().expect("tempdir");
3035        let index_path = temp.path().join("index").join("v4");
3036        std::fs::create_dir_all(&index_path).expect("create index dir");
3037        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
3038
3039        let db_path = temp.path().join("agent_search.db");
3040        std::fs::create_dir_all(&db_path).expect("create unopenable db path");
3041
3042        let vector_path = vector_index_path(temp.path(), HashEmbedder::default().id());
3043        std::fs::create_dir_all(vector_path.parent().expect("vector parent"))
3044            .expect("create vector dir");
3045        std::fs::write(&vector_path, b"index").expect("write vector index");
3046
3047        let snapshot = inspect_search_assets(InspectSearchAssetsInput {
3048            data_dir: temp.path(),
3049            db_path: &db_path,
3050            stale_threshold: 60,
3051            last_indexed_at_ms: Some(1_733_000_000_000),
3052            now_ms: 1_733_000_001_000,
3053            maintenance: SearchMaintenanceSnapshot::default(),
3054            semantic_preference: SemanticPreference::HashFallback,
3055            db_available: true,
3056            compute_lexical_fingerprint: false,
3057            inspect_semantic: true,
3058        })
3059        .expect("semantic metadata probe should trust the existing DB availability signal");
3060
3061        assert_eq!(snapshot.semantic.status, "hash_fallback");
3062        assert_eq!(snapshot.semantic.availability, "hash_fallback");
3063        assert!(snapshot.semantic.can_search);
3064    }
3065
3066    #[test]
3067    fn semantic_state_reports_hash_fallback_as_searchable() {
3068        let state = semantic_state_from_availability(
3069            Path::new("/tmp/cass"),
3070            &SemanticAvailability::HashFallback,
3071            SemanticPreference::HashFallback,
3072            None,
3073        );
3074
3075        assert_eq!(state.status, "hash_fallback");
3076        assert_eq!(state.availability, "hash_fallback");
3077        assert!(state.available);
3078        assert!(state.can_search);
3079        assert_eq!(state.fallback_mode, None);
3080    }
3081
3082    #[test]
3083    fn semantic_preference_surface_preserves_backend_and_model_dir_projection() {
3084        let data_dir = Path::new("/tmp/cass");
3085        let cases = [
3086            (
3087                SemanticPreference::DefaultModel,
3088                "fastembed",
3089                Some(FastEmbedder::default_model_dir(data_dir)),
3090            ),
3091            (SemanticPreference::HashFallback, "hash", None),
3092        ];
3093
3094        for (preference, expected_backend, expected_model_dir) in cases {
3095            let surface = semantic_preference_surface(data_dir, preference);
3096
3097            assert_eq!(surface.preferred_backend, expected_backend);
3098            assert_eq!(surface.model_dir, expected_model_dir);
3099        }
3100    }
3101
3102    #[test]
3103    fn semantic_state_detects_progressive_and_hnsw_assets() {
3104        let temp = tempfile::tempdir().expect("tempdir");
3105        let vector_dir = temp.path().join(VECTOR_INDEX_DIR);
3106        std::fs::create_dir_all(&vector_dir).expect("create vector dir");
3107        std::fs::write(vector_dir.join("vector.fast.idx"), b"fast").expect("write fast tier");
3108        std::fs::write(vector_dir.join("vector.quality.idx"), b"quality")
3109            .expect("write quality tier");
3110        let hnsw_path = hnsw_index_path(temp.path(), FastEmbedder::embedder_id_static());
3111        std::fs::write(&hnsw_path, b"hnsw").expect("write hnsw");
3112
3113        let state = semantic_state_from_availability(
3114            temp.path(),
3115            &SemanticAvailability::Ready {
3116                embedder_id: FastEmbedder::embedder_id_static().to_string(),
3117            },
3118            SemanticPreference::DefaultModel,
3119            None,
3120        );
3121
3122        assert_eq!(state.status, "ready");
3123        assert!(state.progressive_ready);
3124        assert!(state.hnsw_ready);
3125        assert_eq!(
3126            state.embedder_id.as_deref(),
3127            Some(FastEmbedder::embedder_id_static())
3128        );
3129    }
3130
3131    #[test]
3132    fn semantic_state_reports_backfill_when_manifest_only_has_stale_assets() {
3133        let temp = tempfile::tempdir().expect("tempdir");
3134        let mut manifest = SemanticManifest {
3135            fast_tier: Some(ArtifactRecord {
3136                tier: crate::search::semantic_manifest::TierKind::Fast,
3137                embedder_id: HashEmbedder::default().id().to_string(),
3138                model_revision: "hash".to_string(),
3139                schema_version: crate::search::policy::SEMANTIC_SCHEMA_VERSION,
3140                chunking_version: crate::search::policy::CHUNKING_STRATEGY_VERSION,
3141                dimension: 256,
3142                doc_count: 12,
3143                conversation_count: 3,
3144                db_fingerprint: "stale-db".to_string(),
3145                index_path: "vector_index/vector.fast.idx".to_string(),
3146                size_bytes: 4096,
3147                started_at_ms: 1_733_100_000_000,
3148                completed_at_ms: 1_733_100_100_000,
3149                ready: true,
3150            }),
3151            backlog: crate::search::semantic_manifest::BacklogLedger {
3152                total_conversations: 20,
3153                fast_tier_processed: 3,
3154                quality_tier_processed: 0,
3155                db_fingerprint: "current-db".to_string(),
3156                computed_at_ms: 1_733_100_200_000,
3157            },
3158            checkpoint: Some(BuildCheckpoint {
3159                tier: crate::search::semantic_manifest::TierKind::Fast,
3160                embedder_id: HashEmbedder::default().id().to_string(),
3161                last_offset: 77,
3162                docs_embedded: 66,
3163                conversations_processed: 3,
3164                total_conversations: 20,
3165                db_fingerprint: "current-db".to_string(),
3166                schema_version: crate::search::policy::SEMANTIC_SCHEMA_VERSION,
3167                chunking_version: crate::search::policy::CHUNKING_STRATEGY_VERSION,
3168                saved_at_ms: 1_733_100_300_000,
3169                last_message_id: None,
3170                cursor_exhausted: false,
3171            }),
3172            ..Default::default()
3173        };
3174        manifest.save(temp.path()).expect("save semantic manifest");
3175
3176        let state = semantic_state_from_availability(
3177            temp.path(),
3178            &SemanticAvailability::NeedsConsent,
3179            SemanticPreference::DefaultModel,
3180            Some("current-db"),
3181        );
3182
3183        assert_eq!(state.status, "building");
3184        assert_eq!(state.availability, "index_building");
3185        assert!(!state.can_search);
3186        assert_eq!(state.fallback_mode, Some("lexical"));
3187        assert!(state.summary.contains("backfill"));
3188        assert!(
3189            state
3190                .hint
3191                .as_deref()
3192                .is_some_and(|hint| hint.contains("finish backfilling"))
3193        );
3194    }
3195
3196    #[test]
3197    fn semantic_state_prefers_current_hash_tier_over_missing_model() {
3198        let temp = tempfile::tempdir().expect("tempdir");
3199        let mut manifest = SemanticManifest {
3200            fast_tier: Some(ArtifactRecord {
3201                tier: crate::search::semantic_manifest::TierKind::Fast,
3202                embedder_id: HashEmbedder::default().id().to_string(),
3203                model_revision: "hash".to_string(),
3204                schema_version: crate::search::policy::SEMANTIC_SCHEMA_VERSION,
3205                chunking_version: crate::search::policy::CHUNKING_STRATEGY_VERSION,
3206                dimension: 256,
3207                doc_count: 12,
3208                conversation_count: 3,
3209                db_fingerprint: "current-db".to_string(),
3210                index_path: "vector_index/vector.fast.idx".to_string(),
3211                size_bytes: 4096,
3212                started_at_ms: 1_733_100_000_000,
3213                completed_at_ms: 1_733_100_100_000,
3214                ready: true,
3215            }),
3216            ..Default::default()
3217        };
3218        manifest.save(temp.path()).expect("save semantic manifest");
3219        let vector_path = vector_index_path(temp.path(), HashEmbedder::default().id());
3220        std::fs::create_dir_all(vector_path.parent().expect("vector parent"))
3221            .expect("create vector dir");
3222        std::fs::write(&vector_path, b"fast").expect("write fast vector index");
3223
3224        let state = semantic_state_from_availability(
3225            temp.path(),
3226            &SemanticAvailability::NeedsConsent,
3227            SemanticPreference::DefaultModel,
3228            Some("current-db"),
3229        );
3230
3231        assert_eq!(state.status, "ready");
3232        assert_eq!(state.availability, "ready");
3233        assert!(state.can_search);
3234        assert_eq!(state.fallback_mode, None);
3235        assert_eq!(
3236            state.embedder_id.as_deref(),
3237            Some(HashEmbedder::default().id())
3238        );
3239        assert_eq!(state.model_dir, None);
3240        assert_eq!(
3241            state.vector_index_path.as_deref(),
3242            Some(vector_path.as_path())
3243        );
3244        assert_eq!(state.hint, None);
3245    }
3246
3247    #[test]
3248    fn semantic_state_treats_ready_quality_tier_with_unknown_db_match_as_queryable() {
3249        let temp = tempfile::tempdir().expect("tempdir");
3250        let mut manifest = SemanticManifest {
3251            quality_tier: Some(ArtifactRecord {
3252                tier: crate::search::semantic_manifest::TierKind::Quality,
3253                embedder_id: HashEmbedder::default().id().to_string(),
3254                model_revision: "hash".to_string(),
3255                schema_version: crate::search::policy::SEMANTIC_SCHEMA_VERSION,
3256                chunking_version: crate::search::policy::CHUNKING_STRATEGY_VERSION,
3257                dimension: 256,
3258                doc_count: 249,
3259                conversation_count: 21,
3260                db_fingerprint: "boxed-db".to_string(),
3261                index_path: "vector_index/vector.quality.idx".to_string(),
3262                size_bytes: 221_824,
3263                started_at_ms: 1_733_100_000_000,
3264                completed_at_ms: 1_733_100_100_000,
3265                ready: true,
3266            }),
3267            ..Default::default()
3268        };
3269        manifest.save(temp.path()).expect("save semantic manifest");
3270
3271        let state = semantic_state_from_availability(
3272            temp.path(),
3273            &SemanticAvailability::NeedsConsent,
3274            SemanticPreference::DefaultModel,
3275            None,
3276        );
3277
3278        assert_eq!(state.quality_tier.current_db_matches, None);
3279        assert!(
3280            state.quality_tier_published,
3281            "a ready quality tier with unknown DB match should remain visible as published in boxed data-dir status"
3282        );
3283        assert!(
3284            state.semantic_only_search_available,
3285            "semantic-only search can still run when the quality tier is ready and DB match is unknown"
3286        );
3287        assert!(state.can_search);
3288        assert_eq!(state.fallback_mode, None);
3289    }
3290
3291    #[test]
3292    fn semantic_state_promotes_complete_current_shard_generation() {
3293        let temp = tempfile::tempdir().expect("tempdir");
3294        let embedder_id = HashEmbedder::default().id().to_string();
3295        let mut records = Vec::new();
3296        for shard_index in 0..2_u32 {
3297            let relative_path = format!("vector_index/shards/fast-hash/shard-{shard_index}.fsvi");
3298            let path = temp.path().join(&relative_path);
3299            std::fs::create_dir_all(path.parent().expect("shard parent"))
3300                .expect("create shard parent");
3301            std::fs::write(&path, b"fsvi").expect("write shard placeholder");
3302            records.push(SemanticShardRecord {
3303                tier: TierKind::Fast,
3304                embedder_id: embedder_id.clone(),
3305                model_revision: "hash".to_string(),
3306                schema_version: SEMANTIC_SCHEMA_VERSION,
3307                chunking_version: CHUNKING_STRATEGY_VERSION,
3308                dimension: HashEmbedder::default().dimension(),
3309                shard_index,
3310                shard_count: 2,
3311                doc_count: 10 + u64::from(shard_index),
3312                total_conversations: 7,
3313                db_fingerprint: "current-db".to_string(),
3314                index_path: relative_path,
3315                quantization: "f16".to_string(),
3316                mmap_ready: true,
3317                ann_index_path: None,
3318                ann_size_bytes: 0,
3319                ann_ready: false,
3320                size_bytes: 100 + u64::from(shard_index),
3321                started_at_ms: 1_733_100_000_000,
3322                completed_at_ms: 1_733_100_000_000 + i64::from(shard_index),
3323                ready: true,
3324            });
3325        }
3326        let mut shards = SemanticShardManifest {
3327            shards: records,
3328            ..Default::default()
3329        };
3330        shards.save(temp.path()).expect("save shard manifest");
3331
3332        let state = semantic_state_from_availability(
3333            temp.path(),
3334            &SemanticAvailability::IndexMissing {
3335                index_path: vector_index_path(temp.path(), &embedder_id),
3336            },
3337            SemanticPreference::HashFallback,
3338            Some("current-db"),
3339        );
3340
3341        assert_eq!(state.status, "ready");
3342        assert_eq!(state.availability, "ready");
3343        assert!(state.can_search);
3344        assert_eq!(state.fallback_mode, None);
3345        assert_eq!(state.fast_tier.doc_count, Some(21));
3346        let expected_path = temp
3347            .path()
3348            .join("vector_index/shards/fast-hash/shard-0.fsvi");
3349        assert_eq!(
3350            state.vector_index_path.as_deref(),
3351            Some(expected_path.as_path())
3352        );
3353    }
3354
3355    #[test]
3356    fn semantic_state_rejects_complete_shard_generation_with_unsafe_path() {
3357        let temp = tempfile::tempdir().expect("tempdir");
3358        let outside = tempfile::tempdir().expect("outside tempdir");
3359        let outside_path = outside.path().join("outside.fsvi");
3360        std::fs::write(&outside_path, b"fsvi").expect("write outside placeholder");
3361        let embedder_id = HashEmbedder::default().id().to_string();
3362        let mut shards = SemanticShardManifest {
3363            shards: vec![SemanticShardRecord {
3364                tier: TierKind::Fast,
3365                embedder_id: embedder_id.clone(),
3366                model_revision: "hash".to_string(),
3367                schema_version: SEMANTIC_SCHEMA_VERSION,
3368                chunking_version: CHUNKING_STRATEGY_VERSION,
3369                dimension: HashEmbedder::default().dimension(),
3370                shard_index: 0,
3371                shard_count: 1,
3372                doc_count: 10,
3373                total_conversations: 7,
3374                db_fingerprint: "current-db".to_string(),
3375                index_path: outside_path.to_string_lossy().to_string(),
3376                quantization: "f16".to_string(),
3377                mmap_ready: true,
3378                ann_index_path: None,
3379                ann_size_bytes: 0,
3380                ann_ready: false,
3381                size_bytes: 100,
3382                started_at_ms: 1_733_100_000_000,
3383                completed_at_ms: 1_733_100_000_001,
3384                ready: true,
3385            }],
3386            ..Default::default()
3387        };
3388        shards.save(temp.path()).expect("save shard manifest");
3389
3390        let base_vector_path = vector_index_path(temp.path(), &embedder_id);
3391        let state = semantic_state_from_availability(
3392            temp.path(),
3393            &SemanticAvailability::IndexMissing {
3394                index_path: base_vector_path.clone(),
3395            },
3396            SemanticPreference::HashFallback,
3397            Some("current-db"),
3398        );
3399
3400        assert_ne!(state.status, "ready");
3401        assert!(!state.can_search);
3402        assert_eq!(
3403            state.vector_index_path.as_deref(),
3404            Some(base_vector_path.as_path())
3405        );
3406        assert_ne!(
3407            state.vector_index_path.as_deref(),
3408            Some(outside_path.as_path())
3409        );
3410    }
3411
3412    // -----------------------------------------------------------------------
3413    // Maintenance coordination tests
3414    // -----------------------------------------------------------------------
3415
3416    fn make_active_snapshot(now_ms: i64) -> SearchMaintenanceSnapshot {
3417        SearchMaintenanceSnapshot {
3418            active: true,
3419            pid: Some(12345),
3420            started_at_ms: Some(now_ms - 5_000),
3421            db_path: Some(PathBuf::from("/tmp/cass/agent_search.db")),
3422            mode: Some(SearchMaintenanceMode::Index),
3423            job_id: Some("lexical_refresh-1000-12345".to_string()),
3424            job_kind: Some(SearchMaintenanceJobKind::LexicalRefresh),
3425            phase: Some("scanning".to_string()),
3426            updated_at_ms: Some(now_ms - 500),
3427            last_progress_at_ms: Some(now_ms - 500),
3428            orphaned: false,
3429        }
3430    }
3431
3432    #[test]
3433    fn coordination_no_active_job_when_snapshot_inactive() {
3434        let snapshot = SearchMaintenanceSnapshot::default();
3435        let outcome = evaluate_maintenance_coordination_from_snapshot(&snapshot, 1_733_000_000_000);
3436        assert_eq!(outcome, MaintenanceCoordinationOutcome::Idle);
3437    }
3438
3439    #[test]
3440    fn coordination_tracks_active_legacy_lock_without_job_id() {
3441        let snapshot = SearchMaintenanceSnapshot {
3442            active: true,
3443            pid: Some(12345),
3444            job_id: None,
3445            mode: Some(SearchMaintenanceMode::Index),
3446            ..Default::default()
3447        };
3448        let outcome = evaluate_maintenance_coordination_from_snapshot(&snapshot, 1_733_000_000_000);
3449        if let MaintenanceCoordinationOutcome::Active {
3450            ref job_id,
3451            job_kind,
3452            ..
3453        } = outcome
3454        {
3455            assert_eq!(job_id, "index-active-lock-12345");
3456            assert_eq!(job_kind, SearchMaintenanceJobKind::LexicalRefresh);
3457        } else {
3458            assert!(
3459                matches!(outcome, MaintenanceCoordinationOutcome::Active { .. }),
3460                "legacy active lock must remain active, got {outcome:?}"
3461            );
3462        }
3463    }
3464
3465    #[test]
3466    fn coordination_active_job_with_fresh_heartbeat() {
3467        let now_ms = 1_733_000_000_000i64;
3468        let snapshot = make_active_snapshot(now_ms);
3469        let outcome = evaluate_maintenance_coordination_from_snapshot(&snapshot, now_ms);
3470        if let MaintenanceCoordinationOutcome::Active {
3471            ref job_id,
3472            ref phase,
3473            ..
3474        } = outcome
3475        {
3476            assert_eq!(job_id, "lexical_refresh-1000-12345");
3477            assert_eq!(phase.as_deref(), Some("scanning"));
3478        } else {
3479            assert!(
3480                matches!(outcome, MaintenanceCoordinationOutcome::Active { .. }),
3481                "expected ActiveJob, got {outcome:?}"
3482            );
3483        }
3484    }
3485
3486    #[test]
3487    fn coordination_stale_job_with_old_heartbeat() {
3488        let now_ms = 1_733_000_000_000i64;
3489        let snapshot = SearchMaintenanceSnapshot {
3490            updated_at_ms: Some(now_ms - 60_000),
3491            ..make_active_snapshot(now_ms)
3492        };
3493        let outcome = evaluate_maintenance_coordination_from_snapshot(&snapshot, now_ms);
3494        if let MaintenanceCoordinationOutcome::Stale {
3495            ref job_id,
3496            ref reason,
3497        } = outcome
3498        {
3499            assert_eq!(job_id, "lexical_refresh-1000-12345");
3500            assert!(reason.contains("60000ms"), "reason={reason}");
3501        } else {
3502            assert!(
3503                matches!(outcome, MaintenanceCoordinationOutcome::Stale { .. }),
3504                "expected StaleJob, got {outcome:?}"
3505            );
3506        }
3507    }
3508
3509    #[test]
3510    fn coordination_missing_heartbeat_timestamp_still_respects_active_flock() {
3511        let now_ms = 1_733_000_000_000i64;
3512        let snapshot = SearchMaintenanceSnapshot {
3513            updated_at_ms: None,
3514            ..make_active_snapshot(now_ms)
3515        };
3516        let outcome = evaluate_maintenance_coordination_from_snapshot(&snapshot, now_ms);
3517        if let MaintenanceCoordinationOutcome::Active { updated_at_ms, .. } = outcome {
3518            assert_eq!(updated_at_ms, now_ms);
3519        } else {
3520            assert!(
3521                matches!(outcome, MaintenanceCoordinationOutcome::Active { .. }),
3522                "missing heartbeat metadata must not hide an active flock, got {outcome:?}"
3523            );
3524        }
3525    }
3526
3527    #[test]
3528    fn decision_launch_when_no_job() {
3529        let snapshot = SearchMaintenanceSnapshot::default();
3530        let decision = decide_maintenance_action_from_snapshot(&snapshot, 1_733_000_000_000);
3531        assert_eq!(decision, MaintenanceDecision::Launch);
3532    }
3533
3534    #[test]
3535    fn decision_launch_when_no_lock_file() {
3536        let temp = tempfile::tempdir().expect("tempdir");
3537        let decision = decide_maintenance_action(temp.path(), 1_733_000_000_000);
3538        assert_eq!(decision, MaintenanceDecision::Launch);
3539    }
3540
3541    #[test]
3542    fn decision_attaches_when_active_lock_has_stale_heartbeat() {
3543        let now_ms = 1_733_000_000_000i64;
3544        let snapshot = SearchMaintenanceSnapshot {
3545            updated_at_ms: Some(now_ms - 60_000),
3546            ..make_active_snapshot(now_ms)
3547        };
3548        let decision = decide_maintenance_action_from_snapshot(&snapshot, now_ms);
3549        if let MaintenanceDecision::AttachOrWait {
3550            ref job_id,
3551            ref phase,
3552            elapsed_ms,
3553            ..
3554        } = decision
3555        {
3556            assert_eq!(job_id, "lexical_refresh-1000-12345");
3557            assert_eq!(phase.as_deref(), Some("scanning"));
3558            assert_eq!(elapsed_ms, 5_000);
3559        } else {
3560            assert!(
3561                matches!(decision, MaintenanceDecision::AttachOrWait { .. }),
3562                "stale heartbeat still has an active lock, got {decision:?}"
3563            );
3564        }
3565    }
3566
3567    #[test]
3568    fn decision_attach_when_active_fresh_job() {
3569        let now_ms = 1_733_000_000_000i64;
3570        let snapshot = make_active_snapshot(now_ms);
3571        let decision = decide_maintenance_action_from_snapshot(&snapshot, now_ms);
3572        if let MaintenanceDecision::AttachOrWait {
3573            ref job_id,
3574            elapsed_ms,
3575            ..
3576        } = decision
3577        {
3578            assert_eq!(job_id, "lexical_refresh-1000-12345");
3579            assert_eq!(elapsed_ms, 5_000);
3580        } else {
3581            assert!(
3582                matches!(decision, MaintenanceDecision::AttachOrWait { .. }),
3583                "expected AttachOrWait, got {decision:?}"
3584            );
3585        }
3586    }
3587
3588    #[test]
3589    fn poll_returns_immediately_when_no_active_job() {
3590        let temp = tempfile::tempdir().expect("tempdir");
3591        let result = poll_maintenance_until_idle(
3592            temp.path(),
3593            Some(Duration::from_millis(500)),
3594            Some(Duration::from_millis(50)),
3595        );
3596        assert!(!result.timed_out);
3597        assert_eq!(result.polls, 1);
3598        assert!(
3599            matches!(result.outcome, MaintenanceCoordinationOutcome::Idle),
3600            "expected NoActiveJob"
3601        );
3602        assert!(
3603            result.elapsed <= Duration::from_millis(500),
3604            "immediate idle poll should finish before timeout, elapsed={:?}",
3605            result.elapsed
3606        );
3607    }
3608
3609    #[test]
3610    fn poll_returns_active_on_timeout_when_lock_held() {
3611        use fs2::FileExt;
3612        let temp = tempfile::tempdir().expect("tempdir");
3613        let lock_path = temp.path().join("index-run.lock");
3614        let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
3615        let pid = active_lock_fixture_pid(99999);
3616        let mut owner = OpenOptions::new()
3617            .create(true)
3618            .read(true)
3619            .write(true)
3620            .truncate(true)
3621            .open(&lock_path)
3622            .expect("open owner handle");
3623        owner.try_lock_exclusive().expect("acquire lock");
3624        write_locked_metadata(
3625            &lock_path,
3626            &mut owner,
3627            format!(
3628                "pid={pid}\nstarted_at_ms={}\nupdated_at_ms={}\ndb_path=/tmp/test.db\nmode=index\njob_id=test-job-1\njob_kind=lexical_refresh\nphase=scanning\n",
3629                now_ms - 1_000,
3630                now_ms,
3631            )
3632            .as_str(),
3633        )
3634        .expect("write lock metadata");
3635
3636        let result = poll_maintenance_until_idle(
3637            temp.path(),
3638            Some(Duration::from_millis(300)),
3639            Some(Duration::from_millis(50)),
3640        );
3641        assert!(result.timed_out, "should time out when lock is held");
3642        assert!(result.polls >= 2, "should have polled multiple times");
3643        assert!(
3644            matches!(
3645                result.outcome,
3646                MaintenanceCoordinationOutcome::Active { .. }
3647            ),
3648            "expected ActiveJob on timeout"
3649        );
3650
3651        let _ = FileExt::unlock(&owner);
3652    }
3653
3654    #[test]
3655    fn poll_times_out_instead_of_declaring_stale_held_lock_idle() {
3656        use fs2::FileExt;
3657        let temp = tempfile::tempdir().expect("tempdir");
3658        let lock_path = temp.path().join("index-run.lock");
3659        let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
3660        let pid = active_lock_fixture_pid(99999);
3661        let mut owner = OpenOptions::new()
3662            .create(true)
3663            .read(true)
3664            .write(true)
3665            .truncate(true)
3666            .open(&lock_path)
3667            .expect("open owner handle");
3668        owner.try_lock_exclusive().expect("acquire lock");
3669        write_locked_metadata(
3670            &lock_path,
3671            &mut owner,
3672            format!(
3673                "pid={pid}\nstarted_at_ms={}\nupdated_at_ms={}\ndb_path=/tmp/test.db\nmode=index\njob_id=test-job-stale\njob_kind=lexical_refresh\nphase=scanning\n",
3674                now_ms - 120_000,
3675                now_ms - 120_000,
3676            )
3677            .as_str(),
3678        )
3679        .expect("write lock metadata");
3680
3681        let result = poll_maintenance_until_idle(
3682            temp.path(),
3683            Some(Duration::from_millis(150)),
3684            Some(Duration::from_millis(25)),
3685        );
3686        assert!(result.timed_out, "held stale lock is still not idle");
3687        assert!(
3688            matches!(result.outcome, MaintenanceCoordinationOutcome::Stale { .. }),
3689            "expected stale held lock on timeout, got {:?}",
3690            result.outcome
3691        );
3692
3693        let _ = FileExt::unlock(&owner);
3694    }
3695
3696    #[test]
3697    fn poll_detects_release_mid_wait() {
3698        use fs2::FileExt;
3699        let temp = tempfile::tempdir().expect("tempdir");
3700        let lock_path = temp.path().join("index-run.lock");
3701        let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
3702        let pid = active_lock_fixture_pid(99999);
3703        let mut owner = OpenOptions::new()
3704            .create(true)
3705            .read(true)
3706            .write(true)
3707            .truncate(true)
3708            .open(&lock_path)
3709            .expect("open owner handle");
3710        owner.try_lock_exclusive().expect("acquire lock");
3711        write_locked_metadata(
3712            &lock_path,
3713            &mut owner,
3714            format!(
3715                "pid={pid}\nstarted_at_ms={}\nupdated_at_ms={}\ndb_path=/tmp/test.db\nmode=index\njob_id=test-job-2\njob_kind=lexical_refresh\nphase=committing\n",
3716                now_ms - 1_000,
3717                now_ms,
3718            )
3719            .as_str(),
3720        )
3721        .expect("write lock metadata");
3722
3723        let temp_path = temp.path().to_path_buf();
3724        let lock_path_for_release = lock_path.clone();
3725        let release_thread = std::thread::spawn(move || {
3726            std::thread::sleep(Duration::from_millis(150));
3727            let _ = owner.set_len(0);
3728            let _ = clear_index_run_lock_metadata_sidecar(&lock_path_for_release);
3729            let _ = FileExt::unlock(&owner);
3730            drop(owner);
3731        });
3732
3733        let result = poll_maintenance_until_idle(
3734            &temp_path,
3735            Some(Duration::from_secs(2)),
3736            Some(Duration::from_millis(50)),
3737        );
3738        assert!(!result.timed_out, "should detect release before timeout");
3739        release_thread.join().expect("release thread");
3740    }
3741
3742    #[test]
3743    fn failopen_returns_failopen_when_lexical_available_and_job_active() {
3744        let temp = tempfile::tempdir().expect("tempdir");
3745        let lock_path = temp.path().join("index-run.lock");
3746        let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
3747        let pid = active_lock_fixture_pid(99999);
3748
3749        use fs2::FileExt;
3750        let mut owner = OpenOptions::new()
3751            .create(true)
3752            .read(true)
3753            .write(true)
3754            .truncate(true)
3755            .open(&lock_path)
3756            .expect("open owner handle");
3757        owner.try_lock_exclusive().expect("acquire lock");
3758        write_locked_metadata(
3759            &lock_path,
3760            &mut owner,
3761            format!(
3762                "pid={pid}\nstarted_at_ms={}\nupdated_at_ms={}\ndb_path=/tmp/test.db\nmode=index\njob_id=fo-job-1\njob_kind=lexical_refresh\nphase=indexing\n",
3763                now_ms - 1_000,
3764                now_ms,
3765            )
3766            .as_str(),
3767        )
3768        .expect("write lock metadata");
3769
3770        let decision = decide_search_failopen(temp.path(), now_ms, true);
3771        if let MaintenanceDecision::FailOpen { ref reason } = decision {
3772            assert!(reason.contains("fo-job-1"), "reason={reason}");
3773            assert!(reason.contains("failing open"), "reason={reason}");
3774        } else {
3775            assert!(
3776                matches!(decision, MaintenanceDecision::FailOpen { .. }),
3777                "expected FailOpen, got {decision:?}"
3778            );
3779        }
3780
3781        let decision_no_lexical = decide_search_failopen(temp.path(), now_ms, false);
3782        assert!(
3783            matches!(
3784                decision_no_lexical,
3785                MaintenanceDecision::AttachOrWait { .. }
3786            ),
3787            "without lexical must attach, got {decision_no_lexical:?}"
3788        );
3789
3790        let _ = FileExt::unlock(&owner);
3791    }
3792
3793    #[test]
3794    fn failopen_handles_active_stale_heartbeat_without_launching_repair() {
3795        let temp = tempfile::tempdir().expect("tempdir");
3796        let lock_path = temp.path().join("index-run.lock");
3797        let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
3798        let pid = active_lock_fixture_pid(99999);
3799
3800        use fs2::FileExt;
3801        let mut owner = OpenOptions::new()
3802            .create(true)
3803            .read(true)
3804            .write(true)
3805            .truncate(true)
3806            .open(&lock_path)
3807            .expect("open owner handle");
3808        owner.try_lock_exclusive().expect("acquire lock");
3809        write_locked_metadata(
3810            &lock_path,
3811            &mut owner,
3812            format!(
3813                "pid={pid}\nstarted_at_ms={}\nupdated_at_ms={}\ndb_path=/tmp/test.db\nmode=index\njob_id=fo-stale-1\njob_kind=lexical_refresh\nphase=indexing\n",
3814                now_ms - 120_000,
3815                now_ms - 120_000,
3816            )
3817            .as_str(),
3818        )
3819        .expect("write lock metadata");
3820
3821        let decision = decide_search_failopen(temp.path(), now_ms, true);
3822        if let MaintenanceDecision::FailOpen { ref reason } = decision {
3823            assert!(reason.contains("fo-stale-1"), "reason={reason}");
3824            assert!(reason.contains("stale heartbeat"), "reason={reason}");
3825            assert!(reason.contains("failing open"), "reason={reason}");
3826        } else {
3827            assert!(
3828                matches!(decision, MaintenanceDecision::FailOpen { .. }),
3829                "expected FailOpen for searchable stale active lock, got {decision:?}"
3830            );
3831        }
3832
3833        let decision_no_lexical = decide_search_failopen(temp.path(), now_ms, false);
3834        assert!(
3835            matches!(
3836                decision_no_lexical,
3837                MaintenanceDecision::AttachOrWait { .. }
3838            ),
3839            "without lexical must wait for the held lock, got {decision_no_lexical:?}"
3840        );
3841
3842        let _ = FileExt::unlock(&owner);
3843    }
3844
3845    // -----------------------------------------------------------------------
3846    // ibuuh.22: Event log, yield signaling, unified view tests
3847    // -----------------------------------------------------------------------
3848
3849    #[test]
3850    fn event_log_append_and_read() {
3851        let temp = tempfile::tempdir().expect("tempdir");
3852        let event = MaintenanceEvent {
3853            timestamp_ms: 1_733_000_000_000,
3854            job_id: "test-job-1".to_string(),
3855            actor_pid: 42,
3856            kind: MaintenanceEventKind::Started {
3857                job_kind: "lexical_refresh".to_string(),
3858                phase: "scanning".to_string(),
3859            },
3860        };
3861        append_maintenance_event(temp.path(), &event).expect("append");
3862        let events = read_maintenance_events(temp.path(), None, None);
3863        assert_eq!(events.len(), 1);
3864        assert_eq!(events[0].job_id, "test-job-1");
3865        assert_eq!(events[0].actor_pid, 42);
3866        assert!(matches!(
3867            events[0].kind,
3868            MaintenanceEventKind::Started { .. }
3869        ));
3870    }
3871
3872    #[test]
3873    fn event_log_filters_by_timestamp() {
3874        let temp = tempfile::tempdir().expect("tempdir");
3875        for i in 0..5 {
3876            let event = MaintenanceEvent {
3877                timestamp_ms: 1_000 + i,
3878                job_id: format!("job-{i}"),
3879                actor_pid: 1,
3880                kind: MaintenanceEventKind::Progress {
3881                    processed: i as u64,
3882                    total: 5,
3883                },
3884            };
3885            append_maintenance_event(temp.path(), &event).expect("append");
3886        }
3887        let events = read_maintenance_events(temp.path(), Some(1_002), None);
3888        assert_eq!(events.len(), 2, "should only get events after ts 1002");
3889        assert_eq!(events[0].timestamp_ms, 1_003);
3890        assert_eq!(events[1].timestamp_ms, 1_004);
3891    }
3892
3893    #[test]
3894    fn event_log_respects_limit() {
3895        let temp = tempfile::tempdir().expect("tempdir");
3896        for i in 0..10 {
3897            let event = MaintenanceEvent {
3898                timestamp_ms: 1_000 + i,
3899                job_id: format!("job-{i}"),
3900                actor_pid: 1,
3901                kind: MaintenanceEventKind::Resumed,
3902            };
3903            append_maintenance_event(temp.path(), &event).expect("append");
3904        }
3905        let events = read_maintenance_events(temp.path(), None, Some(3));
3906        assert_eq!(events.len(), 3);
3907        assert_eq!(events[0].timestamp_ms, 1_007);
3908        assert_eq!(events[2].timestamp_ms, 1_009);
3909    }
3910
3911    #[test]
3912    fn event_log_returns_empty_when_missing() {
3913        let temp = tempfile::tempdir().expect("tempdir");
3914        let events = read_maintenance_events(temp.path(), None, None);
3915        assert!(events.is_empty());
3916    }
3917
3918    #[test]
3919    fn event_log_truncation_retains_tail() {
3920        let temp = tempfile::tempdir().expect("tempdir");
3921        for i in 0..550 {
3922            let event = MaintenanceEvent {
3923                timestamp_ms: i,
3924                job_id: format!("job-{i}"),
3925                actor_pid: 1,
3926                kind: MaintenanceEventKind::Resumed,
3927            };
3928            append_maintenance_event(temp.path(), &event).expect("append");
3929        }
3930        let before = read_maintenance_events(temp.path(), None, Some(600));
3931        assert_eq!(before.len(), 550);
3932        truncate_maintenance_event_log(temp.path()).expect("truncate");
3933        let after = read_maintenance_events(temp.path(), None, Some(600));
3934        assert_eq!(after.len(), MAX_EVENT_LOG_ENTRIES);
3935        assert_eq!(after[0].timestamp_ms, 50);
3936        assert_eq!(after[MAX_EVENT_LOG_ENTRIES - 1].timestamp_ms, 549);
3937    }
3938
3939    #[test]
3940    fn yield_signal_round_trip() {
3941        let temp = tempfile::tempdir().expect("tempdir");
3942        assert!(
3943            check_yield_requested(temp.path()).is_none(),
3944            "no signal initially"
3945        );
3946        request_yield(temp.path(), "foreground search pressure").expect("request yield");
3947        let req = check_yield_requested(temp.path()).expect("yield should be present");
3948        assert_eq!(req.requester_pid, std::process::id());
3949        assert_eq!(req.reason, "foreground search pressure");
3950        assert!(req.requested_at_ms > 0);
3951        clear_yield_signal(temp.path()).expect("clear");
3952        assert!(
3953            check_yield_requested(temp.path()).is_none(),
3954            "signal cleared"
3955        );
3956    }
3957
3958    #[test]
3959    fn clear_yield_signal_is_idempotent() {
3960        let temp = tempfile::tempdir().expect("tempdir");
3961        clear_yield_signal(temp.path()).expect("clear nonexistent");
3962        clear_yield_signal(temp.path()).expect("clear again");
3963    }
3964
3965    #[test]
3966    fn unified_view_idle_no_events() {
3967        let temp = tempfile::tempdir().expect("tempdir");
3968        let view = unified_maintenance_view(temp.path(), true);
3969        assert!(matches!(
3970            view.coordination,
3971            MaintenanceCoordinationOutcome::Idle
3972        ));
3973        assert!(view.yield_pending.is_none());
3974        assert!(view.recent_events.is_empty());
3975        assert_eq!(view.decision, MaintenanceDecision::Launch);
3976    }
3977
3978    #[test]
3979    fn unified_view_active_with_lexical_fails_open() {
3980        use fs2::FileExt;
3981        let temp = tempfile::tempdir().expect("tempdir");
3982        let lock_path = temp.path().join("index-run.lock");
3983        let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
3984        let pid = active_lock_fixture_pid(99999);
3985        let mut owner = OpenOptions::new()
3986            .create(true)
3987            .read(true)
3988            .write(true)
3989            .truncate(true)
3990            .open(&lock_path)
3991            .expect("open");
3992        owner.try_lock_exclusive().expect("lock");
3993        write_locked_metadata(
3994            &lock_path,
3995            &mut owner,
3996            format!(
3997                "pid={pid}\nstarted_at_ms={}\nupdated_at_ms={}\ndb_path=/tmp/t.db\nmode=index\njob_id=uv-1\njob_kind=lexical_refresh\nphase=indexing\n",
3998                now_ms - 1_000,
3999                now_ms,
4000            )
4001            .as_str(),
4002        )
4003        .expect("write metadata");
4004
4005        let event = MaintenanceEvent {
4006            timestamp_ms: now_ms,
4007            job_id: "uv-1".to_string(),
4008            actor_pid: pid,
4009            kind: MaintenanceEventKind::Started {
4010                job_kind: "lexical_refresh".to_string(),
4011                phase: "indexing".to_string(),
4012            },
4013        };
4014        append_maintenance_event(temp.path(), &event).expect("append");
4015
4016        let view = unified_maintenance_view(temp.path(), true);
4017        assert!(matches!(
4018            view.coordination,
4019            MaintenanceCoordinationOutcome::Active { .. }
4020        ));
4021        assert!(matches!(
4022            view.decision,
4023            MaintenanceDecision::FailOpen { .. }
4024        ));
4025        assert_eq!(view.recent_events.len(), 1);
4026
4027        let _ = FileExt::unlock(&owner);
4028    }
4029
4030    #[test]
4031    fn unified_view_includes_yield_signal() {
4032        let temp = tempfile::tempdir().expect("tempdir");
4033        request_yield(temp.path(), "test yield").expect("yield");
4034        let view = unified_maintenance_view(temp.path(), true);
4035        assert!(view.yield_pending.is_some());
4036        assert_eq!(view.yield_pending.as_ref().unwrap().reason, "test yield");
4037        clear_yield_signal(temp.path()).expect("clear");
4038    }
4039
4040    #[test]
4041    fn event_kinds_serialize_round_trip() {
4042        let temp = tempfile::tempdir().expect("tempdir");
4043        let kinds = vec![
4044            MaintenanceEventKind::Started {
4045                job_kind: "lexical_refresh".to_string(),
4046                phase: "init".to_string(),
4047            },
4048            MaintenanceEventKind::PhaseChanged {
4049                from: "init".to_string(),
4050                to: "scanning".to_string(),
4051            },
4052            MaintenanceEventKind::Progress {
4053                processed: 50,
4054                total: 100,
4055            },
4056            MaintenanceEventKind::YieldRequested {
4057                requester_pid: 42,
4058                reason: "foreground".to_string(),
4059            },
4060            MaintenanceEventKind::Paused {
4061                reason: "yield".to_string(),
4062            },
4063            MaintenanceEventKind::Resumed,
4064            MaintenanceEventKind::Completed {
4065                summary: "done".to_string(),
4066            },
4067            MaintenanceEventKind::Failed {
4068                error: "oops".to_string(),
4069            },
4070            MaintenanceEventKind::Cancelled {
4071                reason: "user".to_string(),
4072            },
4073        ];
4074        for (i, kind) in kinds.into_iter().enumerate() {
4075            let event = MaintenanceEvent {
4076                timestamp_ms: i as i64,
4077                job_id: "rt-test".to_string(),
4078                actor_pid: 1,
4079                kind,
4080            };
4081            append_maintenance_event(temp.path(), &event).expect("append");
4082        }
4083        let events = read_maintenance_events(temp.path(), None, None);
4084        assert_eq!(events.len(), 9);
4085        assert!(matches!(
4086            events[0].kind,
4087            MaintenanceEventKind::Started { .. }
4088        ));
4089        assert!(matches!(events[5].kind, MaintenanceEventKind::Resumed));
4090        assert!(matches!(
4091            events[8].kind,
4092            MaintenanceEventKind::Cancelled { .. }
4093        ));
4094    }
4095}