1use std::fs::OpenOptions;
13use std::io::{Read, Write};
14use std::path::{Path, PathBuf};
15use std::time::{Duration, Instant};
16
17use anyhow::{Context, Result};
18use fs2::FileExt;
19
20use crate::indexer::{
21 LEXICAL_REBUILD_PAGE_SIZE_PUBLIC, LexicalRebuildCheckpoint,
22 lexical_rebuild_page_size_is_compatible, lexical_storage_fingerprint_for_db,
23 load_lexical_rebuild_checkpoint,
24};
25use crate::search::ann_index::hnsw_index_path;
26use crate::search::embedder::Embedder;
27use crate::search::fastembed_embedder::FastEmbedder;
28use crate::search::hash_embedder::HashEmbedder;
29use crate::search::model_manager::{
30 SemanticAvailability, probe_hash_semantic_availability, probe_semantic_availability,
31};
32use crate::search::policy::{
33 CHUNKING_STRATEGY_VERSION, CliSemanticOverrides, SEMANTIC_SCHEMA_VERSION, SemanticPolicy,
34};
35use crate::search::semantic_manifest::{
36 ArtifactRecord, BuildCheckpoint, SemanticManifest, SemanticShardManifest, SemanticShardRecord,
37 TierKind, semantic_shard_artifact_path_is_safe,
38};
39use crate::search::tantivy::SCHEMA_HASH;
40use crate::search::vector_index::{VECTOR_INDEX_DIR, vector_index_path};
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
43pub(crate) enum SearchMaintenanceMode {
44 Index,
45 WatchStartup,
46 Watch,
47 WatchOnce,
48}
49
50impl SearchMaintenanceMode {
51 pub(crate) fn as_lock_value(self) -> &'static str {
52 match self {
53 Self::Index => "index",
54 Self::WatchStartup => "watch_startup",
55 Self::Watch => "watch",
56 Self::WatchOnce => "watch_once",
57 }
58 }
59
60 pub(crate) fn parse_lock_value(raw: &str) -> Option<Self> {
61 match raw.trim() {
62 "index" => Some(Self::Index),
63 "watch_startup" => Some(Self::WatchStartup),
64 "watch" => Some(Self::Watch),
65 "watch_once" => Some(Self::WatchOnce),
66 _ => None,
67 }
68 }
69
70 pub(crate) fn watch_active(self) -> bool {
71 matches!(self, Self::WatchStartup | Self::Watch)
72 }
73
74 pub(crate) fn rebuild_active(self) -> bool {
75 !matches!(self, Self::Watch)
76 }
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
80pub(crate) enum SearchMaintenanceJobKind {
81 LexicalRefresh,
82 SemanticAcquire,
83}
84
85impl SearchMaintenanceJobKind {
86 pub(crate) fn as_lock_value(self) -> &'static str {
87 match self {
88 Self::LexicalRefresh => "lexical_refresh",
89 Self::SemanticAcquire => "semantic_acquire",
90 }
91 }
92
93 pub(crate) fn parse_lock_value(raw: &str) -> Option<Self> {
94 match raw.trim() {
95 "lexical_refresh" => Some(Self::LexicalRefresh),
96 "semantic_acquire" => Some(Self::SemanticAcquire),
97 _ => None,
98 }
99 }
100}
101
102#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize)]
103pub(crate) struct SearchMaintenanceSnapshot {
104 pub active: bool,
105 pub pid: Option<u32>,
106 pub started_at_ms: Option<i64>,
107 pub db_path: Option<PathBuf>,
108 pub mode: Option<SearchMaintenanceMode>,
109 pub job_id: Option<String>,
110 pub job_kind: Option<SearchMaintenanceJobKind>,
111 pub phase: Option<String>,
112 pub updated_at_ms: Option<i64>,
113 pub last_progress_at_ms: Option<i64>,
114 pub orphaned: bool,
115}
116
117pub(crate) fn index_run_lock_path(data_dir: &Path) -> PathBuf {
118 data_dir.join("index-run.lock")
119}
120
121pub(crate) fn index_run_lock_metadata_sidecar_path(lock_path: &Path) -> PathBuf {
122 lock_path.with_file_name("index-run.lock.meta")
123}
124
125pub(crate) fn write_index_run_lock_metadata_sidecar(
126 lock_path: &Path,
127 contents: &str,
128) -> Result<()> {
129 let sidecar_path = index_run_lock_metadata_sidecar_path(lock_path);
130 let mut sidecar = OpenOptions::new()
131 .create(true)
132 .truncate(true)
133 .write(true)
134 .open(&sidecar_path)
135 .with_context(|| {
136 format!(
137 "opening index-run lock metadata sidecar {}",
138 sidecar_path.display()
139 )
140 })?;
141 sidecar.write_all(contents.as_bytes()).with_context(|| {
142 format!(
143 "writing index-run lock metadata sidecar {}",
144 sidecar_path.display()
145 )
146 })?;
147 sidecar.flush().with_context(|| {
148 format!(
149 "flushing index-run lock metadata sidecar {}",
150 sidecar_path.display()
151 )
152 })?;
153 sidecar.sync_all().with_context(|| {
154 format!(
155 "syncing index-run lock metadata sidecar {}",
156 sidecar_path.display()
157 )
158 })
159}
160
161pub(crate) fn clear_index_run_lock_metadata_sidecar(lock_path: &Path) -> Result<()> {
162 let sidecar_path = index_run_lock_metadata_sidecar_path(lock_path);
163 let sidecar = match OpenOptions::new()
164 .truncate(true)
165 .write(true)
166 .open(&sidecar_path)
167 {
168 Ok(sidecar) => sidecar,
169 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(()),
170 Err(err) => {
171 return Err(err).with_context(|| {
172 format!(
173 "opening index-run lock metadata sidecar {}",
174 sidecar_path.display()
175 )
176 });
177 }
178 };
179 sidecar.sync_all().with_context(|| {
180 format!(
181 "syncing cleared index-run lock metadata sidecar {}",
182 sidecar_path.display()
183 )
184 })
185}
186
187fn read_capped_metadata_from_path(path: &Path, max_len: u64) -> std::io::Result<String> {
188 let file = OpenOptions::new().read(true).open(path)?;
189 let mut raw = String::new();
190 (&file).take(max_len).read_to_string(&mut raw)?;
191 Ok(raw)
192}
193
194#[cfg(windows)]
195pub(crate) fn windows_lock_conflict(err: &std::io::Error) -> bool {
196 matches!(err.raw_os_error(), Some(32 | 33))
197}
198
199#[cfg(not(windows))]
200pub(crate) fn windows_lock_conflict(_err: &std::io::Error) -> bool {
201 false
202}
203
204pub(crate) fn read_search_maintenance_snapshot(data_dir: &Path) -> SearchMaintenanceSnapshot {
205 const MAX_LOCK_FILE_READ: u64 = 64 * 1024;
210
211 let lock_path = index_run_lock_path(data_dir);
212 let sidecar_path = index_run_lock_metadata_sidecar_path(&lock_path);
213 let file = match OpenOptions::new().read(true).write(true).open(&lock_path) {
214 Ok(file) => file,
215 Err(err) if windows_lock_conflict(&err) => {
216 let raw = read_capped_metadata_from_path(&sidecar_path, MAX_LOCK_FILE_READ)
217 .unwrap_or_default();
218 return parse_search_maintenance_snapshot(raw, true);
219 }
220 Err(_) => return SearchMaintenanceSnapshot::default(),
221 };
222
223 let mut raw = String::new();
224 let mut read_blocked_by_lock = false;
225 if let Err(err) = (&file).take(MAX_LOCK_FILE_READ).read_to_string(&mut raw)
226 && windows_lock_conflict(&err)
227 {
228 read_blocked_by_lock = true;
229 }
230 if raw.trim().is_empty() {
231 raw = read_capped_metadata_from_path(&sidecar_path, MAX_LOCK_FILE_READ).unwrap_or_default();
232 }
233
234 let mut snapshot = parse_search_maintenance_snapshot(raw, read_blocked_by_lock);
235 if read_blocked_by_lock {
236 return snapshot;
237 }
238
239 let metadata_present = snapshot.pid.is_some()
240 || snapshot.started_at_ms.is_some()
241 || snapshot.db_path.is_some()
242 || snapshot.mode.is_some()
243 || snapshot.job_id.is_some()
244 || snapshot.job_kind.is_some()
245 || snapshot.phase.is_some()
246 || snapshot.updated_at_ms.is_some()
247 || snapshot.last_progress_at_ms.is_some();
248
249 #[cfg(windows)]
250 let current_process_owns_recorded_lock =
251 metadata_present && snapshot.pid == Some(std::process::id());
252 #[cfg(not(windows))]
253 let current_process_owns_recorded_lock = false;
254
255 snapshot.active = if current_process_owns_recorded_lock {
256 true
257 } else {
258 match file.try_lock_exclusive() {
259 Ok(()) => {
260 if metadata_present {
284 if let Err(err) = file.set_len(0) {
285 tracing::warn!(
286 path = %lock_path.display(),
287 error = %err,
288 "failed to truncate stale index-run lock metadata"
289 );
290 } else {
291 let _ = clear_index_run_lock_metadata_sidecar(&lock_path);
292 let _ = file.sync_all();
293 tracing::info!(
294 path = %lock_path.display(),
295 stale_pid = ?snapshot.pid,
296 "cleared stale index-run lock metadata (previous owner gone)"
297 );
298 let _ = file.unlock();
299 return SearchMaintenanceSnapshot::default();
300 }
301 }
302 let _ = file.unlock();
303 false
304 }
305 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => true,
306 Err(err) if windows_lock_conflict(&err) => true,
307 Err(_) => false,
308 }
309 };
310 snapshot.orphaned = metadata_present && !snapshot.active;
311 snapshot
312}
313
314fn parse_search_maintenance_snapshot(
315 raw: String,
316 active_when_metadata_unreadable: bool,
317) -> SearchMaintenanceSnapshot {
318 let mut pid = None;
319 let mut started_at_ms = None;
320 let mut lock_db_path = None::<PathBuf>;
321 let mut mode = None;
322 let mut job_id = None;
323 let mut job_kind = None;
324 let mut phase = None;
325 let mut updated_at_ms = None;
326 let mut last_progress_at_ms = None;
327 for line in raw.lines() {
328 let Some((key, value)) = line.split_once('=') else {
329 continue;
330 };
331 match key.trim() {
332 "pid" => pid = value.trim().parse::<u32>().ok(),
333 "started_at_ms" => started_at_ms = value.trim().parse::<i64>().ok(),
334 "db_path" => lock_db_path = Some(PathBuf::from(value.trim())),
335 "mode" => mode = SearchMaintenanceMode::parse_lock_value(value),
336 "job_id" => job_id = Some(value.trim().to_string()).filter(|value| !value.is_empty()),
337 "job_kind" => job_kind = SearchMaintenanceJobKind::parse_lock_value(value),
338 "phase" => phase = Some(value.trim().to_string()).filter(|value| !value.is_empty()),
339 "updated_at_ms" => updated_at_ms = value.trim().parse::<i64>().ok(),
340 "last_progress_at_ms" => last_progress_at_ms = value.trim().parse::<i64>().ok(),
341 _ => {}
342 }
343 }
344
345 let metadata_present = pid.is_some()
346 || started_at_ms.is_some()
347 || lock_db_path.is_some()
348 || mode.is_some()
349 || job_id.is_some()
350 || job_kind.is_some()
351 || phase.is_some()
352 || updated_at_ms.is_some()
353 || last_progress_at_ms.is_some();
354
355 SearchMaintenanceSnapshot {
356 active: active_when_metadata_unreadable,
357 pid,
358 started_at_ms,
359 db_path: lock_db_path,
360 mode,
361 job_id,
362 job_kind,
363 phase,
364 updated_at_ms,
365 last_progress_at_ms,
366 orphaned: metadata_present && !active_when_metadata_unreadable,
367 }
368}
369
370pub(crate) const REBUILD_STALL_DETECT_SECS_DEFAULT: u64 = 120;
371
372pub(crate) fn rebuild_stall_detect_threshold_ms() -> Option<i64> {
373 let threshold_secs = dotenvy::var("CASS_REBUILD_STALL_DETECT_SECS")
374 .ok()
375 .and_then(|value| value.trim().parse::<u64>().ok())
376 .unwrap_or(REBUILD_STALL_DETECT_SECS_DEFAULT);
377 if threshold_secs == 0 {
378 return None;
379 }
380 let threshold_ms = threshold_secs.saturating_mul(1000);
381 Some(i64::try_from(threshold_ms).unwrap_or(i64::MAX))
382}
383
384pub(crate) fn maintenance_stall_age_ms(
385 snapshot: &SearchMaintenanceSnapshot,
386 now_ms: i64,
387) -> Option<i64> {
388 if !snapshot.active
389 || !snapshot
390 .mode
391 .is_some_and(SearchMaintenanceMode::rebuild_active)
392 {
393 return None;
394 }
395 let last_progress_at_ms = snapshot.last_progress_at_ms?;
396 let age_ms = now_ms.saturating_sub(last_progress_at_ms);
397 let threshold_ms = rebuild_stall_detect_threshold_ms()?;
398 (age_ms >= threshold_ms).then_some(age_ms)
399}
400
401#[cfg_attr(not(test), allow(dead_code))]
402#[derive(Debug, Clone, Copy, PartialEq, Eq)]
403pub(crate) enum SemanticPreference {
404 DefaultModel,
405 HashFallback,
406}
407
408#[derive(Debug, Clone, PartialEq, Eq)]
409pub(crate) struct SearchAssetSnapshot {
410 pub lexical: LexicalAssetState,
411 pub semantic: SemanticAssetState,
412}
413
414#[derive(Debug, Clone, PartialEq, Eq)]
415pub(crate) struct LexicalAssetState {
416 pub status: &'static str,
417 pub exists: bool,
418 pub fresh: bool,
419 pub stale: bool,
420 pub rebuilding: bool,
421 pub stalled: bool,
428 pub last_progress_age_ms: Option<i64>,
433 pub last_progress_at_ms: Option<i64>,
434 pub watch_active: bool,
435 pub last_indexed_at_ms: Option<i64>,
436 pub age_seconds: Option<u64>,
437 pub stale_threshold_seconds: u64,
438 pub activity_at_ms: Option<i64>,
439 pub pending_sessions: u64,
440 pub processed_conversations: Option<u64>,
441 pub total_conversations: Option<u64>,
442 pub indexed_docs: Option<u64>,
443 pub status_reason: Option<String>,
444 pub fingerprint: LexicalFingerprintState,
445 pub checkpoint: LexicalCheckpointState,
446}
447
448#[derive(Debug, Clone, PartialEq, Eq)]
449pub(crate) struct LexicalFingerprintState {
450 pub current_db_fingerprint: Option<String>,
451 pub checkpoint_fingerprint: Option<String>,
452 pub matches_current_db_fingerprint: Option<bool>,
453}
454
455#[derive(Debug, Clone, PartialEq, Eq)]
456pub(crate) struct LexicalCheckpointState {
457 pub present: bool,
458 pub completed: Option<bool>,
459 pub db_matches: Option<bool>,
460 pub schema_matches: Option<bool>,
461 pub page_size_matches: Option<bool>,
462 pub page_size_compatible: Option<bool>,
463}
464
465#[derive(Debug, Clone, PartialEq, Eq)]
466pub(crate) struct SemanticAssetState {
467 pub status: &'static str,
468 pub availability: &'static str,
469 pub summary: String,
470 pub available: bool,
471 pub can_search: bool,
472 pub fallback_mode: Option<&'static str>,
473 pub preferred_backend: &'static str,
474 pub embedder_id: Option<String>,
475 pub vector_index_path: Option<PathBuf>,
476 pub model_dir: Option<PathBuf>,
477 pub hnsw_path: Option<PathBuf>,
478 pub hnsw_ready: bool,
479 pub progressive_ready: bool,
480 pub quality_tier_published: bool,
490 pub semantic_only_search_available: bool,
497 pub hint: Option<String>,
498 pub fast_tier: SemanticTierAssetState,
499 pub quality_tier: SemanticTierAssetState,
500 pub backlog: SemanticBacklogProgressState,
501 pub checkpoint: SemanticCheckpointProgressState,
502}
503
504struct SemanticRuntimeSurface {
505 status: &'static str,
506 availability: &'static str,
507 summary: String,
508 can_search: bool,
509 fallback_mode: Option<&'static str>,
510 hint: Option<String>,
511 embedder_id: Option<String>,
512 vector_index_path: Option<PathBuf>,
513 model_dir: Option<PathBuf>,
514 hnsw_path: Option<PathBuf>,
515}
516
517struct SemanticRuntimeInputs<'a> {
518 data_dir: &'a Path,
519 availability: &'a SemanticAvailability,
520 preference: SemanticPreference,
521 fast_tier: &'a SemanticTierAssetState,
522 quality_tier: &'a SemanticTierAssetState,
523 backlog: &'a SemanticBacklogProgressState,
524 checkpoint: &'a SemanticCheckpointProgressState,
525 base_embedder_id: Option<String>,
526 base_vector_index_path: Option<PathBuf>,
527 base_model_dir: Option<PathBuf>,
528 base_hnsw_path: Option<PathBuf>,
529}
530
531struct SemanticPreferenceSurface {
532 preferred_backend: &'static str,
533 model_dir: Option<PathBuf>,
534}
535
536#[derive(Debug, Clone, Default, PartialEq, Eq)]
537pub(crate) struct SemanticTierAssetState {
538 pub present: bool,
539 pub ready: bool,
540 pub current_db_matches: Option<bool>,
541 pub conversation_count: Option<u64>,
542 pub doc_count: Option<u64>,
543 pub embedder_id: Option<String>,
544 pub model_revision: Option<String>,
545 pub completed_at_ms: Option<i64>,
546 pub size_bytes: Option<u64>,
547 pub index_path: Option<PathBuf>,
548}
549
550#[derive(Debug, Clone, Default, PartialEq, Eq)]
551pub(crate) struct SemanticBacklogProgressState {
552 pub total_conversations: u64,
553 pub fast_tier_processed: u64,
554 pub fast_tier_remaining: u64,
555 pub quality_tier_processed: u64,
556 pub quality_tier_remaining: u64,
557 pub pending_work: bool,
558 pub current_db_matches: Option<bool>,
559 pub computed_at_ms: Option<i64>,
560}
561
562#[derive(Debug, Clone, Default, PartialEq, Eq)]
563pub(crate) struct SemanticCheckpointProgressState {
564 pub active: bool,
565 pub tier: Option<&'static str>,
566 pub current_db_matches: Option<bool>,
567 pub completed: Option<bool>,
568 pub conversations_processed: Option<u64>,
569 pub total_conversations: Option<u64>,
570 pub progress_pct: Option<u8>,
571 pub docs_embedded: Option<u64>,
572 pub last_offset: Option<i64>,
573 pub saved_at_ms: Option<i64>,
574}
575
576pub(crate) struct InspectSearchAssetsInput<'a> {
577 pub data_dir: &'a Path,
578 pub db_path: &'a Path,
579 pub stale_threshold: u64,
580 pub last_indexed_at_ms: Option<i64>,
581 pub now_ms: i64,
589 pub maintenance: SearchMaintenanceSnapshot,
590 pub semantic_preference: SemanticPreference,
591 pub db_available: bool,
592 pub compute_lexical_fingerprint: bool,
593 pub inspect_semantic: bool,
594}
595
596const LEXICAL_STORAGE_FINGERPRINT_MTIME_TOLERANCE_MS: i64 = 1_000;
597
598#[derive(Debug, Clone, Copy, PartialEq, Eq)]
599struct ParsedLexicalStorageFingerprint {
600 db_len: u64,
601 db_mtime_ms: i64,
602 wal_len: u64,
603 wal_mtime_ms: i64,
604}
605
606fn parse_lexical_storage_fingerprint(raw: &str) -> Option<ParsedLexicalStorageFingerprint> {
607 let mut parts = raw.split(':');
608 let fingerprint = ParsedLexicalStorageFingerprint {
609 db_len: parts.next()?.parse().ok()?,
610 db_mtime_ms: parts.next()?.parse().ok()?,
611 wal_len: parts.next()?.parse().ok()?,
612 wal_mtime_ms: parts.next()?.parse().ok()?,
613 };
614 if parts.next().is_some() {
615 return None;
616 }
617 Some(fingerprint)
618}
619
620pub(crate) fn lexical_storage_fingerprints_match(current: &str, saved: &str) -> bool {
621 match (
622 parse_lexical_storage_fingerprint(current),
623 parse_lexical_storage_fingerprint(saved),
624 ) {
625 (Some(current), Some(saved)) => {
626 current.db_len == saved.db_len
627 && current.wal_len == saved.wal_len
628 && current.db_mtime_ms.abs_diff(saved.db_mtime_ms)
629 <= u64::try_from(LEXICAL_STORAGE_FINGERPRINT_MTIME_TOLERANCE_MS)
630 .unwrap_or(u64::MAX)
631 && current.wal_mtime_ms.abs_diff(saved.wal_mtime_ms)
632 <= u64::try_from(LEXICAL_STORAGE_FINGERPRINT_MTIME_TOLERANCE_MS)
633 .unwrap_or(u64::MAX)
634 }
635 _ => current == saved,
636 }
637}
638
639pub(crate) fn inspect_search_assets(
640 input: InspectSearchAssetsInput<'_>,
641) -> Result<SearchAssetSnapshot> {
642 let InspectSearchAssetsInput {
643 data_dir,
644 db_path,
645 stale_threshold,
646 last_indexed_at_ms,
647 now_ms,
648 maintenance,
649 semantic_preference,
650 db_available,
651 compute_lexical_fingerprint,
652 inspect_semantic,
653 } = input;
654
655 let lexical = inspect_lexical_assets(InspectLexicalAssetsInput {
656 data_dir,
657 db_path,
658 stale_threshold,
659 last_indexed_at_ms,
660 now_ms,
661 maintenance,
662 db_available,
663 compute_lexical_fingerprint,
664 })?;
665 let current_db_fingerprint = lexical.fingerprint.current_db_fingerprint.as_deref();
666 let semantic = if inspect_semantic {
667 inspect_semantic_assets(
668 data_dir,
669 db_path,
670 semantic_preference,
671 current_db_fingerprint,
672 db_available,
673 )
674 } else {
675 semantic_state_not_inspected(data_dir, semantic_preference, current_db_fingerprint)
676 };
677
678 Ok(SearchAssetSnapshot { lexical, semantic })
679}
680
681fn semantic_state_not_inspected(
682 data_dir: &Path,
683 preference: SemanticPreference,
684 current_db_fingerprint: Option<&str>,
685) -> SemanticAssetState {
686 let (fast_tier, quality_tier, backlog, checkpoint) =
687 semantic_manifest_progress(data_dir, current_db_fingerprint);
688 let preference_surface = semantic_preference_surface(data_dir, preference);
689
690 SemanticAssetState {
691 status: "not_inspected",
692 availability: "not_inspected",
693 summary: "semantic assets were not inspected for this fast path".to_string(),
694 available: false,
695 can_search: false,
696 fallback_mode: Some("lexical"),
697 preferred_backend: preference_surface.preferred_backend,
698 embedder_id: None,
699 vector_index_path: None,
700 model_dir: preference_surface.model_dir,
701 hnsw_path: None,
702 hnsw_ready: false,
703 progressive_ready: semantic_progressive_assets_ready(data_dir),
704 quality_tier_published: false,
710 semantic_only_search_available: false,
711 hint: Some(
712 "Use 'cass status --json' or 'cass models status --json' for semantic readiness."
713 .to_string(),
714 ),
715 fast_tier,
716 quality_tier,
717 backlog,
718 checkpoint,
719 }
720}
721
722pub(crate) fn inspect_semantic_assets(
723 data_dir: &Path,
724 db_path: &Path,
725 preference: SemanticPreference,
726 current_db_fingerprint: Option<&str>,
727 db_available: bool,
728) -> SemanticAssetState {
729 if !db_available {
730 let availability = SemanticAvailability::DatabaseUnavailable {
731 db_path: db_path.to_path_buf(),
732 error: "database unavailable during asset inspection".to_string(),
733 };
734 return semantic_state_from_availability(
735 data_dir,
736 &availability,
737 preference,
738 current_db_fingerprint,
739 );
740 }
741
742 let availability = match preference {
743 SemanticPreference::DefaultModel => probe_semantic_availability(data_dir),
744 SemanticPreference::HashFallback => probe_hash_semantic_availability(data_dir),
745 };
746 semantic_state_from_availability(data_dir, &availability, preference, current_db_fingerprint)
747}
748
749pub(crate) fn semantic_state_from_availability(
750 data_dir: &Path,
751 availability: &SemanticAvailability,
752 preference: SemanticPreference,
753 current_db_fingerprint: Option<&str>,
754) -> SemanticAssetState {
755 let (mut fast_tier, mut quality_tier, backlog, checkpoint) =
756 semantic_manifest_progress(data_dir, current_db_fingerprint);
757 let preference_surface = semantic_preference_surface(data_dir, preference);
758 let base_embedder_id = semantic_embedder_id(availability, preference);
759 if let (Some(db_fingerprint), Some(embedder_id)) =
760 (current_db_fingerprint, base_embedder_id.as_deref())
761 {
762 promote_complete_shard_generation_state(
763 data_dir,
764 TierKind::Fast,
765 embedder_id,
766 db_fingerprint,
767 &mut fast_tier,
768 );
769 promote_complete_shard_generation_state(
770 data_dir,
771 TierKind::Quality,
772 embedder_id,
773 db_fingerprint,
774 &mut quality_tier,
775 );
776 }
777 let base_vector_index_path = semantic_vector_index_path(data_dir, availability, preference);
778 let base_model_dir = preference_surface.model_dir;
779 let base_hnsw_path = base_embedder_id
780 .as_deref()
781 .map(|embedder_id| hnsw_index_path(data_dir, embedder_id));
782 let runtime = semantic_runtime_surface(SemanticRuntimeInputs {
783 data_dir,
784 availability,
785 preference,
786 fast_tier: &fast_tier,
787 quality_tier: &quality_tier,
788 backlog: &backlog,
789 checkpoint: &checkpoint,
790 base_embedder_id: base_embedder_id.clone(),
791 base_vector_index_path: base_vector_index_path.clone(),
792 base_model_dir: base_model_dir.clone(),
793 base_hnsw_path: base_hnsw_path.clone(),
794 });
795 let use_runtime_paths = runtime.embedder_id.is_some();
796 let embedder_id = runtime.embedder_id.or(base_embedder_id);
797 let vector_index_path = if use_runtime_paths {
798 runtime.vector_index_path
799 } else {
800 runtime.vector_index_path.or(base_vector_index_path)
801 };
802 let model_dir = if use_runtime_paths {
803 runtime.model_dir
804 } else {
805 runtime.model_dir.or(base_model_dir)
806 };
807 let hnsw_path = if use_runtime_paths {
808 runtime.hnsw_path
809 } else {
810 runtime.hnsw_path.or(base_hnsw_path)
811 };
812 let hnsw_ready = hnsw_path.as_ref().is_some_and(|path| path.is_file());
813 let progressive_ready = semantic_progressive_assets_ready(data_dir);
814
815 let quality_tier_published = semantic_tier_queryable(availability, &quality_tier);
820 let fast_tier_queryable = semantic_tier_queryable(availability, &fast_tier);
821 let semantic_only_search_available = quality_tier_published || fast_tier_queryable;
822
823 SemanticAssetState {
824 status: runtime.status,
825 availability: runtime.availability,
826 summary: runtime.summary,
827 available: runtime.can_search,
828 can_search: runtime.can_search,
829 fallback_mode: runtime.fallback_mode,
830 preferred_backend: preference_surface.preferred_backend,
831 embedder_id,
832 vector_index_path,
833 model_dir,
834 hnsw_path,
835 hnsw_ready,
836 progressive_ready,
837 quality_tier_published,
838 semantic_only_search_available,
839 hint: runtime.hint,
840 fast_tier,
841 quality_tier,
842 backlog,
843 checkpoint,
844 }
845}
846
847fn semantic_preference_surface(
848 data_dir: &Path,
849 preference: SemanticPreference,
850) -> SemanticPreferenceSurface {
851 match preference {
852 SemanticPreference::DefaultModel => SemanticPreferenceSurface {
853 preferred_backend: "fastembed",
854 model_dir: active_policy_model_dir(data_dir),
855 },
856 SemanticPreference::HashFallback => SemanticPreferenceSurface {
857 preferred_backend: "hash",
858 model_dir: None,
859 },
860 }
861}
862
863fn semantic_runtime_surface(inputs: SemanticRuntimeInputs<'_>) -> SemanticRuntimeSurface {
864 let SemanticRuntimeInputs {
865 data_dir,
866 availability,
867 preference,
868 fast_tier,
869 quality_tier,
870 backlog,
871 checkpoint,
872 base_embedder_id,
873 base_vector_index_path,
874 base_model_dir,
875 base_hnsw_path,
876 } = inputs;
877 let base_status = semantic_status_from_availability(availability);
878 let base_availability = semantic_availability_code(availability);
879 let base_summary = availability.summary();
880 let base_can_search = availability.can_search();
881 let base_hint = semantic_hint(availability, preference);
882
883 if matches!(
884 availability,
885 SemanticAvailability::Disabled { .. }
886 | SemanticAvailability::DatabaseUnavailable { .. }
887 | SemanticAvailability::LoadFailed { .. }
888 ) {
889 return SemanticRuntimeSurface {
890 status: base_status,
891 availability: base_availability,
892 summary: base_summary,
893 can_search: base_can_search,
894 fallback_mode: (!base_can_search).then_some("lexical"),
895 hint: base_hint,
896 embedder_id: base_embedder_id,
897 vector_index_path: base_vector_index_path,
898 model_dir: base_model_dir,
899 hnsw_path: base_hnsw_path,
900 };
901 }
902
903 let quality_queryable = semantic_tier_queryable(availability, quality_tier);
904 let fast_queryable = semantic_tier_queryable(availability, fast_tier);
905 let checkpoint_active = checkpoint.active;
906 let backlog_pending = backlog.pending_work;
907 let manifest_assets_present = fast_tier.present || quality_tier.present;
908 let backfill_active = checkpoint_active || backlog_pending;
909
910 let effective_embedder_id = if quality_queryable {
911 quality_tier.embedder_id.clone()
912 } else if fast_queryable {
913 fast_tier.embedder_id.clone()
914 } else {
915 None
916 };
917 let effective_vector_index_path = if quality_queryable {
918 quality_tier.index_path.clone()
919 } else if fast_queryable {
920 fast_tier.index_path.clone()
921 } else {
922 None
923 }
924 .or_else(|| {
925 effective_embedder_id
926 .as_deref()
927 .map(|embedder_id| vector_index_path(data_dir, embedder_id))
928 });
929 let effective_model_dir = effective_embedder_id.as_deref().and_then(|embedder_id| {
930 (!semantic_embedder_is_hash(embedder_id))
931 .then(|| model_dir_for_embedder_id(data_dir, embedder_id))
932 .flatten()
933 });
934 let effective_hnsw_path = effective_embedder_id
935 .as_deref()
936 .map(|embedder_id| hnsw_index_path(data_dir, embedder_id));
937
938 if quality_queryable || fast_queryable {
939 let fully_ready = (quality_queryable || fast_queryable) && !backfill_active;
940 let summary = if quality_queryable && backfill_active {
941 "semantic quality tier is usable; residual semantic backfill is still finishing"
942 .to_string()
943 } else if quality_queryable {
944 "semantic quality tier ready".to_string()
945 } else if backfill_active {
946 "semantic fast tier is usable; higher-quality semantic backfill is still in progress"
947 .to_string()
948 } else {
949 "semantic fast tier ready".to_string()
950 };
951 let hint = if backfill_active {
952 Some(
953 "Semantic refinement is already usable; continue searching while higher-quality backfill finishes."
954 .to_string(),
955 )
956 } else {
957 None
958 };
959 return SemanticRuntimeSurface {
960 status: if fully_ready { "ready" } else { "building" },
961 availability: if fully_ready {
962 "ready"
963 } else {
964 "index_building"
965 },
966 summary,
967 can_search: true,
968 fallback_mode: None,
969 hint,
970 embedder_id: effective_embedder_id,
971 vector_index_path: effective_vector_index_path,
972 model_dir: effective_model_dir,
973 hnsw_path: effective_hnsw_path,
974 };
975 }
976
977 if backfill_active {
978 return SemanticRuntimeSurface {
979 status: "building",
980 availability: "index_building",
981 summary: "semantic backfill is in progress for the current database".to_string(),
982 can_search: false,
983 fallback_mode: Some("lexical"),
984 hint: Some(
985 "Run 'cass index --semantic' to finish backfilling current semantic assets; search will use lexical fallback until then."
986 .to_string(),
987 ),
988 embedder_id: base_embedder_id,
989 vector_index_path: base_vector_index_path,
990 model_dir: base_model_dir,
991 hnsw_path: base_hnsw_path,
992 };
993 }
994
995 if manifest_assets_present {
996 return SemanticRuntimeSurface {
997 status: "stale",
998 availability: "update_available",
999 summary: "semantic artifacts exist but do not match the current database".to_string(),
1000 can_search: false,
1001 fallback_mode: Some("lexical"),
1002 hint: Some(
1003 "Run 'cass index --semantic' to refresh semantic assets for the current database; search will use lexical fallback until then."
1004 .to_string(),
1005 ),
1006 embedder_id: base_embedder_id,
1007 vector_index_path: base_vector_index_path,
1008 model_dir: base_model_dir,
1009 hnsw_path: base_hnsw_path,
1010 };
1011 }
1012
1013 SemanticRuntimeSurface {
1014 status: base_status,
1015 availability: base_availability,
1016 summary: base_summary,
1017 can_search: base_can_search,
1018 fallback_mode: (!base_can_search).then_some("lexical"),
1019 hint: base_hint,
1020 embedder_id: base_embedder_id,
1021 vector_index_path: base_vector_index_path,
1022 model_dir: base_model_dir,
1023 hnsw_path: base_hnsw_path,
1024 }
1025}
1026
1027fn active_policy_model_dir(data_dir: &Path) -> Option<PathBuf> {
1028 let policy = SemanticPolicy::resolve(&CliSemanticOverrides::default());
1029 let embedder_name = FastEmbedder::canonical_name(&policy.quality_tier_embedder)?;
1030 FastEmbedder::runtime_model_dir_for(data_dir, embedder_name)
1031}
1032
1033fn model_dir_for_embedder_id(data_dir: &Path, embedder_id: &str) -> Option<PathBuf> {
1034 let embedder_name = FastEmbedder::canonical_name(embedder_id)?;
1035 FastEmbedder::runtime_model_dir_for(data_dir, embedder_name)
1036}
1037
1038fn semantic_tier_queryable(
1039 availability: &SemanticAvailability,
1040 tier: &SemanticTierAssetState,
1041) -> bool {
1042 if !tier.ready || tier.current_db_matches == Some(false) {
1043 return false;
1044 }
1045 let Some(embedder_id) = tier.embedder_id.as_deref() else {
1046 return false;
1047 };
1048 if semantic_embedder_is_hash(embedder_id) {
1049 !matches!(
1050 availability,
1051 SemanticAvailability::Disabled { .. }
1052 | SemanticAvailability::DatabaseUnavailable { .. }
1053 | SemanticAvailability::LoadFailed { .. }
1054 )
1055 } else {
1056 matches!(
1057 availability,
1058 SemanticAvailability::Ready { .. }
1059 | SemanticAvailability::UpdateAvailable { .. }
1060 | SemanticAvailability::IndexBuilding { .. }
1061 | SemanticAvailability::IndexMissing { .. }
1062 )
1063 }
1064}
1065
1066fn semantic_embedder_is_hash(embedder_id: &str) -> bool {
1067 embedder_id == HashEmbedder::default().id()
1068}
1069
1070fn semantic_manifest_progress(
1071 data_dir: &Path,
1072 current_db_fingerprint: Option<&str>,
1073) -> (
1074 SemanticTierAssetState,
1075 SemanticTierAssetState,
1076 SemanticBacklogProgressState,
1077 SemanticCheckpointProgressState,
1078) {
1079 let manifest = SemanticManifest::load_or_default(data_dir).unwrap_or_default();
1080 let fast_tier = semantic_tier_asset_state(manifest.fast_tier.as_ref(), current_db_fingerprint);
1081 let quality_tier =
1082 semantic_tier_asset_state(manifest.quality_tier.as_ref(), current_db_fingerprint);
1083 let backlog = semantic_backlog_progress_state(&manifest, current_db_fingerprint);
1084 let checkpoint =
1085 semantic_checkpoint_progress_state(manifest.checkpoint.as_ref(), current_db_fingerprint);
1086 (fast_tier, quality_tier, backlog, checkpoint)
1087}
1088
1089fn semantic_tier_asset_state(
1090 artifact: Option<&ArtifactRecord>,
1091 current_db_fingerprint: Option<&str>,
1092) -> SemanticTierAssetState {
1093 let Some(artifact) = artifact else {
1094 return SemanticTierAssetState::default();
1095 };
1096
1097 SemanticTierAssetState {
1098 present: true,
1099 ready: artifact.ready,
1100 current_db_matches: current_db_fingerprint.map(|fp| artifact.db_fingerprint == fp),
1101 conversation_count: Some(artifact.conversation_count),
1102 doc_count: Some(artifact.doc_count),
1103 embedder_id: Some(artifact.embedder_id.clone()),
1104 model_revision: Some(artifact.model_revision.clone()),
1105 completed_at_ms: Some(artifact.completed_at_ms),
1106 size_bytes: Some(artifact.size_bytes),
1107 index_path: None,
1108 }
1109}
1110
1111fn resolve_semantic_artifact_path(data_dir: &Path, recorded_path: &str) -> Option<PathBuf> {
1112 semantic_shard_artifact_path_is_safe(recorded_path).then(|| data_dir.join(recorded_path))
1113}
1114
1115fn complete_shard_records_for_state(
1116 data_dir: &Path,
1117 tier: TierKind,
1118 embedder_id: &str,
1119 db_fingerprint: &str,
1120) -> Option<Vec<SemanticShardRecord>> {
1121 let manifest = SemanticShardManifest::load(data_dir).ok().flatten()?;
1122 let summary = manifest.summary(tier, embedder_id, db_fingerprint);
1123 if !summary.complete {
1124 return None;
1125 }
1126 let mut records = manifest
1127 .shards
1128 .into_iter()
1129 .filter(|shard| shard.matches_generation(tier, embedder_id, db_fingerprint))
1130 .collect::<Vec<_>>();
1131 records.sort_by_key(|shard| shard.shard_index);
1132 if records.len() != usize::try_from(summary.shard_count).unwrap_or(usize::MAX) {
1133 return None;
1134 }
1135 let first = records.first()?;
1136 for (expected_index, shard) in records.iter().enumerate() {
1137 if shard.shard_index != u32::try_from(expected_index).unwrap_or(u32::MAX)
1138 || !shard.ready
1139 || !shard.mmap_ready
1140 || shard.model_revision != first.model_revision
1141 || shard.schema_version != SEMANTIC_SCHEMA_VERSION
1142 || shard.chunking_version != CHUNKING_STRATEGY_VERSION
1143 || shard.dimension == 0
1144 || shard.dimension != first.dimension
1145 || shard.total_conversations != first.total_conversations
1146 {
1147 return None;
1148 }
1149 let artifact_path = resolve_semantic_artifact_path(data_dir, &shard.index_path)?;
1150 if !artifact_path.is_file() {
1151 return None;
1152 }
1153 }
1154 Some(records)
1155}
1156
1157fn promote_complete_shard_generation_state(
1158 data_dir: &Path,
1159 tier: TierKind,
1160 embedder_id: &str,
1161 db_fingerprint: &str,
1162 state: &mut SemanticTierAssetState,
1163) {
1164 if state.ready && state.current_db_matches == Some(true) {
1165 return;
1166 }
1167 let Some(records) =
1168 complete_shard_records_for_state(data_dir, tier, embedder_id, db_fingerprint)
1169 else {
1170 return;
1171 };
1172 let doc_count = records
1173 .iter()
1174 .map(|shard| shard.doc_count)
1175 .fold(0, u64::saturating_add);
1176 let size_bytes = records
1177 .iter()
1178 .map(|shard| shard.size_bytes)
1179 .fold(0, u64::saturating_add);
1180 let completed_at_ms = records
1181 .iter()
1182 .map(|shard| shard.completed_at_ms)
1183 .max()
1184 .unwrap_or(0);
1185 let first = &records[0];
1186 let Some(first_index_path) = resolve_semantic_artifact_path(data_dir, &first.index_path) else {
1187 return;
1188 };
1189 *state = SemanticTierAssetState {
1190 present: true,
1191 ready: true,
1192 current_db_matches: Some(true),
1193 conversation_count: Some(first.total_conversations),
1194 doc_count: Some(doc_count),
1195 embedder_id: Some(first.embedder_id.clone()),
1196 model_revision: Some(first.model_revision.clone()),
1197 completed_at_ms: Some(completed_at_ms),
1198 size_bytes: Some(size_bytes),
1199 index_path: Some(first_index_path),
1200 };
1201}
1202
1203fn semantic_backlog_progress_state(
1204 manifest: &SemanticManifest,
1205 current_db_fingerprint: Option<&str>,
1206) -> SemanticBacklogProgressState {
1207 let backlog = &manifest.backlog;
1208 let current_db_matches = current_db_fingerprint.and_then(|fp| {
1209 (backlog.computed_at_ms > 0 || !backlog.db_fingerprint.is_empty())
1210 .then(|| backlog.is_current(fp))
1211 });
1212
1213 SemanticBacklogProgressState {
1214 total_conversations: backlog.total_conversations,
1215 fast_tier_processed: backlog.fast_tier_processed,
1216 fast_tier_remaining: backlog.fast_tier_remaining(),
1217 quality_tier_processed: backlog.quality_tier_processed,
1218 quality_tier_remaining: backlog.quality_tier_remaining(),
1219 pending_work: backlog.has_pending_work() || manifest.checkpoint.is_some(),
1220 current_db_matches,
1221 computed_at_ms: (backlog.computed_at_ms > 0).then_some(backlog.computed_at_ms),
1222 }
1223}
1224
1225fn semantic_checkpoint_progress_state(
1226 checkpoint: Option<&BuildCheckpoint>,
1227 current_db_fingerprint: Option<&str>,
1228) -> SemanticCheckpointProgressState {
1229 let Some(checkpoint) = checkpoint else {
1230 return SemanticCheckpointProgressState::default();
1231 };
1232
1233 SemanticCheckpointProgressState {
1234 active: true,
1235 tier: Some(checkpoint.tier.as_str()),
1236 current_db_matches: current_db_fingerprint.map(|fp| checkpoint.is_valid(fp)),
1237 completed: Some(checkpoint.is_complete()),
1238 conversations_processed: Some(checkpoint.conversations_processed),
1239 total_conversations: Some(checkpoint.total_conversations),
1240 progress_pct: Some(checkpoint.progress_pct()),
1241 docs_embedded: Some(checkpoint.docs_embedded),
1242 last_offset: Some(checkpoint.last_offset),
1243 saved_at_ms: Some(checkpoint.saved_at_ms),
1244 }
1245}
1246
1247struct InspectLexicalAssetsInput<'a> {
1248 data_dir: &'a Path,
1249 db_path: &'a Path,
1250 stale_threshold: u64,
1251 last_indexed_at_ms: Option<i64>,
1252 now_ms: i64,
1257 maintenance: SearchMaintenanceSnapshot,
1258 db_available: bool,
1259 compute_lexical_fingerprint: bool,
1260}
1261
1262fn inspect_lexical_assets(input: InspectLexicalAssetsInput<'_>) -> Result<LexicalAssetState> {
1263 let InspectLexicalAssetsInput {
1264 data_dir,
1265 db_path,
1266 stale_threshold,
1267 last_indexed_at_ms,
1268 now_ms,
1269 maintenance,
1270 db_available,
1271 compute_lexical_fingerprint,
1272 } = input;
1273 let index_path = crate::search::tantivy::expected_index_dir(data_dir);
1274 let checkpoint = load_lexical_rebuild_checkpoint(&index_path)
1275 .with_context(|| format!("loading lexical checkpoint from {}", index_path.display()))?;
1276 let current_db_fingerprint = if db_available && compute_lexical_fingerprint {
1277 Some(
1278 lexical_storage_fingerprint_for_db(db_path).with_context(|| {
1279 format!(
1280 "computing lexical storage fingerprint for {}",
1281 db_path.display()
1282 )
1283 })?,
1284 )
1285 } else {
1286 None
1287 };
1288
1289 Ok(lexical_state_from_observations(LexicalObservationInput {
1290 index_path: &index_path,
1291 db_path,
1292 stale_threshold,
1293 last_indexed_at_ms,
1294 now_ms,
1295 maintenance,
1296 checkpoint: checkpoint.as_ref(),
1297 current_db_fingerprint: current_db_fingerprint.as_deref(),
1298 }))
1299}
1300
1301struct LexicalObservationInput<'a> {
1302 index_path: &'a Path,
1303 db_path: &'a Path,
1304 stale_threshold: u64,
1305 last_indexed_at_ms: Option<i64>,
1306 now_ms: i64,
1308 maintenance: SearchMaintenanceSnapshot,
1309 checkpoint: Option<&'a LexicalRebuildCheckpoint>,
1310 current_db_fingerprint: Option<&'a str>,
1311}
1312
1313fn lexical_state_from_observations(input: LexicalObservationInput<'_>) -> LexicalAssetState {
1314 let LexicalObservationInput {
1315 index_path,
1316 db_path,
1317 stale_threshold,
1318 last_indexed_at_ms,
1319 now_ms,
1320 maintenance,
1321 checkpoint,
1322 current_db_fingerprint,
1323 } = input;
1324 let exists = crate::search::tantivy::searchable_index_exists(index_path);
1325 let checkpoint_db_matches =
1326 checkpoint.map(|state| crate::stored_path_identity_matches(&state.db_path, db_path));
1327 let schema_matches = checkpoint.map(|state| state.schema_hash == SCHEMA_HASH);
1328 let page_size_matches =
1329 checkpoint.map(|state| state.page_size == LEXICAL_REBUILD_PAGE_SIZE_PUBLIC);
1330 let page_size_compatible =
1331 checkpoint.map(|state| lexical_rebuild_page_size_is_compatible(state.page_size));
1332 let checkpoint_fingerprint = checkpoint.map(|state| state.storage_fingerprint.as_str());
1333 let fingerprint_matches = match (current_db_fingerprint, checkpoint_fingerprint) {
1334 (Some(current), Some(saved)) => Some(lexical_storage_fingerprints_match(current, saved)),
1335 _ => None,
1336 };
1337 let checkpoint_incomplete = checkpoint.is_some_and(|state| !state.completed);
1338 let checkpoint_db_mismatch = checkpoint_db_matches == Some(false);
1339 let contract_mismatch = schema_matches == Some(false) || page_size_compatible == Some(false);
1340 let fingerprint_mismatch = fingerprint_matches == Some(false);
1341 let now_secs: u64 = now_ms.div_euclid(1000).max(0) as u64;
1347 let age_seconds = last_indexed_at_ms
1348 .and_then(|ts| (ts > 0).then(|| now_secs.saturating_sub((ts / 1000) as u64)));
1349 let age_stale = match age_seconds {
1350 Some(age) => age > stale_threshold,
1351 None => true,
1352 };
1353 let maintenance_targets_current_db = maintenance
1354 .db_path
1355 .as_ref()
1356 .is_none_or(|lock_db_path| crate::path_identities_match(lock_db_path, db_path));
1357 let watch_active = maintenance.active
1358 && maintenance_targets_current_db
1359 && maintenance
1360 .mode
1361 .is_some_and(SearchMaintenanceMode::watch_active);
1362 let rebuilding = maintenance.active
1363 && maintenance_targets_current_db
1364 && maintenance
1365 .mode
1366 .is_some_and(SearchMaintenanceMode::rebuild_active);
1367 let active_rebuild_progress = rebuilding;
1368 let stall_age_ms = if rebuilding && maintenance_targets_current_db {
1379 maintenance_stall_age_ms(&maintenance, now_ms)
1380 } else {
1381 None
1382 };
1383 let stalled = stall_age_ms.is_some();
1384 let last_progress_at_ms = maintenance
1385 .last_progress_at_ms
1386 .filter(|_| maintenance_targets_current_db);
1387 let last_progress_age_ms = last_progress_at_ms
1388 .filter(|_| rebuilding)
1389 .map(|ts| now_ms.saturating_sub(ts));
1390 let stale = if rebuilding {
1391 !exists || contract_mismatch
1395 } else {
1396 exists
1397 && (age_stale
1398 || checkpoint_db_mismatch
1399 || checkpoint_incomplete
1400 || contract_mismatch
1401 || fingerprint_mismatch)
1402 };
1403 let fresh = exists && !stale && !rebuilding;
1404 let status = if stalled {
1405 "stalled"
1406 } else if rebuilding {
1407 "building"
1408 } else if !exists {
1409 "missing"
1410 } else if stale {
1411 "stale"
1412 } else {
1413 "ready"
1414 };
1415 let status_reason = if stalled {
1416 let secs = stall_age_ms.unwrap_or(0) / 1000;
1417 Some(format!(
1418 "indexing thread has not posted forward progress for {secs}s while the lock heartbeat keeps refreshing — see issue #258 for diagnostics (run `cass doctor check --json` and capture a stack trace)"
1419 ))
1420 } else if rebuilding {
1421 Some("lexical rebuild is in progress".to_string())
1422 } else if !exists {
1423 Some("lexical Tantivy metadata missing".to_string())
1424 } else if checkpoint_db_mismatch {
1425 Some("lexical rebuild checkpoint points at a different database".to_string())
1426 } else if contract_mismatch {
1427 Some("lexical rebuild checkpoint no longer matches the active lexical contract".to_string())
1428 } else if fingerprint_mismatch {
1429 Some("database fingerprint changed since the last lexical checkpoint".to_string())
1430 } else if checkpoint_incomplete {
1431 Some("lexical rebuild checkpoint is incomplete".to_string())
1432 } else if age_stale {
1433 Some("lexical index is older than the stale threshold".to_string())
1434 } else {
1435 None
1436 };
1437 let checkpoint_progress_usable = checkpoint.is_some()
1438 && checkpoint_db_matches == Some(true)
1439 && schema_matches == Some(true)
1440 && page_size_compatible == Some(true)
1441 && if active_rebuild_progress {
1442 true
1443 } else {
1444 current_db_fingerprint.is_some() && fingerprint_matches == Some(true)
1445 };
1446 let pending_sessions = checkpoint
1447 .filter(|_| checkpoint_progress_usable)
1448 .map(|state| {
1449 state
1450 .total_conversations
1451 .saturating_sub(state.processed_conversations) as u64
1452 })
1453 .unwrap_or(0);
1454 let maintenance_activity_at_ms = maintenance_targets_current_db
1455 .then_some(())
1456 .and(maintenance.updated_at_ms.or(maintenance.started_at_ms));
1457 let checkpoint_activity_at_ms = checkpoint
1458 .filter(|_| checkpoint_progress_usable)
1459 .and_then(|state| (state.updated_at_ms > 0).then_some(state.updated_at_ms));
1460 let activity_at_ms = match (checkpoint_activity_at_ms, maintenance_activity_at_ms) {
1461 (Some(checkpoint_ts), Some(maintenance_ts)) => Some(checkpoint_ts.max(maintenance_ts)),
1462 (Some(checkpoint_ts), None) => Some(checkpoint_ts),
1463 (None, Some(maintenance_ts)) => Some(maintenance_ts),
1464 (None, None) => None,
1465 };
1466
1467 LexicalAssetState {
1468 status,
1469 exists,
1470 fresh,
1471 stale,
1472 rebuilding,
1473 stalled,
1474 last_progress_age_ms,
1475 last_progress_at_ms,
1476 watch_active,
1477 last_indexed_at_ms,
1478 age_seconds,
1479 stale_threshold_seconds: stale_threshold,
1480 activity_at_ms,
1481 pending_sessions,
1482 processed_conversations: checkpoint
1483 .filter(|_| checkpoint_progress_usable)
1484 .map(|state| state.processed_conversations as u64),
1485 total_conversations: checkpoint
1486 .filter(|_| checkpoint_progress_usable)
1487 .map(|state| state.total_conversations as u64),
1488 indexed_docs: checkpoint
1489 .filter(|_| checkpoint_progress_usable)
1490 .map(|state| state.indexed_docs as u64),
1491 status_reason,
1492 fingerprint: LexicalFingerprintState {
1493 current_db_fingerprint: current_db_fingerprint.map(ToOwned::to_owned),
1494 checkpoint_fingerprint: checkpoint.map(|state| state.storage_fingerprint.clone()),
1495 matches_current_db_fingerprint: fingerprint_matches,
1496 },
1497 checkpoint: LexicalCheckpointState {
1498 present: checkpoint.is_some(),
1499 completed: checkpoint.map(|state| state.completed),
1500 db_matches: checkpoint_db_matches,
1501 schema_matches,
1502 page_size_matches,
1503 page_size_compatible,
1504 },
1505 }
1506}
1507
1508fn semantic_embedder_id(
1509 availability: &SemanticAvailability,
1510 preference: SemanticPreference,
1511) -> Option<String> {
1512 match availability {
1513 SemanticAvailability::Ready { embedder_id }
1514 | SemanticAvailability::UpdateAvailable { embedder_id, .. }
1515 | SemanticAvailability::IndexBuilding { embedder_id, .. } => Some(embedder_id.clone()),
1516 SemanticAvailability::HashFallback => Some(HashEmbedder::default().id().to_string()),
1517 _ => match preference {
1518 SemanticPreference::DefaultModel => {
1519 Some(FastEmbedder::embedder_id_static().to_string())
1520 }
1521 SemanticPreference::HashFallback => Some(HashEmbedder::default().id().to_string()),
1522 },
1523 }
1524}
1525
1526fn semantic_vector_index_path(
1527 data_dir: &Path,
1528 availability: &SemanticAvailability,
1529 preference: SemanticPreference,
1530) -> Option<PathBuf> {
1531 match availability {
1532 SemanticAvailability::IndexMissing { index_path } => Some(index_path.clone()),
1533 _ => semantic_embedder_id(availability, preference)
1534 .map(|embedder_id| vector_index_path(data_dir, &embedder_id)),
1535 }
1536}
1537
1538fn semantic_progressive_assets_ready(data_dir: &Path) -> bool {
1539 let vector_dir = data_dir.join(VECTOR_INDEX_DIR);
1540 vector_dir.join("vector.fast.idx").is_file() && vector_dir.join("vector.quality.idx").is_file()
1541}
1542
1543fn semantic_availability_code(availability: &SemanticAvailability) -> &'static str {
1544 match availability {
1545 SemanticAvailability::Ready { .. } => "ready",
1546 SemanticAvailability::NotInstalled => "not_installed",
1547 SemanticAvailability::NeedsConsent => "needs_consent",
1548 SemanticAvailability::Downloading { .. } => "downloading",
1549 SemanticAvailability::Verifying => "verifying",
1550 SemanticAvailability::IndexBuilding { .. } => "index_building",
1551 SemanticAvailability::HashFallback => "hash_fallback",
1552 SemanticAvailability::Disabled { .. } => "disabled",
1553 SemanticAvailability::ModelMissing { .. } => "model_missing",
1554 SemanticAvailability::IndexMissing { .. } => "index_missing",
1555 SemanticAvailability::DatabaseUnavailable { .. } => "database_unavailable",
1556 SemanticAvailability::LoadFailed { .. } => "load_failed",
1557 SemanticAvailability::UpdateAvailable { .. } => "update_available",
1558 }
1559}
1560
1561fn semantic_status_from_availability(availability: &SemanticAvailability) -> &'static str {
1562 match availability {
1563 SemanticAvailability::Ready { .. } => "ready",
1564 SemanticAvailability::HashFallback => "hash_fallback",
1565 SemanticAvailability::Downloading { .. }
1566 | SemanticAvailability::Verifying
1567 | SemanticAvailability::IndexBuilding { .. } => "building",
1568 SemanticAvailability::Disabled { .. } => "disabled",
1569 SemanticAvailability::UpdateAvailable { .. } => "stale",
1570 SemanticAvailability::NotInstalled
1571 | SemanticAvailability::NeedsConsent
1572 | SemanticAvailability::ModelMissing { .. }
1573 | SemanticAvailability::IndexMissing { .. } => "missing",
1574 SemanticAvailability::DatabaseUnavailable { .. }
1575 | SemanticAvailability::LoadFailed { .. } => "error",
1576 }
1577}
1578
1579fn semantic_hint(
1580 availability: &SemanticAvailability,
1581 preference: SemanticPreference,
1582) -> Option<String> {
1583 let hint = match (preference, availability) {
1584 (SemanticPreference::HashFallback, SemanticAvailability::IndexMissing { .. }) => {
1585 "Run 'cass index --semantic --embedder hash' to build the hash vector index; lexical search remains available without semantic assets"
1586 }
1587 (SemanticPreference::HashFallback, SemanticAvailability::LoadFailed { .. })
1588 | (SemanticPreference::HashFallback, SemanticAvailability::DatabaseUnavailable { .. }) => {
1589 "Run 'cass index --semantic --embedder hash' after the database is healthy; lexical search remains available"
1590 }
1591 (SemanticPreference::HashFallback, _) => {
1592 "Run 'cass index --semantic --embedder hash' to build the hash vector index; lexical search remains available"
1593 }
1594 (_, SemanticAvailability::NotInstalled)
1595 | (_, SemanticAvailability::NeedsConsent)
1596 | (_, SemanticAvailability::ModelMissing { .. }) => {
1597 "Run 'cass models install' and then 'cass index --semantic'; lexical search remains available without the model"
1598 }
1599 (_, SemanticAvailability::IndexMissing { .. })
1600 | (_, SemanticAvailability::UpdateAvailable { .. })
1601 | (_, SemanticAvailability::IndexBuilding { .. }) => {
1602 "Run 'cass index --semantic' to build or refresh vector assets; lexical search remains available"
1603 }
1604 (_, SemanticAvailability::Downloading { .. }) | (_, SemanticAvailability::Verifying) => {
1605 "Wait for the semantic model installation to finish; lexical search remains available"
1606 }
1607 (_, SemanticAvailability::Disabled { .. }) => {
1608 "Semantic search is disabled by policy; lexical search remains available, or re-enable semantic search"
1609 }
1610 (_, SemanticAvailability::DatabaseUnavailable { .. })
1611 | (_, SemanticAvailability::LoadFailed { .. }) => {
1612 "Restore the semantic assets and database; lexical search remains available when the archive database is healthy"
1613 }
1614 (_, SemanticAvailability::Ready { .. }) | (_, SemanticAvailability::HashFallback) => {
1615 return None;
1616 }
1617 };
1618 Some(hint.to_string())
1619}
1620
1621#[cfg_attr(not(test), allow(dead_code))]
1626const HEARTBEAT_STALE_THRESHOLD_MS: i64 = 30_000;
1627#[cfg_attr(not(test), allow(dead_code))]
1628const BOUNDED_WAIT_DEFAULT: Duration = Duration::from_secs(5);
1629#[cfg_attr(not(test), allow(dead_code))]
1630const POLL_INTERVAL_DEFAULT: Duration = Duration::from_millis(250);
1631
1632#[cfg_attr(not(test), allow(dead_code))]
1633#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
1634pub(crate) enum MaintenanceCoordinationOutcome {
1635 Idle,
1636 Active {
1637 job_id: String,
1638 job_kind: SearchMaintenanceJobKind,
1639 phase: Option<String>,
1640 started_at_ms: i64,
1641 updated_at_ms: i64,
1642 },
1643 Stale {
1644 job_id: String,
1645 reason: String,
1646 },
1647}
1648
1649#[cfg_attr(not(test), allow(dead_code))]
1650#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
1651pub(crate) enum MaintenanceDecision {
1652 Launch,
1653 AttachOrWait {
1654 job_id: String,
1655 job_kind: SearchMaintenanceJobKind,
1656 phase: Option<String>,
1657 elapsed_ms: u64,
1658 },
1659 FailOpen {
1660 reason: String,
1661 },
1662}
1663
1664#[cfg_attr(not(test), allow(dead_code))]
1665pub(crate) fn evaluate_maintenance_coordination(
1666 data_dir: &Path,
1667 now_ms: i64,
1668) -> MaintenanceCoordinationOutcome {
1669 evaluate_maintenance_coordination_from_snapshot(
1670 &read_search_maintenance_snapshot(data_dir),
1671 now_ms,
1672 )
1673}
1674
1675#[cfg_attr(not(test), allow(dead_code))]
1676pub(crate) fn evaluate_maintenance_coordination_from_snapshot(
1677 snapshot: &SearchMaintenanceSnapshot,
1678 now_ms: i64,
1679) -> MaintenanceCoordinationOutcome {
1680 if !snapshot.active {
1681 return MaintenanceCoordinationOutcome::Idle;
1682 }
1683 let job_id = maintenance_snapshot_job_id(snapshot);
1684 if let Some(updated_at_ms) = snapshot.updated_at_ms {
1685 let heartbeat_age_ms = now_ms.saturating_sub(updated_at_ms);
1686 if heartbeat_age_ms > HEARTBEAT_STALE_THRESHOLD_MS {
1687 return MaintenanceCoordinationOutcome::Stale {
1688 job_id,
1689 reason: format!(
1690 "heartbeat is {heartbeat_age_ms}ms old (threshold {HEARTBEAT_STALE_THRESHOLD_MS}ms)"
1691 ),
1692 };
1693 }
1694 }
1695 if let Some(stall_age_ms) = maintenance_stall_age_ms(snapshot, now_ms) {
1702 let threshold_ms = rebuild_stall_detect_threshold_ms().unwrap_or(0);
1703 return MaintenanceCoordinationOutcome::Stale {
1704 job_id,
1705 reason: format!(
1706 "indexing thread has not posted forward progress for {stall_age_ms}ms while the heartbeat keeps refreshing (stall threshold {threshold_ms}ms) — see issue #258"
1707 ),
1708 };
1709 }
1710 MaintenanceCoordinationOutcome::Active {
1711 job_id,
1712 job_kind: snapshot
1713 .job_kind
1714 .unwrap_or(SearchMaintenanceJobKind::LexicalRefresh),
1715 phase: snapshot.phase.clone(),
1716 started_at_ms: snapshot.started_at_ms.unwrap_or(0),
1717 updated_at_ms: snapshot.updated_at_ms.unwrap_or(now_ms),
1718 }
1719}
1720
1721fn maintenance_snapshot_job_id(snapshot: &SearchMaintenanceSnapshot) -> String {
1722 snapshot
1723 .job_id
1724 .as_ref()
1725 .filter(|job_id| !job_id.is_empty())
1726 .cloned()
1727 .unwrap_or_else(|| {
1728 let mode = snapshot
1729 .mode
1730 .map(|mode| mode.as_lock_value())
1731 .unwrap_or("unknown");
1732 let owner = snapshot
1733 .pid
1734 .map(|pid| pid.to_string())
1735 .unwrap_or_else(|| "unknown-owner".to_string());
1736 format!("{mode}-active-lock-{owner}")
1737 })
1738}
1739
1740fn maintenance_snapshot_job_kind(snapshot: &SearchMaintenanceSnapshot) -> SearchMaintenanceJobKind {
1741 snapshot
1742 .job_kind
1743 .unwrap_or(SearchMaintenanceJobKind::LexicalRefresh)
1744}
1745
1746fn maintenance_elapsed_ms(snapshot: &SearchMaintenanceSnapshot, now_ms: i64) -> u64 {
1747 snapshot
1748 .started_at_ms
1749 .map(|started_at_ms| u64::try_from(now_ms.saturating_sub(started_at_ms)).unwrap_or(0))
1750 .unwrap_or(0)
1751}
1752
1753fn stale_heartbeat_phase(snapshot: &SearchMaintenanceSnapshot) -> Option<String> {
1754 snapshot
1755 .phase
1756 .clone()
1757 .or_else(|| Some("stale-heartbeat".to_string()))
1758}
1759
1760#[cfg_attr(not(test), allow(dead_code))]
1761pub(crate) fn decide_maintenance_action(data_dir: &Path, now_ms: i64) -> MaintenanceDecision {
1762 decide_maintenance_action_from_snapshot(&read_search_maintenance_snapshot(data_dir), now_ms)
1763}
1764
1765#[cfg_attr(not(test), allow(dead_code))]
1766pub(crate) fn decide_maintenance_action_from_snapshot(
1767 snapshot: &SearchMaintenanceSnapshot,
1768 now_ms: i64,
1769) -> MaintenanceDecision {
1770 match evaluate_maintenance_coordination_from_snapshot(snapshot, now_ms) {
1771 MaintenanceCoordinationOutcome::Idle => MaintenanceDecision::Launch,
1772 MaintenanceCoordinationOutcome::Stale { job_id, .. } => MaintenanceDecision::AttachOrWait {
1773 job_id,
1774 job_kind: maintenance_snapshot_job_kind(snapshot),
1775 phase: stale_heartbeat_phase(snapshot),
1776 elapsed_ms: maintenance_elapsed_ms(snapshot, now_ms),
1777 },
1778 MaintenanceCoordinationOutcome::Active {
1779 job_id,
1780 job_kind,
1781 phase,
1782 started_at_ms,
1783 ..
1784 } => MaintenanceDecision::AttachOrWait {
1785 job_id,
1786 job_kind,
1787 phase,
1788 elapsed_ms: u64::try_from(now_ms.saturating_sub(started_at_ms)).unwrap_or(0),
1789 },
1790 }
1791}
1792
1793#[cfg_attr(not(test), allow(dead_code))]
1794pub(crate) fn decide_search_failopen(
1795 data_dir: &Path,
1796 now_ms: i64,
1797 lexical_available: bool,
1798) -> MaintenanceDecision {
1799 let snapshot = read_search_maintenance_snapshot(data_dir);
1800 match evaluate_maintenance_coordination_from_snapshot(&snapshot, now_ms) {
1801 MaintenanceCoordinationOutcome::Idle => MaintenanceDecision::Launch,
1802 MaintenanceCoordinationOutcome::Stale { job_id, reason } => {
1803 if lexical_available {
1804 MaintenanceDecision::FailOpen {
1805 reason: format!(
1806 "maintenance job {job_id} has a stale heartbeat ({reason}); lexical index is available, failing open"
1807 ),
1808 }
1809 } else {
1810 MaintenanceDecision::AttachOrWait {
1811 job_id,
1812 job_kind: maintenance_snapshot_job_kind(&snapshot),
1813 phase: stale_heartbeat_phase(&snapshot),
1814 elapsed_ms: maintenance_elapsed_ms(&snapshot, now_ms),
1815 }
1816 }
1817 }
1818 MaintenanceCoordinationOutcome::Active {
1819 job_id,
1820 job_kind,
1821 phase,
1822 started_at_ms,
1823 ..
1824 } => {
1825 if lexical_available {
1826 MaintenanceDecision::FailOpen {
1827 reason: format!(
1828 "maintenance job {job_id} is active; lexical index is available, failing open"
1829 ),
1830 }
1831 } else {
1832 MaintenanceDecision::AttachOrWait {
1833 job_id,
1834 job_kind,
1835 phase,
1836 elapsed_ms: u64::try_from(now_ms.saturating_sub(started_at_ms)).unwrap_or(0),
1837 }
1838 }
1839 }
1840 }
1841}
1842
1843#[cfg_attr(not(test), allow(dead_code))]
1844pub(crate) struct PollResult {
1845 pub outcome: MaintenanceCoordinationOutcome,
1846 pub polls: u32,
1847 pub elapsed: Duration,
1848 pub timed_out: bool,
1849}
1850
1851#[cfg_attr(not(test), allow(dead_code))]
1852pub(crate) fn poll_maintenance_until_idle(
1853 data_dir: &Path,
1854 timeout: Option<Duration>,
1855 poll_interval: Option<Duration>,
1856) -> PollResult {
1857 let timeout = timeout.unwrap_or(BOUNDED_WAIT_DEFAULT);
1858 let interval = poll_interval.unwrap_or(POLL_INTERVAL_DEFAULT);
1859 let start = Instant::now();
1860 let deadline = start + timeout;
1861 let mut polls = 0u32;
1862 loop {
1863 let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
1864 let outcome = evaluate_maintenance_coordination(data_dir, now_ms);
1865 polls += 1;
1866 if matches!(outcome, MaintenanceCoordinationOutcome::Idle) {
1867 return PollResult {
1868 outcome,
1869 polls,
1870 elapsed: start.elapsed(),
1871 timed_out: false,
1872 };
1873 }
1874
1875 let now = Instant::now();
1876 if now >= deadline {
1877 return PollResult {
1878 outcome,
1879 polls,
1880 elapsed: start.elapsed(),
1881 timed_out: true,
1882 };
1883 }
1884 let remaining = deadline - now;
1885 std::thread::sleep(interval.min(remaining));
1886 }
1887}
1888
1889#[cfg_attr(not(test), allow(dead_code))]
1894const MAINTENANCE_EVENTS_FILE: &str = ".maintenance-events.jsonl";
1895#[cfg_attr(not(test), allow(dead_code))]
1896const YIELD_SIGNAL_FILE: &str = "maintenance-yield.signal";
1897#[cfg_attr(not(test), allow(dead_code))]
1898const MAX_EVENT_LOG_ENTRIES: usize = 500;
1899
1900#[cfg_attr(not(test), allow(dead_code))]
1901#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
1902pub(crate) struct MaintenanceEvent {
1903 pub timestamp_ms: i64,
1904 pub job_id: String,
1905 pub actor_pid: u32,
1906 pub kind: MaintenanceEventKind,
1907}
1908
1909#[cfg_attr(not(test), allow(dead_code))]
1910#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
1911pub(crate) enum MaintenanceEventKind {
1912 Started { job_kind: String, phase: String },
1913 PhaseChanged { from: String, to: String },
1914 Progress { processed: u64, total: u64 },
1915 YieldRequested { requester_pid: u32, reason: String },
1916 Paused { reason: String },
1917 Resumed,
1918 Completed { summary: String },
1919 Failed { error: String },
1920 Cancelled { reason: String },
1921}
1922
1923#[cfg_attr(not(test), allow(dead_code))]
1924pub(crate) fn append_maintenance_event(data_dir: &Path, event: &MaintenanceEvent) -> Result<()> {
1925 let path = data_dir.join(MAINTENANCE_EVENTS_FILE);
1926 let line = serde_json::to_string(event).with_context(|| "serializing maintenance event")?;
1927 let mut file = OpenOptions::new()
1928 .create(true)
1929 .append(true)
1930 .open(&path)
1931 .with_context(|| format!("opening maintenance event log at {}", path.display()))?;
1932 use std::io::Write;
1933 writeln!(file, "{line}")
1934 .with_context(|| format!("appending to maintenance event log at {}", path.display()))?;
1935 Ok(())
1936}
1937
1938#[cfg_attr(not(test), allow(dead_code))]
1939pub(crate) fn read_maintenance_events(
1940 data_dir: &Path,
1941 after_ms: Option<i64>,
1942 limit: Option<usize>,
1943) -> Vec<MaintenanceEvent> {
1944 let path = data_dir.join(MAINTENANCE_EVENTS_FILE);
1945 let contents = match std::fs::read_to_string(&path) {
1946 Ok(c) => c,
1947 Err(_) => return Vec::new(),
1948 };
1949 let cap = limit.unwrap_or(MAX_EVENT_LOG_ENTRIES);
1950 contents
1951 .lines()
1952 .filter_map(|line| serde_json::from_str::<MaintenanceEvent>(line).ok())
1953 .filter(|e| after_ms.is_none_or(|threshold| e.timestamp_ms > threshold))
1954 .rev()
1955 .take(cap)
1956 .collect::<Vec<_>>()
1957 .into_iter()
1958 .rev()
1959 .collect()
1960}
1961
1962#[cfg_attr(not(test), allow(dead_code))]
1963pub(crate) fn truncate_maintenance_event_log(data_dir: &Path) -> Result<()> {
1964 let path = data_dir.join(MAINTENANCE_EVENTS_FILE);
1965 let contents = match std::fs::read_to_string(&path) {
1966 Ok(c) => c,
1967 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
1968 Err(e) => {
1969 return Err(e).with_context(|| {
1970 format!("reading event log for truncation at {}", path.display())
1971 });
1972 }
1973 };
1974 let lines: Vec<&str> = contents.lines().collect();
1975 if lines.len() <= MAX_EVENT_LOG_ENTRIES {
1976 return Ok(());
1977 }
1978 let keep = &lines[lines.len() - MAX_EVENT_LOG_ENTRIES..];
1979 let mut output = keep.join("\n");
1980 output.push('\n');
1981 std::fs::write(&path, output)
1982 .with_context(|| format!("truncating event log at {}", path.display()))
1983}
1984
1985#[cfg_attr(not(test), allow(dead_code))]
1990#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
1991pub(crate) struct YieldRequest {
1992 pub requester_pid: u32,
1993 pub requested_at_ms: i64,
1994 pub reason: String,
1995}
1996
1997#[cfg_attr(not(test), allow(dead_code))]
1998pub(crate) fn request_yield(data_dir: &Path, reason: &str) -> Result<()> {
1999 let path = data_dir.join(YIELD_SIGNAL_FILE);
2000 let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
2001 let req = YieldRequest {
2002 requester_pid: std::process::id(),
2003 requested_at_ms: now_ms,
2004 reason: reason.to_string(),
2005 };
2006 let payload = serde_json::to_string(&req).with_context(|| "serializing yield request")?;
2007 std::fs::write(&path, payload)
2008 .with_context(|| format!("writing yield signal to {}", path.display()))
2009}
2010
2011#[cfg_attr(not(test), allow(dead_code))]
2012pub(crate) fn check_yield_requested(data_dir: &Path) -> Option<YieldRequest> {
2013 let path = data_dir.join(YIELD_SIGNAL_FILE);
2014 let contents = std::fs::read_to_string(&path).ok()?;
2015 serde_json::from_str::<YieldRequest>(&contents).ok()
2016}
2017
2018#[cfg_attr(not(test), allow(dead_code))]
2019pub(crate) fn clear_yield_signal(data_dir: &Path) -> Result<()> {
2020 let path = data_dir.join(YIELD_SIGNAL_FILE);
2021 match std::fs::remove_file(&path) {
2022 Ok(()) => Ok(()),
2023 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
2024 Err(e) => Err(e).with_context(|| format!("clearing yield signal at {}", path.display())),
2025 }
2026}
2027
2028#[cfg_attr(not(test), allow(dead_code))]
2033#[derive(Debug, Clone, serde::Serialize)]
2034pub(crate) struct UnifiedMaintenanceView {
2035 pub coordination: MaintenanceCoordinationOutcome,
2036 pub snapshot: SearchMaintenanceSnapshot,
2037 pub yield_pending: Option<YieldRequest>,
2038 pub recent_events: Vec<MaintenanceEvent>,
2039 pub decision: MaintenanceDecision,
2040}
2041
2042#[cfg_attr(not(test), allow(dead_code))]
2043pub(crate) fn unified_maintenance_view(
2044 data_dir: &Path,
2045 lexical_available: bool,
2046) -> UnifiedMaintenanceView {
2047 let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
2048 let snapshot = read_search_maintenance_snapshot(data_dir);
2049 let coordination = evaluate_maintenance_coordination_from_snapshot(&snapshot, now_ms);
2050 let yield_pending = check_yield_requested(data_dir);
2051 let recent_events = read_maintenance_events(data_dir, None, Some(20));
2052 let decision = if lexical_available {
2053 match &coordination {
2054 MaintenanceCoordinationOutcome::Active {
2055 job_id,
2056 job_kind,
2057 phase,
2058 ..
2059 } => MaintenanceDecision::FailOpen {
2060 reason: format!(
2061 "maintenance job {} ({:?}) is active (phase: {}); lexical available, failing open",
2062 job_id,
2063 job_kind,
2064 phase.as_deref().unwrap_or("unknown")
2065 ),
2066 },
2067 MaintenanceCoordinationOutcome::Stale { job_id, reason } => {
2068 MaintenanceDecision::FailOpen {
2069 reason: format!(
2070 "maintenance job {job_id} has a stale heartbeat ({reason}); lexical available, failing open"
2071 ),
2072 }
2073 }
2074 _ => decide_maintenance_action_from_snapshot(&snapshot, now_ms),
2075 }
2076 } else {
2077 decide_maintenance_action_from_snapshot(&snapshot, now_ms)
2078 };
2079
2080 UnifiedMaintenanceView {
2081 coordination,
2082 snapshot,
2083 yield_pending,
2084 recent_events,
2085 decision,
2086 }
2087}
2088
2089#[cfg(test)]
2090mod tests {
2091 use super::*;
2092
2093 #[cfg(windows)]
2094 fn active_lock_fixture_pid(_non_windows_pid: u32) -> u32 {
2095 std::process::id()
2096 }
2097
2098 #[cfg(not(windows))]
2099 fn active_lock_fixture_pid(non_windows_pid: u32) -> u32 {
2100 non_windows_pid
2101 }
2102
2103 fn write_locked_metadata(
2104 lock_path: &Path,
2105 owner: &mut std::fs::File,
2106 contents: &str,
2107 ) -> std::io::Result<()> {
2108 use std::io::{Seek, SeekFrom, Write};
2109
2110 owner.set_len(0)?;
2111 owner.seek(SeekFrom::Start(0))?;
2112 owner.write_all(contents.as_bytes())?;
2113 owner.flush()?;
2114 owner.sync_all()?;
2115 write_index_run_lock_metadata_sidecar(lock_path, contents)
2116 .map_err(std::io::Error::other)?;
2117 Ok(())
2118 }
2119
2120 #[test]
2121 fn maintenance_mode_round_trips_lock_values() {
2122 for mode in [
2123 SearchMaintenanceMode::Index,
2124 SearchMaintenanceMode::WatchStartup,
2125 SearchMaintenanceMode::Watch,
2126 SearchMaintenanceMode::WatchOnce,
2127 ] {
2128 assert_eq!(
2129 SearchMaintenanceMode::parse_lock_value(mode.as_lock_value()),
2130 Some(mode)
2131 );
2132 }
2133 }
2134
2135 #[test]
2136 fn maintenance_job_kind_round_trips_lock_values() {
2137 for kind in [
2138 SearchMaintenanceJobKind::LexicalRefresh,
2139 SearchMaintenanceJobKind::SemanticAcquire,
2140 ] {
2141 assert_eq!(
2142 SearchMaintenanceJobKind::parse_lock_value(kind.as_lock_value()),
2143 Some(kind)
2144 );
2145 }
2146 }
2147
2148 #[test]
2149 fn stale_lock_metadata_from_dead_owner_is_reaped_on_read() {
2150 let temp = tempfile::tempdir().expect("tempdir");
2157 let lock_path = temp.path().join("index-run.lock");
2158 std::fs::write(
2163 &lock_path,
2164 concat!(
2165 "pid=4242\n",
2166 "started_at_ms=1733000111000\n",
2167 "updated_at_ms=1733000112000\n",
2168 "db_path=/tmp/cass/agent_search.db\n",
2169 "mode=index\n",
2170 "job_id=lexical-refresh-1733000111000-4242\n",
2171 "job_kind=lexical_refresh\n",
2172 "phase=rebuilding\n"
2173 ),
2174 )
2175 .expect("write lock metadata");
2176
2177 let snapshot = read_search_maintenance_snapshot(temp.path());
2178 assert!(!snapshot.active, "no owner, must not be reported active");
2179 assert!(
2180 !snapshot.orphaned,
2181 "stale metadata must be reaped, not reported as orphaned"
2182 );
2183 assert!(snapshot.pid.is_none(), "pid must be cleared after reap");
2184 assert!(
2185 snapshot.job_id.is_none(),
2186 "job_id must be cleared after reap"
2187 );
2188 assert!(snapshot.phase.is_none(), "phase must be cleared after reap");
2189
2190 let post = std::fs::metadata(&lock_path).expect("lock file still present");
2193 assert_eq!(post.len(), 0, "stale metadata must be truncated in place");
2194
2195 let snapshot2 = read_search_maintenance_snapshot(temp.path());
2197 assert!(!snapshot2.active);
2198 assert!(!snapshot2.orphaned);
2199 }
2200
2201 #[test]
2202 fn live_owner_metadata_is_preserved_when_flock_is_held() -> std::io::Result<()> {
2203 use fs2::FileExt;
2206 let temp = tempfile::tempdir()?;
2207 let lock_path = temp.path().join("index-run.lock");
2208 let owner_pid = active_lock_fixture_pid(4242);
2209 let mut owner = OpenOptions::new()
2210 .create(true)
2211 .read(true)
2212 .write(true)
2213 .truncate(true)
2214 .open(&lock_path)?;
2215 owner.try_lock_exclusive()?;
2216 write_locked_metadata(
2218 &lock_path,
2219 &mut owner,
2220 format!(
2221 concat!(
2222 "pid={}\n",
2223 "started_at_ms=1733000111000\n",
2224 "updated_at_ms=1733000112000\n",
2225 "db_path=/tmp/cass/agent_search.db\n",
2226 "mode=index\n",
2227 "job_id=lexical-refresh-1733000111000-4242\n",
2228 "job_kind=lexical_refresh\n",
2229 "phase=rebuilding\n"
2230 ),
2231 owner_pid
2232 )
2233 .as_str(),
2234 )?;
2235
2236 let snapshot = read_search_maintenance_snapshot(temp.path());
2237 assert!(snapshot.active, "live owner must be reported active");
2238 assert!(!snapshot.orphaned);
2239 assert_eq!(snapshot.pid, Some(owner_pid));
2240 assert_eq!(
2241 snapshot.job_id.as_deref(),
2242 Some("lexical-refresh-1733000111000-4242")
2243 );
2244 assert_eq!(
2245 snapshot.job_kind,
2246 Some(SearchMaintenanceJobKind::LexicalRefresh)
2247 );
2248 assert_eq!(snapshot.phase.as_deref(), Some("rebuilding"));
2249 assert_eq!(snapshot.updated_at_ms, Some(1_733_000_112_000));
2250
2251 let post = std::fs::metadata(&lock_path)?;
2253 assert!(post.len() > 0, "live-owner metadata must not be truncated");
2254
2255 FileExt::unlock(&owner)?;
2256 Ok(())
2257 }
2258
2259 #[test]
2260 fn lexical_storage_fingerprint_matching_handles_jitter_and_size_drift() {
2261 let cases = [
2262 (
2263 "small mtime settle jitter",
2264 "323584:1776310228000:329632:1776310227824",
2265 "323584:1776310227832:329632:1776310227824",
2266 true,
2267 ),
2268 (
2269 "wal size drift",
2270 "323584:1776310228000:329632:1776310227824",
2271 "323584:1776310227832:400000:1776310227824",
2272 false,
2273 ),
2274 ];
2275
2276 for (label, current, saved, expected) in cases {
2277 assert_eq!(
2278 lexical_storage_fingerprints_match(current, saved),
2279 expected,
2280 "{label}"
2281 );
2282 }
2283 }
2284
2285 #[test]
2286 fn lexical_state_marks_fingerprint_mismatch_stale() {
2287 let temp = tempfile::tempdir().expect("tempdir");
2288 let index_path = temp.path().join("index").join("v4");
2289 std::fs::create_dir_all(&index_path).expect("create index dir");
2290 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2294 let db_path = temp.path().join("agent_search.db");
2295 std::fs::write(&db_path, b"db").expect("write db file");
2296
2297 let checkpoint = LexicalRebuildCheckpoint {
2298 db_path: db_path.display().to_string(),
2299 total_conversations: 10,
2300 storage_fingerprint: "before".to_string(),
2301 committed_offset: 10,
2302 committed_conversation_id: Some(10),
2303 processed_conversations: 10,
2304 indexed_docs: 100,
2305 schema_hash: SCHEMA_HASH.to_string(),
2306 page_size: LEXICAL_REBUILD_PAGE_SIZE_PUBLIC,
2307 completed: true,
2308 updated_at_ms: 1_733_000_000_000,
2309 };
2310
2311 let state = lexical_state_from_observations(LexicalObservationInput {
2312 index_path: &index_path,
2313 db_path: &db_path,
2314 stale_threshold: 60,
2315 last_indexed_at_ms: Some(1_733_000_000_000),
2316 now_ms: 1_733_000_001_000,
2317 maintenance: SearchMaintenanceSnapshot::default(),
2318 checkpoint: Some(&checkpoint),
2319 current_db_fingerprint: Some("after"),
2320 });
2321
2322 assert_eq!(state.status, "stale");
2323 assert_eq!(
2324 state.fingerprint.matches_current_db_fingerprint,
2325 Some(false)
2326 );
2327 assert!(
2328 state
2329 .status_reason
2330 .as_deref()
2331 .is_some_and(|reason| reason.contains("fingerprint"))
2332 );
2333 assert_eq!(state.pending_sessions, 0);
2334 assert_eq!(state.processed_conversations, None);
2335 assert_eq!(state.total_conversations, None);
2336 assert_eq!(state.indexed_docs, None);
2337 }
2338
2339 #[test]
2340 fn lexical_state_marks_checkpoint_db_mismatch_stale_without_fingerprint_probe() {
2341 let temp = tempfile::tempdir().expect("tempdir");
2342 let index_path = temp.path().join("index").join("v4");
2343 std::fs::create_dir_all(&index_path).expect("create index dir");
2344 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2345 let db_path = temp.path().join("agent_search.db");
2346 let other_db_path = temp.path().join("other_agent_search.db");
2347 std::fs::write(&db_path, b"db").expect("write db file");
2348 std::fs::write(&other_db_path, b"other db").expect("write other db file");
2349
2350 let checkpoint = LexicalRebuildCheckpoint {
2351 db_path: other_db_path.display().to_string(),
2352 total_conversations: 10,
2353 storage_fingerprint: "old-db-fingerprint".to_string(),
2354 committed_offset: 10,
2355 committed_conversation_id: Some(10),
2356 processed_conversations: 10,
2357 indexed_docs: 100,
2358 schema_hash: SCHEMA_HASH.to_string(),
2359 page_size: LEXICAL_REBUILD_PAGE_SIZE_PUBLIC,
2360 completed: true,
2361 updated_at_ms: 1_733_000_000_000,
2362 };
2363
2364 let state = lexical_state_from_observations(LexicalObservationInput {
2365 index_path: &index_path,
2366 db_path: &db_path,
2367 stale_threshold: 60,
2368 last_indexed_at_ms: Some(1_733_000_000_000),
2369 now_ms: 1_733_000_001_000,
2370 maintenance: SearchMaintenanceSnapshot::default(),
2371 checkpoint: Some(&checkpoint),
2372 current_db_fingerprint: None,
2373 });
2374
2375 assert_eq!(state.status, "stale");
2376 assert!(state.stale);
2377 assert!(!state.fresh);
2378 assert_eq!(state.checkpoint.db_matches, Some(false));
2379 assert_eq!(state.fingerprint.matches_current_db_fingerprint, None);
2380 assert_eq!(state.pending_sessions, 0);
2381 assert_eq!(state.processed_conversations, None);
2382 assert_eq!(state.total_conversations, None);
2383 assert_eq!(state.indexed_docs, None);
2384 assert!(
2385 state
2386 .status_reason
2387 .as_deref()
2388 .is_some_and(|reason| reason.contains("different database"))
2389 );
2390 }
2391
2392 #[test]
2393 fn lexical_state_missing_index_is_not_marked_stale_until_initialized() {
2394 let temp = tempfile::tempdir().expect("tempdir");
2395 let index_path = temp.path().join("index").join("v4");
2396 std::fs::create_dir_all(&index_path).expect("create index dir");
2397 let db_path = temp.path().join("agent_search.db");
2398 std::fs::write(&db_path, b"db").expect("write db file");
2399
2400 let state = lexical_state_from_observations(LexicalObservationInput {
2401 index_path: &index_path,
2402 db_path: &db_path,
2403 stale_threshold: 60,
2404 last_indexed_at_ms: None,
2405 now_ms: 1_733_000_001_000,
2406 maintenance: SearchMaintenanceSnapshot::default(),
2407 checkpoint: None,
2408 current_db_fingerprint: None,
2409 });
2410
2411 assert_eq!(state.status, "missing");
2412 assert!(!state.exists);
2413 assert!(!state.stale);
2414 assert!(!state.fresh);
2415 assert_eq!(
2416 state.status_reason.as_deref(),
2417 Some("lexical Tantivy metadata missing")
2418 );
2419 }
2420
2421 #[test]
2422 fn lexical_state_keeps_progress_visible_during_active_rebuild_despite_fingerprint_drift() {
2423 let temp = tempfile::tempdir().expect("tempdir");
2424 let index_path = temp.path().join("index").join("v4");
2425 std::fs::create_dir_all(&index_path).expect("create index dir");
2426 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2427 let db_path = temp.path().join("agent_search.db");
2428 std::fs::write(&db_path, b"db").expect("write db file");
2429
2430 let checkpoint = LexicalRebuildCheckpoint {
2431 db_path: db_path.display().to_string(),
2432 total_conversations: 10,
2433 storage_fingerprint: "before".to_string(),
2434 committed_offset: 4,
2435 committed_conversation_id: Some(4),
2436 processed_conversations: 4,
2437 indexed_docs: 20,
2438 schema_hash: SCHEMA_HASH.to_string(),
2439 page_size: 200,
2440 completed: false,
2441 updated_at_ms: 1_733_000_123_000,
2442 };
2443
2444 let state = lexical_state_from_observations(LexicalObservationInput {
2445 index_path: &index_path,
2446 db_path: &db_path,
2447 stale_threshold: 60,
2448 last_indexed_at_ms: Some(1_733_000_000_000),
2449 now_ms: 1_733_000_001_000,
2450 maintenance: SearchMaintenanceSnapshot {
2451 active: true,
2452 pid: Some(std::process::id()),
2453 started_at_ms: Some(1_733_000_111_000),
2454 db_path: Some(db_path.clone()),
2455 mode: Some(SearchMaintenanceMode::Index),
2456 job_id: None,
2457 job_kind: None,
2458 phase: None,
2459 updated_at_ms: None,
2460 last_progress_at_ms: None,
2461 orphaned: false,
2462 },
2463 checkpoint: Some(&checkpoint),
2464 current_db_fingerprint: Some("after"),
2465 });
2466
2467 assert_eq!(state.status, "building");
2468 assert!(!state.stale);
2469 assert!(!state.fresh);
2470 assert_eq!(state.pending_sessions, 6);
2471 assert_eq!(state.processed_conversations, Some(4));
2472 assert_eq!(state.total_conversations, Some(10));
2473 assert_eq!(state.indexed_docs, Some(20));
2474 assert_eq!(state.checkpoint.page_size_matches, Some(false));
2475 assert_eq!(state.checkpoint.page_size_compatible, Some(true));
2476 assert_eq!(
2477 state.status_reason.as_deref(),
2478 Some("lexical rebuild is in progress")
2479 );
2480 }
2481
2482 #[test]
2483 fn lexical_state_hides_progress_for_incompatible_page_size_checkpoint() {
2484 let temp = tempfile::tempdir().expect("tempdir");
2485 let index_path = temp.path().join("index").join("v4");
2486 std::fs::create_dir_all(&index_path).expect("create index dir");
2487 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2488 let db_path = temp.path().join("agent_search.db");
2489 std::fs::write(&db_path, b"db").expect("write db file");
2490
2491 let checkpoint = LexicalRebuildCheckpoint {
2492 db_path: db_path.display().to_string(),
2493 total_conversations: 10,
2494 storage_fingerprint: "before".to_string(),
2495 committed_offset: 4,
2496 committed_conversation_id: Some(4),
2497 processed_conversations: 4,
2498 indexed_docs: 20,
2499 schema_hash: SCHEMA_HASH.to_string(),
2500 page_size: 13,
2501 completed: false,
2502 updated_at_ms: 1_733_000_123_000,
2503 };
2504
2505 let state = lexical_state_from_observations(LexicalObservationInput {
2506 index_path: &index_path,
2507 db_path: &db_path,
2508 stale_threshold: 60,
2509 last_indexed_at_ms: Some(1_733_000_000_000),
2510 now_ms: 1_733_000_001_000,
2511 maintenance: SearchMaintenanceSnapshot::default(),
2512 checkpoint: Some(&checkpoint),
2513 current_db_fingerprint: Some("before"),
2514 });
2515
2516 assert_eq!(state.status, "stale");
2517 assert!(state.stale);
2518 assert_eq!(state.pending_sessions, 0);
2519 assert_eq!(state.processed_conversations, None);
2520 assert_eq!(state.total_conversations, None);
2521 assert_eq!(state.indexed_docs, None);
2522 assert_eq!(state.checkpoint.page_size_matches, Some(false));
2523 assert_eq!(state.checkpoint.page_size_compatible, Some(false));
2524 assert!(
2525 state
2526 .status_reason
2527 .as_deref()
2528 .is_some_and(|reason| reason.contains("contract"))
2529 );
2530 }
2531
2532 #[test]
2533 fn lexical_state_prefers_newer_maintenance_heartbeat_over_stale_checkpoint_timestamp() {
2534 let temp = tempfile::tempdir().expect("tempdir");
2535 let index_path = temp.path().join("index").join("v4");
2536 std::fs::create_dir_all(&index_path).expect("create index dir");
2537 let db_path = temp.path().join("agent_search.db");
2538 std::fs::write(&db_path, b"db").expect("write db file");
2539
2540 let checkpoint = LexicalRebuildCheckpoint {
2541 db_path: db_path.display().to_string(),
2542 total_conversations: 10,
2543 storage_fingerprint: "before".to_string(),
2544 committed_offset: 4,
2545 committed_conversation_id: Some(4),
2546 processed_conversations: 4,
2547 indexed_docs: 20,
2548 schema_hash: SCHEMA_HASH.to_string(),
2549 page_size: LEXICAL_REBUILD_PAGE_SIZE_PUBLIC,
2550 completed: false,
2551 updated_at_ms: 1_733_000_123_000,
2552 };
2553
2554 let state = lexical_state_from_observations(LexicalObservationInput {
2555 index_path: &index_path,
2556 db_path: &db_path,
2557 stale_threshold: 60,
2558 last_indexed_at_ms: Some(1_733_000_000_000),
2559 now_ms: 1_733_000_001_000,
2560 maintenance: SearchMaintenanceSnapshot {
2561 active: true,
2562 pid: Some(std::process::id()),
2563 started_at_ms: Some(1_733_000_111_000),
2564 db_path: Some(db_path.clone()),
2565 mode: Some(SearchMaintenanceMode::Index),
2566 job_id: None,
2567 job_kind: None,
2568 phase: None,
2569 updated_at_ms: Some(1_733_000_456_000),
2570 last_progress_at_ms: None,
2571 orphaned: false,
2572 },
2573 checkpoint: Some(&checkpoint),
2574 current_db_fingerprint: Some("after"),
2575 });
2576
2577 assert_eq!(state.status, "building");
2578 assert_eq!(state.activity_at_ms, Some(1_733_000_456_000));
2579 }
2580
2581 #[test]
2582 fn lexical_state_ignores_rebuild_lock_for_different_database() {
2583 let temp = tempfile::tempdir().expect("tempdir");
2584 let index_path = temp.path().join("index").join("v4");
2585 std::fs::create_dir_all(&index_path).expect("create index dir");
2586 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2587 let db_path = temp.path().join("agent_search.db");
2588 std::fs::write(&db_path, b"db").expect("write db file");
2589 let other_db_path = temp.path().join("other.db");
2590 std::fs::write(&other_db_path, b"other").expect("write other db file");
2591
2592 let checkpoint = LexicalRebuildCheckpoint {
2593 db_path: db_path.display().to_string(),
2594 total_conversations: 10,
2595 storage_fingerprint: "before".to_string(),
2596 committed_offset: 4,
2597 committed_conversation_id: Some(4),
2598 processed_conversations: 4,
2599 indexed_docs: 20,
2600 schema_hash: SCHEMA_HASH.to_string(),
2601 page_size: LEXICAL_REBUILD_PAGE_SIZE_PUBLIC,
2602 completed: false,
2603 updated_at_ms: 1_733_000_123_000,
2604 };
2605
2606 let state = lexical_state_from_observations(LexicalObservationInput {
2607 index_path: &index_path,
2608 db_path: &db_path,
2609 stale_threshold: 60,
2610 last_indexed_at_ms: Some(1_733_000_000_000),
2611 now_ms: 1_733_000_001_000,
2612 maintenance: SearchMaintenanceSnapshot {
2613 active: true,
2614 pid: Some(std::process::id()),
2615 started_at_ms: Some(1_733_000_111_000),
2616 db_path: Some(other_db_path),
2617 mode: Some(SearchMaintenanceMode::Index),
2618 job_id: None,
2619 job_kind: None,
2620 phase: None,
2621 updated_at_ms: None,
2622 last_progress_at_ms: None,
2623 orphaned: false,
2624 },
2625 checkpoint: Some(&checkpoint),
2626 current_db_fingerprint: Some("after"),
2627 });
2628
2629 assert_eq!(state.status, "stale");
2630 assert!(state.stale);
2631 assert!(!state.fresh);
2632 assert!(!state.rebuilding);
2633 assert!(!state.watch_active);
2634 assert_eq!(state.activity_at_ms, None);
2635 assert_eq!(state.pending_sessions, 0);
2636 assert_eq!(state.processed_conversations, None);
2637 assert_eq!(state.total_conversations, None);
2638 assert_eq!(state.indexed_docs, None);
2639 assert!(
2640 state
2641 .status_reason
2642 .as_deref()
2643 .is_some_and(|reason| reason.contains("fingerprint"))
2644 );
2645 }
2646
2647 #[test]
2648 fn lexical_state_ignores_watch_lock_for_different_database() {
2649 let temp = tempfile::tempdir().expect("tempdir");
2650 let index_path = temp.path().join("index").join("v4");
2651 std::fs::create_dir_all(&index_path).expect("create index dir");
2652 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2653 let db_path = temp.path().join("agent_search.db");
2654 std::fs::write(&db_path, b"db").expect("write db file");
2655 let other_db_path = temp.path().join("other.db");
2656 std::fs::write(&other_db_path, b"other").expect("write other db file");
2657
2658 let state = lexical_state_from_observations(LexicalObservationInput {
2659 index_path: &index_path,
2660 db_path: &db_path,
2661 stale_threshold: 60,
2662 last_indexed_at_ms: Some(1_733_000_000_000),
2663 now_ms: 1_733_000_020_000,
2664 maintenance: SearchMaintenanceSnapshot {
2665 active: true,
2666 pid: Some(std::process::id()),
2667 started_at_ms: Some(1_733_000_111_000),
2668 db_path: Some(other_db_path),
2669 mode: Some(SearchMaintenanceMode::Watch),
2670 job_id: None,
2671 job_kind: None,
2672 phase: None,
2673 updated_at_ms: None,
2674 last_progress_at_ms: None,
2675 orphaned: false,
2676 },
2677 checkpoint: None,
2678 current_db_fingerprint: None,
2679 });
2680
2681 assert_eq!(state.status, "ready");
2682 assert!(state.fresh);
2683 assert!(!state.stale);
2684 assert!(!state.rebuilding);
2685 assert!(!state.watch_active);
2686 assert_eq!(state.activity_at_ms, None);
2687 }
2688
2689 #[test]
2697 fn lexical_state_reports_stalled_when_progress_is_stale_despite_fresh_heartbeat() {
2698 let temp = tempfile::tempdir().expect("tempdir");
2699 let index_path = temp.path().join("index").join("v4");
2700 std::fs::create_dir_all(&index_path).expect("create index dir");
2701 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2702 let db_path = temp.path().join("agent_search.db");
2703 std::fs::write(&db_path, b"db").expect("write db file");
2704
2705 let now_ms: i64 = 1_733_000_300_000;
2712
2713 let state = lexical_state_from_observations(LexicalObservationInput {
2714 index_path: &index_path,
2715 db_path: &db_path,
2716 stale_threshold: 60,
2717 last_indexed_at_ms: Some(1_733_000_000_000),
2718 now_ms,
2719 maintenance: SearchMaintenanceSnapshot {
2720 active: true,
2721 pid: Some(std::process::id()),
2722 started_at_ms: Some(now_ms - 600_000),
2723 db_path: Some(db_path.clone()),
2724 mode: Some(SearchMaintenanceMode::WatchStartup),
2725 job_id: Some("lexical_refresh-1-1".to_string()),
2726 job_kind: Some(SearchMaintenanceJobKind::LexicalRefresh),
2727 phase: Some("watch_startup".to_string()),
2728 updated_at_ms: Some(now_ms - 500),
2729 last_progress_at_ms: Some(now_ms - 300_000),
2730 orphaned: false,
2731 },
2732 checkpoint: None,
2733 current_db_fingerprint: None,
2734 });
2735
2736 assert!(state.rebuilding, "active rebuild lock must still register");
2737 assert!(
2738 state.stalled,
2739 "stale forward-progress timestamp must flip stalled=true",
2740 );
2741 assert_eq!(state.status, "stalled");
2742 assert_eq!(
2743 state.last_progress_at_ms,
2744 Some(now_ms - 300_000),
2745 "last_progress_at_ms must be surfaced to status callers",
2746 );
2747 let age = state
2748 .last_progress_age_ms
2749 .expect("last_progress_age_ms must be computed");
2750 assert!(
2751 (299_900..=300_100).contains(&age),
2752 "computed last_progress_age_ms ({age}ms) should equal now - last_progress_at_ms (300_000ms)",
2753 );
2754 let reason = state
2755 .status_reason
2756 .as_deref()
2757 .expect("stalled state should populate status_reason");
2758 assert!(
2759 reason.contains("forward progress")
2760 && (reason.contains("#258") || reason.contains("issue #258")),
2761 "status_reason should mention forward progress and reference #258 ({reason})",
2762 );
2763 }
2764
2765 #[test]
2769 fn lexical_state_stays_building_when_progress_is_recent() {
2770 let temp = tempfile::tempdir().expect("tempdir");
2771 let index_path = temp.path().join("index").join("v4");
2772 std::fs::create_dir_all(&index_path).expect("create index dir");
2773 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2774 let db_path = temp.path().join("agent_search.db");
2775 std::fs::write(&db_path, b"db").expect("write db file");
2776
2777 let now_ms: i64 = 1_733_000_300_000;
2781
2782 let state = lexical_state_from_observations(LexicalObservationInput {
2783 index_path: &index_path,
2784 db_path: &db_path,
2785 stale_threshold: 60,
2786 last_indexed_at_ms: Some(1_733_000_000_000),
2787 now_ms,
2788 maintenance: SearchMaintenanceSnapshot {
2789 active: true,
2790 pid: Some(std::process::id()),
2791 started_at_ms: Some(now_ms - 30_000),
2792 db_path: Some(db_path.clone()),
2793 mode: Some(SearchMaintenanceMode::Index),
2794 job_id: Some("lexical_refresh-1-1".to_string()),
2795 job_kind: Some(SearchMaintenanceJobKind::LexicalRefresh),
2796 phase: Some("scanning".to_string()),
2797 updated_at_ms: Some(now_ms - 500),
2798 last_progress_at_ms: Some(now_ms - 1_000),
2799 orphaned: false,
2800 },
2801 checkpoint: None,
2802 current_db_fingerprint: None,
2803 });
2804
2805 assert!(state.rebuilding);
2806 assert!(!state.stalled, "fresh progress must not flip stalled");
2807 assert_eq!(state.status, "building");
2808 }
2809
2810 #[test]
2815 fn lexical_state_does_not_stall_when_legacy_lock_omits_progress_field() {
2816 let temp = tempfile::tempdir().expect("tempdir");
2817 let index_path = temp.path().join("index").join("v4");
2818 std::fs::create_dir_all(&index_path).expect("create index dir");
2819 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2820 let db_path = temp.path().join("agent_search.db");
2821 std::fs::write(&db_path, b"db").expect("write db file");
2822
2823 let now_ms: i64 = 1_733_000_300_000;
2827
2828 let state = lexical_state_from_observations(LexicalObservationInput {
2829 index_path: &index_path,
2830 db_path: &db_path,
2831 stale_threshold: 60,
2832 last_indexed_at_ms: Some(1_733_000_000_000),
2833 now_ms,
2834 maintenance: SearchMaintenanceSnapshot {
2835 active: true,
2836 pid: Some(std::process::id()),
2837 started_at_ms: Some(now_ms - 30_000),
2838 db_path: Some(db_path.clone()),
2839 mode: Some(SearchMaintenanceMode::Index),
2840 job_id: None,
2841 job_kind: None,
2842 phase: None,
2843 updated_at_ms: Some(now_ms - 500),
2844 last_progress_at_ms: None,
2845 orphaned: false,
2846 },
2847 checkpoint: None,
2848 current_db_fingerprint: None,
2849 });
2850
2851 assert!(state.rebuilding);
2852 assert!(
2853 !state.stalled,
2854 "legacy lock without last_progress_at_ms must NOT be misreported as stalled",
2855 );
2856 assert_eq!(state.status, "building");
2857 assert!(state.last_progress_age_ms.is_none());
2858 }
2859
2860 #[test]
2869 fn lexical_state_progress_age_is_ms_precision_not_seconds_quantised() {
2870 let temp = tempfile::tempdir().expect("tempdir");
2871 let index_path = temp.path().join("index").join("v4");
2872 std::fs::create_dir_all(&index_path).expect("create index dir");
2873 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2874 let db_path = temp.path().join("agent_search.db");
2875 std::fs::write(&db_path, b"db").expect("write db file");
2876
2877 let now_ms: i64 = 1_733_000_300_700;
2880 let last_progress_at_ms = now_ms - 119_500;
2881
2882 let state = lexical_state_from_observations(LexicalObservationInput {
2883 index_path: &index_path,
2884 db_path: &db_path,
2885 stale_threshold: 60,
2886 last_indexed_at_ms: Some(1_733_000_000_000),
2887 now_ms,
2888 maintenance: SearchMaintenanceSnapshot {
2889 active: true,
2890 pid: Some(std::process::id()),
2891 started_at_ms: Some(now_ms - 600_000),
2892 db_path: Some(db_path.clone()),
2893 mode: Some(SearchMaintenanceMode::WatchStartup),
2894 job_id: Some("lexical_refresh-1-1".to_string()),
2895 job_kind: Some(SearchMaintenanceJobKind::LexicalRefresh),
2896 phase: Some("watch_startup".to_string()),
2897 updated_at_ms: Some(now_ms - 500),
2898 last_progress_at_ms: Some(last_progress_at_ms),
2899 orphaned: false,
2900 },
2901 checkpoint: None,
2902 current_db_fingerprint: None,
2903 });
2904
2905 let age = state
2906 .last_progress_age_ms
2907 .expect("forward-progress age must be computed");
2908 assert_eq!(
2909 age, 119_500,
2910 "ms-precision plumbing must surface the exact diff (no second-quantisation)"
2911 );
2912 assert!(
2919 !state.stalled,
2920 "119.5 s lag must remain `building`, not flip to `stalled`",
2921 );
2922 assert_eq!(state.status, "building");
2923 }
2924
2925 #[test]
2929 fn coordination_reports_stale_when_forward_progress_is_stuck() {
2930 let now_ms: i64 = 1_733_000_300_000;
2931 let snapshot = SearchMaintenanceSnapshot {
2932 active: true,
2933 pid: Some(12345),
2934 started_at_ms: Some(now_ms - 600_000),
2935 db_path: Some(PathBuf::from("/tmp/cass/agent_search.db")),
2936 mode: Some(SearchMaintenanceMode::WatchStartup),
2937 job_id: Some("lexical_refresh-1-12345".to_string()),
2938 job_kind: Some(SearchMaintenanceJobKind::LexicalRefresh),
2939 phase: Some("watch_startup".to_string()),
2940 updated_at_ms: Some(now_ms - 500),
2941 last_progress_at_ms: Some(now_ms - 300_000),
2942 orphaned: false,
2943 };
2944
2945 let outcome = evaluate_maintenance_coordination_from_snapshot(&snapshot, now_ms);
2946 match outcome {
2947 MaintenanceCoordinationOutcome::Stale { ref reason, .. } => {
2948 assert!(
2949 reason.contains("forward progress")
2950 && (reason.contains("#258") || reason.contains("issue #258")),
2951 "stalled coordination reason should mention forward progress and #258: {reason}",
2952 );
2953 }
2954 other => assert!(
2955 matches!(other, MaintenanceCoordinationOutcome::Stale { .. }),
2956 "stalled forward-progress snapshot must coordinate as Stale, got {other:?}",
2957 ),
2958 }
2959 }
2960
2961 #[test]
2962 fn inspect_search_assets_preserves_semantic_database_unavailable_signal() {
2963 let temp = tempfile::tempdir().expect("tempdir");
2964 let index_path = temp.path().join("index").join("v4");
2965 std::fs::create_dir_all(&index_path).expect("create index dir");
2966 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2967
2968 let db_path = temp.path().join("agent_search.db");
2969 std::fs::create_dir_all(&db_path).expect("create unopenable db path");
2970
2971 let vector_path = vector_index_path(temp.path(), HashEmbedder::default().id());
2972 std::fs::create_dir_all(vector_path.parent().expect("vector parent"))
2973 .expect("create vector dir");
2974 std::fs::write(&vector_path, b"index").expect("write vector index");
2975
2976 let snapshot = inspect_search_assets(InspectSearchAssetsInput {
2977 data_dir: temp.path(),
2978 db_path: &db_path,
2979 stale_threshold: 60,
2980 last_indexed_at_ms: Some(1_733_000_000_000),
2981 now_ms: 1_733_000_001_000,
2982 maintenance: SearchMaintenanceSnapshot::default(),
2983 semantic_preference: SemanticPreference::HashFallback,
2984 db_available: false,
2985 compute_lexical_fingerprint: false,
2986 inspect_semantic: true,
2987 })
2988 .expect("asset inspection should not fail when db availability is already known");
2989
2990 assert_ne!(snapshot.lexical.status, "error");
2991 assert_eq!(snapshot.semantic.status, "error");
2992 assert_eq!(snapshot.semantic.availability, "database_unavailable");
2993 assert_eq!(snapshot.semantic.fallback_mode, Some("lexical"));
2994 assert!(snapshot.semantic.summary.contains("db unavailable"));
2995 }
2996
2997 #[test]
2998 fn inspect_search_assets_can_skip_semantic_db_open_for_fast_paths() {
2999 let temp = tempfile::tempdir().expect("tempdir");
3000 let index_path = temp.path().join("index").join("v4");
3001 std::fs::create_dir_all(&index_path).expect("create index dir");
3002 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
3003
3004 let db_path = temp.path().join("agent_search.db");
3005 std::fs::create_dir_all(&db_path).expect("create unopenable db path");
3006
3007 let vector_path = vector_index_path(temp.path(), HashEmbedder::default().id());
3008 std::fs::create_dir_all(vector_path.parent().expect("vector parent"))
3009 .expect("create vector dir");
3010 std::fs::write(&vector_path, b"index").expect("write vector index");
3011
3012 let snapshot = inspect_search_assets(InspectSearchAssetsInput {
3013 data_dir: temp.path(),
3014 db_path: &db_path,
3015 stale_threshold: 60,
3016 last_indexed_at_ms: Some(1_733_000_000_000),
3017 now_ms: 1_733_000_001_000,
3018 maintenance: SearchMaintenanceSnapshot::default(),
3019 semantic_preference: SemanticPreference::HashFallback,
3020 db_available: false,
3021 compute_lexical_fingerprint: false,
3022 inspect_semantic: false,
3023 })
3024 .expect("asset inspection should not open semantic DB when semantic inspection is skipped");
3025
3026 assert_ne!(snapshot.lexical.status, "error");
3027 assert_eq!(snapshot.semantic.status, "not_inspected");
3028 assert_eq!(snapshot.semantic.availability, "not_inspected");
3029 assert_eq!(snapshot.semantic.fallback_mode, Some("lexical"));
3030 }
3031
3032 #[test]
3033 fn inspect_search_assets_trusts_db_probe_for_semantic_metadata_probe() {
3034 let temp = tempfile::tempdir().expect("tempdir");
3035 let index_path = temp.path().join("index").join("v4");
3036 std::fs::create_dir_all(&index_path).expect("create index dir");
3037 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
3038
3039 let db_path = temp.path().join("agent_search.db");
3040 std::fs::create_dir_all(&db_path).expect("create unopenable db path");
3041
3042 let vector_path = vector_index_path(temp.path(), HashEmbedder::default().id());
3043 std::fs::create_dir_all(vector_path.parent().expect("vector parent"))
3044 .expect("create vector dir");
3045 std::fs::write(&vector_path, b"index").expect("write vector index");
3046
3047 let snapshot = inspect_search_assets(InspectSearchAssetsInput {
3048 data_dir: temp.path(),
3049 db_path: &db_path,
3050 stale_threshold: 60,
3051 last_indexed_at_ms: Some(1_733_000_000_000),
3052 now_ms: 1_733_000_001_000,
3053 maintenance: SearchMaintenanceSnapshot::default(),
3054 semantic_preference: SemanticPreference::HashFallback,
3055 db_available: true,
3056 compute_lexical_fingerprint: false,
3057 inspect_semantic: true,
3058 })
3059 .expect("semantic metadata probe should trust the existing DB availability signal");
3060
3061 assert_eq!(snapshot.semantic.status, "hash_fallback");
3062 assert_eq!(snapshot.semantic.availability, "hash_fallback");
3063 assert!(snapshot.semantic.can_search);
3064 }
3065
3066 #[test]
3067 fn semantic_state_reports_hash_fallback_as_searchable() {
3068 let state = semantic_state_from_availability(
3069 Path::new("/tmp/cass"),
3070 &SemanticAvailability::HashFallback,
3071 SemanticPreference::HashFallback,
3072 None,
3073 );
3074
3075 assert_eq!(state.status, "hash_fallback");
3076 assert_eq!(state.availability, "hash_fallback");
3077 assert!(state.available);
3078 assert!(state.can_search);
3079 assert_eq!(state.fallback_mode, None);
3080 }
3081
3082 #[test]
3083 fn semantic_preference_surface_preserves_backend_and_model_dir_projection() {
3084 let data_dir = Path::new("/tmp/cass");
3085 let cases = [
3086 (
3087 SemanticPreference::DefaultModel,
3088 "fastembed",
3089 Some(FastEmbedder::default_model_dir(data_dir)),
3090 ),
3091 (SemanticPreference::HashFallback, "hash", None),
3092 ];
3093
3094 for (preference, expected_backend, expected_model_dir) in cases {
3095 let surface = semantic_preference_surface(data_dir, preference);
3096
3097 assert_eq!(surface.preferred_backend, expected_backend);
3098 assert_eq!(surface.model_dir, expected_model_dir);
3099 }
3100 }
3101
3102 #[test]
3103 fn semantic_state_detects_progressive_and_hnsw_assets() {
3104 let temp = tempfile::tempdir().expect("tempdir");
3105 let vector_dir = temp.path().join(VECTOR_INDEX_DIR);
3106 std::fs::create_dir_all(&vector_dir).expect("create vector dir");
3107 std::fs::write(vector_dir.join("vector.fast.idx"), b"fast").expect("write fast tier");
3108 std::fs::write(vector_dir.join("vector.quality.idx"), b"quality")
3109 .expect("write quality tier");
3110 let hnsw_path = hnsw_index_path(temp.path(), FastEmbedder::embedder_id_static());
3111 std::fs::write(&hnsw_path, b"hnsw").expect("write hnsw");
3112
3113 let state = semantic_state_from_availability(
3114 temp.path(),
3115 &SemanticAvailability::Ready {
3116 embedder_id: FastEmbedder::embedder_id_static().to_string(),
3117 },
3118 SemanticPreference::DefaultModel,
3119 None,
3120 );
3121
3122 assert_eq!(state.status, "ready");
3123 assert!(state.progressive_ready);
3124 assert!(state.hnsw_ready);
3125 assert_eq!(
3126 state.embedder_id.as_deref(),
3127 Some(FastEmbedder::embedder_id_static())
3128 );
3129 }
3130
3131 #[test]
3132 fn semantic_state_reports_backfill_when_manifest_only_has_stale_assets() {
3133 let temp = tempfile::tempdir().expect("tempdir");
3134 let mut manifest = SemanticManifest {
3135 fast_tier: Some(ArtifactRecord {
3136 tier: crate::search::semantic_manifest::TierKind::Fast,
3137 embedder_id: HashEmbedder::default().id().to_string(),
3138 model_revision: "hash".to_string(),
3139 schema_version: crate::search::policy::SEMANTIC_SCHEMA_VERSION,
3140 chunking_version: crate::search::policy::CHUNKING_STRATEGY_VERSION,
3141 dimension: 256,
3142 doc_count: 12,
3143 conversation_count: 3,
3144 db_fingerprint: "stale-db".to_string(),
3145 index_path: "vector_index/vector.fast.idx".to_string(),
3146 size_bytes: 4096,
3147 started_at_ms: 1_733_100_000_000,
3148 completed_at_ms: 1_733_100_100_000,
3149 ready: true,
3150 }),
3151 backlog: crate::search::semantic_manifest::BacklogLedger {
3152 total_conversations: 20,
3153 fast_tier_processed: 3,
3154 quality_tier_processed: 0,
3155 db_fingerprint: "current-db".to_string(),
3156 computed_at_ms: 1_733_100_200_000,
3157 },
3158 checkpoint: Some(BuildCheckpoint {
3159 tier: crate::search::semantic_manifest::TierKind::Fast,
3160 embedder_id: HashEmbedder::default().id().to_string(),
3161 last_offset: 77,
3162 docs_embedded: 66,
3163 conversations_processed: 3,
3164 total_conversations: 20,
3165 db_fingerprint: "current-db".to_string(),
3166 schema_version: crate::search::policy::SEMANTIC_SCHEMA_VERSION,
3167 chunking_version: crate::search::policy::CHUNKING_STRATEGY_VERSION,
3168 saved_at_ms: 1_733_100_300_000,
3169 last_message_id: None,
3170 cursor_exhausted: false,
3171 }),
3172 ..Default::default()
3173 };
3174 manifest.save(temp.path()).expect("save semantic manifest");
3175
3176 let state = semantic_state_from_availability(
3177 temp.path(),
3178 &SemanticAvailability::NeedsConsent,
3179 SemanticPreference::DefaultModel,
3180 Some("current-db"),
3181 );
3182
3183 assert_eq!(state.status, "building");
3184 assert_eq!(state.availability, "index_building");
3185 assert!(!state.can_search);
3186 assert_eq!(state.fallback_mode, Some("lexical"));
3187 assert!(state.summary.contains("backfill"));
3188 assert!(
3189 state
3190 .hint
3191 .as_deref()
3192 .is_some_and(|hint| hint.contains("finish backfilling"))
3193 );
3194 }
3195
3196 #[test]
3197 fn semantic_state_prefers_current_hash_tier_over_missing_model() {
3198 let temp = tempfile::tempdir().expect("tempdir");
3199 let mut manifest = SemanticManifest {
3200 fast_tier: Some(ArtifactRecord {
3201 tier: crate::search::semantic_manifest::TierKind::Fast,
3202 embedder_id: HashEmbedder::default().id().to_string(),
3203 model_revision: "hash".to_string(),
3204 schema_version: crate::search::policy::SEMANTIC_SCHEMA_VERSION,
3205 chunking_version: crate::search::policy::CHUNKING_STRATEGY_VERSION,
3206 dimension: 256,
3207 doc_count: 12,
3208 conversation_count: 3,
3209 db_fingerprint: "current-db".to_string(),
3210 index_path: "vector_index/vector.fast.idx".to_string(),
3211 size_bytes: 4096,
3212 started_at_ms: 1_733_100_000_000,
3213 completed_at_ms: 1_733_100_100_000,
3214 ready: true,
3215 }),
3216 ..Default::default()
3217 };
3218 manifest.save(temp.path()).expect("save semantic manifest");
3219 let vector_path = vector_index_path(temp.path(), HashEmbedder::default().id());
3220 std::fs::create_dir_all(vector_path.parent().expect("vector parent"))
3221 .expect("create vector dir");
3222 std::fs::write(&vector_path, b"fast").expect("write fast vector index");
3223
3224 let state = semantic_state_from_availability(
3225 temp.path(),
3226 &SemanticAvailability::NeedsConsent,
3227 SemanticPreference::DefaultModel,
3228 Some("current-db"),
3229 );
3230
3231 assert_eq!(state.status, "ready");
3232 assert_eq!(state.availability, "ready");
3233 assert!(state.can_search);
3234 assert_eq!(state.fallback_mode, None);
3235 assert_eq!(
3236 state.embedder_id.as_deref(),
3237 Some(HashEmbedder::default().id())
3238 );
3239 assert_eq!(state.model_dir, None);
3240 assert_eq!(
3241 state.vector_index_path.as_deref(),
3242 Some(vector_path.as_path())
3243 );
3244 assert_eq!(state.hint, None);
3245 }
3246
3247 #[test]
3248 fn semantic_state_treats_ready_quality_tier_with_unknown_db_match_as_queryable() {
3249 let temp = tempfile::tempdir().expect("tempdir");
3250 let mut manifest = SemanticManifest {
3251 quality_tier: Some(ArtifactRecord {
3252 tier: crate::search::semantic_manifest::TierKind::Quality,
3253 embedder_id: HashEmbedder::default().id().to_string(),
3254 model_revision: "hash".to_string(),
3255 schema_version: crate::search::policy::SEMANTIC_SCHEMA_VERSION,
3256 chunking_version: crate::search::policy::CHUNKING_STRATEGY_VERSION,
3257 dimension: 256,
3258 doc_count: 249,
3259 conversation_count: 21,
3260 db_fingerprint: "boxed-db".to_string(),
3261 index_path: "vector_index/vector.quality.idx".to_string(),
3262 size_bytes: 221_824,
3263 started_at_ms: 1_733_100_000_000,
3264 completed_at_ms: 1_733_100_100_000,
3265 ready: true,
3266 }),
3267 ..Default::default()
3268 };
3269 manifest.save(temp.path()).expect("save semantic manifest");
3270
3271 let state = semantic_state_from_availability(
3272 temp.path(),
3273 &SemanticAvailability::NeedsConsent,
3274 SemanticPreference::DefaultModel,
3275 None,
3276 );
3277
3278 assert_eq!(state.quality_tier.current_db_matches, None);
3279 assert!(
3280 state.quality_tier_published,
3281 "a ready quality tier with unknown DB match should remain visible as published in boxed data-dir status"
3282 );
3283 assert!(
3284 state.semantic_only_search_available,
3285 "semantic-only search can still run when the quality tier is ready and DB match is unknown"
3286 );
3287 assert!(state.can_search);
3288 assert_eq!(state.fallback_mode, None);
3289 }
3290
3291 #[test]
3292 fn semantic_state_promotes_complete_current_shard_generation() {
3293 let temp = tempfile::tempdir().expect("tempdir");
3294 let embedder_id = HashEmbedder::default().id().to_string();
3295 let mut records = Vec::new();
3296 for shard_index in 0..2_u32 {
3297 let relative_path = format!("vector_index/shards/fast-hash/shard-{shard_index}.fsvi");
3298 let path = temp.path().join(&relative_path);
3299 std::fs::create_dir_all(path.parent().expect("shard parent"))
3300 .expect("create shard parent");
3301 std::fs::write(&path, b"fsvi").expect("write shard placeholder");
3302 records.push(SemanticShardRecord {
3303 tier: TierKind::Fast,
3304 embedder_id: embedder_id.clone(),
3305 model_revision: "hash".to_string(),
3306 schema_version: SEMANTIC_SCHEMA_VERSION,
3307 chunking_version: CHUNKING_STRATEGY_VERSION,
3308 dimension: HashEmbedder::default().dimension(),
3309 shard_index,
3310 shard_count: 2,
3311 doc_count: 10 + u64::from(shard_index),
3312 total_conversations: 7,
3313 db_fingerprint: "current-db".to_string(),
3314 index_path: relative_path,
3315 quantization: "f16".to_string(),
3316 mmap_ready: true,
3317 ann_index_path: None,
3318 ann_size_bytes: 0,
3319 ann_ready: false,
3320 size_bytes: 100 + u64::from(shard_index),
3321 started_at_ms: 1_733_100_000_000,
3322 completed_at_ms: 1_733_100_000_000 + i64::from(shard_index),
3323 ready: true,
3324 });
3325 }
3326 let mut shards = SemanticShardManifest {
3327 shards: records,
3328 ..Default::default()
3329 };
3330 shards.save(temp.path()).expect("save shard manifest");
3331
3332 let state = semantic_state_from_availability(
3333 temp.path(),
3334 &SemanticAvailability::IndexMissing {
3335 index_path: vector_index_path(temp.path(), &embedder_id),
3336 },
3337 SemanticPreference::HashFallback,
3338 Some("current-db"),
3339 );
3340
3341 assert_eq!(state.status, "ready");
3342 assert_eq!(state.availability, "ready");
3343 assert!(state.can_search);
3344 assert_eq!(state.fallback_mode, None);
3345 assert_eq!(state.fast_tier.doc_count, Some(21));
3346 let expected_path = temp
3347 .path()
3348 .join("vector_index/shards/fast-hash/shard-0.fsvi");
3349 assert_eq!(
3350 state.vector_index_path.as_deref(),
3351 Some(expected_path.as_path())
3352 );
3353 }
3354
3355 #[test]
3356 fn semantic_state_rejects_complete_shard_generation_with_unsafe_path() {
3357 let temp = tempfile::tempdir().expect("tempdir");
3358 let outside = tempfile::tempdir().expect("outside tempdir");
3359 let outside_path = outside.path().join("outside.fsvi");
3360 std::fs::write(&outside_path, b"fsvi").expect("write outside placeholder");
3361 let embedder_id = HashEmbedder::default().id().to_string();
3362 let mut shards = SemanticShardManifest {
3363 shards: vec![SemanticShardRecord {
3364 tier: TierKind::Fast,
3365 embedder_id: embedder_id.clone(),
3366 model_revision: "hash".to_string(),
3367 schema_version: SEMANTIC_SCHEMA_VERSION,
3368 chunking_version: CHUNKING_STRATEGY_VERSION,
3369 dimension: HashEmbedder::default().dimension(),
3370 shard_index: 0,
3371 shard_count: 1,
3372 doc_count: 10,
3373 total_conversations: 7,
3374 db_fingerprint: "current-db".to_string(),
3375 index_path: outside_path.to_string_lossy().to_string(),
3376 quantization: "f16".to_string(),
3377 mmap_ready: true,
3378 ann_index_path: None,
3379 ann_size_bytes: 0,
3380 ann_ready: false,
3381 size_bytes: 100,
3382 started_at_ms: 1_733_100_000_000,
3383 completed_at_ms: 1_733_100_000_001,
3384 ready: true,
3385 }],
3386 ..Default::default()
3387 };
3388 shards.save(temp.path()).expect("save shard manifest");
3389
3390 let base_vector_path = vector_index_path(temp.path(), &embedder_id);
3391 let state = semantic_state_from_availability(
3392 temp.path(),
3393 &SemanticAvailability::IndexMissing {
3394 index_path: base_vector_path.clone(),
3395 },
3396 SemanticPreference::HashFallback,
3397 Some("current-db"),
3398 );
3399
3400 assert_ne!(state.status, "ready");
3401 assert!(!state.can_search);
3402 assert_eq!(
3403 state.vector_index_path.as_deref(),
3404 Some(base_vector_path.as_path())
3405 );
3406 assert_ne!(
3407 state.vector_index_path.as_deref(),
3408 Some(outside_path.as_path())
3409 );
3410 }
3411
3412 fn make_active_snapshot(now_ms: i64) -> SearchMaintenanceSnapshot {
3417 SearchMaintenanceSnapshot {
3418 active: true,
3419 pid: Some(12345),
3420 started_at_ms: Some(now_ms - 5_000),
3421 db_path: Some(PathBuf::from("/tmp/cass/agent_search.db")),
3422 mode: Some(SearchMaintenanceMode::Index),
3423 job_id: Some("lexical_refresh-1000-12345".to_string()),
3424 job_kind: Some(SearchMaintenanceJobKind::LexicalRefresh),
3425 phase: Some("scanning".to_string()),
3426 updated_at_ms: Some(now_ms - 500),
3427 last_progress_at_ms: Some(now_ms - 500),
3428 orphaned: false,
3429 }
3430 }
3431
3432 #[test]
3433 fn coordination_no_active_job_when_snapshot_inactive() {
3434 let snapshot = SearchMaintenanceSnapshot::default();
3435 let outcome = evaluate_maintenance_coordination_from_snapshot(&snapshot, 1_733_000_000_000);
3436 assert_eq!(outcome, MaintenanceCoordinationOutcome::Idle);
3437 }
3438
3439 #[test]
3440 fn coordination_tracks_active_legacy_lock_without_job_id() {
3441 let snapshot = SearchMaintenanceSnapshot {
3442 active: true,
3443 pid: Some(12345),
3444 job_id: None,
3445 mode: Some(SearchMaintenanceMode::Index),
3446 ..Default::default()
3447 };
3448 let outcome = evaluate_maintenance_coordination_from_snapshot(&snapshot, 1_733_000_000_000);
3449 if let MaintenanceCoordinationOutcome::Active {
3450 ref job_id,
3451 job_kind,
3452 ..
3453 } = outcome
3454 {
3455 assert_eq!(job_id, "index-active-lock-12345");
3456 assert_eq!(job_kind, SearchMaintenanceJobKind::LexicalRefresh);
3457 } else {
3458 assert!(
3459 matches!(outcome, MaintenanceCoordinationOutcome::Active { .. }),
3460 "legacy active lock must remain active, got {outcome:?}"
3461 );
3462 }
3463 }
3464
3465 #[test]
3466 fn coordination_active_job_with_fresh_heartbeat() {
3467 let now_ms = 1_733_000_000_000i64;
3468 let snapshot = make_active_snapshot(now_ms);
3469 let outcome = evaluate_maintenance_coordination_from_snapshot(&snapshot, now_ms);
3470 if let MaintenanceCoordinationOutcome::Active {
3471 ref job_id,
3472 ref phase,
3473 ..
3474 } = outcome
3475 {
3476 assert_eq!(job_id, "lexical_refresh-1000-12345");
3477 assert_eq!(phase.as_deref(), Some("scanning"));
3478 } else {
3479 assert!(
3480 matches!(outcome, MaintenanceCoordinationOutcome::Active { .. }),
3481 "expected ActiveJob, got {outcome:?}"
3482 );
3483 }
3484 }
3485
3486 #[test]
3487 fn coordination_stale_job_with_old_heartbeat() {
3488 let now_ms = 1_733_000_000_000i64;
3489 let snapshot = SearchMaintenanceSnapshot {
3490 updated_at_ms: Some(now_ms - 60_000),
3491 ..make_active_snapshot(now_ms)
3492 };
3493 let outcome = evaluate_maintenance_coordination_from_snapshot(&snapshot, now_ms);
3494 if let MaintenanceCoordinationOutcome::Stale {
3495 ref job_id,
3496 ref reason,
3497 } = outcome
3498 {
3499 assert_eq!(job_id, "lexical_refresh-1000-12345");
3500 assert!(reason.contains("60000ms"), "reason={reason}");
3501 } else {
3502 assert!(
3503 matches!(outcome, MaintenanceCoordinationOutcome::Stale { .. }),
3504 "expected StaleJob, got {outcome:?}"
3505 );
3506 }
3507 }
3508
3509 #[test]
3510 fn coordination_missing_heartbeat_timestamp_still_respects_active_flock() {
3511 let now_ms = 1_733_000_000_000i64;
3512 let snapshot = SearchMaintenanceSnapshot {
3513 updated_at_ms: None,
3514 ..make_active_snapshot(now_ms)
3515 };
3516 let outcome = evaluate_maintenance_coordination_from_snapshot(&snapshot, now_ms);
3517 if let MaintenanceCoordinationOutcome::Active { updated_at_ms, .. } = outcome {
3518 assert_eq!(updated_at_ms, now_ms);
3519 } else {
3520 assert!(
3521 matches!(outcome, MaintenanceCoordinationOutcome::Active { .. }),
3522 "missing heartbeat metadata must not hide an active flock, got {outcome:?}"
3523 );
3524 }
3525 }
3526
3527 #[test]
3528 fn decision_launch_when_no_job() {
3529 let snapshot = SearchMaintenanceSnapshot::default();
3530 let decision = decide_maintenance_action_from_snapshot(&snapshot, 1_733_000_000_000);
3531 assert_eq!(decision, MaintenanceDecision::Launch);
3532 }
3533
3534 #[test]
3535 fn decision_launch_when_no_lock_file() {
3536 let temp = tempfile::tempdir().expect("tempdir");
3537 let decision = decide_maintenance_action(temp.path(), 1_733_000_000_000);
3538 assert_eq!(decision, MaintenanceDecision::Launch);
3539 }
3540
3541 #[test]
3542 fn decision_attaches_when_active_lock_has_stale_heartbeat() {
3543 let now_ms = 1_733_000_000_000i64;
3544 let snapshot = SearchMaintenanceSnapshot {
3545 updated_at_ms: Some(now_ms - 60_000),
3546 ..make_active_snapshot(now_ms)
3547 };
3548 let decision = decide_maintenance_action_from_snapshot(&snapshot, now_ms);
3549 if let MaintenanceDecision::AttachOrWait {
3550 ref job_id,
3551 ref phase,
3552 elapsed_ms,
3553 ..
3554 } = decision
3555 {
3556 assert_eq!(job_id, "lexical_refresh-1000-12345");
3557 assert_eq!(phase.as_deref(), Some("scanning"));
3558 assert_eq!(elapsed_ms, 5_000);
3559 } else {
3560 assert!(
3561 matches!(decision, MaintenanceDecision::AttachOrWait { .. }),
3562 "stale heartbeat still has an active lock, got {decision:?}"
3563 );
3564 }
3565 }
3566
3567 #[test]
3568 fn decision_attach_when_active_fresh_job() {
3569 let now_ms = 1_733_000_000_000i64;
3570 let snapshot = make_active_snapshot(now_ms);
3571 let decision = decide_maintenance_action_from_snapshot(&snapshot, now_ms);
3572 if let MaintenanceDecision::AttachOrWait {
3573 ref job_id,
3574 elapsed_ms,
3575 ..
3576 } = decision
3577 {
3578 assert_eq!(job_id, "lexical_refresh-1000-12345");
3579 assert_eq!(elapsed_ms, 5_000);
3580 } else {
3581 assert!(
3582 matches!(decision, MaintenanceDecision::AttachOrWait { .. }),
3583 "expected AttachOrWait, got {decision:?}"
3584 );
3585 }
3586 }
3587
3588 #[test]
3589 fn poll_returns_immediately_when_no_active_job() {
3590 let temp = tempfile::tempdir().expect("tempdir");
3591 let result = poll_maintenance_until_idle(
3592 temp.path(),
3593 Some(Duration::from_millis(500)),
3594 Some(Duration::from_millis(50)),
3595 );
3596 assert!(!result.timed_out);
3597 assert_eq!(result.polls, 1);
3598 assert!(
3599 matches!(result.outcome, MaintenanceCoordinationOutcome::Idle),
3600 "expected NoActiveJob"
3601 );
3602 assert!(
3603 result.elapsed <= Duration::from_millis(500),
3604 "immediate idle poll should finish before timeout, elapsed={:?}",
3605 result.elapsed
3606 );
3607 }
3608
3609 #[test]
3610 fn poll_returns_active_on_timeout_when_lock_held() {
3611 use fs2::FileExt;
3612 let temp = tempfile::tempdir().expect("tempdir");
3613 let lock_path = temp.path().join("index-run.lock");
3614 let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
3615 let pid = active_lock_fixture_pid(99999);
3616 let mut owner = OpenOptions::new()
3617 .create(true)
3618 .read(true)
3619 .write(true)
3620 .truncate(true)
3621 .open(&lock_path)
3622 .expect("open owner handle");
3623 owner.try_lock_exclusive().expect("acquire lock");
3624 write_locked_metadata(
3625 &lock_path,
3626 &mut owner,
3627 format!(
3628 "pid={pid}\nstarted_at_ms={}\nupdated_at_ms={}\ndb_path=/tmp/test.db\nmode=index\njob_id=test-job-1\njob_kind=lexical_refresh\nphase=scanning\n",
3629 now_ms - 1_000,
3630 now_ms,
3631 )
3632 .as_str(),
3633 )
3634 .expect("write lock metadata");
3635
3636 let result = poll_maintenance_until_idle(
3637 temp.path(),
3638 Some(Duration::from_millis(300)),
3639 Some(Duration::from_millis(50)),
3640 );
3641 assert!(result.timed_out, "should time out when lock is held");
3642 assert!(result.polls >= 2, "should have polled multiple times");
3643 assert!(
3644 matches!(
3645 result.outcome,
3646 MaintenanceCoordinationOutcome::Active { .. }
3647 ),
3648 "expected ActiveJob on timeout"
3649 );
3650
3651 let _ = FileExt::unlock(&owner);
3652 }
3653
3654 #[test]
3655 fn poll_times_out_instead_of_declaring_stale_held_lock_idle() {
3656 use fs2::FileExt;
3657 let temp = tempfile::tempdir().expect("tempdir");
3658 let lock_path = temp.path().join("index-run.lock");
3659 let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
3660 let pid = active_lock_fixture_pid(99999);
3661 let mut owner = OpenOptions::new()
3662 .create(true)
3663 .read(true)
3664 .write(true)
3665 .truncate(true)
3666 .open(&lock_path)
3667 .expect("open owner handle");
3668 owner.try_lock_exclusive().expect("acquire lock");
3669 write_locked_metadata(
3670 &lock_path,
3671 &mut owner,
3672 format!(
3673 "pid={pid}\nstarted_at_ms={}\nupdated_at_ms={}\ndb_path=/tmp/test.db\nmode=index\njob_id=test-job-stale\njob_kind=lexical_refresh\nphase=scanning\n",
3674 now_ms - 120_000,
3675 now_ms - 120_000,
3676 )
3677 .as_str(),
3678 )
3679 .expect("write lock metadata");
3680
3681 let result = poll_maintenance_until_idle(
3682 temp.path(),
3683 Some(Duration::from_millis(150)),
3684 Some(Duration::from_millis(25)),
3685 );
3686 assert!(result.timed_out, "held stale lock is still not idle");
3687 assert!(
3688 matches!(result.outcome, MaintenanceCoordinationOutcome::Stale { .. }),
3689 "expected stale held lock on timeout, got {:?}",
3690 result.outcome
3691 );
3692
3693 let _ = FileExt::unlock(&owner);
3694 }
3695
3696 #[test]
3697 fn poll_detects_release_mid_wait() {
3698 use fs2::FileExt;
3699 let temp = tempfile::tempdir().expect("tempdir");
3700 let lock_path = temp.path().join("index-run.lock");
3701 let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
3702 let pid = active_lock_fixture_pid(99999);
3703 let mut owner = OpenOptions::new()
3704 .create(true)
3705 .read(true)
3706 .write(true)
3707 .truncate(true)
3708 .open(&lock_path)
3709 .expect("open owner handle");
3710 owner.try_lock_exclusive().expect("acquire lock");
3711 write_locked_metadata(
3712 &lock_path,
3713 &mut owner,
3714 format!(
3715 "pid={pid}\nstarted_at_ms={}\nupdated_at_ms={}\ndb_path=/tmp/test.db\nmode=index\njob_id=test-job-2\njob_kind=lexical_refresh\nphase=committing\n",
3716 now_ms - 1_000,
3717 now_ms,
3718 )
3719 .as_str(),
3720 )
3721 .expect("write lock metadata");
3722
3723 let temp_path = temp.path().to_path_buf();
3724 let lock_path_for_release = lock_path.clone();
3725 let release_thread = std::thread::spawn(move || {
3726 std::thread::sleep(Duration::from_millis(150));
3727 let _ = owner.set_len(0);
3728 let _ = clear_index_run_lock_metadata_sidecar(&lock_path_for_release);
3729 let _ = FileExt::unlock(&owner);
3730 drop(owner);
3731 });
3732
3733 let result = poll_maintenance_until_idle(
3734 &temp_path,
3735 Some(Duration::from_secs(2)),
3736 Some(Duration::from_millis(50)),
3737 );
3738 assert!(!result.timed_out, "should detect release before timeout");
3739 release_thread.join().expect("release thread");
3740 }
3741
3742 #[test]
3743 fn failopen_returns_failopen_when_lexical_available_and_job_active() {
3744 let temp = tempfile::tempdir().expect("tempdir");
3745 let lock_path = temp.path().join("index-run.lock");
3746 let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
3747 let pid = active_lock_fixture_pid(99999);
3748
3749 use fs2::FileExt;
3750 let mut owner = OpenOptions::new()
3751 .create(true)
3752 .read(true)
3753 .write(true)
3754 .truncate(true)
3755 .open(&lock_path)
3756 .expect("open owner handle");
3757 owner.try_lock_exclusive().expect("acquire lock");
3758 write_locked_metadata(
3759 &lock_path,
3760 &mut owner,
3761 format!(
3762 "pid={pid}\nstarted_at_ms={}\nupdated_at_ms={}\ndb_path=/tmp/test.db\nmode=index\njob_id=fo-job-1\njob_kind=lexical_refresh\nphase=indexing\n",
3763 now_ms - 1_000,
3764 now_ms,
3765 )
3766 .as_str(),
3767 )
3768 .expect("write lock metadata");
3769
3770 let decision = decide_search_failopen(temp.path(), now_ms, true);
3771 if let MaintenanceDecision::FailOpen { ref reason } = decision {
3772 assert!(reason.contains("fo-job-1"), "reason={reason}");
3773 assert!(reason.contains("failing open"), "reason={reason}");
3774 } else {
3775 assert!(
3776 matches!(decision, MaintenanceDecision::FailOpen { .. }),
3777 "expected FailOpen, got {decision:?}"
3778 );
3779 }
3780
3781 let decision_no_lexical = decide_search_failopen(temp.path(), now_ms, false);
3782 assert!(
3783 matches!(
3784 decision_no_lexical,
3785 MaintenanceDecision::AttachOrWait { .. }
3786 ),
3787 "without lexical must attach, got {decision_no_lexical:?}"
3788 );
3789
3790 let _ = FileExt::unlock(&owner);
3791 }
3792
3793 #[test]
3794 fn failopen_handles_active_stale_heartbeat_without_launching_repair() {
3795 let temp = tempfile::tempdir().expect("tempdir");
3796 let lock_path = temp.path().join("index-run.lock");
3797 let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
3798 let pid = active_lock_fixture_pid(99999);
3799
3800 use fs2::FileExt;
3801 let mut owner = OpenOptions::new()
3802 .create(true)
3803 .read(true)
3804 .write(true)
3805 .truncate(true)
3806 .open(&lock_path)
3807 .expect("open owner handle");
3808 owner.try_lock_exclusive().expect("acquire lock");
3809 write_locked_metadata(
3810 &lock_path,
3811 &mut owner,
3812 format!(
3813 "pid={pid}\nstarted_at_ms={}\nupdated_at_ms={}\ndb_path=/tmp/test.db\nmode=index\njob_id=fo-stale-1\njob_kind=lexical_refresh\nphase=indexing\n",
3814 now_ms - 120_000,
3815 now_ms - 120_000,
3816 )
3817 .as_str(),
3818 )
3819 .expect("write lock metadata");
3820
3821 let decision = decide_search_failopen(temp.path(), now_ms, true);
3822 if let MaintenanceDecision::FailOpen { ref reason } = decision {
3823 assert!(reason.contains("fo-stale-1"), "reason={reason}");
3824 assert!(reason.contains("stale heartbeat"), "reason={reason}");
3825 assert!(reason.contains("failing open"), "reason={reason}");
3826 } else {
3827 assert!(
3828 matches!(decision, MaintenanceDecision::FailOpen { .. }),
3829 "expected FailOpen for searchable stale active lock, got {decision:?}"
3830 );
3831 }
3832
3833 let decision_no_lexical = decide_search_failopen(temp.path(), now_ms, false);
3834 assert!(
3835 matches!(
3836 decision_no_lexical,
3837 MaintenanceDecision::AttachOrWait { .. }
3838 ),
3839 "without lexical must wait for the held lock, got {decision_no_lexical:?}"
3840 );
3841
3842 let _ = FileExt::unlock(&owner);
3843 }
3844
3845 #[test]
3850 fn event_log_append_and_read() {
3851 let temp = tempfile::tempdir().expect("tempdir");
3852 let event = MaintenanceEvent {
3853 timestamp_ms: 1_733_000_000_000,
3854 job_id: "test-job-1".to_string(),
3855 actor_pid: 42,
3856 kind: MaintenanceEventKind::Started {
3857 job_kind: "lexical_refresh".to_string(),
3858 phase: "scanning".to_string(),
3859 },
3860 };
3861 append_maintenance_event(temp.path(), &event).expect("append");
3862 let events = read_maintenance_events(temp.path(), None, None);
3863 assert_eq!(events.len(), 1);
3864 assert_eq!(events[0].job_id, "test-job-1");
3865 assert_eq!(events[0].actor_pid, 42);
3866 assert!(matches!(
3867 events[0].kind,
3868 MaintenanceEventKind::Started { .. }
3869 ));
3870 }
3871
3872 #[test]
3873 fn event_log_filters_by_timestamp() {
3874 let temp = tempfile::tempdir().expect("tempdir");
3875 for i in 0..5 {
3876 let event = MaintenanceEvent {
3877 timestamp_ms: 1_000 + i,
3878 job_id: format!("job-{i}"),
3879 actor_pid: 1,
3880 kind: MaintenanceEventKind::Progress {
3881 processed: i as u64,
3882 total: 5,
3883 },
3884 };
3885 append_maintenance_event(temp.path(), &event).expect("append");
3886 }
3887 let events = read_maintenance_events(temp.path(), Some(1_002), None);
3888 assert_eq!(events.len(), 2, "should only get events after ts 1002");
3889 assert_eq!(events[0].timestamp_ms, 1_003);
3890 assert_eq!(events[1].timestamp_ms, 1_004);
3891 }
3892
3893 #[test]
3894 fn event_log_respects_limit() {
3895 let temp = tempfile::tempdir().expect("tempdir");
3896 for i in 0..10 {
3897 let event = MaintenanceEvent {
3898 timestamp_ms: 1_000 + i,
3899 job_id: format!("job-{i}"),
3900 actor_pid: 1,
3901 kind: MaintenanceEventKind::Resumed,
3902 };
3903 append_maintenance_event(temp.path(), &event).expect("append");
3904 }
3905 let events = read_maintenance_events(temp.path(), None, Some(3));
3906 assert_eq!(events.len(), 3);
3907 assert_eq!(events[0].timestamp_ms, 1_007);
3908 assert_eq!(events[2].timestamp_ms, 1_009);
3909 }
3910
3911 #[test]
3912 fn event_log_returns_empty_when_missing() {
3913 let temp = tempfile::tempdir().expect("tempdir");
3914 let events = read_maintenance_events(temp.path(), None, None);
3915 assert!(events.is_empty());
3916 }
3917
3918 #[test]
3919 fn event_log_truncation_retains_tail() {
3920 let temp = tempfile::tempdir().expect("tempdir");
3921 for i in 0..550 {
3922 let event = MaintenanceEvent {
3923 timestamp_ms: i,
3924 job_id: format!("job-{i}"),
3925 actor_pid: 1,
3926 kind: MaintenanceEventKind::Resumed,
3927 };
3928 append_maintenance_event(temp.path(), &event).expect("append");
3929 }
3930 let before = read_maintenance_events(temp.path(), None, Some(600));
3931 assert_eq!(before.len(), 550);
3932 truncate_maintenance_event_log(temp.path()).expect("truncate");
3933 let after = read_maintenance_events(temp.path(), None, Some(600));
3934 assert_eq!(after.len(), MAX_EVENT_LOG_ENTRIES);
3935 assert_eq!(after[0].timestamp_ms, 50);
3936 assert_eq!(after[MAX_EVENT_LOG_ENTRIES - 1].timestamp_ms, 549);
3937 }
3938
3939 #[test]
3940 fn yield_signal_round_trip() {
3941 let temp = tempfile::tempdir().expect("tempdir");
3942 assert!(
3943 check_yield_requested(temp.path()).is_none(),
3944 "no signal initially"
3945 );
3946 request_yield(temp.path(), "foreground search pressure").expect("request yield");
3947 let req = check_yield_requested(temp.path()).expect("yield should be present");
3948 assert_eq!(req.requester_pid, std::process::id());
3949 assert_eq!(req.reason, "foreground search pressure");
3950 assert!(req.requested_at_ms > 0);
3951 clear_yield_signal(temp.path()).expect("clear");
3952 assert!(
3953 check_yield_requested(temp.path()).is_none(),
3954 "signal cleared"
3955 );
3956 }
3957
3958 #[test]
3959 fn clear_yield_signal_is_idempotent() {
3960 let temp = tempfile::tempdir().expect("tempdir");
3961 clear_yield_signal(temp.path()).expect("clear nonexistent");
3962 clear_yield_signal(temp.path()).expect("clear again");
3963 }
3964
3965 #[test]
3966 fn unified_view_idle_no_events() {
3967 let temp = tempfile::tempdir().expect("tempdir");
3968 let view = unified_maintenance_view(temp.path(), true);
3969 assert!(matches!(
3970 view.coordination,
3971 MaintenanceCoordinationOutcome::Idle
3972 ));
3973 assert!(view.yield_pending.is_none());
3974 assert!(view.recent_events.is_empty());
3975 assert_eq!(view.decision, MaintenanceDecision::Launch);
3976 }
3977
3978 #[test]
3979 fn unified_view_active_with_lexical_fails_open() {
3980 use fs2::FileExt;
3981 let temp = tempfile::tempdir().expect("tempdir");
3982 let lock_path = temp.path().join("index-run.lock");
3983 let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
3984 let pid = active_lock_fixture_pid(99999);
3985 let mut owner = OpenOptions::new()
3986 .create(true)
3987 .read(true)
3988 .write(true)
3989 .truncate(true)
3990 .open(&lock_path)
3991 .expect("open");
3992 owner.try_lock_exclusive().expect("lock");
3993 write_locked_metadata(
3994 &lock_path,
3995 &mut owner,
3996 format!(
3997 "pid={pid}\nstarted_at_ms={}\nupdated_at_ms={}\ndb_path=/tmp/t.db\nmode=index\njob_id=uv-1\njob_kind=lexical_refresh\nphase=indexing\n",
3998 now_ms - 1_000,
3999 now_ms,
4000 )
4001 .as_str(),
4002 )
4003 .expect("write metadata");
4004
4005 let event = MaintenanceEvent {
4006 timestamp_ms: now_ms,
4007 job_id: "uv-1".to_string(),
4008 actor_pid: pid,
4009 kind: MaintenanceEventKind::Started {
4010 job_kind: "lexical_refresh".to_string(),
4011 phase: "indexing".to_string(),
4012 },
4013 };
4014 append_maintenance_event(temp.path(), &event).expect("append");
4015
4016 let view = unified_maintenance_view(temp.path(), true);
4017 assert!(matches!(
4018 view.coordination,
4019 MaintenanceCoordinationOutcome::Active { .. }
4020 ));
4021 assert!(matches!(
4022 view.decision,
4023 MaintenanceDecision::FailOpen { .. }
4024 ));
4025 assert_eq!(view.recent_events.len(), 1);
4026
4027 let _ = FileExt::unlock(&owner);
4028 }
4029
4030 #[test]
4031 fn unified_view_includes_yield_signal() {
4032 let temp = tempfile::tempdir().expect("tempdir");
4033 request_yield(temp.path(), "test yield").expect("yield");
4034 let view = unified_maintenance_view(temp.path(), true);
4035 assert!(view.yield_pending.is_some());
4036 assert_eq!(view.yield_pending.as_ref().unwrap().reason, "test yield");
4037 clear_yield_signal(temp.path()).expect("clear");
4038 }
4039
4040 #[test]
4041 fn event_kinds_serialize_round_trip() {
4042 let temp = tempfile::tempdir().expect("tempdir");
4043 let kinds = vec![
4044 MaintenanceEventKind::Started {
4045 job_kind: "lexical_refresh".to_string(),
4046 phase: "init".to_string(),
4047 },
4048 MaintenanceEventKind::PhaseChanged {
4049 from: "init".to_string(),
4050 to: "scanning".to_string(),
4051 },
4052 MaintenanceEventKind::Progress {
4053 processed: 50,
4054 total: 100,
4055 },
4056 MaintenanceEventKind::YieldRequested {
4057 requester_pid: 42,
4058 reason: "foreground".to_string(),
4059 },
4060 MaintenanceEventKind::Paused {
4061 reason: "yield".to_string(),
4062 },
4063 MaintenanceEventKind::Resumed,
4064 MaintenanceEventKind::Completed {
4065 summary: "done".to_string(),
4066 },
4067 MaintenanceEventKind::Failed {
4068 error: "oops".to_string(),
4069 },
4070 MaintenanceEventKind::Cancelled {
4071 reason: "user".to_string(),
4072 },
4073 ];
4074 for (i, kind) in kinds.into_iter().enumerate() {
4075 let event = MaintenanceEvent {
4076 timestamp_ms: i as i64,
4077 job_id: "rt-test".to_string(),
4078 actor_pid: 1,
4079 kind,
4080 };
4081 append_maintenance_event(temp.path(), &event).expect("append");
4082 }
4083 let events = read_maintenance_events(temp.path(), None, None);
4084 assert_eq!(events.len(), 9);
4085 assert!(matches!(
4086 events[0].kind,
4087 MaintenanceEventKind::Started { .. }
4088 ));
4089 assert!(matches!(events[5].kind, MaintenanceEventKind::Resumed));
4090 assert!(matches!(
4091 events[8].kind,
4092 MaintenanceEventKind::Cancelled { .. }
4093 ));
4094 }
4095}