Skip to main content

coding_agent_search/search/
asset_state.rs

1//! Shared search asset state evaluation for status, health, and fail-open search planning.
2//!
3//! This module centralizes coarse-grained asset truth so callers stop inferring
4//! lexical freshness, active maintenance, and semantic readiness from ad hoc
5//! file checks spread across the CLI.
6//!
7//! The maintenance coordination layer (`evaluate_maintenance_coordination`,
8//! `decide_maintenance_action`, `poll_maintenance_until_idle`) provides
9//! single-flight semantics: foreground cass actors share one coherent truth
10//! for repair/acquisition work and never duplicate basic maintenance jobs.
11
12use std::fs::OpenOptions;
13use std::io::Read;
14use std::path::{Path, PathBuf};
15use std::time::{Duration, Instant};
16
17use anyhow::{Context, Result};
18use fs2::FileExt;
19
20use crate::indexer::{
21    LEXICAL_REBUILD_PAGE_SIZE_PUBLIC, LexicalRebuildCheckpoint,
22    lexical_rebuild_page_size_is_compatible, lexical_storage_fingerprint_for_db,
23    load_lexical_rebuild_checkpoint,
24};
25use crate::search::ann_index::hnsw_index_path;
26use crate::search::embedder::Embedder;
27use crate::search::fastembed_embedder::FastEmbedder;
28use crate::search::hash_embedder::HashEmbedder;
29use crate::search::model_manager::{
30    SemanticAvailability, probe_hash_semantic_availability, probe_semantic_availability,
31};
32use crate::search::policy::{
33    CHUNKING_STRATEGY_VERSION, CliSemanticOverrides, SEMANTIC_SCHEMA_VERSION, SemanticPolicy,
34};
35use crate::search::semantic_manifest::{
36    ArtifactRecord, BuildCheckpoint, SemanticManifest, SemanticShardManifest, SemanticShardRecord,
37    TierKind, semantic_shard_artifact_path_is_safe,
38};
39use crate::search::tantivy::SCHEMA_HASH;
40use crate::search::vector_index::{VECTOR_INDEX_DIR, vector_index_path};
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
43pub(crate) enum SearchMaintenanceMode {
44    Index,
45    WatchStartup,
46    Watch,
47    WatchOnce,
48}
49
50impl SearchMaintenanceMode {
51    pub(crate) fn as_lock_value(self) -> &'static str {
52        match self {
53            Self::Index => "index",
54            Self::WatchStartup => "watch_startup",
55            Self::Watch => "watch",
56            Self::WatchOnce => "watch_once",
57        }
58    }
59
60    pub(crate) fn parse_lock_value(raw: &str) -> Option<Self> {
61        match raw.trim() {
62            "index" => Some(Self::Index),
63            "watch_startup" => Some(Self::WatchStartup),
64            "watch" => Some(Self::Watch),
65            "watch_once" => Some(Self::WatchOnce),
66            _ => None,
67        }
68    }
69
70    pub(crate) fn watch_active(self) -> bool {
71        matches!(self, Self::WatchStartup | Self::Watch)
72    }
73
74    pub(crate) fn rebuild_active(self) -> bool {
75        !matches!(self, Self::Watch)
76    }
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
80pub(crate) enum SearchMaintenanceJobKind {
81    LexicalRefresh,
82    SemanticAcquire,
83}
84
85impl SearchMaintenanceJobKind {
86    pub(crate) fn as_lock_value(self) -> &'static str {
87        match self {
88            Self::LexicalRefresh => "lexical_refresh",
89            Self::SemanticAcquire => "semantic_acquire",
90        }
91    }
92
93    pub(crate) fn parse_lock_value(raw: &str) -> Option<Self> {
94        match raw.trim() {
95            "lexical_refresh" => Some(Self::LexicalRefresh),
96            "semantic_acquire" => Some(Self::SemanticAcquire),
97            _ => None,
98        }
99    }
100}
101
102#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize)]
103pub(crate) struct SearchMaintenanceSnapshot {
104    pub active: bool,
105    pub pid: Option<u32>,
106    pub started_at_ms: Option<i64>,
107    pub db_path: Option<PathBuf>,
108    pub mode: Option<SearchMaintenanceMode>,
109    pub job_id: Option<String>,
110    pub job_kind: Option<SearchMaintenanceJobKind>,
111    pub phase: Option<String>,
112    pub updated_at_ms: Option<i64>,
113    pub last_progress_at_ms: Option<i64>,
114    pub orphaned: bool,
115}
116
117pub(crate) fn read_search_maintenance_snapshot(data_dir: &Path) -> SearchMaintenanceSnapshot {
118    // Real index-run.lock files written by `acquire_index_run_lock`
119    // have a fixed key=value shape under ~1 KiB. Cap the read at 64 KiB
120    // so a corrupted or maliciously-large lock file cannot force us to
121    // allocate arbitrary memory just to inspect its metadata.
122    const MAX_LOCK_FILE_READ: u64 = 64 * 1024;
123
124    let lock_path = data_dir.join("index-run.lock");
125    let file = match OpenOptions::new().read(true).write(true).open(&lock_path) {
126        Ok(file) => file,
127        Err(_) => return SearchMaintenanceSnapshot::default(),
128    };
129
130    let mut raw = String::new();
131    let _ = (&file).take(MAX_LOCK_FILE_READ).read_to_string(&mut raw);
132
133    let mut pid = None;
134    let mut started_at_ms = None;
135    let mut lock_db_path = None::<PathBuf>;
136    let mut mode = None;
137    let mut job_id = None;
138    let mut job_kind = None;
139    let mut phase = None;
140    let mut updated_at_ms = None;
141    let mut last_progress_at_ms = None;
142    for line in raw.lines() {
143        let Some((key, value)) = line.split_once('=') else {
144            continue;
145        };
146        match key.trim() {
147            "pid" => pid = value.trim().parse::<u32>().ok(),
148            "started_at_ms" => started_at_ms = value.trim().parse::<i64>().ok(),
149            "db_path" => lock_db_path = Some(PathBuf::from(value.trim())),
150            "mode" => mode = SearchMaintenanceMode::parse_lock_value(value),
151            "job_id" => job_id = Some(value.trim().to_string()).filter(|value| !value.is_empty()),
152            "job_kind" => job_kind = SearchMaintenanceJobKind::parse_lock_value(value),
153            "phase" => phase = Some(value.trim().to_string()).filter(|value| !value.is_empty()),
154            "updated_at_ms" => updated_at_ms = value.trim().parse::<i64>().ok(),
155            "last_progress_at_ms" => last_progress_at_ms = value.trim().parse::<i64>().ok(),
156            _ => {}
157        }
158    }
159
160    let metadata_present = pid.is_some()
161        || started_at_ms.is_some()
162        || lock_db_path.is_some()
163        || mode.is_some()
164        || job_id.is_some()
165        || job_kind.is_some()
166        || phase.is_some()
167        || updated_at_ms.is_some()
168        || last_progress_at_ms.is_some();
169
170    let active = match file.try_lock_exclusive() {
171        Ok(()) => {
172            // We acquired the exclusive lock with no waiting, which is
173            // proof that no process holds it. POSIX flock (via fs2) is
174            // released automatically when the owning file description
175            // is closed — either explicitly on graceful drop, or by the
176            // kernel on process exit / crash. Therefore, if the file
177            // contains metadata but no holder is present, the previous
178            // owner is gone.
179            //
180            // Historically this produced a permanent `orphaned: true`
181            // state that callers (notably the TUI) interpreted as
182            // "rebuild in progress, keep polling" — yielding a tight
183            // CPU-bound loop that only cleared when the user manually
184            // deleted the lock file (see issue #176).
185            //
186            // Reap the stale metadata in place while we hold the lock,
187            // so that this and every subsequent reader observes a
188            // clean state.
189            //
190            // We deliberately do NOT gate this on a `kill(pid, 0)`
191            // liveness probe. Under PID reuse (the recorded pid is
192            // reassigned to an unrelated live process), such a probe
193            // would refuse to reap and the spin would reappear. Flock
194            // acquisition is the stronger and more precise signal.
195            if metadata_present {
196                if let Err(err) = file.set_len(0) {
197                    tracing::warn!(
198                        path = %lock_path.display(),
199                        error = %err,
200                        "failed to truncate stale index-run lock metadata"
201                    );
202                } else {
203                    let _ = file.sync_all();
204                    tracing::info!(
205                        path = %lock_path.display(),
206                        stale_pid = ?pid,
207                        "cleared stale index-run lock metadata (previous owner gone)"
208                    );
209                    let _ = file.unlock();
210                    return SearchMaintenanceSnapshot::default();
211                }
212            }
213            let _ = file.unlock();
214            false
215        }
216        Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => true,
217        Err(_) => false,
218    };
219
220    SearchMaintenanceSnapshot {
221        active,
222        pid,
223        started_at_ms,
224        db_path: lock_db_path,
225        mode,
226        job_id,
227        job_kind,
228        phase,
229        updated_at_ms,
230        last_progress_at_ms,
231        orphaned: metadata_present && !active,
232    }
233}
234
235pub(crate) const REBUILD_STALL_DETECT_SECS_DEFAULT: u64 = 120;
236
237pub(crate) fn rebuild_stall_detect_threshold_ms() -> Option<i64> {
238    let threshold_secs = dotenvy::var("CASS_REBUILD_STALL_DETECT_SECS")
239        .ok()
240        .and_then(|value| value.trim().parse::<u64>().ok())
241        .unwrap_or(REBUILD_STALL_DETECT_SECS_DEFAULT);
242    if threshold_secs == 0 {
243        return None;
244    }
245    let threshold_ms = threshold_secs.saturating_mul(1000);
246    Some(i64::try_from(threshold_ms).unwrap_or(i64::MAX))
247}
248
249pub(crate) fn maintenance_stall_age_ms(
250    snapshot: &SearchMaintenanceSnapshot,
251    now_ms: i64,
252) -> Option<i64> {
253    if !snapshot.active
254        || !snapshot
255            .mode
256            .is_some_and(SearchMaintenanceMode::rebuild_active)
257    {
258        return None;
259    }
260    let last_progress_at_ms = snapshot.last_progress_at_ms?;
261    let age_ms = now_ms.saturating_sub(last_progress_at_ms);
262    let threshold_ms = rebuild_stall_detect_threshold_ms()?;
263    (age_ms >= threshold_ms).then_some(age_ms)
264}
265
266#[cfg_attr(not(test), allow(dead_code))]
267#[derive(Debug, Clone, Copy, PartialEq, Eq)]
268pub(crate) enum SemanticPreference {
269    DefaultModel,
270    HashFallback,
271}
272
273#[derive(Debug, Clone, PartialEq, Eq)]
274pub(crate) struct SearchAssetSnapshot {
275    pub lexical: LexicalAssetState,
276    pub semantic: SemanticAssetState,
277}
278
279#[derive(Debug, Clone, PartialEq, Eq)]
280pub(crate) struct LexicalAssetState {
281    pub status: &'static str,
282    pub exists: bool,
283    pub fresh: bool,
284    pub stale: bool,
285    pub rebuilding: bool,
286    /// `true` when the rebuild is nominally active but the indexing
287    /// thread has not posted forward progress within the configured
288    /// stall threshold. Driven by `last_progress_at_ms` on the lock
289    /// file, NOT by `updated_at_ms` (which the heartbeat thread
290    /// refreshes unconditionally). See issue #258 for the regression
291    /// this guards against.
292    pub stalled: bool,
293    /// Wall-clock age of the most recent forward-progress event, when
294    /// the indexer thread has posted one. `None` when no
295    /// `last_progress_at_ms` is available (legacy lock file, or the
296    /// rebuild is not active).
297    pub last_progress_age_ms: Option<i64>,
298    pub last_progress_at_ms: Option<i64>,
299    pub watch_active: bool,
300    pub last_indexed_at_ms: Option<i64>,
301    pub age_seconds: Option<u64>,
302    pub stale_threshold_seconds: u64,
303    pub activity_at_ms: Option<i64>,
304    pub pending_sessions: u64,
305    pub processed_conversations: Option<u64>,
306    pub total_conversations: Option<u64>,
307    pub indexed_docs: Option<u64>,
308    pub status_reason: Option<String>,
309    pub fingerprint: LexicalFingerprintState,
310    pub checkpoint: LexicalCheckpointState,
311}
312
313#[derive(Debug, Clone, PartialEq, Eq)]
314pub(crate) struct LexicalFingerprintState {
315    pub current_db_fingerprint: Option<String>,
316    pub checkpoint_fingerprint: Option<String>,
317    pub matches_current_db_fingerprint: Option<bool>,
318}
319
320#[derive(Debug, Clone, PartialEq, Eq)]
321pub(crate) struct LexicalCheckpointState {
322    pub present: bool,
323    pub completed: Option<bool>,
324    pub db_matches: Option<bool>,
325    pub schema_matches: Option<bool>,
326    pub page_size_matches: Option<bool>,
327    pub page_size_compatible: Option<bool>,
328}
329
330#[derive(Debug, Clone, PartialEq, Eq)]
331pub(crate) struct SemanticAssetState {
332    pub status: &'static str,
333    pub availability: &'static str,
334    pub summary: String,
335    pub available: bool,
336    pub can_search: bool,
337    pub fallback_mode: Option<&'static str>,
338    pub preferred_backend: &'static str,
339    pub embedder_id: Option<String>,
340    pub vector_index_path: Option<PathBuf>,
341    pub model_dir: Option<PathBuf>,
342    pub hnsw_path: Option<PathBuf>,
343    pub hnsw_ready: bool,
344    pub progressive_ready: bool,
345    /// Sub-fix 3 for cass#257: true when a quality-tier vector index is
346    /// published, matches the current DB fingerprint, and could serve a
347    /// `--mode semantic` search even if the progressive/hybrid stack is
348    /// still building (e.g. the fast tier hasn't been backfilled yet).
349    ///
350    /// Distinct from `progressive_ready`, which only returns true when
351    /// BOTH the fast and quality tier index files exist on disk —
352    /// useful for the "hybrid stack is good to go" surface but
353    /// misleading for operators who only run `--mode semantic`.
354    pub quality_tier_published: bool,
355    /// Sub-fix 3 for cass#257: true when at least one tier (fast OR
356    /// quality) is queryable against the current DB. This collapses
357    /// the per-tier readiness into a single flag suitable for the
358    /// operator question "can I run `cass search --mode semantic`
359    /// right now?". Mirrors `can_search` but is named so it survives
360    /// future refactors of the can_search semantics.
361    pub semantic_only_search_available: bool,
362    pub hint: Option<String>,
363    pub fast_tier: SemanticTierAssetState,
364    pub quality_tier: SemanticTierAssetState,
365    pub backlog: SemanticBacklogProgressState,
366    pub checkpoint: SemanticCheckpointProgressState,
367}
368
369struct SemanticRuntimeSurface {
370    status: &'static str,
371    availability: &'static str,
372    summary: String,
373    can_search: bool,
374    fallback_mode: Option<&'static str>,
375    hint: Option<String>,
376    embedder_id: Option<String>,
377    vector_index_path: Option<PathBuf>,
378    model_dir: Option<PathBuf>,
379    hnsw_path: Option<PathBuf>,
380}
381
382struct SemanticRuntimeInputs<'a> {
383    data_dir: &'a Path,
384    availability: &'a SemanticAvailability,
385    preference: SemanticPreference,
386    fast_tier: &'a SemanticTierAssetState,
387    quality_tier: &'a SemanticTierAssetState,
388    backlog: &'a SemanticBacklogProgressState,
389    checkpoint: &'a SemanticCheckpointProgressState,
390    base_embedder_id: Option<String>,
391    base_vector_index_path: Option<PathBuf>,
392    base_model_dir: Option<PathBuf>,
393    base_hnsw_path: Option<PathBuf>,
394}
395
396struct SemanticPreferenceSurface {
397    preferred_backend: &'static str,
398    model_dir: Option<PathBuf>,
399}
400
401#[derive(Debug, Clone, Default, PartialEq, Eq)]
402pub(crate) struct SemanticTierAssetState {
403    pub present: bool,
404    pub ready: bool,
405    pub current_db_matches: Option<bool>,
406    pub conversation_count: Option<u64>,
407    pub doc_count: Option<u64>,
408    pub embedder_id: Option<String>,
409    pub model_revision: Option<String>,
410    pub completed_at_ms: Option<i64>,
411    pub size_bytes: Option<u64>,
412    pub index_path: Option<PathBuf>,
413}
414
415#[derive(Debug, Clone, Default, PartialEq, Eq)]
416pub(crate) struct SemanticBacklogProgressState {
417    pub total_conversations: u64,
418    pub fast_tier_processed: u64,
419    pub fast_tier_remaining: u64,
420    pub quality_tier_processed: u64,
421    pub quality_tier_remaining: u64,
422    pub pending_work: bool,
423    pub current_db_matches: Option<bool>,
424    pub computed_at_ms: Option<i64>,
425}
426
427#[derive(Debug, Clone, Default, PartialEq, Eq)]
428pub(crate) struct SemanticCheckpointProgressState {
429    pub active: bool,
430    pub tier: Option<&'static str>,
431    pub current_db_matches: Option<bool>,
432    pub completed: Option<bool>,
433    pub conversations_processed: Option<u64>,
434    pub total_conversations: Option<u64>,
435    pub progress_pct: Option<u8>,
436    pub docs_embedded: Option<u64>,
437    pub last_offset: Option<i64>,
438    pub saved_at_ms: Option<i64>,
439}
440
441pub(crate) struct InspectSearchAssetsInput<'a> {
442    pub data_dir: &'a Path,
443    pub db_path: &'a Path,
444    pub stale_threshold: u64,
445    pub last_indexed_at_ms: Option<i64>,
446    /// Full-precision (millisecond) wall clock used for stall-detection
447    /// math against `last_progress_at_ms`. Callers should pass
448    /// `FrankenStorage::now_millis()` here. F4 (cass tech debt): the
449    /// previous shape only carried `now_secs`, which quantised the
450    /// comparison to second resolution while `last_progress_at_ms` is
451    /// stored at full ms. Down-stream `now_secs` is now derived from
452    /// this value so the two clocks remain consistent.
453    pub now_ms: i64,
454    pub maintenance: SearchMaintenanceSnapshot,
455    pub semantic_preference: SemanticPreference,
456    pub db_available: bool,
457    pub compute_lexical_fingerprint: bool,
458    pub inspect_semantic: bool,
459}
460
461const LEXICAL_STORAGE_FINGERPRINT_MTIME_TOLERANCE_MS: i64 = 1_000;
462
463#[derive(Debug, Clone, Copy, PartialEq, Eq)]
464struct ParsedLexicalStorageFingerprint {
465    db_len: u64,
466    db_mtime_ms: i64,
467    wal_len: u64,
468    wal_mtime_ms: i64,
469}
470
471fn parse_lexical_storage_fingerprint(raw: &str) -> Option<ParsedLexicalStorageFingerprint> {
472    let mut parts = raw.split(':');
473    let fingerprint = ParsedLexicalStorageFingerprint {
474        db_len: parts.next()?.parse().ok()?,
475        db_mtime_ms: parts.next()?.parse().ok()?,
476        wal_len: parts.next()?.parse().ok()?,
477        wal_mtime_ms: parts.next()?.parse().ok()?,
478    };
479    if parts.next().is_some() {
480        return None;
481    }
482    Some(fingerprint)
483}
484
485pub(crate) fn lexical_storage_fingerprints_match(current: &str, saved: &str) -> bool {
486    match (
487        parse_lexical_storage_fingerprint(current),
488        parse_lexical_storage_fingerprint(saved),
489    ) {
490        (Some(current), Some(saved)) => {
491            current.db_len == saved.db_len
492                && current.wal_len == saved.wal_len
493                && current.db_mtime_ms.abs_diff(saved.db_mtime_ms)
494                    <= u64::try_from(LEXICAL_STORAGE_FINGERPRINT_MTIME_TOLERANCE_MS)
495                        .unwrap_or(u64::MAX)
496                && current.wal_mtime_ms.abs_diff(saved.wal_mtime_ms)
497                    <= u64::try_from(LEXICAL_STORAGE_FINGERPRINT_MTIME_TOLERANCE_MS)
498                        .unwrap_or(u64::MAX)
499        }
500        _ => current == saved,
501    }
502}
503
504pub(crate) fn inspect_search_assets(
505    input: InspectSearchAssetsInput<'_>,
506) -> Result<SearchAssetSnapshot> {
507    let InspectSearchAssetsInput {
508        data_dir,
509        db_path,
510        stale_threshold,
511        last_indexed_at_ms,
512        now_ms,
513        maintenance,
514        semantic_preference,
515        db_available,
516        compute_lexical_fingerprint,
517        inspect_semantic,
518    } = input;
519
520    let lexical = inspect_lexical_assets(InspectLexicalAssetsInput {
521        data_dir,
522        db_path,
523        stale_threshold,
524        last_indexed_at_ms,
525        now_ms,
526        maintenance,
527        db_available,
528        compute_lexical_fingerprint,
529    })?;
530    let current_db_fingerprint = lexical.fingerprint.current_db_fingerprint.as_deref();
531    let semantic = if inspect_semantic {
532        inspect_semantic_assets(
533            data_dir,
534            db_path,
535            semantic_preference,
536            current_db_fingerprint,
537            db_available,
538        )
539    } else {
540        semantic_state_not_inspected(data_dir, semantic_preference, current_db_fingerprint)
541    };
542
543    Ok(SearchAssetSnapshot { lexical, semantic })
544}
545
546fn semantic_state_not_inspected(
547    data_dir: &Path,
548    preference: SemanticPreference,
549    current_db_fingerprint: Option<&str>,
550) -> SemanticAssetState {
551    let (fast_tier, quality_tier, backlog, checkpoint) =
552        semantic_manifest_progress(data_dir, current_db_fingerprint);
553    let preference_surface = semantic_preference_surface(data_dir, preference);
554
555    SemanticAssetState {
556        status: "not_inspected",
557        availability: "not_inspected",
558        summary: "semantic assets were not inspected for this fast path".to_string(),
559        available: false,
560        can_search: false,
561        fallback_mode: Some("lexical"),
562        preferred_backend: preference_surface.preferred_backend,
563        embedder_id: None,
564        vector_index_path: None,
565        model_dir: preference_surface.model_dir,
566        hnsw_path: None,
567        hnsw_ready: false,
568        progressive_ready: semantic_progressive_assets_ready(data_dir),
569        // The fast-path skip-DB-open lane doesn't have an
570        // `availability` to consult, so we can't honestly call the
571        // tiers queryable here — leave the sub-fix-3 flags false. The
572        // caller upgrades to `semantic_state_from_availability` when
573        // it needs an answer.
574        quality_tier_published: false,
575        semantic_only_search_available: false,
576        hint: Some(
577            "Use 'cass status --json' or 'cass models status --json' for semantic readiness."
578                .to_string(),
579        ),
580        fast_tier,
581        quality_tier,
582        backlog,
583        checkpoint,
584    }
585}
586
587pub(crate) fn inspect_semantic_assets(
588    data_dir: &Path,
589    db_path: &Path,
590    preference: SemanticPreference,
591    current_db_fingerprint: Option<&str>,
592    db_available: bool,
593) -> SemanticAssetState {
594    if !db_available {
595        let availability = SemanticAvailability::DatabaseUnavailable {
596            db_path: db_path.to_path_buf(),
597            error: "database unavailable during asset inspection".to_string(),
598        };
599        return semantic_state_from_availability(
600            data_dir,
601            &availability,
602            preference,
603            current_db_fingerprint,
604        );
605    }
606
607    let availability = match preference {
608        SemanticPreference::DefaultModel => probe_semantic_availability(data_dir),
609        SemanticPreference::HashFallback => probe_hash_semantic_availability(data_dir),
610    };
611    semantic_state_from_availability(data_dir, &availability, preference, current_db_fingerprint)
612}
613
614pub(crate) fn semantic_state_from_availability(
615    data_dir: &Path,
616    availability: &SemanticAvailability,
617    preference: SemanticPreference,
618    current_db_fingerprint: Option<&str>,
619) -> SemanticAssetState {
620    let (mut fast_tier, mut quality_tier, backlog, checkpoint) =
621        semantic_manifest_progress(data_dir, current_db_fingerprint);
622    let preference_surface = semantic_preference_surface(data_dir, preference);
623    let base_embedder_id = semantic_embedder_id(availability, preference);
624    if let (Some(db_fingerprint), Some(embedder_id)) =
625        (current_db_fingerprint, base_embedder_id.as_deref())
626    {
627        promote_complete_shard_generation_state(
628            data_dir,
629            TierKind::Fast,
630            embedder_id,
631            db_fingerprint,
632            &mut fast_tier,
633        );
634        promote_complete_shard_generation_state(
635            data_dir,
636            TierKind::Quality,
637            embedder_id,
638            db_fingerprint,
639            &mut quality_tier,
640        );
641    }
642    let base_vector_index_path = semantic_vector_index_path(data_dir, availability, preference);
643    let base_model_dir = preference_surface.model_dir;
644    let base_hnsw_path = base_embedder_id
645        .as_deref()
646        .map(|embedder_id| hnsw_index_path(data_dir, embedder_id));
647    let runtime = semantic_runtime_surface(SemanticRuntimeInputs {
648        data_dir,
649        availability,
650        preference,
651        fast_tier: &fast_tier,
652        quality_tier: &quality_tier,
653        backlog: &backlog,
654        checkpoint: &checkpoint,
655        base_embedder_id: base_embedder_id.clone(),
656        base_vector_index_path: base_vector_index_path.clone(),
657        base_model_dir: base_model_dir.clone(),
658        base_hnsw_path: base_hnsw_path.clone(),
659    });
660    let use_runtime_paths = runtime.embedder_id.is_some();
661    let embedder_id = runtime.embedder_id.or(base_embedder_id);
662    let vector_index_path = if use_runtime_paths {
663        runtime.vector_index_path
664    } else {
665        runtime.vector_index_path.or(base_vector_index_path)
666    };
667    let model_dir = if use_runtime_paths {
668        runtime.model_dir
669    } else {
670        runtime.model_dir.or(base_model_dir)
671    };
672    let hnsw_path = if use_runtime_paths {
673        runtime.hnsw_path
674    } else {
675        runtime.hnsw_path.or(base_hnsw_path)
676    };
677    let hnsw_ready = hnsw_path.as_ref().is_some_and(|path| path.is_file());
678    let progressive_ready = semantic_progressive_assets_ready(data_dir);
679
680    // Sub-fix 3 for cass#257: report quality-tier readiness as a
681    // first-class flag so operators querying `--mode semantic` can
682    // tell when the quality index is usable even while the
683    // progressive/hybrid stack remains incomplete.
684    let quality_tier_published = semantic_tier_queryable(availability, &quality_tier);
685    let fast_tier_queryable = semantic_tier_queryable(availability, &fast_tier);
686    let semantic_only_search_available = quality_tier_published || fast_tier_queryable;
687
688    SemanticAssetState {
689        status: runtime.status,
690        availability: runtime.availability,
691        summary: runtime.summary,
692        available: runtime.can_search,
693        can_search: runtime.can_search,
694        fallback_mode: runtime.fallback_mode,
695        preferred_backend: preference_surface.preferred_backend,
696        embedder_id,
697        vector_index_path,
698        model_dir,
699        hnsw_path,
700        hnsw_ready,
701        progressive_ready,
702        quality_tier_published,
703        semantic_only_search_available,
704        hint: runtime.hint,
705        fast_tier,
706        quality_tier,
707        backlog,
708        checkpoint,
709    }
710}
711
712fn semantic_preference_surface(
713    data_dir: &Path,
714    preference: SemanticPreference,
715) -> SemanticPreferenceSurface {
716    match preference {
717        SemanticPreference::DefaultModel => SemanticPreferenceSurface {
718            preferred_backend: "fastembed",
719            model_dir: active_policy_model_dir(data_dir),
720        },
721        SemanticPreference::HashFallback => SemanticPreferenceSurface {
722            preferred_backend: "hash",
723            model_dir: None,
724        },
725    }
726}
727
728fn semantic_runtime_surface(inputs: SemanticRuntimeInputs<'_>) -> SemanticRuntimeSurface {
729    let SemanticRuntimeInputs {
730        data_dir,
731        availability,
732        preference,
733        fast_tier,
734        quality_tier,
735        backlog,
736        checkpoint,
737        base_embedder_id,
738        base_vector_index_path,
739        base_model_dir,
740        base_hnsw_path,
741    } = inputs;
742    let base_status = semantic_status_from_availability(availability);
743    let base_availability = semantic_availability_code(availability);
744    let base_summary = availability.summary();
745    let base_can_search = availability.can_search();
746    let base_hint = semantic_hint(availability, preference);
747
748    if matches!(
749        availability,
750        SemanticAvailability::Disabled { .. }
751            | SemanticAvailability::DatabaseUnavailable { .. }
752            | SemanticAvailability::LoadFailed { .. }
753    ) {
754        return SemanticRuntimeSurface {
755            status: base_status,
756            availability: base_availability,
757            summary: base_summary,
758            can_search: base_can_search,
759            fallback_mode: (!base_can_search).then_some("lexical"),
760            hint: base_hint,
761            embedder_id: base_embedder_id,
762            vector_index_path: base_vector_index_path,
763            model_dir: base_model_dir,
764            hnsw_path: base_hnsw_path,
765        };
766    }
767
768    let quality_queryable = semantic_tier_queryable(availability, quality_tier);
769    let fast_queryable = semantic_tier_queryable(availability, fast_tier);
770    let checkpoint_active = checkpoint.active;
771    let backlog_pending = backlog.pending_work;
772    let manifest_assets_present = fast_tier.present || quality_tier.present;
773    let backfill_active = checkpoint_active || backlog_pending;
774
775    let effective_embedder_id = if quality_queryable {
776        quality_tier.embedder_id.clone()
777    } else if fast_queryable {
778        fast_tier.embedder_id.clone()
779    } else {
780        None
781    };
782    let effective_vector_index_path = if quality_queryable {
783        quality_tier.index_path.clone()
784    } else if fast_queryable {
785        fast_tier.index_path.clone()
786    } else {
787        None
788    }
789    .or_else(|| {
790        effective_embedder_id
791            .as_deref()
792            .map(|embedder_id| vector_index_path(data_dir, embedder_id))
793    });
794    let effective_model_dir = effective_embedder_id.as_deref().and_then(|embedder_id| {
795        (!semantic_embedder_is_hash(embedder_id))
796            .then(|| model_dir_for_embedder_id(data_dir, embedder_id))
797            .flatten()
798    });
799    let effective_hnsw_path = effective_embedder_id
800        .as_deref()
801        .map(|embedder_id| hnsw_index_path(data_dir, embedder_id));
802
803    if quality_queryable || fast_queryable {
804        let fully_ready = (quality_queryable || fast_queryable) && !backfill_active;
805        let summary = if quality_queryable && backfill_active {
806            "semantic quality tier is usable; residual semantic backfill is still finishing"
807                .to_string()
808        } else if quality_queryable {
809            "semantic quality tier ready".to_string()
810        } else if backfill_active {
811            "semantic fast tier is usable; higher-quality semantic backfill is still in progress"
812                .to_string()
813        } else {
814            "semantic fast tier ready".to_string()
815        };
816        let hint = if backfill_active {
817            Some(
818                "Semantic refinement is already usable; continue searching while higher-quality backfill finishes."
819                    .to_string(),
820            )
821        } else {
822            None
823        };
824        return SemanticRuntimeSurface {
825            status: if fully_ready { "ready" } else { "building" },
826            availability: if fully_ready {
827                "ready"
828            } else {
829                "index_building"
830            },
831            summary,
832            can_search: true,
833            fallback_mode: None,
834            hint,
835            embedder_id: effective_embedder_id,
836            vector_index_path: effective_vector_index_path,
837            model_dir: effective_model_dir,
838            hnsw_path: effective_hnsw_path,
839        };
840    }
841
842    if backfill_active {
843        return SemanticRuntimeSurface {
844            status: "building",
845            availability: "index_building",
846            summary: "semantic backfill is in progress for the current database".to_string(),
847            can_search: false,
848            fallback_mode: Some("lexical"),
849            hint: Some(
850                "Run 'cass index --semantic' to finish backfilling current semantic assets; search will use lexical fallback until then."
851                    .to_string(),
852            ),
853            embedder_id: base_embedder_id,
854            vector_index_path: base_vector_index_path,
855            model_dir: base_model_dir,
856            hnsw_path: base_hnsw_path,
857        };
858    }
859
860    if manifest_assets_present {
861        return SemanticRuntimeSurface {
862            status: "stale",
863            availability: "update_available",
864            summary: "semantic artifacts exist but do not match the current database".to_string(),
865            can_search: false,
866            fallback_mode: Some("lexical"),
867            hint: Some(
868                "Run 'cass index --semantic' to refresh semantic assets for the current database; search will use lexical fallback until then."
869                    .to_string(),
870            ),
871            embedder_id: base_embedder_id,
872            vector_index_path: base_vector_index_path,
873            model_dir: base_model_dir,
874            hnsw_path: base_hnsw_path,
875        };
876    }
877
878    SemanticRuntimeSurface {
879        status: base_status,
880        availability: base_availability,
881        summary: base_summary,
882        can_search: base_can_search,
883        fallback_mode: (!base_can_search).then_some("lexical"),
884        hint: base_hint,
885        embedder_id: base_embedder_id,
886        vector_index_path: base_vector_index_path,
887        model_dir: base_model_dir,
888        hnsw_path: base_hnsw_path,
889    }
890}
891
892fn active_policy_model_dir(data_dir: &Path) -> Option<PathBuf> {
893    let policy = SemanticPolicy::resolve(&CliSemanticOverrides::default());
894    let embedder_name = FastEmbedder::canonical_name(&policy.quality_tier_embedder)?;
895    FastEmbedder::runtime_model_dir_for(data_dir, embedder_name)
896}
897
898fn model_dir_for_embedder_id(data_dir: &Path, embedder_id: &str) -> Option<PathBuf> {
899    let embedder_name = FastEmbedder::canonical_name(embedder_id)?;
900    FastEmbedder::runtime_model_dir_for(data_dir, embedder_name)
901}
902
903fn semantic_tier_queryable(
904    availability: &SemanticAvailability,
905    tier: &SemanticTierAssetState,
906) -> bool {
907    if !tier.ready || tier.current_db_matches == Some(false) {
908        return false;
909    }
910    let Some(embedder_id) = tier.embedder_id.as_deref() else {
911        return false;
912    };
913    if semantic_embedder_is_hash(embedder_id) {
914        !matches!(
915            availability,
916            SemanticAvailability::Disabled { .. }
917                | SemanticAvailability::DatabaseUnavailable { .. }
918                | SemanticAvailability::LoadFailed { .. }
919        )
920    } else {
921        matches!(
922            availability,
923            SemanticAvailability::Ready { .. }
924                | SemanticAvailability::UpdateAvailable { .. }
925                | SemanticAvailability::IndexBuilding { .. }
926                | SemanticAvailability::IndexMissing { .. }
927        )
928    }
929}
930
931fn semantic_embedder_is_hash(embedder_id: &str) -> bool {
932    embedder_id == HashEmbedder::default().id()
933}
934
935fn semantic_manifest_progress(
936    data_dir: &Path,
937    current_db_fingerprint: Option<&str>,
938) -> (
939    SemanticTierAssetState,
940    SemanticTierAssetState,
941    SemanticBacklogProgressState,
942    SemanticCheckpointProgressState,
943) {
944    let manifest = SemanticManifest::load_or_default(data_dir).unwrap_or_default();
945    let fast_tier = semantic_tier_asset_state(manifest.fast_tier.as_ref(), current_db_fingerprint);
946    let quality_tier =
947        semantic_tier_asset_state(manifest.quality_tier.as_ref(), current_db_fingerprint);
948    let backlog = semantic_backlog_progress_state(&manifest, current_db_fingerprint);
949    let checkpoint =
950        semantic_checkpoint_progress_state(manifest.checkpoint.as_ref(), current_db_fingerprint);
951    (fast_tier, quality_tier, backlog, checkpoint)
952}
953
954fn semantic_tier_asset_state(
955    artifact: Option<&ArtifactRecord>,
956    current_db_fingerprint: Option<&str>,
957) -> SemanticTierAssetState {
958    let Some(artifact) = artifact else {
959        return SemanticTierAssetState::default();
960    };
961
962    SemanticTierAssetState {
963        present: true,
964        ready: artifact.ready,
965        current_db_matches: current_db_fingerprint.map(|fp| artifact.db_fingerprint == fp),
966        conversation_count: Some(artifact.conversation_count),
967        doc_count: Some(artifact.doc_count),
968        embedder_id: Some(artifact.embedder_id.clone()),
969        model_revision: Some(artifact.model_revision.clone()),
970        completed_at_ms: Some(artifact.completed_at_ms),
971        size_bytes: Some(artifact.size_bytes),
972        index_path: None,
973    }
974}
975
976fn resolve_semantic_artifact_path(data_dir: &Path, recorded_path: &str) -> Option<PathBuf> {
977    semantic_shard_artifact_path_is_safe(recorded_path).then(|| data_dir.join(recorded_path))
978}
979
980fn complete_shard_records_for_state(
981    data_dir: &Path,
982    tier: TierKind,
983    embedder_id: &str,
984    db_fingerprint: &str,
985) -> Option<Vec<SemanticShardRecord>> {
986    let manifest = SemanticShardManifest::load(data_dir).ok().flatten()?;
987    let summary = manifest.summary(tier, embedder_id, db_fingerprint);
988    if !summary.complete {
989        return None;
990    }
991    let mut records = manifest
992        .shards
993        .into_iter()
994        .filter(|shard| shard.matches_generation(tier, embedder_id, db_fingerprint))
995        .collect::<Vec<_>>();
996    records.sort_by_key(|shard| shard.shard_index);
997    if records.len() != usize::try_from(summary.shard_count).unwrap_or(usize::MAX) {
998        return None;
999    }
1000    let first = records.first()?;
1001    for (expected_index, shard) in records.iter().enumerate() {
1002        if shard.shard_index != u32::try_from(expected_index).unwrap_or(u32::MAX)
1003            || !shard.ready
1004            || !shard.mmap_ready
1005            || shard.model_revision != first.model_revision
1006            || shard.schema_version != SEMANTIC_SCHEMA_VERSION
1007            || shard.chunking_version != CHUNKING_STRATEGY_VERSION
1008            || shard.dimension == 0
1009            || shard.dimension != first.dimension
1010            || shard.total_conversations != first.total_conversations
1011        {
1012            return None;
1013        }
1014        let artifact_path = resolve_semantic_artifact_path(data_dir, &shard.index_path)?;
1015        if !artifact_path.is_file() {
1016            return None;
1017        }
1018    }
1019    Some(records)
1020}
1021
1022fn promote_complete_shard_generation_state(
1023    data_dir: &Path,
1024    tier: TierKind,
1025    embedder_id: &str,
1026    db_fingerprint: &str,
1027    state: &mut SemanticTierAssetState,
1028) {
1029    if state.ready && state.current_db_matches == Some(true) {
1030        return;
1031    }
1032    let Some(records) =
1033        complete_shard_records_for_state(data_dir, tier, embedder_id, db_fingerprint)
1034    else {
1035        return;
1036    };
1037    let doc_count = records
1038        .iter()
1039        .map(|shard| shard.doc_count)
1040        .fold(0, u64::saturating_add);
1041    let size_bytes = records
1042        .iter()
1043        .map(|shard| shard.size_bytes)
1044        .fold(0, u64::saturating_add);
1045    let completed_at_ms = records
1046        .iter()
1047        .map(|shard| shard.completed_at_ms)
1048        .max()
1049        .unwrap_or(0);
1050    let first = &records[0];
1051    let Some(first_index_path) = resolve_semantic_artifact_path(data_dir, &first.index_path) else {
1052        return;
1053    };
1054    *state = SemanticTierAssetState {
1055        present: true,
1056        ready: true,
1057        current_db_matches: Some(true),
1058        conversation_count: Some(first.total_conversations),
1059        doc_count: Some(doc_count),
1060        embedder_id: Some(first.embedder_id.clone()),
1061        model_revision: Some(first.model_revision.clone()),
1062        completed_at_ms: Some(completed_at_ms),
1063        size_bytes: Some(size_bytes),
1064        index_path: Some(first_index_path),
1065    };
1066}
1067
1068fn semantic_backlog_progress_state(
1069    manifest: &SemanticManifest,
1070    current_db_fingerprint: Option<&str>,
1071) -> SemanticBacklogProgressState {
1072    let backlog = &manifest.backlog;
1073    let current_db_matches = current_db_fingerprint.and_then(|fp| {
1074        (backlog.computed_at_ms > 0 || !backlog.db_fingerprint.is_empty())
1075            .then(|| backlog.is_current(fp))
1076    });
1077
1078    SemanticBacklogProgressState {
1079        total_conversations: backlog.total_conversations,
1080        fast_tier_processed: backlog.fast_tier_processed,
1081        fast_tier_remaining: backlog.fast_tier_remaining(),
1082        quality_tier_processed: backlog.quality_tier_processed,
1083        quality_tier_remaining: backlog.quality_tier_remaining(),
1084        pending_work: backlog.has_pending_work() || manifest.checkpoint.is_some(),
1085        current_db_matches,
1086        computed_at_ms: (backlog.computed_at_ms > 0).then_some(backlog.computed_at_ms),
1087    }
1088}
1089
1090fn semantic_checkpoint_progress_state(
1091    checkpoint: Option<&BuildCheckpoint>,
1092    current_db_fingerprint: Option<&str>,
1093) -> SemanticCheckpointProgressState {
1094    let Some(checkpoint) = checkpoint else {
1095        return SemanticCheckpointProgressState::default();
1096    };
1097
1098    SemanticCheckpointProgressState {
1099        active: true,
1100        tier: Some(checkpoint.tier.as_str()),
1101        current_db_matches: current_db_fingerprint.map(|fp| checkpoint.is_valid(fp)),
1102        completed: Some(checkpoint.is_complete()),
1103        conversations_processed: Some(checkpoint.conversations_processed),
1104        total_conversations: Some(checkpoint.total_conversations),
1105        progress_pct: Some(checkpoint.progress_pct()),
1106        docs_embedded: Some(checkpoint.docs_embedded),
1107        last_offset: Some(checkpoint.last_offset),
1108        saved_at_ms: Some(checkpoint.saved_at_ms),
1109    }
1110}
1111
1112struct InspectLexicalAssetsInput<'a> {
1113    data_dir: &'a Path,
1114    db_path: &'a Path,
1115    stale_threshold: u64,
1116    last_indexed_at_ms: Option<i64>,
1117    /// F4 (cass tech debt): full-precision wall clock; the legacy
1118    /// `now_secs` field was widening the second-precision comparison
1119    /// against ms-precision `last_progress_at_ms`. Derive `now_secs`
1120    /// locally from this when needed for age math.
1121    now_ms: i64,
1122    maintenance: SearchMaintenanceSnapshot,
1123    db_available: bool,
1124    compute_lexical_fingerprint: bool,
1125}
1126
1127fn inspect_lexical_assets(input: InspectLexicalAssetsInput<'_>) -> Result<LexicalAssetState> {
1128    let InspectLexicalAssetsInput {
1129        data_dir,
1130        db_path,
1131        stale_threshold,
1132        last_indexed_at_ms,
1133        now_ms,
1134        maintenance,
1135        db_available,
1136        compute_lexical_fingerprint,
1137    } = input;
1138    let index_path = crate::search::tantivy::expected_index_dir(data_dir);
1139    let checkpoint = load_lexical_rebuild_checkpoint(&index_path)
1140        .with_context(|| format!("loading lexical checkpoint from {}", index_path.display()))?;
1141    let current_db_fingerprint = if db_available && compute_lexical_fingerprint {
1142        Some(
1143            lexical_storage_fingerprint_for_db(db_path).with_context(|| {
1144                format!(
1145                    "computing lexical storage fingerprint for {}",
1146                    db_path.display()
1147                )
1148            })?,
1149        )
1150    } else {
1151        None
1152    };
1153
1154    Ok(lexical_state_from_observations(LexicalObservationInput {
1155        index_path: &index_path,
1156        db_path,
1157        stale_threshold,
1158        last_indexed_at_ms,
1159        now_ms,
1160        maintenance,
1161        checkpoint: checkpoint.as_ref(),
1162        current_db_fingerprint: current_db_fingerprint.as_deref(),
1163    }))
1164}
1165
1166struct LexicalObservationInput<'a> {
1167    index_path: &'a Path,
1168    db_path: &'a Path,
1169    stale_threshold: u64,
1170    last_indexed_at_ms: Option<i64>,
1171    /// Full-precision wall clock (F4); see [`InspectSearchAssetsInput::now_ms`].
1172    now_ms: i64,
1173    maintenance: SearchMaintenanceSnapshot,
1174    checkpoint: Option<&'a LexicalRebuildCheckpoint>,
1175    current_db_fingerprint: Option<&'a str>,
1176}
1177
1178fn lexical_state_from_observations(input: LexicalObservationInput<'_>) -> LexicalAssetState {
1179    let LexicalObservationInput {
1180        index_path,
1181        db_path,
1182        stale_threshold,
1183        last_indexed_at_ms,
1184        now_ms,
1185        maintenance,
1186        checkpoint,
1187        current_db_fingerprint,
1188    } = input;
1189    let exists = crate::search::tantivy::searchable_index_exists(index_path);
1190    let checkpoint_db_matches =
1191        checkpoint.map(|state| crate::stored_path_identity_matches(&state.db_path, db_path));
1192    let schema_matches = checkpoint.map(|state| state.schema_hash == SCHEMA_HASH);
1193    let page_size_matches =
1194        checkpoint.map(|state| state.page_size == LEXICAL_REBUILD_PAGE_SIZE_PUBLIC);
1195    let page_size_compatible =
1196        checkpoint.map(|state| lexical_rebuild_page_size_is_compatible(state.page_size));
1197    let checkpoint_fingerprint = checkpoint.map(|state| state.storage_fingerprint.as_str());
1198    let fingerprint_matches = match (current_db_fingerprint, checkpoint_fingerprint) {
1199        (Some(current), Some(saved)) => Some(lexical_storage_fingerprints_match(current, saved)),
1200        _ => None,
1201    };
1202    let checkpoint_incomplete = checkpoint.is_some_and(|state| !state.completed);
1203    let checkpoint_db_mismatch = checkpoint_db_matches == Some(false);
1204    let contract_mismatch = schema_matches == Some(false) || page_size_compatible == Some(false);
1205    let fingerprint_mismatch = fingerprint_matches == Some(false);
1206    // F4 (cass tech debt): derive the (legacy) second-resolution clock
1207    // from `now_ms` rather than the other way around so the comparison
1208    // against ms-precision `last_progress_at_ms` below is no longer
1209    // forced into second-bin alignment. `as u64` is correct because
1210    // wall-clock millis fits well inside i63 (until year ~292477).
1211    let now_secs: u64 = now_ms.div_euclid(1000).max(0) as u64;
1212    let age_seconds = last_indexed_at_ms
1213        .and_then(|ts| (ts > 0).then(|| now_secs.saturating_sub((ts / 1000) as u64)));
1214    let age_stale = match age_seconds {
1215        Some(age) => age > stale_threshold,
1216        None => true,
1217    };
1218    let maintenance_targets_current_db = maintenance
1219        .db_path
1220        .as_ref()
1221        .is_none_or(|lock_db_path| crate::path_identities_match(lock_db_path, db_path));
1222    let watch_active = maintenance.active
1223        && maintenance_targets_current_db
1224        && maintenance
1225            .mode
1226            .is_some_and(SearchMaintenanceMode::watch_active);
1227    let rebuilding = maintenance.active
1228        && maintenance_targets_current_db
1229        && maintenance
1230            .mode
1231            .is_some_and(SearchMaintenanceMode::rebuild_active);
1232    let active_rebuild_progress = rebuilding;
1233    // Forward-progress liveness check (issue #258): when a rebuild is
1234    // active but the indexing thread has not posted progress within
1235    // `CASS_REBUILD_STALL_DETECT_SECS` (default 120 s), report
1236    // `stalled` rather than `rebuilding`. The lock file's
1237    // `updated_at_ms` is heartbeat-refreshed every ~1 s by a separate
1238    // thread, so it cannot be used as a "work is happening" signal —
1239    // only the indexer-thread-owned `last_progress_at_ms` can.
1240    //
1241    // `now_ms` is now passed in at full ms precision (F4); the old
1242    // shape derived it from `now_secs` and quantised the comparison.
1243    let stall_age_ms = if rebuilding && maintenance_targets_current_db {
1244        maintenance_stall_age_ms(&maintenance, now_ms)
1245    } else {
1246        None
1247    };
1248    let stalled = stall_age_ms.is_some();
1249    let last_progress_at_ms = maintenance
1250        .last_progress_at_ms
1251        .filter(|_| maintenance_targets_current_db);
1252    let last_progress_age_ms = last_progress_at_ms
1253        .filter(|_| rebuilding)
1254        .map(|ts| now_ms.saturating_sub(ts));
1255    let stale = if rebuilding {
1256        // A stalled rebuild leaves the on-disk index unchanged; if it
1257        // existed before the stall it is still searchable, so treat
1258        // `stalled` like the indexer just hadn't gotten there yet.
1259        !exists || contract_mismatch
1260    } else {
1261        exists
1262            && (age_stale
1263                || checkpoint_db_mismatch
1264                || checkpoint_incomplete
1265                || contract_mismatch
1266                || fingerprint_mismatch)
1267    };
1268    let fresh = exists && !stale && !rebuilding;
1269    let status = if stalled {
1270        "stalled"
1271    } else if rebuilding {
1272        "building"
1273    } else if !exists {
1274        "missing"
1275    } else if stale {
1276        "stale"
1277    } else {
1278        "ready"
1279    };
1280    let status_reason = if stalled {
1281        let secs = stall_age_ms.unwrap_or(0) / 1000;
1282        Some(format!(
1283            "indexing thread has not posted forward progress for {secs}s while the lock heartbeat keeps refreshing — see issue #258 for diagnostics (run `cass doctor check --json` and capture a stack trace)"
1284        ))
1285    } else if rebuilding {
1286        Some("lexical rebuild is in progress".to_string())
1287    } else if !exists {
1288        Some("lexical Tantivy metadata missing".to_string())
1289    } else if checkpoint_db_mismatch {
1290        Some("lexical rebuild checkpoint points at a different database".to_string())
1291    } else if contract_mismatch {
1292        Some("lexical rebuild checkpoint no longer matches the active lexical contract".to_string())
1293    } else if fingerprint_mismatch {
1294        Some("database fingerprint changed since the last lexical checkpoint".to_string())
1295    } else if checkpoint_incomplete {
1296        Some("lexical rebuild checkpoint is incomplete".to_string())
1297    } else if age_stale {
1298        Some("lexical index is older than the stale threshold".to_string())
1299    } else {
1300        None
1301    };
1302    let checkpoint_progress_usable = checkpoint.is_some()
1303        && checkpoint_db_matches == Some(true)
1304        && schema_matches == Some(true)
1305        && page_size_compatible == Some(true)
1306        && if active_rebuild_progress {
1307            true
1308        } else {
1309            current_db_fingerprint.is_some() && fingerprint_matches == Some(true)
1310        };
1311    let pending_sessions = checkpoint
1312        .filter(|_| checkpoint_progress_usable)
1313        .map(|state| {
1314            state
1315                .total_conversations
1316                .saturating_sub(state.processed_conversations) as u64
1317        })
1318        .unwrap_or(0);
1319    let maintenance_activity_at_ms = maintenance_targets_current_db
1320        .then_some(())
1321        .and(maintenance.updated_at_ms.or(maintenance.started_at_ms));
1322    let checkpoint_activity_at_ms = checkpoint
1323        .filter(|_| checkpoint_progress_usable)
1324        .and_then(|state| (state.updated_at_ms > 0).then_some(state.updated_at_ms));
1325    let activity_at_ms = match (checkpoint_activity_at_ms, maintenance_activity_at_ms) {
1326        (Some(checkpoint_ts), Some(maintenance_ts)) => Some(checkpoint_ts.max(maintenance_ts)),
1327        (Some(checkpoint_ts), None) => Some(checkpoint_ts),
1328        (None, Some(maintenance_ts)) => Some(maintenance_ts),
1329        (None, None) => None,
1330    };
1331
1332    LexicalAssetState {
1333        status,
1334        exists,
1335        fresh,
1336        stale,
1337        rebuilding,
1338        stalled,
1339        last_progress_age_ms,
1340        last_progress_at_ms,
1341        watch_active,
1342        last_indexed_at_ms,
1343        age_seconds,
1344        stale_threshold_seconds: stale_threshold,
1345        activity_at_ms,
1346        pending_sessions,
1347        processed_conversations: checkpoint
1348            .filter(|_| checkpoint_progress_usable)
1349            .map(|state| state.processed_conversations as u64),
1350        total_conversations: checkpoint
1351            .filter(|_| checkpoint_progress_usable)
1352            .map(|state| state.total_conversations as u64),
1353        indexed_docs: checkpoint
1354            .filter(|_| checkpoint_progress_usable)
1355            .map(|state| state.indexed_docs as u64),
1356        status_reason,
1357        fingerprint: LexicalFingerprintState {
1358            current_db_fingerprint: current_db_fingerprint.map(ToOwned::to_owned),
1359            checkpoint_fingerprint: checkpoint.map(|state| state.storage_fingerprint.clone()),
1360            matches_current_db_fingerprint: fingerprint_matches,
1361        },
1362        checkpoint: LexicalCheckpointState {
1363            present: checkpoint.is_some(),
1364            completed: checkpoint.map(|state| state.completed),
1365            db_matches: checkpoint_db_matches,
1366            schema_matches,
1367            page_size_matches,
1368            page_size_compatible,
1369        },
1370    }
1371}
1372
1373fn semantic_embedder_id(
1374    availability: &SemanticAvailability,
1375    preference: SemanticPreference,
1376) -> Option<String> {
1377    match availability {
1378        SemanticAvailability::Ready { embedder_id }
1379        | SemanticAvailability::UpdateAvailable { embedder_id, .. }
1380        | SemanticAvailability::IndexBuilding { embedder_id, .. } => Some(embedder_id.clone()),
1381        SemanticAvailability::HashFallback => Some(HashEmbedder::default().id().to_string()),
1382        _ => match preference {
1383            SemanticPreference::DefaultModel => {
1384                Some(FastEmbedder::embedder_id_static().to_string())
1385            }
1386            SemanticPreference::HashFallback => Some(HashEmbedder::default().id().to_string()),
1387        },
1388    }
1389}
1390
1391fn semantic_vector_index_path(
1392    data_dir: &Path,
1393    availability: &SemanticAvailability,
1394    preference: SemanticPreference,
1395) -> Option<PathBuf> {
1396    match availability {
1397        SemanticAvailability::IndexMissing { index_path } => Some(index_path.clone()),
1398        _ => semantic_embedder_id(availability, preference)
1399            .map(|embedder_id| vector_index_path(data_dir, &embedder_id)),
1400    }
1401}
1402
1403fn semantic_progressive_assets_ready(data_dir: &Path) -> bool {
1404    let vector_dir = data_dir.join(VECTOR_INDEX_DIR);
1405    vector_dir.join("vector.fast.idx").is_file() && vector_dir.join("vector.quality.idx").is_file()
1406}
1407
1408fn semantic_availability_code(availability: &SemanticAvailability) -> &'static str {
1409    match availability {
1410        SemanticAvailability::Ready { .. } => "ready",
1411        SemanticAvailability::NotInstalled => "not_installed",
1412        SemanticAvailability::NeedsConsent => "needs_consent",
1413        SemanticAvailability::Downloading { .. } => "downloading",
1414        SemanticAvailability::Verifying => "verifying",
1415        SemanticAvailability::IndexBuilding { .. } => "index_building",
1416        SemanticAvailability::HashFallback => "hash_fallback",
1417        SemanticAvailability::Disabled { .. } => "disabled",
1418        SemanticAvailability::ModelMissing { .. } => "model_missing",
1419        SemanticAvailability::IndexMissing { .. } => "index_missing",
1420        SemanticAvailability::DatabaseUnavailable { .. } => "database_unavailable",
1421        SemanticAvailability::LoadFailed { .. } => "load_failed",
1422        SemanticAvailability::UpdateAvailable { .. } => "update_available",
1423    }
1424}
1425
1426fn semantic_status_from_availability(availability: &SemanticAvailability) -> &'static str {
1427    match availability {
1428        SemanticAvailability::Ready { .. } => "ready",
1429        SemanticAvailability::HashFallback => "hash_fallback",
1430        SemanticAvailability::Downloading { .. }
1431        | SemanticAvailability::Verifying
1432        | SemanticAvailability::IndexBuilding { .. } => "building",
1433        SemanticAvailability::Disabled { .. } => "disabled",
1434        SemanticAvailability::UpdateAvailable { .. } => "stale",
1435        SemanticAvailability::NotInstalled
1436        | SemanticAvailability::NeedsConsent
1437        | SemanticAvailability::ModelMissing { .. }
1438        | SemanticAvailability::IndexMissing { .. } => "missing",
1439        SemanticAvailability::DatabaseUnavailable { .. }
1440        | SemanticAvailability::LoadFailed { .. } => "error",
1441    }
1442}
1443
1444fn semantic_hint(
1445    availability: &SemanticAvailability,
1446    preference: SemanticPreference,
1447) -> Option<String> {
1448    let hint = match (preference, availability) {
1449        (SemanticPreference::HashFallback, SemanticAvailability::IndexMissing { .. }) => {
1450            "Run 'cass index --semantic --embedder hash' to build the hash vector index; lexical search remains available without semantic assets"
1451        }
1452        (SemanticPreference::HashFallback, SemanticAvailability::LoadFailed { .. })
1453        | (SemanticPreference::HashFallback, SemanticAvailability::DatabaseUnavailable { .. }) => {
1454            "Run 'cass index --semantic --embedder hash' after the database is healthy; lexical search remains available"
1455        }
1456        (SemanticPreference::HashFallback, _) => {
1457            "Run 'cass index --semantic --embedder hash' to build the hash vector index; lexical search remains available"
1458        }
1459        (_, SemanticAvailability::NotInstalled)
1460        | (_, SemanticAvailability::NeedsConsent)
1461        | (_, SemanticAvailability::ModelMissing { .. }) => {
1462            "Run 'cass models install' and then 'cass index --semantic'; lexical search remains available without the model"
1463        }
1464        (_, SemanticAvailability::IndexMissing { .. })
1465        | (_, SemanticAvailability::UpdateAvailable { .. })
1466        | (_, SemanticAvailability::IndexBuilding { .. }) => {
1467            "Run 'cass index --semantic' to build or refresh vector assets; lexical search remains available"
1468        }
1469        (_, SemanticAvailability::Downloading { .. }) | (_, SemanticAvailability::Verifying) => {
1470            "Wait for the semantic model installation to finish; lexical search remains available"
1471        }
1472        (_, SemanticAvailability::Disabled { .. }) => {
1473            "Semantic search is disabled by policy; lexical search remains available, or re-enable semantic search"
1474        }
1475        (_, SemanticAvailability::DatabaseUnavailable { .. })
1476        | (_, SemanticAvailability::LoadFailed { .. }) => {
1477            "Restore the semantic assets and database; lexical search remains available when the archive database is healthy"
1478        }
1479        (_, SemanticAvailability::Ready { .. }) | (_, SemanticAvailability::HashFallback) => {
1480            return None;
1481        }
1482    };
1483    Some(hint.to_string())
1484}
1485
1486// ---------------------------------------------------------------------------
1487// Maintenance coordination: single-flight, attach-to-progress, fail-open
1488// ---------------------------------------------------------------------------
1489
1490#[cfg_attr(not(test), allow(dead_code))]
1491const HEARTBEAT_STALE_THRESHOLD_MS: i64 = 30_000;
1492#[cfg_attr(not(test), allow(dead_code))]
1493const BOUNDED_WAIT_DEFAULT: Duration = Duration::from_secs(5);
1494#[cfg_attr(not(test), allow(dead_code))]
1495const POLL_INTERVAL_DEFAULT: Duration = Duration::from_millis(250);
1496
1497#[cfg_attr(not(test), allow(dead_code))]
1498#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
1499pub(crate) enum MaintenanceCoordinationOutcome {
1500    Idle,
1501    Active {
1502        job_id: String,
1503        job_kind: SearchMaintenanceJobKind,
1504        phase: Option<String>,
1505        started_at_ms: i64,
1506        updated_at_ms: i64,
1507    },
1508    Stale {
1509        job_id: String,
1510        reason: String,
1511    },
1512}
1513
1514#[cfg_attr(not(test), allow(dead_code))]
1515#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
1516pub(crate) enum MaintenanceDecision {
1517    Launch,
1518    AttachOrWait {
1519        job_id: String,
1520        job_kind: SearchMaintenanceJobKind,
1521        phase: Option<String>,
1522        elapsed_ms: u64,
1523    },
1524    FailOpen {
1525        reason: String,
1526    },
1527}
1528
1529#[cfg_attr(not(test), allow(dead_code))]
1530pub(crate) fn evaluate_maintenance_coordination(
1531    data_dir: &Path,
1532    now_ms: i64,
1533) -> MaintenanceCoordinationOutcome {
1534    evaluate_maintenance_coordination_from_snapshot(
1535        &read_search_maintenance_snapshot(data_dir),
1536        now_ms,
1537    )
1538}
1539
1540#[cfg_attr(not(test), allow(dead_code))]
1541pub(crate) fn evaluate_maintenance_coordination_from_snapshot(
1542    snapshot: &SearchMaintenanceSnapshot,
1543    now_ms: i64,
1544) -> MaintenanceCoordinationOutcome {
1545    if !snapshot.active {
1546        return MaintenanceCoordinationOutcome::Idle;
1547    }
1548    let job_id = maintenance_snapshot_job_id(snapshot);
1549    if let Some(updated_at_ms) = snapshot.updated_at_ms {
1550        let heartbeat_age_ms = now_ms.saturating_sub(updated_at_ms);
1551        if heartbeat_age_ms > HEARTBEAT_STALE_THRESHOLD_MS {
1552            return MaintenanceCoordinationOutcome::Stale {
1553                job_id,
1554                reason: format!(
1555                    "heartbeat is {heartbeat_age_ms}ms old (threshold {HEARTBEAT_STALE_THRESHOLD_MS}ms)"
1556                ),
1557            };
1558        }
1559    }
1560    // Forward-progress liveness check (issue #258). Even if the
1561    // heartbeat is fresh, treat the job as `Stale` when the indexing
1562    // thread itself has not posted progress for longer than the stall
1563    // threshold. Coordination consumers (search fail-open, attach-or-
1564    // wait) then route around the wedged worker instead of waiting on
1565    // it indefinitely.
1566    if let Some(stall_age_ms) = maintenance_stall_age_ms(snapshot, now_ms) {
1567        let threshold_ms = rebuild_stall_detect_threshold_ms().unwrap_or(0);
1568        return MaintenanceCoordinationOutcome::Stale {
1569            job_id,
1570            reason: format!(
1571                "indexing thread has not posted forward progress for {stall_age_ms}ms while the heartbeat keeps refreshing (stall threshold {threshold_ms}ms) — see issue #258"
1572            ),
1573        };
1574    }
1575    MaintenanceCoordinationOutcome::Active {
1576        job_id,
1577        job_kind: snapshot
1578            .job_kind
1579            .unwrap_or(SearchMaintenanceJobKind::LexicalRefresh),
1580        phase: snapshot.phase.clone(),
1581        started_at_ms: snapshot.started_at_ms.unwrap_or(0),
1582        updated_at_ms: snapshot.updated_at_ms.unwrap_or(now_ms),
1583    }
1584}
1585
1586fn maintenance_snapshot_job_id(snapshot: &SearchMaintenanceSnapshot) -> String {
1587    snapshot
1588        .job_id
1589        .as_ref()
1590        .filter(|job_id| !job_id.is_empty())
1591        .cloned()
1592        .unwrap_or_else(|| {
1593            let mode = snapshot
1594                .mode
1595                .map(|mode| mode.as_lock_value())
1596                .unwrap_or("unknown");
1597            let owner = snapshot
1598                .pid
1599                .map(|pid| pid.to_string())
1600                .unwrap_or_else(|| "unknown-owner".to_string());
1601            format!("{mode}-active-lock-{owner}")
1602        })
1603}
1604
1605fn maintenance_snapshot_job_kind(snapshot: &SearchMaintenanceSnapshot) -> SearchMaintenanceJobKind {
1606    snapshot
1607        .job_kind
1608        .unwrap_or(SearchMaintenanceJobKind::LexicalRefresh)
1609}
1610
1611fn maintenance_elapsed_ms(snapshot: &SearchMaintenanceSnapshot, now_ms: i64) -> u64 {
1612    snapshot
1613        .started_at_ms
1614        .map(|started_at_ms| u64::try_from(now_ms.saturating_sub(started_at_ms)).unwrap_or(0))
1615        .unwrap_or(0)
1616}
1617
1618fn stale_heartbeat_phase(snapshot: &SearchMaintenanceSnapshot) -> Option<String> {
1619    snapshot
1620        .phase
1621        .clone()
1622        .or_else(|| Some("stale-heartbeat".to_string()))
1623}
1624
1625#[cfg_attr(not(test), allow(dead_code))]
1626pub(crate) fn decide_maintenance_action(data_dir: &Path, now_ms: i64) -> MaintenanceDecision {
1627    decide_maintenance_action_from_snapshot(&read_search_maintenance_snapshot(data_dir), now_ms)
1628}
1629
1630#[cfg_attr(not(test), allow(dead_code))]
1631pub(crate) fn decide_maintenance_action_from_snapshot(
1632    snapshot: &SearchMaintenanceSnapshot,
1633    now_ms: i64,
1634) -> MaintenanceDecision {
1635    match evaluate_maintenance_coordination_from_snapshot(snapshot, now_ms) {
1636        MaintenanceCoordinationOutcome::Idle => MaintenanceDecision::Launch,
1637        MaintenanceCoordinationOutcome::Stale { job_id, .. } => MaintenanceDecision::AttachOrWait {
1638            job_id,
1639            job_kind: maintenance_snapshot_job_kind(snapshot),
1640            phase: stale_heartbeat_phase(snapshot),
1641            elapsed_ms: maintenance_elapsed_ms(snapshot, now_ms),
1642        },
1643        MaintenanceCoordinationOutcome::Active {
1644            job_id,
1645            job_kind,
1646            phase,
1647            started_at_ms,
1648            ..
1649        } => MaintenanceDecision::AttachOrWait {
1650            job_id,
1651            job_kind,
1652            phase,
1653            elapsed_ms: u64::try_from(now_ms.saturating_sub(started_at_ms)).unwrap_or(0),
1654        },
1655    }
1656}
1657
1658#[cfg_attr(not(test), allow(dead_code))]
1659pub(crate) fn decide_search_failopen(
1660    data_dir: &Path,
1661    now_ms: i64,
1662    lexical_available: bool,
1663) -> MaintenanceDecision {
1664    let snapshot = read_search_maintenance_snapshot(data_dir);
1665    match evaluate_maintenance_coordination_from_snapshot(&snapshot, now_ms) {
1666        MaintenanceCoordinationOutcome::Idle => MaintenanceDecision::Launch,
1667        MaintenanceCoordinationOutcome::Stale { job_id, reason } => {
1668            if lexical_available {
1669                MaintenanceDecision::FailOpen {
1670                    reason: format!(
1671                        "maintenance job {job_id} has a stale heartbeat ({reason}); lexical index is available, failing open"
1672                    ),
1673                }
1674            } else {
1675                MaintenanceDecision::AttachOrWait {
1676                    job_id,
1677                    job_kind: maintenance_snapshot_job_kind(&snapshot),
1678                    phase: stale_heartbeat_phase(&snapshot),
1679                    elapsed_ms: maintenance_elapsed_ms(&snapshot, now_ms),
1680                }
1681            }
1682        }
1683        MaintenanceCoordinationOutcome::Active {
1684            job_id,
1685            job_kind,
1686            phase,
1687            started_at_ms,
1688            ..
1689        } => {
1690            if lexical_available {
1691                MaintenanceDecision::FailOpen {
1692                    reason: format!(
1693                        "maintenance job {job_id} is active; lexical index is available, failing open"
1694                    ),
1695                }
1696            } else {
1697                MaintenanceDecision::AttachOrWait {
1698                    job_id,
1699                    job_kind,
1700                    phase,
1701                    elapsed_ms: u64::try_from(now_ms.saturating_sub(started_at_ms)).unwrap_or(0),
1702                }
1703            }
1704        }
1705    }
1706}
1707
1708#[cfg_attr(not(test), allow(dead_code))]
1709pub(crate) struct PollResult {
1710    pub outcome: MaintenanceCoordinationOutcome,
1711    pub polls: u32,
1712    pub elapsed: Duration,
1713    pub timed_out: bool,
1714}
1715
1716#[cfg_attr(not(test), allow(dead_code))]
1717pub(crate) fn poll_maintenance_until_idle(
1718    data_dir: &Path,
1719    timeout: Option<Duration>,
1720    poll_interval: Option<Duration>,
1721) -> PollResult {
1722    let timeout = timeout.unwrap_or(BOUNDED_WAIT_DEFAULT);
1723    let interval = poll_interval.unwrap_or(POLL_INTERVAL_DEFAULT);
1724    let start = Instant::now();
1725    let deadline = start + timeout;
1726    let mut polls = 0u32;
1727    loop {
1728        let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
1729        let outcome = evaluate_maintenance_coordination(data_dir, now_ms);
1730        polls += 1;
1731        if matches!(outcome, MaintenanceCoordinationOutcome::Idle) {
1732            return PollResult {
1733                outcome,
1734                polls,
1735                elapsed: start.elapsed(),
1736                timed_out: false,
1737            };
1738        }
1739
1740        let now = Instant::now();
1741        if now >= deadline {
1742            return PollResult {
1743                outcome,
1744                polls,
1745                elapsed: start.elapsed(),
1746                timed_out: true,
1747            };
1748        }
1749        let remaining = deadline - now;
1750        std::thread::sleep(interval.min(remaining));
1751    }
1752}
1753
1754// ---------------------------------------------------------------------------
1755// Rich multi-actor event log, yield/pause signaling, unified view (ibuuh.22)
1756// ---------------------------------------------------------------------------
1757
1758#[cfg_attr(not(test), allow(dead_code))]
1759const MAINTENANCE_EVENTS_FILE: &str = ".maintenance-events.jsonl";
1760#[cfg_attr(not(test), allow(dead_code))]
1761const YIELD_SIGNAL_FILE: &str = "maintenance-yield.signal";
1762#[cfg_attr(not(test), allow(dead_code))]
1763const MAX_EVENT_LOG_ENTRIES: usize = 500;
1764
1765#[cfg_attr(not(test), allow(dead_code))]
1766#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
1767pub(crate) struct MaintenanceEvent {
1768    pub timestamp_ms: i64,
1769    pub job_id: String,
1770    pub actor_pid: u32,
1771    pub kind: MaintenanceEventKind,
1772}
1773
1774#[cfg_attr(not(test), allow(dead_code))]
1775#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
1776pub(crate) enum MaintenanceEventKind {
1777    Started { job_kind: String, phase: String },
1778    PhaseChanged { from: String, to: String },
1779    Progress { processed: u64, total: u64 },
1780    YieldRequested { requester_pid: u32, reason: String },
1781    Paused { reason: String },
1782    Resumed,
1783    Completed { summary: String },
1784    Failed { error: String },
1785    Cancelled { reason: String },
1786}
1787
1788#[cfg_attr(not(test), allow(dead_code))]
1789pub(crate) fn append_maintenance_event(data_dir: &Path, event: &MaintenanceEvent) -> Result<()> {
1790    let path = data_dir.join(MAINTENANCE_EVENTS_FILE);
1791    let line = serde_json::to_string(event).with_context(|| "serializing maintenance event")?;
1792    let mut file = OpenOptions::new()
1793        .create(true)
1794        .append(true)
1795        .open(&path)
1796        .with_context(|| format!("opening maintenance event log at {}", path.display()))?;
1797    use std::io::Write;
1798    writeln!(file, "{line}")
1799        .with_context(|| format!("appending to maintenance event log at {}", path.display()))?;
1800    Ok(())
1801}
1802
1803#[cfg_attr(not(test), allow(dead_code))]
1804pub(crate) fn read_maintenance_events(
1805    data_dir: &Path,
1806    after_ms: Option<i64>,
1807    limit: Option<usize>,
1808) -> Vec<MaintenanceEvent> {
1809    let path = data_dir.join(MAINTENANCE_EVENTS_FILE);
1810    let contents = match std::fs::read_to_string(&path) {
1811        Ok(c) => c,
1812        Err(_) => return Vec::new(),
1813    };
1814    let cap = limit.unwrap_or(MAX_EVENT_LOG_ENTRIES);
1815    contents
1816        .lines()
1817        .filter_map(|line| serde_json::from_str::<MaintenanceEvent>(line).ok())
1818        .filter(|e| after_ms.is_none_or(|threshold| e.timestamp_ms > threshold))
1819        .rev()
1820        .take(cap)
1821        .collect::<Vec<_>>()
1822        .into_iter()
1823        .rev()
1824        .collect()
1825}
1826
1827#[cfg_attr(not(test), allow(dead_code))]
1828pub(crate) fn truncate_maintenance_event_log(data_dir: &Path) -> Result<()> {
1829    let path = data_dir.join(MAINTENANCE_EVENTS_FILE);
1830    let contents = match std::fs::read_to_string(&path) {
1831        Ok(c) => c,
1832        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
1833        Err(e) => {
1834            return Err(e).with_context(|| {
1835                format!("reading event log for truncation at {}", path.display())
1836            });
1837        }
1838    };
1839    let lines: Vec<&str> = contents.lines().collect();
1840    if lines.len() <= MAX_EVENT_LOG_ENTRIES {
1841        return Ok(());
1842    }
1843    let keep = &lines[lines.len() - MAX_EVENT_LOG_ENTRIES..];
1844    let mut output = keep.join("\n");
1845    output.push('\n');
1846    std::fs::write(&path, output)
1847        .with_context(|| format!("truncating event log at {}", path.display()))
1848}
1849
1850// ---------------------------------------------------------------------------
1851// Yield/pause signaling
1852// ---------------------------------------------------------------------------
1853
1854#[cfg_attr(not(test), allow(dead_code))]
1855#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
1856pub(crate) struct YieldRequest {
1857    pub requester_pid: u32,
1858    pub requested_at_ms: i64,
1859    pub reason: String,
1860}
1861
1862#[cfg_attr(not(test), allow(dead_code))]
1863pub(crate) fn request_yield(data_dir: &Path, reason: &str) -> Result<()> {
1864    let path = data_dir.join(YIELD_SIGNAL_FILE);
1865    let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
1866    let req = YieldRequest {
1867        requester_pid: std::process::id(),
1868        requested_at_ms: now_ms,
1869        reason: reason.to_string(),
1870    };
1871    let payload = serde_json::to_string(&req).with_context(|| "serializing yield request")?;
1872    std::fs::write(&path, payload)
1873        .with_context(|| format!("writing yield signal to {}", path.display()))
1874}
1875
1876#[cfg_attr(not(test), allow(dead_code))]
1877pub(crate) fn check_yield_requested(data_dir: &Path) -> Option<YieldRequest> {
1878    let path = data_dir.join(YIELD_SIGNAL_FILE);
1879    let contents = std::fs::read_to_string(&path).ok()?;
1880    serde_json::from_str::<YieldRequest>(&contents).ok()
1881}
1882
1883#[cfg_attr(not(test), allow(dead_code))]
1884pub(crate) fn clear_yield_signal(data_dir: &Path) -> Result<()> {
1885    let path = data_dir.join(YIELD_SIGNAL_FILE);
1886    match std::fs::remove_file(&path) {
1887        Ok(()) => Ok(()),
1888        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
1889        Err(e) => Err(e).with_context(|| format!("clearing yield signal at {}", path.display())),
1890    }
1891}
1892
1893// ---------------------------------------------------------------------------
1894// Unified maintenance view
1895// ---------------------------------------------------------------------------
1896
1897#[cfg_attr(not(test), allow(dead_code))]
1898#[derive(Debug, Clone, serde::Serialize)]
1899pub(crate) struct UnifiedMaintenanceView {
1900    pub coordination: MaintenanceCoordinationOutcome,
1901    pub snapshot: SearchMaintenanceSnapshot,
1902    pub yield_pending: Option<YieldRequest>,
1903    pub recent_events: Vec<MaintenanceEvent>,
1904    pub decision: MaintenanceDecision,
1905}
1906
1907#[cfg_attr(not(test), allow(dead_code))]
1908pub(crate) fn unified_maintenance_view(
1909    data_dir: &Path,
1910    lexical_available: bool,
1911) -> UnifiedMaintenanceView {
1912    let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
1913    let snapshot = read_search_maintenance_snapshot(data_dir);
1914    let coordination = evaluate_maintenance_coordination_from_snapshot(&snapshot, now_ms);
1915    let yield_pending = check_yield_requested(data_dir);
1916    let recent_events = read_maintenance_events(data_dir, None, Some(20));
1917    let decision = if lexical_available {
1918        match &coordination {
1919            MaintenanceCoordinationOutcome::Active {
1920                job_id,
1921                job_kind,
1922                phase,
1923                ..
1924            } => MaintenanceDecision::FailOpen {
1925                reason: format!(
1926                    "maintenance job {} ({:?}) is active (phase: {}); lexical available, failing open",
1927                    job_id,
1928                    job_kind,
1929                    phase.as_deref().unwrap_or("unknown")
1930                ),
1931            },
1932            MaintenanceCoordinationOutcome::Stale { job_id, reason } => {
1933                MaintenanceDecision::FailOpen {
1934                    reason: format!(
1935                        "maintenance job {job_id} has a stale heartbeat ({reason}); lexical available, failing open"
1936                    ),
1937                }
1938            }
1939            _ => decide_maintenance_action_from_snapshot(&snapshot, now_ms),
1940        }
1941    } else {
1942        decide_maintenance_action_from_snapshot(&snapshot, now_ms)
1943    };
1944
1945    UnifiedMaintenanceView {
1946        coordination,
1947        snapshot,
1948        yield_pending,
1949        recent_events,
1950        decision,
1951    }
1952}
1953
1954#[cfg(test)]
1955mod tests {
1956    use super::*;
1957
1958    #[test]
1959    fn maintenance_mode_round_trips_lock_values() {
1960        for mode in [
1961            SearchMaintenanceMode::Index,
1962            SearchMaintenanceMode::WatchStartup,
1963            SearchMaintenanceMode::Watch,
1964            SearchMaintenanceMode::WatchOnce,
1965        ] {
1966            assert_eq!(
1967                SearchMaintenanceMode::parse_lock_value(mode.as_lock_value()),
1968                Some(mode)
1969            );
1970        }
1971    }
1972
1973    #[test]
1974    fn maintenance_job_kind_round_trips_lock_values() {
1975        for kind in [
1976            SearchMaintenanceJobKind::LexicalRefresh,
1977            SearchMaintenanceJobKind::SemanticAcquire,
1978        ] {
1979            assert_eq!(
1980                SearchMaintenanceJobKind::parse_lock_value(kind.as_lock_value()),
1981                Some(kind)
1982            );
1983        }
1984    }
1985
1986    #[test]
1987    fn stale_lock_metadata_from_dead_owner_is_reaped_on_read() {
1988        // Regression for issue #176: the TUI used to see a permanent
1989        // `orphaned: true` state when the index-run.lock file contained
1990        // metadata from a crashed process, because nothing in the read
1991        // path cleaned it up. That produced a tight CPU-bound poll loop
1992        // on startup. The read path now reaps stale metadata atomically
1993        // while holding the exclusive flock.
1994        let temp = tempfile::tempdir().expect("tempdir");
1995        let lock_path = temp.path().join("index-run.lock");
1996        // The reap path does not probe the recorded pid — POSIX flock
1997        // acquisition is the signal — so the concrete pid value in the
1998        // fixture is irrelevant. We still record one so the parser
1999        // runs through its full happy path.
2000        std::fs::write(
2001            &lock_path,
2002            concat!(
2003                "pid=4242\n",
2004                "started_at_ms=1733000111000\n",
2005                "updated_at_ms=1733000112000\n",
2006                "db_path=/tmp/cass/agent_search.db\n",
2007                "mode=index\n",
2008                "job_id=lexical-refresh-1733000111000-4242\n",
2009                "job_kind=lexical_refresh\n",
2010                "phase=rebuilding\n"
2011            ),
2012        )
2013        .expect("write lock metadata");
2014
2015        let snapshot = read_search_maintenance_snapshot(temp.path());
2016        assert!(!snapshot.active, "no owner, must not be reported active");
2017        assert!(
2018            !snapshot.orphaned,
2019            "stale metadata must be reaped, not reported as orphaned"
2020        );
2021        assert!(snapshot.pid.is_none(), "pid must be cleared after reap");
2022        assert!(
2023            snapshot.job_id.is_none(),
2024            "job_id must be cleared after reap"
2025        );
2026        assert!(snapshot.phase.is_none(), "phase must be cleared after reap");
2027
2028        // File must still exist (to preserve permissions and avoid
2029        // creating/recreating races) but be empty.
2030        let post = std::fs::metadata(&lock_path).expect("lock file still present");
2031        assert_eq!(post.len(), 0, "stale metadata must be truncated in place");
2032
2033        // Second read also returns a clean default snapshot.
2034        let snapshot2 = read_search_maintenance_snapshot(temp.path());
2035        assert!(!snapshot2.active);
2036        assert!(!snapshot2.orphaned);
2037    }
2038
2039    #[test]
2040    fn live_owner_metadata_is_preserved_when_flock_is_held() {
2041        // When the lock is actually held by a live owner, the snapshot
2042        // must report the owner faithfully and must NOT reap the file.
2043        use fs2::FileExt;
2044        let temp = tempfile::tempdir().expect("tempdir");
2045        let lock_path = temp.path().join("index-run.lock");
2046        let owner = OpenOptions::new()
2047            .create(true)
2048            .read(true)
2049            .write(true)
2050            .truncate(true)
2051            .open(&lock_path)
2052            .expect("open owner handle");
2053        owner
2054            .try_lock_exclusive()
2055            .expect("owner acquires exclusive lock");
2056        // Write metadata while holding the lock, matching acquire_index_run_lock's order.
2057        std::fs::write(
2058            &lock_path,
2059            concat!(
2060                "pid=4242\n",
2061                "started_at_ms=1733000111000\n",
2062                "updated_at_ms=1733000112000\n",
2063                "db_path=/tmp/cass/agent_search.db\n",
2064                "mode=index\n",
2065                "job_id=lexical-refresh-1733000111000-4242\n",
2066                "job_kind=lexical_refresh\n",
2067                "phase=rebuilding\n"
2068            ),
2069        )
2070        .expect("write lock metadata");
2071
2072        let snapshot = read_search_maintenance_snapshot(temp.path());
2073        assert!(snapshot.active, "live owner must be reported active");
2074        assert!(!snapshot.orphaned);
2075        assert_eq!(snapshot.pid, Some(4242));
2076        assert_eq!(
2077            snapshot.job_id.as_deref(),
2078            Some("lexical-refresh-1733000111000-4242")
2079        );
2080        assert_eq!(
2081            snapshot.job_kind,
2082            Some(SearchMaintenanceJobKind::LexicalRefresh)
2083        );
2084        assert_eq!(snapshot.phase.as_deref(), Some("rebuilding"));
2085        assert_eq!(snapshot.updated_at_ms, Some(1_733_000_112_000));
2086
2087        // Metadata must still be present — reaping must NOT have happened.
2088        let post = std::fs::metadata(&lock_path).expect("lock file still present");
2089        assert!(post.len() > 0, "live-owner metadata must not be truncated");
2090
2091        let _ = FileExt::unlock(&owner);
2092    }
2093
2094    #[test]
2095    fn lexical_storage_fingerprint_matching_handles_jitter_and_size_drift() {
2096        let cases = [
2097            (
2098                "small mtime settle jitter",
2099                "323584:1776310228000:329632:1776310227824",
2100                "323584:1776310227832:329632:1776310227824",
2101                true,
2102            ),
2103            (
2104                "wal size drift",
2105                "323584:1776310228000:329632:1776310227824",
2106                "323584:1776310227832:400000:1776310227824",
2107                false,
2108            ),
2109        ];
2110
2111        for (label, current, saved, expected) in cases {
2112            assert_eq!(
2113                lexical_storage_fingerprints_match(current, saved),
2114                expected,
2115                "{label}"
2116            );
2117        }
2118    }
2119
2120    #[test]
2121    fn lexical_state_marks_fingerprint_mismatch_stale() {
2122        let temp = tempfile::tempdir().expect("tempdir");
2123        let index_path = temp.path().join("index").join("v4");
2124        std::fs::create_dir_all(&index_path).expect("create index dir");
2125        // Simulate an existing tantivy index (meta.json present) so the
2126        // "missing" branch in lexical_state_from_observations doesn't short
2127        // circuit before the fingerprint check we want to exercise.
2128        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2129        let db_path = temp.path().join("agent_search.db");
2130        std::fs::write(&db_path, b"db").expect("write db file");
2131
2132        let checkpoint = LexicalRebuildCheckpoint {
2133            db_path: db_path.display().to_string(),
2134            total_conversations: 10,
2135            storage_fingerprint: "before".to_string(),
2136            committed_offset: 10,
2137            committed_conversation_id: Some(10),
2138            processed_conversations: 10,
2139            indexed_docs: 100,
2140            schema_hash: SCHEMA_HASH.to_string(),
2141            page_size: LEXICAL_REBUILD_PAGE_SIZE_PUBLIC,
2142            completed: true,
2143            updated_at_ms: 1_733_000_000_000,
2144        };
2145
2146        let state = lexical_state_from_observations(LexicalObservationInput {
2147            index_path: &index_path,
2148            db_path: &db_path,
2149            stale_threshold: 60,
2150            last_indexed_at_ms: Some(1_733_000_000_000),
2151            now_ms: 1_733_000_001_000,
2152            maintenance: SearchMaintenanceSnapshot::default(),
2153            checkpoint: Some(&checkpoint),
2154            current_db_fingerprint: Some("after"),
2155        });
2156
2157        assert_eq!(state.status, "stale");
2158        assert_eq!(
2159            state.fingerprint.matches_current_db_fingerprint,
2160            Some(false)
2161        );
2162        assert!(
2163            state
2164                .status_reason
2165                .as_deref()
2166                .is_some_and(|reason| reason.contains("fingerprint"))
2167        );
2168        assert_eq!(state.pending_sessions, 0);
2169        assert_eq!(state.processed_conversations, None);
2170        assert_eq!(state.total_conversations, None);
2171        assert_eq!(state.indexed_docs, None);
2172    }
2173
2174    #[test]
2175    fn lexical_state_marks_checkpoint_db_mismatch_stale_without_fingerprint_probe() {
2176        let temp = tempfile::tempdir().expect("tempdir");
2177        let index_path = temp.path().join("index").join("v4");
2178        std::fs::create_dir_all(&index_path).expect("create index dir");
2179        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2180        let db_path = temp.path().join("agent_search.db");
2181        let other_db_path = temp.path().join("other_agent_search.db");
2182        std::fs::write(&db_path, b"db").expect("write db file");
2183        std::fs::write(&other_db_path, b"other db").expect("write other db file");
2184
2185        let checkpoint = LexicalRebuildCheckpoint {
2186            db_path: other_db_path.display().to_string(),
2187            total_conversations: 10,
2188            storage_fingerprint: "old-db-fingerprint".to_string(),
2189            committed_offset: 10,
2190            committed_conversation_id: Some(10),
2191            processed_conversations: 10,
2192            indexed_docs: 100,
2193            schema_hash: SCHEMA_HASH.to_string(),
2194            page_size: LEXICAL_REBUILD_PAGE_SIZE_PUBLIC,
2195            completed: true,
2196            updated_at_ms: 1_733_000_000_000,
2197        };
2198
2199        let state = lexical_state_from_observations(LexicalObservationInput {
2200            index_path: &index_path,
2201            db_path: &db_path,
2202            stale_threshold: 60,
2203            last_indexed_at_ms: Some(1_733_000_000_000),
2204            now_ms: 1_733_000_001_000,
2205            maintenance: SearchMaintenanceSnapshot::default(),
2206            checkpoint: Some(&checkpoint),
2207            current_db_fingerprint: None,
2208        });
2209
2210        assert_eq!(state.status, "stale");
2211        assert!(state.stale);
2212        assert!(!state.fresh);
2213        assert_eq!(state.checkpoint.db_matches, Some(false));
2214        assert_eq!(state.fingerprint.matches_current_db_fingerprint, None);
2215        assert_eq!(state.pending_sessions, 0);
2216        assert_eq!(state.processed_conversations, None);
2217        assert_eq!(state.total_conversations, None);
2218        assert_eq!(state.indexed_docs, None);
2219        assert!(
2220            state
2221                .status_reason
2222                .as_deref()
2223                .is_some_and(|reason| reason.contains("different database"))
2224        );
2225    }
2226
2227    #[test]
2228    fn lexical_state_missing_index_is_not_marked_stale_until_initialized() {
2229        let temp = tempfile::tempdir().expect("tempdir");
2230        let index_path = temp.path().join("index").join("v4");
2231        std::fs::create_dir_all(&index_path).expect("create index dir");
2232        let db_path = temp.path().join("agent_search.db");
2233        std::fs::write(&db_path, b"db").expect("write db file");
2234
2235        let state = lexical_state_from_observations(LexicalObservationInput {
2236            index_path: &index_path,
2237            db_path: &db_path,
2238            stale_threshold: 60,
2239            last_indexed_at_ms: None,
2240            now_ms: 1_733_000_001_000,
2241            maintenance: SearchMaintenanceSnapshot::default(),
2242            checkpoint: None,
2243            current_db_fingerprint: None,
2244        });
2245
2246        assert_eq!(state.status, "missing");
2247        assert!(!state.exists);
2248        assert!(!state.stale);
2249        assert!(!state.fresh);
2250        assert_eq!(
2251            state.status_reason.as_deref(),
2252            Some("lexical Tantivy metadata missing")
2253        );
2254    }
2255
2256    #[test]
2257    fn lexical_state_keeps_progress_visible_during_active_rebuild_despite_fingerprint_drift() {
2258        let temp = tempfile::tempdir().expect("tempdir");
2259        let index_path = temp.path().join("index").join("v4");
2260        std::fs::create_dir_all(&index_path).expect("create index dir");
2261        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2262        let db_path = temp.path().join("agent_search.db");
2263        std::fs::write(&db_path, b"db").expect("write db file");
2264
2265        let checkpoint = LexicalRebuildCheckpoint {
2266            db_path: db_path.display().to_string(),
2267            total_conversations: 10,
2268            storage_fingerprint: "before".to_string(),
2269            committed_offset: 4,
2270            committed_conversation_id: Some(4),
2271            processed_conversations: 4,
2272            indexed_docs: 20,
2273            schema_hash: SCHEMA_HASH.to_string(),
2274            page_size: 200,
2275            completed: false,
2276            updated_at_ms: 1_733_000_123_000,
2277        };
2278
2279        let state = lexical_state_from_observations(LexicalObservationInput {
2280            index_path: &index_path,
2281            db_path: &db_path,
2282            stale_threshold: 60,
2283            last_indexed_at_ms: Some(1_733_000_000_000),
2284            now_ms: 1_733_000_001_000,
2285            maintenance: SearchMaintenanceSnapshot {
2286                active: true,
2287                pid: Some(std::process::id()),
2288                started_at_ms: Some(1_733_000_111_000),
2289                db_path: Some(db_path.clone()),
2290                mode: Some(SearchMaintenanceMode::Index),
2291                job_id: None,
2292                job_kind: None,
2293                phase: None,
2294                updated_at_ms: None,
2295                last_progress_at_ms: None,
2296                orphaned: false,
2297            },
2298            checkpoint: Some(&checkpoint),
2299            current_db_fingerprint: Some("after"),
2300        });
2301
2302        assert_eq!(state.status, "building");
2303        assert!(!state.stale);
2304        assert!(!state.fresh);
2305        assert_eq!(state.pending_sessions, 6);
2306        assert_eq!(state.processed_conversations, Some(4));
2307        assert_eq!(state.total_conversations, Some(10));
2308        assert_eq!(state.indexed_docs, Some(20));
2309        assert_eq!(state.checkpoint.page_size_matches, Some(false));
2310        assert_eq!(state.checkpoint.page_size_compatible, Some(true));
2311        assert_eq!(
2312            state.status_reason.as_deref(),
2313            Some("lexical rebuild is in progress")
2314        );
2315    }
2316
2317    #[test]
2318    fn lexical_state_hides_progress_for_incompatible_page_size_checkpoint() {
2319        let temp = tempfile::tempdir().expect("tempdir");
2320        let index_path = temp.path().join("index").join("v4");
2321        std::fs::create_dir_all(&index_path).expect("create index dir");
2322        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2323        let db_path = temp.path().join("agent_search.db");
2324        std::fs::write(&db_path, b"db").expect("write db file");
2325
2326        let checkpoint = LexicalRebuildCheckpoint {
2327            db_path: db_path.display().to_string(),
2328            total_conversations: 10,
2329            storage_fingerprint: "before".to_string(),
2330            committed_offset: 4,
2331            committed_conversation_id: Some(4),
2332            processed_conversations: 4,
2333            indexed_docs: 20,
2334            schema_hash: SCHEMA_HASH.to_string(),
2335            page_size: 13,
2336            completed: false,
2337            updated_at_ms: 1_733_000_123_000,
2338        };
2339
2340        let state = lexical_state_from_observations(LexicalObservationInput {
2341            index_path: &index_path,
2342            db_path: &db_path,
2343            stale_threshold: 60,
2344            last_indexed_at_ms: Some(1_733_000_000_000),
2345            now_ms: 1_733_000_001_000,
2346            maintenance: SearchMaintenanceSnapshot::default(),
2347            checkpoint: Some(&checkpoint),
2348            current_db_fingerprint: Some("before"),
2349        });
2350
2351        assert_eq!(state.status, "stale");
2352        assert!(state.stale);
2353        assert_eq!(state.pending_sessions, 0);
2354        assert_eq!(state.processed_conversations, None);
2355        assert_eq!(state.total_conversations, None);
2356        assert_eq!(state.indexed_docs, None);
2357        assert_eq!(state.checkpoint.page_size_matches, Some(false));
2358        assert_eq!(state.checkpoint.page_size_compatible, Some(false));
2359        assert!(
2360            state
2361                .status_reason
2362                .as_deref()
2363                .is_some_and(|reason| reason.contains("contract"))
2364        );
2365    }
2366
2367    #[test]
2368    fn lexical_state_prefers_newer_maintenance_heartbeat_over_stale_checkpoint_timestamp() {
2369        let temp = tempfile::tempdir().expect("tempdir");
2370        let index_path = temp.path().join("index").join("v4");
2371        std::fs::create_dir_all(&index_path).expect("create index dir");
2372        let db_path = temp.path().join("agent_search.db");
2373        std::fs::write(&db_path, b"db").expect("write db file");
2374
2375        let checkpoint = LexicalRebuildCheckpoint {
2376            db_path: db_path.display().to_string(),
2377            total_conversations: 10,
2378            storage_fingerprint: "before".to_string(),
2379            committed_offset: 4,
2380            committed_conversation_id: Some(4),
2381            processed_conversations: 4,
2382            indexed_docs: 20,
2383            schema_hash: SCHEMA_HASH.to_string(),
2384            page_size: LEXICAL_REBUILD_PAGE_SIZE_PUBLIC,
2385            completed: false,
2386            updated_at_ms: 1_733_000_123_000,
2387        };
2388
2389        let state = lexical_state_from_observations(LexicalObservationInput {
2390            index_path: &index_path,
2391            db_path: &db_path,
2392            stale_threshold: 60,
2393            last_indexed_at_ms: Some(1_733_000_000_000),
2394            now_ms: 1_733_000_001_000,
2395            maintenance: SearchMaintenanceSnapshot {
2396                active: true,
2397                pid: Some(std::process::id()),
2398                started_at_ms: Some(1_733_000_111_000),
2399                db_path: Some(db_path.clone()),
2400                mode: Some(SearchMaintenanceMode::Index),
2401                job_id: None,
2402                job_kind: None,
2403                phase: None,
2404                updated_at_ms: Some(1_733_000_456_000),
2405                last_progress_at_ms: None,
2406                orphaned: false,
2407            },
2408            checkpoint: Some(&checkpoint),
2409            current_db_fingerprint: Some("after"),
2410        });
2411
2412        assert_eq!(state.status, "building");
2413        assert_eq!(state.activity_at_ms, Some(1_733_000_456_000));
2414    }
2415
2416    #[test]
2417    fn lexical_state_ignores_rebuild_lock_for_different_database() {
2418        let temp = tempfile::tempdir().expect("tempdir");
2419        let index_path = temp.path().join("index").join("v4");
2420        std::fs::create_dir_all(&index_path).expect("create index dir");
2421        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2422        let db_path = temp.path().join("agent_search.db");
2423        std::fs::write(&db_path, b"db").expect("write db file");
2424        let other_db_path = temp.path().join("other.db");
2425        std::fs::write(&other_db_path, b"other").expect("write other db file");
2426
2427        let checkpoint = LexicalRebuildCheckpoint {
2428            db_path: db_path.display().to_string(),
2429            total_conversations: 10,
2430            storage_fingerprint: "before".to_string(),
2431            committed_offset: 4,
2432            committed_conversation_id: Some(4),
2433            processed_conversations: 4,
2434            indexed_docs: 20,
2435            schema_hash: SCHEMA_HASH.to_string(),
2436            page_size: LEXICAL_REBUILD_PAGE_SIZE_PUBLIC,
2437            completed: false,
2438            updated_at_ms: 1_733_000_123_000,
2439        };
2440
2441        let state = lexical_state_from_observations(LexicalObservationInput {
2442            index_path: &index_path,
2443            db_path: &db_path,
2444            stale_threshold: 60,
2445            last_indexed_at_ms: Some(1_733_000_000_000),
2446            now_ms: 1_733_000_001_000,
2447            maintenance: SearchMaintenanceSnapshot {
2448                active: true,
2449                pid: Some(std::process::id()),
2450                started_at_ms: Some(1_733_000_111_000),
2451                db_path: Some(other_db_path),
2452                mode: Some(SearchMaintenanceMode::Index),
2453                job_id: None,
2454                job_kind: None,
2455                phase: None,
2456                updated_at_ms: None,
2457                last_progress_at_ms: None,
2458                orphaned: false,
2459            },
2460            checkpoint: Some(&checkpoint),
2461            current_db_fingerprint: Some("after"),
2462        });
2463
2464        assert_eq!(state.status, "stale");
2465        assert!(state.stale);
2466        assert!(!state.fresh);
2467        assert!(!state.rebuilding);
2468        assert!(!state.watch_active);
2469        assert_eq!(state.activity_at_ms, None);
2470        assert_eq!(state.pending_sessions, 0);
2471        assert_eq!(state.processed_conversations, None);
2472        assert_eq!(state.total_conversations, None);
2473        assert_eq!(state.indexed_docs, None);
2474        assert!(
2475            state
2476                .status_reason
2477                .as_deref()
2478                .is_some_and(|reason| reason.contains("fingerprint"))
2479        );
2480    }
2481
2482    #[test]
2483    fn lexical_state_ignores_watch_lock_for_different_database() {
2484        let temp = tempfile::tempdir().expect("tempdir");
2485        let index_path = temp.path().join("index").join("v4");
2486        std::fs::create_dir_all(&index_path).expect("create index dir");
2487        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2488        let db_path = temp.path().join("agent_search.db");
2489        std::fs::write(&db_path, b"db").expect("write db file");
2490        let other_db_path = temp.path().join("other.db");
2491        std::fs::write(&other_db_path, b"other").expect("write other db file");
2492
2493        let state = lexical_state_from_observations(LexicalObservationInput {
2494            index_path: &index_path,
2495            db_path: &db_path,
2496            stale_threshold: 60,
2497            last_indexed_at_ms: Some(1_733_000_000_000),
2498            now_ms: 1_733_000_020_000,
2499            maintenance: SearchMaintenanceSnapshot {
2500                active: true,
2501                pid: Some(std::process::id()),
2502                started_at_ms: Some(1_733_000_111_000),
2503                db_path: Some(other_db_path),
2504                mode: Some(SearchMaintenanceMode::Watch),
2505                job_id: None,
2506                job_kind: None,
2507                phase: None,
2508                updated_at_ms: None,
2509                last_progress_at_ms: None,
2510                orphaned: false,
2511            },
2512            checkpoint: None,
2513            current_db_fingerprint: None,
2514        });
2515
2516        assert_eq!(state.status, "ready");
2517        assert!(state.fresh);
2518        assert!(!state.stale);
2519        assert!(!state.rebuilding);
2520        assert!(!state.watch_active);
2521        assert_eq!(state.activity_at_ms, None);
2522    }
2523
2524    // ---- Forward-progress liveness / stall detection (issue #258) ----
2525
2526    /// `last_progress_at_ms` is older than the default 120 s stall
2527    /// threshold; the heartbeat `updated_at_ms` is fresh (the heartbeat
2528    /// thread kept refreshing it independently). Status must flip to
2529    /// `stalled`, NOT remain `building` — this is the regression #258
2530    /// guards against.
2531    #[test]
2532    fn lexical_state_reports_stalled_when_progress_is_stale_despite_fresh_heartbeat() {
2533        let temp = tempfile::tempdir().expect("tempdir");
2534        let index_path = temp.path().join("index").join("v4");
2535        std::fs::create_dir_all(&index_path).expect("create index dir");
2536        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2537        let db_path = temp.path().join("agent_search.db");
2538        std::fs::write(&db_path, b"db").expect("write db file");
2539
2540        // now = 1_733_000_300 s (= 1_733_000_300_000 ms).
2541        // heartbeat updated 500 ms ago: fresh.
2542        // forward progress posted 300 s ago: well past the 120 s default stall threshold.
2543        // F4 (cass tech debt): the input now carries full-precision
2544        // `now_ms` end-to-end. Tests that previously needed
2545        // `now_secs as i64 * 1000` no longer have to round-trip.
2546        let now_ms: i64 = 1_733_000_300_000;
2547
2548        let state = lexical_state_from_observations(LexicalObservationInput {
2549            index_path: &index_path,
2550            db_path: &db_path,
2551            stale_threshold: 60,
2552            last_indexed_at_ms: Some(1_733_000_000_000),
2553            now_ms,
2554            maintenance: SearchMaintenanceSnapshot {
2555                active: true,
2556                pid: Some(std::process::id()),
2557                started_at_ms: Some(now_ms - 600_000),
2558                db_path: Some(db_path.clone()),
2559                mode: Some(SearchMaintenanceMode::WatchStartup),
2560                job_id: Some("lexical_refresh-1-1".to_string()),
2561                job_kind: Some(SearchMaintenanceJobKind::LexicalRefresh),
2562                phase: Some("watch_startup".to_string()),
2563                updated_at_ms: Some(now_ms - 500),
2564                last_progress_at_ms: Some(now_ms - 300_000),
2565                orphaned: false,
2566            },
2567            checkpoint: None,
2568            current_db_fingerprint: None,
2569        });
2570
2571        assert!(state.rebuilding, "active rebuild lock must still register");
2572        assert!(
2573            state.stalled,
2574            "stale forward-progress timestamp must flip stalled=true",
2575        );
2576        assert_eq!(state.status, "stalled");
2577        assert_eq!(
2578            state.last_progress_at_ms,
2579            Some(now_ms - 300_000),
2580            "last_progress_at_ms must be surfaced to status callers",
2581        );
2582        let age = state
2583            .last_progress_age_ms
2584            .expect("last_progress_age_ms must be computed");
2585        assert!(
2586            (299_900..=300_100).contains(&age),
2587            "computed last_progress_age_ms ({age}ms) should equal now - last_progress_at_ms (300_000ms)",
2588        );
2589        let reason = state
2590            .status_reason
2591            .as_deref()
2592            .expect("stalled state should populate status_reason");
2593        assert!(
2594            reason.contains("forward progress")
2595                && (reason.contains("#258") || reason.contains("issue #258")),
2596            "status_reason should mention forward progress and reference #258 ({reason})",
2597        );
2598    }
2599
2600    /// Heartbeat is fresh AND forward-progress is fresh: no stall.
2601    /// Status remains `building`. Ensures the new gate does not regress
2602    /// the happy path.
2603    #[test]
2604    fn lexical_state_stays_building_when_progress_is_recent() {
2605        let temp = tempfile::tempdir().expect("tempdir");
2606        let index_path = temp.path().join("index").join("v4");
2607        std::fs::create_dir_all(&index_path).expect("create index dir");
2608        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2609        let db_path = temp.path().join("agent_search.db");
2610        std::fs::write(&db_path, b"db").expect("write db file");
2611
2612        // F4 (cass tech debt): the input now carries full-precision
2613        // `now_ms` end-to-end. Tests that previously needed
2614        // `now_secs as i64 * 1000` no longer have to round-trip.
2615        let now_ms: i64 = 1_733_000_300_000;
2616
2617        let state = lexical_state_from_observations(LexicalObservationInput {
2618            index_path: &index_path,
2619            db_path: &db_path,
2620            stale_threshold: 60,
2621            last_indexed_at_ms: Some(1_733_000_000_000),
2622            now_ms,
2623            maintenance: SearchMaintenanceSnapshot {
2624                active: true,
2625                pid: Some(std::process::id()),
2626                started_at_ms: Some(now_ms - 30_000),
2627                db_path: Some(db_path.clone()),
2628                mode: Some(SearchMaintenanceMode::Index),
2629                job_id: Some("lexical_refresh-1-1".to_string()),
2630                job_kind: Some(SearchMaintenanceJobKind::LexicalRefresh),
2631                phase: Some("scanning".to_string()),
2632                updated_at_ms: Some(now_ms - 500),
2633                last_progress_at_ms: Some(now_ms - 1_000),
2634                orphaned: false,
2635            },
2636            checkpoint: None,
2637            current_db_fingerprint: None,
2638        });
2639
2640        assert!(state.rebuilding);
2641        assert!(!state.stalled, "fresh progress must not flip stalled");
2642        assert_eq!(state.status, "building");
2643    }
2644
2645    /// Legacy lock files (older cass that didn't write
2646    /// `last_progress_at_ms`) must not be misreported as stalled. The
2647    /// stall check only fires when an explicit `last_progress_at_ms`
2648    /// is present; absent that, we fall back to the previous behavior.
2649    #[test]
2650    fn lexical_state_does_not_stall_when_legacy_lock_omits_progress_field() {
2651        let temp = tempfile::tempdir().expect("tempdir");
2652        let index_path = temp.path().join("index").join("v4");
2653        std::fs::create_dir_all(&index_path).expect("create index dir");
2654        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2655        let db_path = temp.path().join("agent_search.db");
2656        std::fs::write(&db_path, b"db").expect("write db file");
2657
2658        // F4 (cass tech debt): the input now carries full-precision
2659        // `now_ms` end-to-end. Tests that previously needed
2660        // `now_secs as i64 * 1000` no longer have to round-trip.
2661        let now_ms: i64 = 1_733_000_300_000;
2662
2663        let state = lexical_state_from_observations(LexicalObservationInput {
2664            index_path: &index_path,
2665            db_path: &db_path,
2666            stale_threshold: 60,
2667            last_indexed_at_ms: Some(1_733_000_000_000),
2668            now_ms,
2669            maintenance: SearchMaintenanceSnapshot {
2670                active: true,
2671                pid: Some(std::process::id()),
2672                started_at_ms: Some(now_ms - 30_000),
2673                db_path: Some(db_path.clone()),
2674                mode: Some(SearchMaintenanceMode::Index),
2675                job_id: None,
2676                job_kind: None,
2677                phase: None,
2678                updated_at_ms: Some(now_ms - 500),
2679                last_progress_at_ms: None,
2680                orphaned: false,
2681            },
2682            checkpoint: None,
2683            current_db_fingerprint: None,
2684        });
2685
2686        assert!(state.rebuilding);
2687        assert!(
2688            !state.stalled,
2689            "legacy lock without last_progress_at_ms must NOT be misreported as stalled",
2690        );
2691        assert_eq!(state.status, "building");
2692        assert!(state.last_progress_age_ms.is_none());
2693    }
2694
2695    /// F4 (cass tech debt): the stall-age comparison must operate at
2696    /// full millisecond precision. Pre-fix, `now_ms` was derived from
2697    /// `now_secs * 1000` inside `lexical_state_from_observations`, which
2698    /// quantised the comparison to second resolution and made a
2699    /// 119_900 ms-old progress timestamp indistinguishable from a 119
2700    /// 000 ms-old one. This test pins a 119_500 ms age so that the only
2701    /// way it surfaces correctly as a `last_progress_age_ms` close to
2702    /// 119_500 (and NOT 119_000 or 120_000) is full-ms plumbing.
2703    #[test]
2704    fn lexical_state_progress_age_is_ms_precision_not_seconds_quantised() {
2705        let temp = tempfile::tempdir().expect("tempdir");
2706        let index_path = temp.path().join("index").join("v4");
2707        std::fs::create_dir_all(&index_path).expect("create index dir");
2708        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2709        let db_path = temp.path().join("agent_search.db");
2710        std::fs::write(&db_path, b"db").expect("write db file");
2711
2712        // `now_ms` deliberately lands at .700 of a second so any
2713        // second-quantisation upstream would shave 700 ms off the diff.
2714        let now_ms: i64 = 1_733_000_300_700;
2715        let last_progress_at_ms = now_ms - 119_500;
2716
2717        let state = lexical_state_from_observations(LexicalObservationInput {
2718            index_path: &index_path,
2719            db_path: &db_path,
2720            stale_threshold: 60,
2721            last_indexed_at_ms: Some(1_733_000_000_000),
2722            now_ms,
2723            maintenance: SearchMaintenanceSnapshot {
2724                active: true,
2725                pid: Some(std::process::id()),
2726                started_at_ms: Some(now_ms - 600_000),
2727                db_path: Some(db_path.clone()),
2728                mode: Some(SearchMaintenanceMode::WatchStartup),
2729                job_id: Some("lexical_refresh-1-1".to_string()),
2730                job_kind: Some(SearchMaintenanceJobKind::LexicalRefresh),
2731                phase: Some("watch_startup".to_string()),
2732                updated_at_ms: Some(now_ms - 500),
2733                last_progress_at_ms: Some(last_progress_at_ms),
2734                orphaned: false,
2735            },
2736            checkpoint: None,
2737            current_db_fingerprint: None,
2738        });
2739
2740        let age = state
2741            .last_progress_age_ms
2742            .expect("forward-progress age must be computed");
2743        assert_eq!(
2744            age, 119_500,
2745            "ms-precision plumbing must surface the exact diff (no second-quantisation)"
2746        );
2747        // 119_500 ms is still under the default 120 s stall threshold,
2748        // so we should be `building`, not `stalled`. Pre-F4, a
2749        // second-quantised clock could either floor the diff to 119_000
2750        // (still building, OK) OR — on different `.fff` ms suffixes —
2751        // round it to 120_000 (false-positive stall). Pinning the
2752        // expected status here protects both edges.
2753        assert!(
2754            !state.stalled,
2755            "119.5 s lag must remain `building`, not flip to `stalled`",
2756        );
2757        assert_eq!(state.status, "building");
2758    }
2759
2760    /// Coordination outcome layer must also degrade to `Stale` when
2761    /// forward progress is stuck — search-side single-flight callers
2762    /// then route around the wedged worker instead of attaching to it.
2763    #[test]
2764    fn coordination_reports_stale_when_forward_progress_is_stuck() {
2765        let now_ms: i64 = 1_733_000_300_000;
2766        let snapshot = SearchMaintenanceSnapshot {
2767            active: true,
2768            pid: Some(12345),
2769            started_at_ms: Some(now_ms - 600_000),
2770            db_path: Some(PathBuf::from("/tmp/cass/agent_search.db")),
2771            mode: Some(SearchMaintenanceMode::WatchStartup),
2772            job_id: Some("lexical_refresh-1-12345".to_string()),
2773            job_kind: Some(SearchMaintenanceJobKind::LexicalRefresh),
2774            phase: Some("watch_startup".to_string()),
2775            updated_at_ms: Some(now_ms - 500),
2776            last_progress_at_ms: Some(now_ms - 300_000),
2777            orphaned: false,
2778        };
2779
2780        let outcome = evaluate_maintenance_coordination_from_snapshot(&snapshot, now_ms);
2781        match outcome {
2782            MaintenanceCoordinationOutcome::Stale { ref reason, .. } => {
2783                assert!(
2784                    reason.contains("forward progress")
2785                        && (reason.contains("#258") || reason.contains("issue #258")),
2786                    "stalled coordination reason should mention forward progress and #258: {reason}",
2787                );
2788            }
2789            other => {
2790                panic!("stalled forward-progress snapshot must coordinate as Stale, got {other:?}",)
2791            }
2792        }
2793    }
2794
2795    #[test]
2796    fn inspect_search_assets_preserves_semantic_database_unavailable_signal() {
2797        let temp = tempfile::tempdir().expect("tempdir");
2798        let index_path = temp.path().join("index").join("v4");
2799        std::fs::create_dir_all(&index_path).expect("create index dir");
2800        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2801
2802        let db_path = temp.path().join("agent_search.db");
2803        std::fs::create_dir_all(&db_path).expect("create unopenable db path");
2804
2805        let vector_path = vector_index_path(temp.path(), HashEmbedder::default().id());
2806        std::fs::create_dir_all(vector_path.parent().expect("vector parent"))
2807            .expect("create vector dir");
2808        std::fs::write(&vector_path, b"index").expect("write vector index");
2809
2810        let snapshot = inspect_search_assets(InspectSearchAssetsInput {
2811            data_dir: temp.path(),
2812            db_path: &db_path,
2813            stale_threshold: 60,
2814            last_indexed_at_ms: Some(1_733_000_000_000),
2815            now_ms: 1_733_000_001_000,
2816            maintenance: SearchMaintenanceSnapshot::default(),
2817            semantic_preference: SemanticPreference::HashFallback,
2818            db_available: false,
2819            compute_lexical_fingerprint: false,
2820            inspect_semantic: true,
2821        })
2822        .expect("asset inspection should not fail when db availability is already known");
2823
2824        assert_ne!(snapshot.lexical.status, "error");
2825        assert_eq!(snapshot.semantic.status, "error");
2826        assert_eq!(snapshot.semantic.availability, "database_unavailable");
2827        assert_eq!(snapshot.semantic.fallback_mode, Some("lexical"));
2828        assert!(snapshot.semantic.summary.contains("db unavailable"));
2829    }
2830
2831    #[test]
2832    fn inspect_search_assets_can_skip_semantic_db_open_for_fast_paths() {
2833        let temp = tempfile::tempdir().expect("tempdir");
2834        let index_path = temp.path().join("index").join("v4");
2835        std::fs::create_dir_all(&index_path).expect("create index dir");
2836        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2837
2838        let db_path = temp.path().join("agent_search.db");
2839        std::fs::create_dir_all(&db_path).expect("create unopenable db path");
2840
2841        let vector_path = vector_index_path(temp.path(), HashEmbedder::default().id());
2842        std::fs::create_dir_all(vector_path.parent().expect("vector parent"))
2843            .expect("create vector dir");
2844        std::fs::write(&vector_path, b"index").expect("write vector index");
2845
2846        let snapshot = inspect_search_assets(InspectSearchAssetsInput {
2847            data_dir: temp.path(),
2848            db_path: &db_path,
2849            stale_threshold: 60,
2850            last_indexed_at_ms: Some(1_733_000_000_000),
2851            now_ms: 1_733_000_001_000,
2852            maintenance: SearchMaintenanceSnapshot::default(),
2853            semantic_preference: SemanticPreference::HashFallback,
2854            db_available: false,
2855            compute_lexical_fingerprint: false,
2856            inspect_semantic: false,
2857        })
2858        .expect("asset inspection should not open semantic DB when semantic inspection is skipped");
2859
2860        assert_ne!(snapshot.lexical.status, "error");
2861        assert_eq!(snapshot.semantic.status, "not_inspected");
2862        assert_eq!(snapshot.semantic.availability, "not_inspected");
2863        assert_eq!(snapshot.semantic.fallback_mode, Some("lexical"));
2864    }
2865
2866    #[test]
2867    fn inspect_search_assets_trusts_db_probe_for_semantic_metadata_probe() {
2868        let temp = tempfile::tempdir().expect("tempdir");
2869        let index_path = temp.path().join("index").join("v4");
2870        std::fs::create_dir_all(&index_path).expect("create index dir");
2871        std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2872
2873        let db_path = temp.path().join("agent_search.db");
2874        std::fs::create_dir_all(&db_path).expect("create unopenable db path");
2875
2876        let vector_path = vector_index_path(temp.path(), HashEmbedder::default().id());
2877        std::fs::create_dir_all(vector_path.parent().expect("vector parent"))
2878            .expect("create vector dir");
2879        std::fs::write(&vector_path, b"index").expect("write vector index");
2880
2881        let snapshot = inspect_search_assets(InspectSearchAssetsInput {
2882            data_dir: temp.path(),
2883            db_path: &db_path,
2884            stale_threshold: 60,
2885            last_indexed_at_ms: Some(1_733_000_000_000),
2886            now_ms: 1_733_000_001_000,
2887            maintenance: SearchMaintenanceSnapshot::default(),
2888            semantic_preference: SemanticPreference::HashFallback,
2889            db_available: true,
2890            compute_lexical_fingerprint: false,
2891            inspect_semantic: true,
2892        })
2893        .expect("semantic metadata probe should trust the existing DB availability signal");
2894
2895        assert_eq!(snapshot.semantic.status, "hash_fallback");
2896        assert_eq!(snapshot.semantic.availability, "hash_fallback");
2897        assert!(snapshot.semantic.can_search);
2898    }
2899
2900    #[test]
2901    fn semantic_state_reports_hash_fallback_as_searchable() {
2902        let state = semantic_state_from_availability(
2903            Path::new("/tmp/cass"),
2904            &SemanticAvailability::HashFallback,
2905            SemanticPreference::HashFallback,
2906            None,
2907        );
2908
2909        assert_eq!(state.status, "hash_fallback");
2910        assert_eq!(state.availability, "hash_fallback");
2911        assert!(state.available);
2912        assert!(state.can_search);
2913        assert_eq!(state.fallback_mode, None);
2914    }
2915
2916    #[test]
2917    fn semantic_preference_surface_preserves_backend_and_model_dir_projection() {
2918        let data_dir = Path::new("/tmp/cass");
2919        let cases = [
2920            (
2921                SemanticPreference::DefaultModel,
2922                "fastembed",
2923                Some(FastEmbedder::default_model_dir(data_dir)),
2924            ),
2925            (SemanticPreference::HashFallback, "hash", None),
2926        ];
2927
2928        for (preference, expected_backend, expected_model_dir) in cases {
2929            let surface = semantic_preference_surface(data_dir, preference);
2930
2931            assert_eq!(surface.preferred_backend, expected_backend);
2932            assert_eq!(surface.model_dir, expected_model_dir);
2933        }
2934    }
2935
2936    #[test]
2937    fn semantic_state_detects_progressive_and_hnsw_assets() {
2938        let temp = tempfile::tempdir().expect("tempdir");
2939        let vector_dir = temp.path().join(VECTOR_INDEX_DIR);
2940        std::fs::create_dir_all(&vector_dir).expect("create vector dir");
2941        std::fs::write(vector_dir.join("vector.fast.idx"), b"fast").expect("write fast tier");
2942        std::fs::write(vector_dir.join("vector.quality.idx"), b"quality")
2943            .expect("write quality tier");
2944        let hnsw_path = hnsw_index_path(temp.path(), FastEmbedder::embedder_id_static());
2945        std::fs::write(&hnsw_path, b"hnsw").expect("write hnsw");
2946
2947        let state = semantic_state_from_availability(
2948            temp.path(),
2949            &SemanticAvailability::Ready {
2950                embedder_id: FastEmbedder::embedder_id_static().to_string(),
2951            },
2952            SemanticPreference::DefaultModel,
2953            None,
2954        );
2955
2956        assert_eq!(state.status, "ready");
2957        assert!(state.progressive_ready);
2958        assert!(state.hnsw_ready);
2959        assert_eq!(
2960            state.embedder_id.as_deref(),
2961            Some(FastEmbedder::embedder_id_static())
2962        );
2963    }
2964
2965    #[test]
2966    fn semantic_state_reports_backfill_when_manifest_only_has_stale_assets() {
2967        let temp = tempfile::tempdir().expect("tempdir");
2968        let mut manifest = SemanticManifest {
2969            fast_tier: Some(ArtifactRecord {
2970                tier: crate::search::semantic_manifest::TierKind::Fast,
2971                embedder_id: HashEmbedder::default().id().to_string(),
2972                model_revision: "hash".to_string(),
2973                schema_version: crate::search::policy::SEMANTIC_SCHEMA_VERSION,
2974                chunking_version: crate::search::policy::CHUNKING_STRATEGY_VERSION,
2975                dimension: 256,
2976                doc_count: 12,
2977                conversation_count: 3,
2978                db_fingerprint: "stale-db".to_string(),
2979                index_path: "vector_index/vector.fast.idx".to_string(),
2980                size_bytes: 4096,
2981                started_at_ms: 1_733_100_000_000,
2982                completed_at_ms: 1_733_100_100_000,
2983                ready: true,
2984            }),
2985            backlog: crate::search::semantic_manifest::BacklogLedger {
2986                total_conversations: 20,
2987                fast_tier_processed: 3,
2988                quality_tier_processed: 0,
2989                db_fingerprint: "current-db".to_string(),
2990                computed_at_ms: 1_733_100_200_000,
2991            },
2992            checkpoint: Some(BuildCheckpoint {
2993                tier: crate::search::semantic_manifest::TierKind::Fast,
2994                embedder_id: HashEmbedder::default().id().to_string(),
2995                last_offset: 77,
2996                docs_embedded: 66,
2997                conversations_processed: 3,
2998                total_conversations: 20,
2999                db_fingerprint: "current-db".to_string(),
3000                schema_version: crate::search::policy::SEMANTIC_SCHEMA_VERSION,
3001                chunking_version: crate::search::policy::CHUNKING_STRATEGY_VERSION,
3002                saved_at_ms: 1_733_100_300_000,
3003                last_message_id: None,
3004            }),
3005            ..Default::default()
3006        };
3007        manifest.save(temp.path()).expect("save semantic manifest");
3008
3009        let state = semantic_state_from_availability(
3010            temp.path(),
3011            &SemanticAvailability::NeedsConsent,
3012            SemanticPreference::DefaultModel,
3013            Some("current-db"),
3014        );
3015
3016        assert_eq!(state.status, "building");
3017        assert_eq!(state.availability, "index_building");
3018        assert!(!state.can_search);
3019        assert_eq!(state.fallback_mode, Some("lexical"));
3020        assert!(state.summary.contains("backfill"));
3021        assert!(
3022            state
3023                .hint
3024                .as_deref()
3025                .is_some_and(|hint| hint.contains("finish backfilling"))
3026        );
3027    }
3028
3029    #[test]
3030    fn semantic_state_prefers_current_hash_tier_over_missing_model() {
3031        let temp = tempfile::tempdir().expect("tempdir");
3032        let mut manifest = SemanticManifest {
3033            fast_tier: Some(ArtifactRecord {
3034                tier: crate::search::semantic_manifest::TierKind::Fast,
3035                embedder_id: HashEmbedder::default().id().to_string(),
3036                model_revision: "hash".to_string(),
3037                schema_version: crate::search::policy::SEMANTIC_SCHEMA_VERSION,
3038                chunking_version: crate::search::policy::CHUNKING_STRATEGY_VERSION,
3039                dimension: 256,
3040                doc_count: 12,
3041                conversation_count: 3,
3042                db_fingerprint: "current-db".to_string(),
3043                index_path: "vector_index/vector.fast.idx".to_string(),
3044                size_bytes: 4096,
3045                started_at_ms: 1_733_100_000_000,
3046                completed_at_ms: 1_733_100_100_000,
3047                ready: true,
3048            }),
3049            ..Default::default()
3050        };
3051        manifest.save(temp.path()).expect("save semantic manifest");
3052        let vector_path = vector_index_path(temp.path(), HashEmbedder::default().id());
3053        std::fs::create_dir_all(vector_path.parent().expect("vector parent"))
3054            .expect("create vector dir");
3055        std::fs::write(&vector_path, b"fast").expect("write fast vector index");
3056
3057        let state = semantic_state_from_availability(
3058            temp.path(),
3059            &SemanticAvailability::NeedsConsent,
3060            SemanticPreference::DefaultModel,
3061            Some("current-db"),
3062        );
3063
3064        assert_eq!(state.status, "ready");
3065        assert_eq!(state.availability, "ready");
3066        assert!(state.can_search);
3067        assert_eq!(state.fallback_mode, None);
3068        assert_eq!(
3069            state.embedder_id.as_deref(),
3070            Some(HashEmbedder::default().id())
3071        );
3072        assert_eq!(state.model_dir, None);
3073        assert_eq!(
3074            state.vector_index_path.as_deref(),
3075            Some(vector_path.as_path())
3076        );
3077        assert_eq!(state.hint, None);
3078    }
3079
3080    #[test]
3081    fn semantic_state_treats_ready_quality_tier_with_unknown_db_match_as_queryable() {
3082        let temp = tempfile::tempdir().expect("tempdir");
3083        let mut manifest = SemanticManifest {
3084            quality_tier: Some(ArtifactRecord {
3085                tier: crate::search::semantic_manifest::TierKind::Quality,
3086                embedder_id: HashEmbedder::default().id().to_string(),
3087                model_revision: "hash".to_string(),
3088                schema_version: crate::search::policy::SEMANTIC_SCHEMA_VERSION,
3089                chunking_version: crate::search::policy::CHUNKING_STRATEGY_VERSION,
3090                dimension: 256,
3091                doc_count: 249,
3092                conversation_count: 21,
3093                db_fingerprint: "boxed-db".to_string(),
3094                index_path: "vector_index/vector.quality.idx".to_string(),
3095                size_bytes: 221_824,
3096                started_at_ms: 1_733_100_000_000,
3097                completed_at_ms: 1_733_100_100_000,
3098                ready: true,
3099            }),
3100            ..Default::default()
3101        };
3102        manifest.save(temp.path()).expect("save semantic manifest");
3103
3104        let state = semantic_state_from_availability(
3105            temp.path(),
3106            &SemanticAvailability::NeedsConsent,
3107            SemanticPreference::DefaultModel,
3108            None,
3109        );
3110
3111        assert_eq!(state.quality_tier.current_db_matches, None);
3112        assert!(
3113            state.quality_tier_published,
3114            "a ready quality tier with unknown DB match should remain visible as published in boxed data-dir status"
3115        );
3116        assert!(
3117            state.semantic_only_search_available,
3118            "semantic-only search can still run when the quality tier is ready and DB match is unknown"
3119        );
3120        assert!(state.can_search);
3121        assert_eq!(state.fallback_mode, None);
3122    }
3123
3124    #[test]
3125    fn semantic_state_promotes_complete_current_shard_generation() {
3126        let temp = tempfile::tempdir().expect("tempdir");
3127        let embedder_id = HashEmbedder::default().id().to_string();
3128        let mut records = Vec::new();
3129        for shard_index in 0..2_u32 {
3130            let relative_path = format!("vector_index/shards/fast-hash/shard-{shard_index}.fsvi");
3131            let path = temp.path().join(&relative_path);
3132            std::fs::create_dir_all(path.parent().expect("shard parent"))
3133                .expect("create shard parent");
3134            std::fs::write(&path, b"fsvi").expect("write shard placeholder");
3135            records.push(SemanticShardRecord {
3136                tier: TierKind::Fast,
3137                embedder_id: embedder_id.clone(),
3138                model_revision: "hash".to_string(),
3139                schema_version: SEMANTIC_SCHEMA_VERSION,
3140                chunking_version: CHUNKING_STRATEGY_VERSION,
3141                dimension: HashEmbedder::default().dimension(),
3142                shard_index,
3143                shard_count: 2,
3144                doc_count: 10 + u64::from(shard_index),
3145                total_conversations: 7,
3146                db_fingerprint: "current-db".to_string(),
3147                index_path: relative_path,
3148                quantization: "f16".to_string(),
3149                mmap_ready: true,
3150                ann_index_path: None,
3151                ann_size_bytes: 0,
3152                ann_ready: false,
3153                size_bytes: 100 + u64::from(shard_index),
3154                started_at_ms: 1_733_100_000_000,
3155                completed_at_ms: 1_733_100_000_000 + i64::from(shard_index),
3156                ready: true,
3157            });
3158        }
3159        let mut shards = SemanticShardManifest {
3160            shards: records,
3161            ..Default::default()
3162        };
3163        shards.save(temp.path()).expect("save shard manifest");
3164
3165        let state = semantic_state_from_availability(
3166            temp.path(),
3167            &SemanticAvailability::IndexMissing {
3168                index_path: vector_index_path(temp.path(), &embedder_id),
3169            },
3170            SemanticPreference::HashFallback,
3171            Some("current-db"),
3172        );
3173
3174        assert_eq!(state.status, "ready");
3175        assert_eq!(state.availability, "ready");
3176        assert!(state.can_search);
3177        assert_eq!(state.fallback_mode, None);
3178        assert_eq!(state.fast_tier.doc_count, Some(21));
3179        let expected_path = temp
3180            .path()
3181            .join("vector_index/shards/fast-hash/shard-0.fsvi");
3182        assert_eq!(
3183            state.vector_index_path.as_deref(),
3184            Some(expected_path.as_path())
3185        );
3186    }
3187
3188    #[test]
3189    fn semantic_state_rejects_complete_shard_generation_with_unsafe_path() {
3190        let temp = tempfile::tempdir().expect("tempdir");
3191        let outside = tempfile::tempdir().expect("outside tempdir");
3192        let outside_path = outside.path().join("outside.fsvi");
3193        std::fs::write(&outside_path, b"fsvi").expect("write outside placeholder");
3194        let embedder_id = HashEmbedder::default().id().to_string();
3195        let mut shards = SemanticShardManifest {
3196            shards: vec![SemanticShardRecord {
3197                tier: TierKind::Fast,
3198                embedder_id: embedder_id.clone(),
3199                model_revision: "hash".to_string(),
3200                schema_version: SEMANTIC_SCHEMA_VERSION,
3201                chunking_version: CHUNKING_STRATEGY_VERSION,
3202                dimension: HashEmbedder::default().dimension(),
3203                shard_index: 0,
3204                shard_count: 1,
3205                doc_count: 10,
3206                total_conversations: 7,
3207                db_fingerprint: "current-db".to_string(),
3208                index_path: outside_path.to_string_lossy().to_string(),
3209                quantization: "f16".to_string(),
3210                mmap_ready: true,
3211                ann_index_path: None,
3212                ann_size_bytes: 0,
3213                ann_ready: false,
3214                size_bytes: 100,
3215                started_at_ms: 1_733_100_000_000,
3216                completed_at_ms: 1_733_100_000_001,
3217                ready: true,
3218            }],
3219            ..Default::default()
3220        };
3221        shards.save(temp.path()).expect("save shard manifest");
3222
3223        let base_vector_path = vector_index_path(temp.path(), &embedder_id);
3224        let state = semantic_state_from_availability(
3225            temp.path(),
3226            &SemanticAvailability::IndexMissing {
3227                index_path: base_vector_path.clone(),
3228            },
3229            SemanticPreference::HashFallback,
3230            Some("current-db"),
3231        );
3232
3233        assert_ne!(state.status, "ready");
3234        assert!(!state.can_search);
3235        assert_eq!(
3236            state.vector_index_path.as_deref(),
3237            Some(base_vector_path.as_path())
3238        );
3239        assert_ne!(
3240            state.vector_index_path.as_deref(),
3241            Some(outside_path.as_path())
3242        );
3243    }
3244
3245    // -----------------------------------------------------------------------
3246    // Maintenance coordination tests
3247    // -----------------------------------------------------------------------
3248
3249    fn make_active_snapshot(now_ms: i64) -> SearchMaintenanceSnapshot {
3250        SearchMaintenanceSnapshot {
3251            active: true,
3252            pid: Some(12345),
3253            started_at_ms: Some(now_ms - 5_000),
3254            db_path: Some(PathBuf::from("/tmp/cass/agent_search.db")),
3255            mode: Some(SearchMaintenanceMode::Index),
3256            job_id: Some("lexical_refresh-1000-12345".to_string()),
3257            job_kind: Some(SearchMaintenanceJobKind::LexicalRefresh),
3258            phase: Some("scanning".to_string()),
3259            updated_at_ms: Some(now_ms - 500),
3260            last_progress_at_ms: Some(now_ms - 500),
3261            orphaned: false,
3262        }
3263    }
3264
3265    #[test]
3266    fn coordination_no_active_job_when_snapshot_inactive() {
3267        let snapshot = SearchMaintenanceSnapshot::default();
3268        let outcome = evaluate_maintenance_coordination_from_snapshot(&snapshot, 1_733_000_000_000);
3269        assert_eq!(outcome, MaintenanceCoordinationOutcome::Idle);
3270    }
3271
3272    #[test]
3273    fn coordination_tracks_active_legacy_lock_without_job_id() {
3274        let snapshot = SearchMaintenanceSnapshot {
3275            active: true,
3276            pid: Some(12345),
3277            job_id: None,
3278            mode: Some(SearchMaintenanceMode::Index),
3279            ..Default::default()
3280        };
3281        let outcome = evaluate_maintenance_coordination_from_snapshot(&snapshot, 1_733_000_000_000);
3282        if let MaintenanceCoordinationOutcome::Active {
3283            ref job_id,
3284            job_kind,
3285            ..
3286        } = outcome
3287        {
3288            assert_eq!(job_id, "index-active-lock-12345");
3289            assert_eq!(job_kind, SearchMaintenanceJobKind::LexicalRefresh);
3290        } else {
3291            assert!(
3292                matches!(outcome, MaintenanceCoordinationOutcome::Active { .. }),
3293                "legacy active lock must remain active, got {outcome:?}"
3294            );
3295        }
3296    }
3297
3298    #[test]
3299    fn coordination_active_job_with_fresh_heartbeat() {
3300        let now_ms = 1_733_000_000_000i64;
3301        let snapshot = make_active_snapshot(now_ms);
3302        let outcome = evaluate_maintenance_coordination_from_snapshot(&snapshot, now_ms);
3303        if let MaintenanceCoordinationOutcome::Active {
3304            ref job_id,
3305            ref phase,
3306            ..
3307        } = outcome
3308        {
3309            assert_eq!(job_id, "lexical_refresh-1000-12345");
3310            assert_eq!(phase.as_deref(), Some("scanning"));
3311        } else {
3312            assert!(
3313                matches!(outcome, MaintenanceCoordinationOutcome::Active { .. }),
3314                "expected ActiveJob, got {outcome:?}"
3315            );
3316        }
3317    }
3318
3319    #[test]
3320    fn coordination_stale_job_with_old_heartbeat() {
3321        let now_ms = 1_733_000_000_000i64;
3322        let snapshot = SearchMaintenanceSnapshot {
3323            updated_at_ms: Some(now_ms - 60_000),
3324            ..make_active_snapshot(now_ms)
3325        };
3326        let outcome = evaluate_maintenance_coordination_from_snapshot(&snapshot, now_ms);
3327        if let MaintenanceCoordinationOutcome::Stale {
3328            ref job_id,
3329            ref reason,
3330        } = outcome
3331        {
3332            assert_eq!(job_id, "lexical_refresh-1000-12345");
3333            assert!(reason.contains("60000ms"), "reason={reason}");
3334        } else {
3335            assert!(
3336                matches!(outcome, MaintenanceCoordinationOutcome::Stale { .. }),
3337                "expected StaleJob, got {outcome:?}"
3338            );
3339        }
3340    }
3341
3342    #[test]
3343    fn coordination_missing_heartbeat_timestamp_still_respects_active_flock() {
3344        let now_ms = 1_733_000_000_000i64;
3345        let snapshot = SearchMaintenanceSnapshot {
3346            updated_at_ms: None,
3347            ..make_active_snapshot(now_ms)
3348        };
3349        let outcome = evaluate_maintenance_coordination_from_snapshot(&snapshot, now_ms);
3350        if let MaintenanceCoordinationOutcome::Active { updated_at_ms, .. } = outcome {
3351            assert_eq!(updated_at_ms, now_ms);
3352        } else {
3353            assert!(
3354                matches!(outcome, MaintenanceCoordinationOutcome::Active { .. }),
3355                "missing heartbeat metadata must not hide an active flock, got {outcome:?}"
3356            );
3357        }
3358    }
3359
3360    #[test]
3361    fn decision_launch_when_no_job() {
3362        let snapshot = SearchMaintenanceSnapshot::default();
3363        let decision = decide_maintenance_action_from_snapshot(&snapshot, 1_733_000_000_000);
3364        assert_eq!(decision, MaintenanceDecision::Launch);
3365    }
3366
3367    #[test]
3368    fn decision_launch_when_no_lock_file() {
3369        let temp = tempfile::tempdir().expect("tempdir");
3370        let decision = decide_maintenance_action(temp.path(), 1_733_000_000_000);
3371        assert_eq!(decision, MaintenanceDecision::Launch);
3372    }
3373
3374    #[test]
3375    fn decision_attaches_when_active_lock_has_stale_heartbeat() {
3376        let now_ms = 1_733_000_000_000i64;
3377        let snapshot = SearchMaintenanceSnapshot {
3378            updated_at_ms: Some(now_ms - 60_000),
3379            ..make_active_snapshot(now_ms)
3380        };
3381        let decision = decide_maintenance_action_from_snapshot(&snapshot, now_ms);
3382        if let MaintenanceDecision::AttachOrWait {
3383            ref job_id,
3384            ref phase,
3385            elapsed_ms,
3386            ..
3387        } = decision
3388        {
3389            assert_eq!(job_id, "lexical_refresh-1000-12345");
3390            assert_eq!(phase.as_deref(), Some("scanning"));
3391            assert_eq!(elapsed_ms, 5_000);
3392        } else {
3393            assert!(
3394                matches!(decision, MaintenanceDecision::AttachOrWait { .. }),
3395                "stale heartbeat still has an active lock, got {decision:?}"
3396            );
3397        }
3398    }
3399
3400    #[test]
3401    fn decision_attach_when_active_fresh_job() {
3402        let now_ms = 1_733_000_000_000i64;
3403        let snapshot = make_active_snapshot(now_ms);
3404        let decision = decide_maintenance_action_from_snapshot(&snapshot, now_ms);
3405        if let MaintenanceDecision::AttachOrWait {
3406            ref job_id,
3407            elapsed_ms,
3408            ..
3409        } = decision
3410        {
3411            assert_eq!(job_id, "lexical_refresh-1000-12345");
3412            assert_eq!(elapsed_ms, 5_000);
3413        } else {
3414            assert!(
3415                matches!(decision, MaintenanceDecision::AttachOrWait { .. }),
3416                "expected AttachOrWait, got {decision:?}"
3417            );
3418        }
3419    }
3420
3421    #[test]
3422    fn poll_returns_immediately_when_no_active_job() {
3423        let temp = tempfile::tempdir().expect("tempdir");
3424        let result = poll_maintenance_until_idle(
3425            temp.path(),
3426            Some(Duration::from_millis(500)),
3427            Some(Duration::from_millis(50)),
3428        );
3429        assert!(!result.timed_out);
3430        assert_eq!(result.polls, 1);
3431        assert!(
3432            matches!(result.outcome, MaintenanceCoordinationOutcome::Idle),
3433            "expected NoActiveJob"
3434        );
3435        assert!(
3436            result.elapsed <= Duration::from_millis(500),
3437            "immediate idle poll should finish before timeout, elapsed={:?}",
3438            result.elapsed
3439        );
3440    }
3441
3442    #[test]
3443    fn poll_returns_active_on_timeout_when_lock_held() {
3444        use fs2::FileExt;
3445        let temp = tempfile::tempdir().expect("tempdir");
3446        let lock_path = temp.path().join("index-run.lock");
3447        let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
3448        let owner = OpenOptions::new()
3449            .create(true)
3450            .read(true)
3451            .write(true)
3452            .truncate(true)
3453            .open(&lock_path)
3454            .expect("open owner handle");
3455        owner.try_lock_exclusive().expect("acquire lock");
3456        std::fs::write(
3457            &lock_path,
3458            format!(
3459                "pid=99999\nstarted_at_ms={}\nupdated_at_ms={}\ndb_path=/tmp/test.db\nmode=index\njob_id=test-job-1\njob_kind=lexical_refresh\nphase=scanning\n",
3460                now_ms - 1_000,
3461                now_ms,
3462            ),
3463        )
3464        .expect("write lock metadata");
3465
3466        let result = poll_maintenance_until_idle(
3467            temp.path(),
3468            Some(Duration::from_millis(300)),
3469            Some(Duration::from_millis(50)),
3470        );
3471        assert!(result.timed_out, "should time out when lock is held");
3472        assert!(result.polls >= 2, "should have polled multiple times");
3473        assert!(
3474            matches!(
3475                result.outcome,
3476                MaintenanceCoordinationOutcome::Active { .. }
3477            ),
3478            "expected ActiveJob on timeout"
3479        );
3480
3481        let _ = FileExt::unlock(&owner);
3482    }
3483
3484    #[test]
3485    fn poll_times_out_instead_of_declaring_stale_held_lock_idle() {
3486        use fs2::FileExt;
3487        let temp = tempfile::tempdir().expect("tempdir");
3488        let lock_path = temp.path().join("index-run.lock");
3489        let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
3490        let owner = OpenOptions::new()
3491            .create(true)
3492            .read(true)
3493            .write(true)
3494            .truncate(true)
3495            .open(&lock_path)
3496            .expect("open owner handle");
3497        owner.try_lock_exclusive().expect("acquire lock");
3498        std::fs::write(
3499            &lock_path,
3500            format!(
3501                "pid=99999\nstarted_at_ms={}\nupdated_at_ms={}\ndb_path=/tmp/test.db\nmode=index\njob_id=test-job-stale\njob_kind=lexical_refresh\nphase=scanning\n",
3502                now_ms - 120_000,
3503                now_ms - 120_000,
3504            ),
3505        )
3506        .expect("write lock metadata");
3507
3508        let result = poll_maintenance_until_idle(
3509            temp.path(),
3510            Some(Duration::from_millis(150)),
3511            Some(Duration::from_millis(25)),
3512        );
3513        assert!(result.timed_out, "held stale lock is still not idle");
3514        assert!(
3515            matches!(result.outcome, MaintenanceCoordinationOutcome::Stale { .. }),
3516            "expected stale held lock on timeout, got {:?}",
3517            result.outcome
3518        );
3519
3520        let _ = FileExt::unlock(&owner);
3521    }
3522
3523    #[test]
3524    fn poll_detects_release_mid_wait() {
3525        use fs2::FileExt;
3526        let temp = tempfile::tempdir().expect("tempdir");
3527        let lock_path = temp.path().join("index-run.lock");
3528        let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
3529        let owner = OpenOptions::new()
3530            .create(true)
3531            .read(true)
3532            .write(true)
3533            .truncate(true)
3534            .open(&lock_path)
3535            .expect("open owner handle");
3536        owner.try_lock_exclusive().expect("acquire lock");
3537        std::fs::write(
3538            &lock_path,
3539            format!(
3540                "pid=99999\nstarted_at_ms={}\nupdated_at_ms={}\ndb_path=/tmp/test.db\nmode=index\njob_id=test-job-2\njob_kind=lexical_refresh\nphase=committing\n",
3541                now_ms - 1_000,
3542                now_ms,
3543            ),
3544        )
3545        .expect("write lock metadata");
3546
3547        let temp_path = temp.path().to_path_buf();
3548        let release_thread = std::thread::spawn(move || {
3549            std::thread::sleep(Duration::from_millis(150));
3550            let _ = owner.set_len(0);
3551            let _ = FileExt::unlock(&owner);
3552            drop(owner);
3553        });
3554
3555        let result = poll_maintenance_until_idle(
3556            &temp_path,
3557            Some(Duration::from_secs(2)),
3558            Some(Duration::from_millis(50)),
3559        );
3560        assert!(!result.timed_out, "should detect release before timeout");
3561        release_thread.join().expect("release thread");
3562    }
3563
3564    #[test]
3565    fn failopen_returns_failopen_when_lexical_available_and_job_active() {
3566        let temp = tempfile::tempdir().expect("tempdir");
3567        let lock_path = temp.path().join("index-run.lock");
3568        let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
3569
3570        use fs2::FileExt;
3571        let owner = OpenOptions::new()
3572            .create(true)
3573            .read(true)
3574            .write(true)
3575            .truncate(true)
3576            .open(&lock_path)
3577            .expect("open owner handle");
3578        owner.try_lock_exclusive().expect("acquire lock");
3579        std::fs::write(
3580            &lock_path,
3581            format!(
3582                "pid=99999\nstarted_at_ms={}\nupdated_at_ms={}\ndb_path=/tmp/test.db\nmode=index\njob_id=fo-job-1\njob_kind=lexical_refresh\nphase=indexing\n",
3583                now_ms - 1_000,
3584                now_ms,
3585            ),
3586        )
3587        .expect("write lock metadata");
3588
3589        let decision = decide_search_failopen(temp.path(), now_ms, true);
3590        if let MaintenanceDecision::FailOpen { ref reason } = decision {
3591            assert!(reason.contains("fo-job-1"), "reason={reason}");
3592            assert!(reason.contains("failing open"), "reason={reason}");
3593        } else {
3594            assert!(
3595                matches!(decision, MaintenanceDecision::FailOpen { .. }),
3596                "expected FailOpen, got {decision:?}"
3597            );
3598        }
3599
3600        let decision_no_lexical = decide_search_failopen(temp.path(), now_ms, false);
3601        assert!(
3602            matches!(
3603                decision_no_lexical,
3604                MaintenanceDecision::AttachOrWait { .. }
3605            ),
3606            "without lexical must attach, got {decision_no_lexical:?}"
3607        );
3608
3609        let _ = FileExt::unlock(&owner);
3610    }
3611
3612    #[test]
3613    fn failopen_handles_active_stale_heartbeat_without_launching_repair() {
3614        let temp = tempfile::tempdir().expect("tempdir");
3615        let lock_path = temp.path().join("index-run.lock");
3616        let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
3617
3618        use fs2::FileExt;
3619        let owner = OpenOptions::new()
3620            .create(true)
3621            .read(true)
3622            .write(true)
3623            .truncate(true)
3624            .open(&lock_path)
3625            .expect("open owner handle");
3626        owner.try_lock_exclusive().expect("acquire lock");
3627        std::fs::write(
3628            &lock_path,
3629            format!(
3630                "pid=99999\nstarted_at_ms={}\nupdated_at_ms={}\ndb_path=/tmp/test.db\nmode=index\njob_id=fo-stale-1\njob_kind=lexical_refresh\nphase=indexing\n",
3631                now_ms - 120_000,
3632                now_ms - 120_000,
3633            ),
3634        )
3635        .expect("write lock metadata");
3636
3637        let decision = decide_search_failopen(temp.path(), now_ms, true);
3638        if let MaintenanceDecision::FailOpen { ref reason } = decision {
3639            assert!(reason.contains("fo-stale-1"), "reason={reason}");
3640            assert!(reason.contains("stale heartbeat"), "reason={reason}");
3641            assert!(reason.contains("failing open"), "reason={reason}");
3642        } else {
3643            assert!(
3644                matches!(decision, MaintenanceDecision::FailOpen { .. }),
3645                "expected FailOpen for searchable stale active lock, got {decision:?}"
3646            );
3647        }
3648
3649        let decision_no_lexical = decide_search_failopen(temp.path(), now_ms, false);
3650        assert!(
3651            matches!(
3652                decision_no_lexical,
3653                MaintenanceDecision::AttachOrWait { .. }
3654            ),
3655            "without lexical must wait for the held lock, got {decision_no_lexical:?}"
3656        );
3657
3658        let _ = FileExt::unlock(&owner);
3659    }
3660
3661    // -----------------------------------------------------------------------
3662    // ibuuh.22: Event log, yield signaling, unified view tests
3663    // -----------------------------------------------------------------------
3664
3665    #[test]
3666    fn event_log_append_and_read() {
3667        let temp = tempfile::tempdir().expect("tempdir");
3668        let event = MaintenanceEvent {
3669            timestamp_ms: 1_733_000_000_000,
3670            job_id: "test-job-1".to_string(),
3671            actor_pid: 42,
3672            kind: MaintenanceEventKind::Started {
3673                job_kind: "lexical_refresh".to_string(),
3674                phase: "scanning".to_string(),
3675            },
3676        };
3677        append_maintenance_event(temp.path(), &event).expect("append");
3678        let events = read_maintenance_events(temp.path(), None, None);
3679        assert_eq!(events.len(), 1);
3680        assert_eq!(events[0].job_id, "test-job-1");
3681        assert_eq!(events[0].actor_pid, 42);
3682        assert!(matches!(
3683            events[0].kind,
3684            MaintenanceEventKind::Started { .. }
3685        ));
3686    }
3687
3688    #[test]
3689    fn event_log_filters_by_timestamp() {
3690        let temp = tempfile::tempdir().expect("tempdir");
3691        for i in 0..5 {
3692            let event = MaintenanceEvent {
3693                timestamp_ms: 1_000 + i,
3694                job_id: format!("job-{i}"),
3695                actor_pid: 1,
3696                kind: MaintenanceEventKind::Progress {
3697                    processed: i as u64,
3698                    total: 5,
3699                },
3700            };
3701            append_maintenance_event(temp.path(), &event).expect("append");
3702        }
3703        let events = read_maintenance_events(temp.path(), Some(1_002), None);
3704        assert_eq!(events.len(), 2, "should only get events after ts 1002");
3705        assert_eq!(events[0].timestamp_ms, 1_003);
3706        assert_eq!(events[1].timestamp_ms, 1_004);
3707    }
3708
3709    #[test]
3710    fn event_log_respects_limit() {
3711        let temp = tempfile::tempdir().expect("tempdir");
3712        for i in 0..10 {
3713            let event = MaintenanceEvent {
3714                timestamp_ms: 1_000 + i,
3715                job_id: format!("job-{i}"),
3716                actor_pid: 1,
3717                kind: MaintenanceEventKind::Resumed,
3718            };
3719            append_maintenance_event(temp.path(), &event).expect("append");
3720        }
3721        let events = read_maintenance_events(temp.path(), None, Some(3));
3722        assert_eq!(events.len(), 3);
3723        assert_eq!(events[0].timestamp_ms, 1_007);
3724        assert_eq!(events[2].timestamp_ms, 1_009);
3725    }
3726
3727    #[test]
3728    fn event_log_returns_empty_when_missing() {
3729        let temp = tempfile::tempdir().expect("tempdir");
3730        let events = read_maintenance_events(temp.path(), None, None);
3731        assert!(events.is_empty());
3732    }
3733
3734    #[test]
3735    fn event_log_truncation_retains_tail() {
3736        let temp = tempfile::tempdir().expect("tempdir");
3737        for i in 0..550 {
3738            let event = MaintenanceEvent {
3739                timestamp_ms: i,
3740                job_id: format!("job-{i}"),
3741                actor_pid: 1,
3742                kind: MaintenanceEventKind::Resumed,
3743            };
3744            append_maintenance_event(temp.path(), &event).expect("append");
3745        }
3746        let before = read_maintenance_events(temp.path(), None, Some(600));
3747        assert_eq!(before.len(), 550);
3748        truncate_maintenance_event_log(temp.path()).expect("truncate");
3749        let after = read_maintenance_events(temp.path(), None, Some(600));
3750        assert_eq!(after.len(), MAX_EVENT_LOG_ENTRIES);
3751        assert_eq!(after[0].timestamp_ms, 50);
3752        assert_eq!(after[MAX_EVENT_LOG_ENTRIES - 1].timestamp_ms, 549);
3753    }
3754
3755    #[test]
3756    fn yield_signal_round_trip() {
3757        let temp = tempfile::tempdir().expect("tempdir");
3758        assert!(
3759            check_yield_requested(temp.path()).is_none(),
3760            "no signal initially"
3761        );
3762        request_yield(temp.path(), "foreground search pressure").expect("request yield");
3763        let req = check_yield_requested(temp.path()).expect("yield should be present");
3764        assert_eq!(req.requester_pid, std::process::id());
3765        assert_eq!(req.reason, "foreground search pressure");
3766        assert!(req.requested_at_ms > 0);
3767        clear_yield_signal(temp.path()).expect("clear");
3768        assert!(
3769            check_yield_requested(temp.path()).is_none(),
3770            "signal cleared"
3771        );
3772    }
3773
3774    #[test]
3775    fn clear_yield_signal_is_idempotent() {
3776        let temp = tempfile::tempdir().expect("tempdir");
3777        clear_yield_signal(temp.path()).expect("clear nonexistent");
3778        clear_yield_signal(temp.path()).expect("clear again");
3779    }
3780
3781    #[test]
3782    fn unified_view_idle_no_events() {
3783        let temp = tempfile::tempdir().expect("tempdir");
3784        let view = unified_maintenance_view(temp.path(), true);
3785        assert!(matches!(
3786            view.coordination,
3787            MaintenanceCoordinationOutcome::Idle
3788        ));
3789        assert!(view.yield_pending.is_none());
3790        assert!(view.recent_events.is_empty());
3791        assert_eq!(view.decision, MaintenanceDecision::Launch);
3792    }
3793
3794    #[test]
3795    fn unified_view_active_with_lexical_fails_open() {
3796        use fs2::FileExt;
3797        let temp = tempfile::tempdir().expect("tempdir");
3798        let lock_path = temp.path().join("index-run.lock");
3799        let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
3800        let owner = OpenOptions::new()
3801            .create(true)
3802            .read(true)
3803            .write(true)
3804            .truncate(true)
3805            .open(&lock_path)
3806            .expect("open");
3807        owner.try_lock_exclusive().expect("lock");
3808        std::fs::write(
3809            &lock_path,
3810            format!(
3811                "pid=99999\nstarted_at_ms={}\nupdated_at_ms={}\ndb_path=/tmp/t.db\nmode=index\njob_id=uv-1\njob_kind=lexical_refresh\nphase=indexing\n",
3812                now_ms - 1_000,
3813                now_ms,
3814            ),
3815        )
3816        .expect("write metadata");
3817
3818        let event = MaintenanceEvent {
3819            timestamp_ms: now_ms,
3820            job_id: "uv-1".to_string(),
3821            actor_pid: 99999,
3822            kind: MaintenanceEventKind::Started {
3823                job_kind: "lexical_refresh".to_string(),
3824                phase: "indexing".to_string(),
3825            },
3826        };
3827        append_maintenance_event(temp.path(), &event).expect("append");
3828
3829        let view = unified_maintenance_view(temp.path(), true);
3830        assert!(matches!(
3831            view.coordination,
3832            MaintenanceCoordinationOutcome::Active { .. }
3833        ));
3834        assert!(matches!(
3835            view.decision,
3836            MaintenanceDecision::FailOpen { .. }
3837        ));
3838        assert_eq!(view.recent_events.len(), 1);
3839
3840        let _ = FileExt::unlock(&owner);
3841    }
3842
3843    #[test]
3844    fn unified_view_includes_yield_signal() {
3845        let temp = tempfile::tempdir().expect("tempdir");
3846        request_yield(temp.path(), "test yield").expect("yield");
3847        let view = unified_maintenance_view(temp.path(), true);
3848        assert!(view.yield_pending.is_some());
3849        assert_eq!(view.yield_pending.as_ref().unwrap().reason, "test yield");
3850        clear_yield_signal(temp.path()).expect("clear");
3851    }
3852
3853    #[test]
3854    fn event_kinds_serialize_round_trip() {
3855        let temp = tempfile::tempdir().expect("tempdir");
3856        let kinds = vec![
3857            MaintenanceEventKind::Started {
3858                job_kind: "lexical_refresh".to_string(),
3859                phase: "init".to_string(),
3860            },
3861            MaintenanceEventKind::PhaseChanged {
3862                from: "init".to_string(),
3863                to: "scanning".to_string(),
3864            },
3865            MaintenanceEventKind::Progress {
3866                processed: 50,
3867                total: 100,
3868            },
3869            MaintenanceEventKind::YieldRequested {
3870                requester_pid: 42,
3871                reason: "foreground".to_string(),
3872            },
3873            MaintenanceEventKind::Paused {
3874                reason: "yield".to_string(),
3875            },
3876            MaintenanceEventKind::Resumed,
3877            MaintenanceEventKind::Completed {
3878                summary: "done".to_string(),
3879            },
3880            MaintenanceEventKind::Failed {
3881                error: "oops".to_string(),
3882            },
3883            MaintenanceEventKind::Cancelled {
3884                reason: "user".to_string(),
3885            },
3886        ];
3887        for (i, kind) in kinds.into_iter().enumerate() {
3888            let event = MaintenanceEvent {
3889                timestamp_ms: i as i64,
3890                job_id: "rt-test".to_string(),
3891                actor_pid: 1,
3892                kind,
3893            };
3894            append_maintenance_event(temp.path(), &event).expect("append");
3895        }
3896        let events = read_maintenance_events(temp.path(), None, None);
3897        assert_eq!(events.len(), 9);
3898        assert!(matches!(
3899            events[0].kind,
3900            MaintenanceEventKind::Started { .. }
3901        ));
3902        assert!(matches!(events[5].kind, MaintenanceEventKind::Resumed));
3903        assert!(matches!(
3904            events[8].kind,
3905            MaintenanceEventKind::Cancelled { .. }
3906        ));
3907    }
3908}