1use std::fs::OpenOptions;
13use std::io::Read;
14use std::path::{Path, PathBuf};
15use std::time::{Duration, Instant};
16
17use anyhow::{Context, Result};
18use fs2::FileExt;
19
20use crate::indexer::{
21 LEXICAL_REBUILD_PAGE_SIZE_PUBLIC, LexicalRebuildCheckpoint,
22 lexical_rebuild_page_size_is_compatible, lexical_storage_fingerprint_for_db,
23 load_lexical_rebuild_checkpoint,
24};
25use crate::search::ann_index::hnsw_index_path;
26use crate::search::embedder::Embedder;
27use crate::search::fastembed_embedder::FastEmbedder;
28use crate::search::hash_embedder::HashEmbedder;
29use crate::search::model_manager::{
30 SemanticAvailability, probe_hash_semantic_availability, probe_semantic_availability,
31};
32use crate::search::policy::{
33 CHUNKING_STRATEGY_VERSION, CliSemanticOverrides, SEMANTIC_SCHEMA_VERSION, SemanticPolicy,
34};
35use crate::search::semantic_manifest::{
36 ArtifactRecord, BuildCheckpoint, SemanticManifest, SemanticShardManifest, SemanticShardRecord,
37 TierKind, semantic_shard_artifact_path_is_safe,
38};
39use crate::search::tantivy::SCHEMA_HASH;
40use crate::search::vector_index::{VECTOR_INDEX_DIR, vector_index_path};
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
43pub(crate) enum SearchMaintenanceMode {
44 Index,
45 WatchStartup,
46 Watch,
47 WatchOnce,
48}
49
50impl SearchMaintenanceMode {
51 pub(crate) fn as_lock_value(self) -> &'static str {
52 match self {
53 Self::Index => "index",
54 Self::WatchStartup => "watch_startup",
55 Self::Watch => "watch",
56 Self::WatchOnce => "watch_once",
57 }
58 }
59
60 pub(crate) fn parse_lock_value(raw: &str) -> Option<Self> {
61 match raw.trim() {
62 "index" => Some(Self::Index),
63 "watch_startup" => Some(Self::WatchStartup),
64 "watch" => Some(Self::Watch),
65 "watch_once" => Some(Self::WatchOnce),
66 _ => None,
67 }
68 }
69
70 pub(crate) fn watch_active(self) -> bool {
71 matches!(self, Self::WatchStartup | Self::Watch)
72 }
73
74 pub(crate) fn rebuild_active(self) -> bool {
75 !matches!(self, Self::Watch)
76 }
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
80pub(crate) enum SearchMaintenanceJobKind {
81 LexicalRefresh,
82 SemanticAcquire,
83}
84
85impl SearchMaintenanceJobKind {
86 pub(crate) fn as_lock_value(self) -> &'static str {
87 match self {
88 Self::LexicalRefresh => "lexical_refresh",
89 Self::SemanticAcquire => "semantic_acquire",
90 }
91 }
92
93 pub(crate) fn parse_lock_value(raw: &str) -> Option<Self> {
94 match raw.trim() {
95 "lexical_refresh" => Some(Self::LexicalRefresh),
96 "semantic_acquire" => Some(Self::SemanticAcquire),
97 _ => None,
98 }
99 }
100}
101
102#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize)]
103pub(crate) struct SearchMaintenanceSnapshot {
104 pub active: bool,
105 pub pid: Option<u32>,
106 pub started_at_ms: Option<i64>,
107 pub db_path: Option<PathBuf>,
108 pub mode: Option<SearchMaintenanceMode>,
109 pub job_id: Option<String>,
110 pub job_kind: Option<SearchMaintenanceJobKind>,
111 pub phase: Option<String>,
112 pub updated_at_ms: Option<i64>,
113 pub last_progress_at_ms: Option<i64>,
114 pub orphaned: bool,
115}
116
117pub(crate) fn read_search_maintenance_snapshot(data_dir: &Path) -> SearchMaintenanceSnapshot {
118 const MAX_LOCK_FILE_READ: u64 = 64 * 1024;
123
124 let lock_path = data_dir.join("index-run.lock");
125 let file = match OpenOptions::new().read(true).write(true).open(&lock_path) {
126 Ok(file) => file,
127 Err(_) => return SearchMaintenanceSnapshot::default(),
128 };
129
130 let mut raw = String::new();
131 let _ = (&file).take(MAX_LOCK_FILE_READ).read_to_string(&mut raw);
132
133 let mut pid = None;
134 let mut started_at_ms = None;
135 let mut lock_db_path = None::<PathBuf>;
136 let mut mode = None;
137 let mut job_id = None;
138 let mut job_kind = None;
139 let mut phase = None;
140 let mut updated_at_ms = None;
141 let mut last_progress_at_ms = None;
142 for line in raw.lines() {
143 let Some((key, value)) = line.split_once('=') else {
144 continue;
145 };
146 match key.trim() {
147 "pid" => pid = value.trim().parse::<u32>().ok(),
148 "started_at_ms" => started_at_ms = value.trim().parse::<i64>().ok(),
149 "db_path" => lock_db_path = Some(PathBuf::from(value.trim())),
150 "mode" => mode = SearchMaintenanceMode::parse_lock_value(value),
151 "job_id" => job_id = Some(value.trim().to_string()).filter(|value| !value.is_empty()),
152 "job_kind" => job_kind = SearchMaintenanceJobKind::parse_lock_value(value),
153 "phase" => phase = Some(value.trim().to_string()).filter(|value| !value.is_empty()),
154 "updated_at_ms" => updated_at_ms = value.trim().parse::<i64>().ok(),
155 "last_progress_at_ms" => last_progress_at_ms = value.trim().parse::<i64>().ok(),
156 _ => {}
157 }
158 }
159
160 let metadata_present = pid.is_some()
161 || started_at_ms.is_some()
162 || lock_db_path.is_some()
163 || mode.is_some()
164 || job_id.is_some()
165 || job_kind.is_some()
166 || phase.is_some()
167 || updated_at_ms.is_some()
168 || last_progress_at_ms.is_some();
169
170 let active = match file.try_lock_exclusive() {
171 Ok(()) => {
172 if metadata_present {
196 if let Err(err) = file.set_len(0) {
197 tracing::warn!(
198 path = %lock_path.display(),
199 error = %err,
200 "failed to truncate stale index-run lock metadata"
201 );
202 } else {
203 let _ = file.sync_all();
204 tracing::info!(
205 path = %lock_path.display(),
206 stale_pid = ?pid,
207 "cleared stale index-run lock metadata (previous owner gone)"
208 );
209 let _ = file.unlock();
210 return SearchMaintenanceSnapshot::default();
211 }
212 }
213 let _ = file.unlock();
214 false
215 }
216 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => true,
217 Err(_) => false,
218 };
219
220 SearchMaintenanceSnapshot {
221 active,
222 pid,
223 started_at_ms,
224 db_path: lock_db_path,
225 mode,
226 job_id,
227 job_kind,
228 phase,
229 updated_at_ms,
230 last_progress_at_ms,
231 orphaned: metadata_present && !active,
232 }
233}
234
235pub(crate) const REBUILD_STALL_DETECT_SECS_DEFAULT: u64 = 120;
236
237pub(crate) fn rebuild_stall_detect_threshold_ms() -> Option<i64> {
238 let threshold_secs = dotenvy::var("CASS_REBUILD_STALL_DETECT_SECS")
239 .ok()
240 .and_then(|value| value.trim().parse::<u64>().ok())
241 .unwrap_or(REBUILD_STALL_DETECT_SECS_DEFAULT);
242 if threshold_secs == 0 {
243 return None;
244 }
245 let threshold_ms = threshold_secs.saturating_mul(1000);
246 Some(i64::try_from(threshold_ms).unwrap_or(i64::MAX))
247}
248
249pub(crate) fn maintenance_stall_age_ms(
250 snapshot: &SearchMaintenanceSnapshot,
251 now_ms: i64,
252) -> Option<i64> {
253 if !snapshot.active
254 || !snapshot
255 .mode
256 .is_some_and(SearchMaintenanceMode::rebuild_active)
257 {
258 return None;
259 }
260 let last_progress_at_ms = snapshot.last_progress_at_ms?;
261 let age_ms = now_ms.saturating_sub(last_progress_at_ms);
262 let threshold_ms = rebuild_stall_detect_threshold_ms()?;
263 (age_ms >= threshold_ms).then_some(age_ms)
264}
265
266#[cfg_attr(not(test), allow(dead_code))]
267#[derive(Debug, Clone, Copy, PartialEq, Eq)]
268pub(crate) enum SemanticPreference {
269 DefaultModel,
270 HashFallback,
271}
272
273#[derive(Debug, Clone, PartialEq, Eq)]
274pub(crate) struct SearchAssetSnapshot {
275 pub lexical: LexicalAssetState,
276 pub semantic: SemanticAssetState,
277}
278
279#[derive(Debug, Clone, PartialEq, Eq)]
280pub(crate) struct LexicalAssetState {
281 pub status: &'static str,
282 pub exists: bool,
283 pub fresh: bool,
284 pub stale: bool,
285 pub rebuilding: bool,
286 pub stalled: bool,
293 pub last_progress_age_ms: Option<i64>,
298 pub last_progress_at_ms: Option<i64>,
299 pub watch_active: bool,
300 pub last_indexed_at_ms: Option<i64>,
301 pub age_seconds: Option<u64>,
302 pub stale_threshold_seconds: u64,
303 pub activity_at_ms: Option<i64>,
304 pub pending_sessions: u64,
305 pub processed_conversations: Option<u64>,
306 pub total_conversations: Option<u64>,
307 pub indexed_docs: Option<u64>,
308 pub status_reason: Option<String>,
309 pub fingerprint: LexicalFingerprintState,
310 pub checkpoint: LexicalCheckpointState,
311}
312
313#[derive(Debug, Clone, PartialEq, Eq)]
314pub(crate) struct LexicalFingerprintState {
315 pub current_db_fingerprint: Option<String>,
316 pub checkpoint_fingerprint: Option<String>,
317 pub matches_current_db_fingerprint: Option<bool>,
318}
319
320#[derive(Debug, Clone, PartialEq, Eq)]
321pub(crate) struct LexicalCheckpointState {
322 pub present: bool,
323 pub completed: Option<bool>,
324 pub db_matches: Option<bool>,
325 pub schema_matches: Option<bool>,
326 pub page_size_matches: Option<bool>,
327 pub page_size_compatible: Option<bool>,
328}
329
330#[derive(Debug, Clone, PartialEq, Eq)]
331pub(crate) struct SemanticAssetState {
332 pub status: &'static str,
333 pub availability: &'static str,
334 pub summary: String,
335 pub available: bool,
336 pub can_search: bool,
337 pub fallback_mode: Option<&'static str>,
338 pub preferred_backend: &'static str,
339 pub embedder_id: Option<String>,
340 pub vector_index_path: Option<PathBuf>,
341 pub model_dir: Option<PathBuf>,
342 pub hnsw_path: Option<PathBuf>,
343 pub hnsw_ready: bool,
344 pub progressive_ready: bool,
345 pub quality_tier_published: bool,
355 pub semantic_only_search_available: bool,
362 pub hint: Option<String>,
363 pub fast_tier: SemanticTierAssetState,
364 pub quality_tier: SemanticTierAssetState,
365 pub backlog: SemanticBacklogProgressState,
366 pub checkpoint: SemanticCheckpointProgressState,
367}
368
369struct SemanticRuntimeSurface {
370 status: &'static str,
371 availability: &'static str,
372 summary: String,
373 can_search: bool,
374 fallback_mode: Option<&'static str>,
375 hint: Option<String>,
376 embedder_id: Option<String>,
377 vector_index_path: Option<PathBuf>,
378 model_dir: Option<PathBuf>,
379 hnsw_path: Option<PathBuf>,
380}
381
382struct SemanticRuntimeInputs<'a> {
383 data_dir: &'a Path,
384 availability: &'a SemanticAvailability,
385 preference: SemanticPreference,
386 fast_tier: &'a SemanticTierAssetState,
387 quality_tier: &'a SemanticTierAssetState,
388 backlog: &'a SemanticBacklogProgressState,
389 checkpoint: &'a SemanticCheckpointProgressState,
390 base_embedder_id: Option<String>,
391 base_vector_index_path: Option<PathBuf>,
392 base_model_dir: Option<PathBuf>,
393 base_hnsw_path: Option<PathBuf>,
394}
395
396struct SemanticPreferenceSurface {
397 preferred_backend: &'static str,
398 model_dir: Option<PathBuf>,
399}
400
401#[derive(Debug, Clone, Default, PartialEq, Eq)]
402pub(crate) struct SemanticTierAssetState {
403 pub present: bool,
404 pub ready: bool,
405 pub current_db_matches: Option<bool>,
406 pub conversation_count: Option<u64>,
407 pub doc_count: Option<u64>,
408 pub embedder_id: Option<String>,
409 pub model_revision: Option<String>,
410 pub completed_at_ms: Option<i64>,
411 pub size_bytes: Option<u64>,
412 pub index_path: Option<PathBuf>,
413}
414
415#[derive(Debug, Clone, Default, PartialEq, Eq)]
416pub(crate) struct SemanticBacklogProgressState {
417 pub total_conversations: u64,
418 pub fast_tier_processed: u64,
419 pub fast_tier_remaining: u64,
420 pub quality_tier_processed: u64,
421 pub quality_tier_remaining: u64,
422 pub pending_work: bool,
423 pub current_db_matches: Option<bool>,
424 pub computed_at_ms: Option<i64>,
425}
426
427#[derive(Debug, Clone, Default, PartialEq, Eq)]
428pub(crate) struct SemanticCheckpointProgressState {
429 pub active: bool,
430 pub tier: Option<&'static str>,
431 pub current_db_matches: Option<bool>,
432 pub completed: Option<bool>,
433 pub conversations_processed: Option<u64>,
434 pub total_conversations: Option<u64>,
435 pub progress_pct: Option<u8>,
436 pub docs_embedded: Option<u64>,
437 pub last_offset: Option<i64>,
438 pub saved_at_ms: Option<i64>,
439}
440
441pub(crate) struct InspectSearchAssetsInput<'a> {
442 pub data_dir: &'a Path,
443 pub db_path: &'a Path,
444 pub stale_threshold: u64,
445 pub last_indexed_at_ms: Option<i64>,
446 pub now_ms: i64,
454 pub maintenance: SearchMaintenanceSnapshot,
455 pub semantic_preference: SemanticPreference,
456 pub db_available: bool,
457 pub compute_lexical_fingerprint: bool,
458 pub inspect_semantic: bool,
459}
460
461const LEXICAL_STORAGE_FINGERPRINT_MTIME_TOLERANCE_MS: i64 = 1_000;
462
463#[derive(Debug, Clone, Copy, PartialEq, Eq)]
464struct ParsedLexicalStorageFingerprint {
465 db_len: u64,
466 db_mtime_ms: i64,
467 wal_len: u64,
468 wal_mtime_ms: i64,
469}
470
471fn parse_lexical_storage_fingerprint(raw: &str) -> Option<ParsedLexicalStorageFingerprint> {
472 let mut parts = raw.split(':');
473 let fingerprint = ParsedLexicalStorageFingerprint {
474 db_len: parts.next()?.parse().ok()?,
475 db_mtime_ms: parts.next()?.parse().ok()?,
476 wal_len: parts.next()?.parse().ok()?,
477 wal_mtime_ms: parts.next()?.parse().ok()?,
478 };
479 if parts.next().is_some() {
480 return None;
481 }
482 Some(fingerprint)
483}
484
485pub(crate) fn lexical_storage_fingerprints_match(current: &str, saved: &str) -> bool {
486 match (
487 parse_lexical_storage_fingerprint(current),
488 parse_lexical_storage_fingerprint(saved),
489 ) {
490 (Some(current), Some(saved)) => {
491 current.db_len == saved.db_len
492 && current.wal_len == saved.wal_len
493 && current.db_mtime_ms.abs_diff(saved.db_mtime_ms)
494 <= u64::try_from(LEXICAL_STORAGE_FINGERPRINT_MTIME_TOLERANCE_MS)
495 .unwrap_or(u64::MAX)
496 && current.wal_mtime_ms.abs_diff(saved.wal_mtime_ms)
497 <= u64::try_from(LEXICAL_STORAGE_FINGERPRINT_MTIME_TOLERANCE_MS)
498 .unwrap_or(u64::MAX)
499 }
500 _ => current == saved,
501 }
502}
503
504pub(crate) fn inspect_search_assets(
505 input: InspectSearchAssetsInput<'_>,
506) -> Result<SearchAssetSnapshot> {
507 let InspectSearchAssetsInput {
508 data_dir,
509 db_path,
510 stale_threshold,
511 last_indexed_at_ms,
512 now_ms,
513 maintenance,
514 semantic_preference,
515 db_available,
516 compute_lexical_fingerprint,
517 inspect_semantic,
518 } = input;
519
520 let lexical = inspect_lexical_assets(InspectLexicalAssetsInput {
521 data_dir,
522 db_path,
523 stale_threshold,
524 last_indexed_at_ms,
525 now_ms,
526 maintenance,
527 db_available,
528 compute_lexical_fingerprint,
529 })?;
530 let current_db_fingerprint = lexical.fingerprint.current_db_fingerprint.as_deref();
531 let semantic = if inspect_semantic {
532 inspect_semantic_assets(
533 data_dir,
534 db_path,
535 semantic_preference,
536 current_db_fingerprint,
537 db_available,
538 )
539 } else {
540 semantic_state_not_inspected(data_dir, semantic_preference, current_db_fingerprint)
541 };
542
543 Ok(SearchAssetSnapshot { lexical, semantic })
544}
545
546fn semantic_state_not_inspected(
547 data_dir: &Path,
548 preference: SemanticPreference,
549 current_db_fingerprint: Option<&str>,
550) -> SemanticAssetState {
551 let (fast_tier, quality_tier, backlog, checkpoint) =
552 semantic_manifest_progress(data_dir, current_db_fingerprint);
553 let preference_surface = semantic_preference_surface(data_dir, preference);
554
555 SemanticAssetState {
556 status: "not_inspected",
557 availability: "not_inspected",
558 summary: "semantic assets were not inspected for this fast path".to_string(),
559 available: false,
560 can_search: false,
561 fallback_mode: Some("lexical"),
562 preferred_backend: preference_surface.preferred_backend,
563 embedder_id: None,
564 vector_index_path: None,
565 model_dir: preference_surface.model_dir,
566 hnsw_path: None,
567 hnsw_ready: false,
568 progressive_ready: semantic_progressive_assets_ready(data_dir),
569 quality_tier_published: false,
575 semantic_only_search_available: false,
576 hint: Some(
577 "Use 'cass status --json' or 'cass models status --json' for semantic readiness."
578 .to_string(),
579 ),
580 fast_tier,
581 quality_tier,
582 backlog,
583 checkpoint,
584 }
585}
586
587pub(crate) fn inspect_semantic_assets(
588 data_dir: &Path,
589 db_path: &Path,
590 preference: SemanticPreference,
591 current_db_fingerprint: Option<&str>,
592 db_available: bool,
593) -> SemanticAssetState {
594 if !db_available {
595 let availability = SemanticAvailability::DatabaseUnavailable {
596 db_path: db_path.to_path_buf(),
597 error: "database unavailable during asset inspection".to_string(),
598 };
599 return semantic_state_from_availability(
600 data_dir,
601 &availability,
602 preference,
603 current_db_fingerprint,
604 );
605 }
606
607 let availability = match preference {
608 SemanticPreference::DefaultModel => probe_semantic_availability(data_dir),
609 SemanticPreference::HashFallback => probe_hash_semantic_availability(data_dir),
610 };
611 semantic_state_from_availability(data_dir, &availability, preference, current_db_fingerprint)
612}
613
614pub(crate) fn semantic_state_from_availability(
615 data_dir: &Path,
616 availability: &SemanticAvailability,
617 preference: SemanticPreference,
618 current_db_fingerprint: Option<&str>,
619) -> SemanticAssetState {
620 let (mut fast_tier, mut quality_tier, backlog, checkpoint) =
621 semantic_manifest_progress(data_dir, current_db_fingerprint);
622 let preference_surface = semantic_preference_surface(data_dir, preference);
623 let base_embedder_id = semantic_embedder_id(availability, preference);
624 if let (Some(db_fingerprint), Some(embedder_id)) =
625 (current_db_fingerprint, base_embedder_id.as_deref())
626 {
627 promote_complete_shard_generation_state(
628 data_dir,
629 TierKind::Fast,
630 embedder_id,
631 db_fingerprint,
632 &mut fast_tier,
633 );
634 promote_complete_shard_generation_state(
635 data_dir,
636 TierKind::Quality,
637 embedder_id,
638 db_fingerprint,
639 &mut quality_tier,
640 );
641 }
642 let base_vector_index_path = semantic_vector_index_path(data_dir, availability, preference);
643 let base_model_dir = preference_surface.model_dir;
644 let base_hnsw_path = base_embedder_id
645 .as_deref()
646 .map(|embedder_id| hnsw_index_path(data_dir, embedder_id));
647 let runtime = semantic_runtime_surface(SemanticRuntimeInputs {
648 data_dir,
649 availability,
650 preference,
651 fast_tier: &fast_tier,
652 quality_tier: &quality_tier,
653 backlog: &backlog,
654 checkpoint: &checkpoint,
655 base_embedder_id: base_embedder_id.clone(),
656 base_vector_index_path: base_vector_index_path.clone(),
657 base_model_dir: base_model_dir.clone(),
658 base_hnsw_path: base_hnsw_path.clone(),
659 });
660 let use_runtime_paths = runtime.embedder_id.is_some();
661 let embedder_id = runtime.embedder_id.or(base_embedder_id);
662 let vector_index_path = if use_runtime_paths {
663 runtime.vector_index_path
664 } else {
665 runtime.vector_index_path.or(base_vector_index_path)
666 };
667 let model_dir = if use_runtime_paths {
668 runtime.model_dir
669 } else {
670 runtime.model_dir.or(base_model_dir)
671 };
672 let hnsw_path = if use_runtime_paths {
673 runtime.hnsw_path
674 } else {
675 runtime.hnsw_path.or(base_hnsw_path)
676 };
677 let hnsw_ready = hnsw_path.as_ref().is_some_and(|path| path.is_file());
678 let progressive_ready = semantic_progressive_assets_ready(data_dir);
679
680 let quality_tier_published = semantic_tier_queryable(availability, &quality_tier);
685 let fast_tier_queryable = semantic_tier_queryable(availability, &fast_tier);
686 let semantic_only_search_available = quality_tier_published || fast_tier_queryable;
687
688 SemanticAssetState {
689 status: runtime.status,
690 availability: runtime.availability,
691 summary: runtime.summary,
692 available: runtime.can_search,
693 can_search: runtime.can_search,
694 fallback_mode: runtime.fallback_mode,
695 preferred_backend: preference_surface.preferred_backend,
696 embedder_id,
697 vector_index_path,
698 model_dir,
699 hnsw_path,
700 hnsw_ready,
701 progressive_ready,
702 quality_tier_published,
703 semantic_only_search_available,
704 hint: runtime.hint,
705 fast_tier,
706 quality_tier,
707 backlog,
708 checkpoint,
709 }
710}
711
712fn semantic_preference_surface(
713 data_dir: &Path,
714 preference: SemanticPreference,
715) -> SemanticPreferenceSurface {
716 match preference {
717 SemanticPreference::DefaultModel => SemanticPreferenceSurface {
718 preferred_backend: "fastembed",
719 model_dir: active_policy_model_dir(data_dir),
720 },
721 SemanticPreference::HashFallback => SemanticPreferenceSurface {
722 preferred_backend: "hash",
723 model_dir: None,
724 },
725 }
726}
727
728fn semantic_runtime_surface(inputs: SemanticRuntimeInputs<'_>) -> SemanticRuntimeSurface {
729 let SemanticRuntimeInputs {
730 data_dir,
731 availability,
732 preference,
733 fast_tier,
734 quality_tier,
735 backlog,
736 checkpoint,
737 base_embedder_id,
738 base_vector_index_path,
739 base_model_dir,
740 base_hnsw_path,
741 } = inputs;
742 let base_status = semantic_status_from_availability(availability);
743 let base_availability = semantic_availability_code(availability);
744 let base_summary = availability.summary();
745 let base_can_search = availability.can_search();
746 let base_hint = semantic_hint(availability, preference);
747
748 if matches!(
749 availability,
750 SemanticAvailability::Disabled { .. }
751 | SemanticAvailability::DatabaseUnavailable { .. }
752 | SemanticAvailability::LoadFailed { .. }
753 ) {
754 return SemanticRuntimeSurface {
755 status: base_status,
756 availability: base_availability,
757 summary: base_summary,
758 can_search: base_can_search,
759 fallback_mode: (!base_can_search).then_some("lexical"),
760 hint: base_hint,
761 embedder_id: base_embedder_id,
762 vector_index_path: base_vector_index_path,
763 model_dir: base_model_dir,
764 hnsw_path: base_hnsw_path,
765 };
766 }
767
768 let quality_queryable = semantic_tier_queryable(availability, quality_tier);
769 let fast_queryable = semantic_tier_queryable(availability, fast_tier);
770 let checkpoint_active = checkpoint.active;
771 let backlog_pending = backlog.pending_work;
772 let manifest_assets_present = fast_tier.present || quality_tier.present;
773 let backfill_active = checkpoint_active || backlog_pending;
774
775 let effective_embedder_id = if quality_queryable {
776 quality_tier.embedder_id.clone()
777 } else if fast_queryable {
778 fast_tier.embedder_id.clone()
779 } else {
780 None
781 };
782 let effective_vector_index_path = if quality_queryable {
783 quality_tier.index_path.clone()
784 } else if fast_queryable {
785 fast_tier.index_path.clone()
786 } else {
787 None
788 }
789 .or_else(|| {
790 effective_embedder_id
791 .as_deref()
792 .map(|embedder_id| vector_index_path(data_dir, embedder_id))
793 });
794 let effective_model_dir = effective_embedder_id.as_deref().and_then(|embedder_id| {
795 (!semantic_embedder_is_hash(embedder_id))
796 .then(|| model_dir_for_embedder_id(data_dir, embedder_id))
797 .flatten()
798 });
799 let effective_hnsw_path = effective_embedder_id
800 .as_deref()
801 .map(|embedder_id| hnsw_index_path(data_dir, embedder_id));
802
803 if quality_queryable || fast_queryable {
804 let fully_ready = (quality_queryable || fast_queryable) && !backfill_active;
805 let summary = if quality_queryable && backfill_active {
806 "semantic quality tier is usable; residual semantic backfill is still finishing"
807 .to_string()
808 } else if quality_queryable {
809 "semantic quality tier ready".to_string()
810 } else if backfill_active {
811 "semantic fast tier is usable; higher-quality semantic backfill is still in progress"
812 .to_string()
813 } else {
814 "semantic fast tier ready".to_string()
815 };
816 let hint = if backfill_active {
817 Some(
818 "Semantic refinement is already usable; continue searching while higher-quality backfill finishes."
819 .to_string(),
820 )
821 } else {
822 None
823 };
824 return SemanticRuntimeSurface {
825 status: if fully_ready { "ready" } else { "building" },
826 availability: if fully_ready {
827 "ready"
828 } else {
829 "index_building"
830 },
831 summary,
832 can_search: true,
833 fallback_mode: None,
834 hint,
835 embedder_id: effective_embedder_id,
836 vector_index_path: effective_vector_index_path,
837 model_dir: effective_model_dir,
838 hnsw_path: effective_hnsw_path,
839 };
840 }
841
842 if backfill_active {
843 return SemanticRuntimeSurface {
844 status: "building",
845 availability: "index_building",
846 summary: "semantic backfill is in progress for the current database".to_string(),
847 can_search: false,
848 fallback_mode: Some("lexical"),
849 hint: Some(
850 "Run 'cass index --semantic' to finish backfilling current semantic assets; search will use lexical fallback until then."
851 .to_string(),
852 ),
853 embedder_id: base_embedder_id,
854 vector_index_path: base_vector_index_path,
855 model_dir: base_model_dir,
856 hnsw_path: base_hnsw_path,
857 };
858 }
859
860 if manifest_assets_present {
861 return SemanticRuntimeSurface {
862 status: "stale",
863 availability: "update_available",
864 summary: "semantic artifacts exist but do not match the current database".to_string(),
865 can_search: false,
866 fallback_mode: Some("lexical"),
867 hint: Some(
868 "Run 'cass index --semantic' to refresh semantic assets for the current database; search will use lexical fallback until then."
869 .to_string(),
870 ),
871 embedder_id: base_embedder_id,
872 vector_index_path: base_vector_index_path,
873 model_dir: base_model_dir,
874 hnsw_path: base_hnsw_path,
875 };
876 }
877
878 SemanticRuntimeSurface {
879 status: base_status,
880 availability: base_availability,
881 summary: base_summary,
882 can_search: base_can_search,
883 fallback_mode: (!base_can_search).then_some("lexical"),
884 hint: base_hint,
885 embedder_id: base_embedder_id,
886 vector_index_path: base_vector_index_path,
887 model_dir: base_model_dir,
888 hnsw_path: base_hnsw_path,
889 }
890}
891
892fn active_policy_model_dir(data_dir: &Path) -> Option<PathBuf> {
893 let policy = SemanticPolicy::resolve(&CliSemanticOverrides::default());
894 let embedder_name = FastEmbedder::canonical_name(&policy.quality_tier_embedder)?;
895 FastEmbedder::runtime_model_dir_for(data_dir, embedder_name)
896}
897
898fn model_dir_for_embedder_id(data_dir: &Path, embedder_id: &str) -> Option<PathBuf> {
899 let embedder_name = FastEmbedder::canonical_name(embedder_id)?;
900 FastEmbedder::runtime_model_dir_for(data_dir, embedder_name)
901}
902
903fn semantic_tier_queryable(
904 availability: &SemanticAvailability,
905 tier: &SemanticTierAssetState,
906) -> bool {
907 if !tier.ready || tier.current_db_matches == Some(false) {
908 return false;
909 }
910 let Some(embedder_id) = tier.embedder_id.as_deref() else {
911 return false;
912 };
913 if semantic_embedder_is_hash(embedder_id) {
914 !matches!(
915 availability,
916 SemanticAvailability::Disabled { .. }
917 | SemanticAvailability::DatabaseUnavailable { .. }
918 | SemanticAvailability::LoadFailed { .. }
919 )
920 } else {
921 matches!(
922 availability,
923 SemanticAvailability::Ready { .. }
924 | SemanticAvailability::UpdateAvailable { .. }
925 | SemanticAvailability::IndexBuilding { .. }
926 | SemanticAvailability::IndexMissing { .. }
927 )
928 }
929}
930
931fn semantic_embedder_is_hash(embedder_id: &str) -> bool {
932 embedder_id == HashEmbedder::default().id()
933}
934
935fn semantic_manifest_progress(
936 data_dir: &Path,
937 current_db_fingerprint: Option<&str>,
938) -> (
939 SemanticTierAssetState,
940 SemanticTierAssetState,
941 SemanticBacklogProgressState,
942 SemanticCheckpointProgressState,
943) {
944 let manifest = SemanticManifest::load_or_default(data_dir).unwrap_or_default();
945 let fast_tier = semantic_tier_asset_state(manifest.fast_tier.as_ref(), current_db_fingerprint);
946 let quality_tier =
947 semantic_tier_asset_state(manifest.quality_tier.as_ref(), current_db_fingerprint);
948 let backlog = semantic_backlog_progress_state(&manifest, current_db_fingerprint);
949 let checkpoint =
950 semantic_checkpoint_progress_state(manifest.checkpoint.as_ref(), current_db_fingerprint);
951 (fast_tier, quality_tier, backlog, checkpoint)
952}
953
954fn semantic_tier_asset_state(
955 artifact: Option<&ArtifactRecord>,
956 current_db_fingerprint: Option<&str>,
957) -> SemanticTierAssetState {
958 let Some(artifact) = artifact else {
959 return SemanticTierAssetState::default();
960 };
961
962 SemanticTierAssetState {
963 present: true,
964 ready: artifact.ready,
965 current_db_matches: current_db_fingerprint.map(|fp| artifact.db_fingerprint == fp),
966 conversation_count: Some(artifact.conversation_count),
967 doc_count: Some(artifact.doc_count),
968 embedder_id: Some(artifact.embedder_id.clone()),
969 model_revision: Some(artifact.model_revision.clone()),
970 completed_at_ms: Some(artifact.completed_at_ms),
971 size_bytes: Some(artifact.size_bytes),
972 index_path: None,
973 }
974}
975
976fn resolve_semantic_artifact_path(data_dir: &Path, recorded_path: &str) -> Option<PathBuf> {
977 semantic_shard_artifact_path_is_safe(recorded_path).then(|| data_dir.join(recorded_path))
978}
979
980fn complete_shard_records_for_state(
981 data_dir: &Path,
982 tier: TierKind,
983 embedder_id: &str,
984 db_fingerprint: &str,
985) -> Option<Vec<SemanticShardRecord>> {
986 let manifest = SemanticShardManifest::load(data_dir).ok().flatten()?;
987 let summary = manifest.summary(tier, embedder_id, db_fingerprint);
988 if !summary.complete {
989 return None;
990 }
991 let mut records = manifest
992 .shards
993 .into_iter()
994 .filter(|shard| shard.matches_generation(tier, embedder_id, db_fingerprint))
995 .collect::<Vec<_>>();
996 records.sort_by_key(|shard| shard.shard_index);
997 if records.len() != usize::try_from(summary.shard_count).unwrap_or(usize::MAX) {
998 return None;
999 }
1000 let first = records.first()?;
1001 for (expected_index, shard) in records.iter().enumerate() {
1002 if shard.shard_index != u32::try_from(expected_index).unwrap_or(u32::MAX)
1003 || !shard.ready
1004 || !shard.mmap_ready
1005 || shard.model_revision != first.model_revision
1006 || shard.schema_version != SEMANTIC_SCHEMA_VERSION
1007 || shard.chunking_version != CHUNKING_STRATEGY_VERSION
1008 || shard.dimension == 0
1009 || shard.dimension != first.dimension
1010 || shard.total_conversations != first.total_conversations
1011 {
1012 return None;
1013 }
1014 let artifact_path = resolve_semantic_artifact_path(data_dir, &shard.index_path)?;
1015 if !artifact_path.is_file() {
1016 return None;
1017 }
1018 }
1019 Some(records)
1020}
1021
1022fn promote_complete_shard_generation_state(
1023 data_dir: &Path,
1024 tier: TierKind,
1025 embedder_id: &str,
1026 db_fingerprint: &str,
1027 state: &mut SemanticTierAssetState,
1028) {
1029 if state.ready && state.current_db_matches == Some(true) {
1030 return;
1031 }
1032 let Some(records) =
1033 complete_shard_records_for_state(data_dir, tier, embedder_id, db_fingerprint)
1034 else {
1035 return;
1036 };
1037 let doc_count = records
1038 .iter()
1039 .map(|shard| shard.doc_count)
1040 .fold(0, u64::saturating_add);
1041 let size_bytes = records
1042 .iter()
1043 .map(|shard| shard.size_bytes)
1044 .fold(0, u64::saturating_add);
1045 let completed_at_ms = records
1046 .iter()
1047 .map(|shard| shard.completed_at_ms)
1048 .max()
1049 .unwrap_or(0);
1050 let first = &records[0];
1051 let Some(first_index_path) = resolve_semantic_artifact_path(data_dir, &first.index_path) else {
1052 return;
1053 };
1054 *state = SemanticTierAssetState {
1055 present: true,
1056 ready: true,
1057 current_db_matches: Some(true),
1058 conversation_count: Some(first.total_conversations),
1059 doc_count: Some(doc_count),
1060 embedder_id: Some(first.embedder_id.clone()),
1061 model_revision: Some(first.model_revision.clone()),
1062 completed_at_ms: Some(completed_at_ms),
1063 size_bytes: Some(size_bytes),
1064 index_path: Some(first_index_path),
1065 };
1066}
1067
1068fn semantic_backlog_progress_state(
1069 manifest: &SemanticManifest,
1070 current_db_fingerprint: Option<&str>,
1071) -> SemanticBacklogProgressState {
1072 let backlog = &manifest.backlog;
1073 let current_db_matches = current_db_fingerprint.and_then(|fp| {
1074 (backlog.computed_at_ms > 0 || !backlog.db_fingerprint.is_empty())
1075 .then(|| backlog.is_current(fp))
1076 });
1077
1078 SemanticBacklogProgressState {
1079 total_conversations: backlog.total_conversations,
1080 fast_tier_processed: backlog.fast_tier_processed,
1081 fast_tier_remaining: backlog.fast_tier_remaining(),
1082 quality_tier_processed: backlog.quality_tier_processed,
1083 quality_tier_remaining: backlog.quality_tier_remaining(),
1084 pending_work: backlog.has_pending_work() || manifest.checkpoint.is_some(),
1085 current_db_matches,
1086 computed_at_ms: (backlog.computed_at_ms > 0).then_some(backlog.computed_at_ms),
1087 }
1088}
1089
1090fn semantic_checkpoint_progress_state(
1091 checkpoint: Option<&BuildCheckpoint>,
1092 current_db_fingerprint: Option<&str>,
1093) -> SemanticCheckpointProgressState {
1094 let Some(checkpoint) = checkpoint else {
1095 return SemanticCheckpointProgressState::default();
1096 };
1097
1098 SemanticCheckpointProgressState {
1099 active: true,
1100 tier: Some(checkpoint.tier.as_str()),
1101 current_db_matches: current_db_fingerprint.map(|fp| checkpoint.is_valid(fp)),
1102 completed: Some(checkpoint.is_complete()),
1103 conversations_processed: Some(checkpoint.conversations_processed),
1104 total_conversations: Some(checkpoint.total_conversations),
1105 progress_pct: Some(checkpoint.progress_pct()),
1106 docs_embedded: Some(checkpoint.docs_embedded),
1107 last_offset: Some(checkpoint.last_offset),
1108 saved_at_ms: Some(checkpoint.saved_at_ms),
1109 }
1110}
1111
1112struct InspectLexicalAssetsInput<'a> {
1113 data_dir: &'a Path,
1114 db_path: &'a Path,
1115 stale_threshold: u64,
1116 last_indexed_at_ms: Option<i64>,
1117 now_ms: i64,
1122 maintenance: SearchMaintenanceSnapshot,
1123 db_available: bool,
1124 compute_lexical_fingerprint: bool,
1125}
1126
1127fn inspect_lexical_assets(input: InspectLexicalAssetsInput<'_>) -> Result<LexicalAssetState> {
1128 let InspectLexicalAssetsInput {
1129 data_dir,
1130 db_path,
1131 stale_threshold,
1132 last_indexed_at_ms,
1133 now_ms,
1134 maintenance,
1135 db_available,
1136 compute_lexical_fingerprint,
1137 } = input;
1138 let index_path = crate::search::tantivy::expected_index_dir(data_dir);
1139 let checkpoint = load_lexical_rebuild_checkpoint(&index_path)
1140 .with_context(|| format!("loading lexical checkpoint from {}", index_path.display()))?;
1141 let current_db_fingerprint = if db_available && compute_lexical_fingerprint {
1142 Some(
1143 lexical_storage_fingerprint_for_db(db_path).with_context(|| {
1144 format!(
1145 "computing lexical storage fingerprint for {}",
1146 db_path.display()
1147 )
1148 })?,
1149 )
1150 } else {
1151 None
1152 };
1153
1154 Ok(lexical_state_from_observations(LexicalObservationInput {
1155 index_path: &index_path,
1156 db_path,
1157 stale_threshold,
1158 last_indexed_at_ms,
1159 now_ms,
1160 maintenance,
1161 checkpoint: checkpoint.as_ref(),
1162 current_db_fingerprint: current_db_fingerprint.as_deref(),
1163 }))
1164}
1165
1166struct LexicalObservationInput<'a> {
1167 index_path: &'a Path,
1168 db_path: &'a Path,
1169 stale_threshold: u64,
1170 last_indexed_at_ms: Option<i64>,
1171 now_ms: i64,
1173 maintenance: SearchMaintenanceSnapshot,
1174 checkpoint: Option<&'a LexicalRebuildCheckpoint>,
1175 current_db_fingerprint: Option<&'a str>,
1176}
1177
1178fn lexical_state_from_observations(input: LexicalObservationInput<'_>) -> LexicalAssetState {
1179 let LexicalObservationInput {
1180 index_path,
1181 db_path,
1182 stale_threshold,
1183 last_indexed_at_ms,
1184 now_ms,
1185 maintenance,
1186 checkpoint,
1187 current_db_fingerprint,
1188 } = input;
1189 let exists = crate::search::tantivy::searchable_index_exists(index_path);
1190 let checkpoint_db_matches =
1191 checkpoint.map(|state| crate::stored_path_identity_matches(&state.db_path, db_path));
1192 let schema_matches = checkpoint.map(|state| state.schema_hash == SCHEMA_HASH);
1193 let page_size_matches =
1194 checkpoint.map(|state| state.page_size == LEXICAL_REBUILD_PAGE_SIZE_PUBLIC);
1195 let page_size_compatible =
1196 checkpoint.map(|state| lexical_rebuild_page_size_is_compatible(state.page_size));
1197 let checkpoint_fingerprint = checkpoint.map(|state| state.storage_fingerprint.as_str());
1198 let fingerprint_matches = match (current_db_fingerprint, checkpoint_fingerprint) {
1199 (Some(current), Some(saved)) => Some(lexical_storage_fingerprints_match(current, saved)),
1200 _ => None,
1201 };
1202 let checkpoint_incomplete = checkpoint.is_some_and(|state| !state.completed);
1203 let checkpoint_db_mismatch = checkpoint_db_matches == Some(false);
1204 let contract_mismatch = schema_matches == Some(false) || page_size_compatible == Some(false);
1205 let fingerprint_mismatch = fingerprint_matches == Some(false);
1206 let now_secs: u64 = now_ms.div_euclid(1000).max(0) as u64;
1212 let age_seconds = last_indexed_at_ms
1213 .and_then(|ts| (ts > 0).then(|| now_secs.saturating_sub((ts / 1000) as u64)));
1214 let age_stale = match age_seconds {
1215 Some(age) => age > stale_threshold,
1216 None => true,
1217 };
1218 let maintenance_targets_current_db = maintenance
1219 .db_path
1220 .as_ref()
1221 .is_none_or(|lock_db_path| crate::path_identities_match(lock_db_path, db_path));
1222 let watch_active = maintenance.active
1223 && maintenance_targets_current_db
1224 && maintenance
1225 .mode
1226 .is_some_and(SearchMaintenanceMode::watch_active);
1227 let rebuilding = maintenance.active
1228 && maintenance_targets_current_db
1229 && maintenance
1230 .mode
1231 .is_some_and(SearchMaintenanceMode::rebuild_active);
1232 let active_rebuild_progress = rebuilding;
1233 let stall_age_ms = if rebuilding && maintenance_targets_current_db {
1244 maintenance_stall_age_ms(&maintenance, now_ms)
1245 } else {
1246 None
1247 };
1248 let stalled = stall_age_ms.is_some();
1249 let last_progress_at_ms = maintenance
1250 .last_progress_at_ms
1251 .filter(|_| maintenance_targets_current_db);
1252 let last_progress_age_ms = last_progress_at_ms
1253 .filter(|_| rebuilding)
1254 .map(|ts| now_ms.saturating_sub(ts));
1255 let stale = if rebuilding {
1256 !exists || contract_mismatch
1260 } else {
1261 exists
1262 && (age_stale
1263 || checkpoint_db_mismatch
1264 || checkpoint_incomplete
1265 || contract_mismatch
1266 || fingerprint_mismatch)
1267 };
1268 let fresh = exists && !stale && !rebuilding;
1269 let status = if stalled {
1270 "stalled"
1271 } else if rebuilding {
1272 "building"
1273 } else if !exists {
1274 "missing"
1275 } else if stale {
1276 "stale"
1277 } else {
1278 "ready"
1279 };
1280 let status_reason = if stalled {
1281 let secs = stall_age_ms.unwrap_or(0) / 1000;
1282 Some(format!(
1283 "indexing thread has not posted forward progress for {secs}s while the lock heartbeat keeps refreshing — see issue #258 for diagnostics (run `cass doctor check --json` and capture a stack trace)"
1284 ))
1285 } else if rebuilding {
1286 Some("lexical rebuild is in progress".to_string())
1287 } else if !exists {
1288 Some("lexical Tantivy metadata missing".to_string())
1289 } else if checkpoint_db_mismatch {
1290 Some("lexical rebuild checkpoint points at a different database".to_string())
1291 } else if contract_mismatch {
1292 Some("lexical rebuild checkpoint no longer matches the active lexical contract".to_string())
1293 } else if fingerprint_mismatch {
1294 Some("database fingerprint changed since the last lexical checkpoint".to_string())
1295 } else if checkpoint_incomplete {
1296 Some("lexical rebuild checkpoint is incomplete".to_string())
1297 } else if age_stale {
1298 Some("lexical index is older than the stale threshold".to_string())
1299 } else {
1300 None
1301 };
1302 let checkpoint_progress_usable = checkpoint.is_some()
1303 && checkpoint_db_matches == Some(true)
1304 && schema_matches == Some(true)
1305 && page_size_compatible == Some(true)
1306 && if active_rebuild_progress {
1307 true
1308 } else {
1309 current_db_fingerprint.is_some() && fingerprint_matches == Some(true)
1310 };
1311 let pending_sessions = checkpoint
1312 .filter(|_| checkpoint_progress_usable)
1313 .map(|state| {
1314 state
1315 .total_conversations
1316 .saturating_sub(state.processed_conversations) as u64
1317 })
1318 .unwrap_or(0);
1319 let maintenance_activity_at_ms = maintenance_targets_current_db
1320 .then_some(())
1321 .and(maintenance.updated_at_ms.or(maintenance.started_at_ms));
1322 let checkpoint_activity_at_ms = checkpoint
1323 .filter(|_| checkpoint_progress_usable)
1324 .and_then(|state| (state.updated_at_ms > 0).then_some(state.updated_at_ms));
1325 let activity_at_ms = match (checkpoint_activity_at_ms, maintenance_activity_at_ms) {
1326 (Some(checkpoint_ts), Some(maintenance_ts)) => Some(checkpoint_ts.max(maintenance_ts)),
1327 (Some(checkpoint_ts), None) => Some(checkpoint_ts),
1328 (None, Some(maintenance_ts)) => Some(maintenance_ts),
1329 (None, None) => None,
1330 };
1331
1332 LexicalAssetState {
1333 status,
1334 exists,
1335 fresh,
1336 stale,
1337 rebuilding,
1338 stalled,
1339 last_progress_age_ms,
1340 last_progress_at_ms,
1341 watch_active,
1342 last_indexed_at_ms,
1343 age_seconds,
1344 stale_threshold_seconds: stale_threshold,
1345 activity_at_ms,
1346 pending_sessions,
1347 processed_conversations: checkpoint
1348 .filter(|_| checkpoint_progress_usable)
1349 .map(|state| state.processed_conversations as u64),
1350 total_conversations: checkpoint
1351 .filter(|_| checkpoint_progress_usable)
1352 .map(|state| state.total_conversations as u64),
1353 indexed_docs: checkpoint
1354 .filter(|_| checkpoint_progress_usable)
1355 .map(|state| state.indexed_docs as u64),
1356 status_reason,
1357 fingerprint: LexicalFingerprintState {
1358 current_db_fingerprint: current_db_fingerprint.map(ToOwned::to_owned),
1359 checkpoint_fingerprint: checkpoint.map(|state| state.storage_fingerprint.clone()),
1360 matches_current_db_fingerprint: fingerprint_matches,
1361 },
1362 checkpoint: LexicalCheckpointState {
1363 present: checkpoint.is_some(),
1364 completed: checkpoint.map(|state| state.completed),
1365 db_matches: checkpoint_db_matches,
1366 schema_matches,
1367 page_size_matches,
1368 page_size_compatible,
1369 },
1370 }
1371}
1372
1373fn semantic_embedder_id(
1374 availability: &SemanticAvailability,
1375 preference: SemanticPreference,
1376) -> Option<String> {
1377 match availability {
1378 SemanticAvailability::Ready { embedder_id }
1379 | SemanticAvailability::UpdateAvailable { embedder_id, .. }
1380 | SemanticAvailability::IndexBuilding { embedder_id, .. } => Some(embedder_id.clone()),
1381 SemanticAvailability::HashFallback => Some(HashEmbedder::default().id().to_string()),
1382 _ => match preference {
1383 SemanticPreference::DefaultModel => {
1384 Some(FastEmbedder::embedder_id_static().to_string())
1385 }
1386 SemanticPreference::HashFallback => Some(HashEmbedder::default().id().to_string()),
1387 },
1388 }
1389}
1390
1391fn semantic_vector_index_path(
1392 data_dir: &Path,
1393 availability: &SemanticAvailability,
1394 preference: SemanticPreference,
1395) -> Option<PathBuf> {
1396 match availability {
1397 SemanticAvailability::IndexMissing { index_path } => Some(index_path.clone()),
1398 _ => semantic_embedder_id(availability, preference)
1399 .map(|embedder_id| vector_index_path(data_dir, &embedder_id)),
1400 }
1401}
1402
1403fn semantic_progressive_assets_ready(data_dir: &Path) -> bool {
1404 let vector_dir = data_dir.join(VECTOR_INDEX_DIR);
1405 vector_dir.join("vector.fast.idx").is_file() && vector_dir.join("vector.quality.idx").is_file()
1406}
1407
1408fn semantic_availability_code(availability: &SemanticAvailability) -> &'static str {
1409 match availability {
1410 SemanticAvailability::Ready { .. } => "ready",
1411 SemanticAvailability::NotInstalled => "not_installed",
1412 SemanticAvailability::NeedsConsent => "needs_consent",
1413 SemanticAvailability::Downloading { .. } => "downloading",
1414 SemanticAvailability::Verifying => "verifying",
1415 SemanticAvailability::IndexBuilding { .. } => "index_building",
1416 SemanticAvailability::HashFallback => "hash_fallback",
1417 SemanticAvailability::Disabled { .. } => "disabled",
1418 SemanticAvailability::ModelMissing { .. } => "model_missing",
1419 SemanticAvailability::IndexMissing { .. } => "index_missing",
1420 SemanticAvailability::DatabaseUnavailable { .. } => "database_unavailable",
1421 SemanticAvailability::LoadFailed { .. } => "load_failed",
1422 SemanticAvailability::UpdateAvailable { .. } => "update_available",
1423 }
1424}
1425
1426fn semantic_status_from_availability(availability: &SemanticAvailability) -> &'static str {
1427 match availability {
1428 SemanticAvailability::Ready { .. } => "ready",
1429 SemanticAvailability::HashFallback => "hash_fallback",
1430 SemanticAvailability::Downloading { .. }
1431 | SemanticAvailability::Verifying
1432 | SemanticAvailability::IndexBuilding { .. } => "building",
1433 SemanticAvailability::Disabled { .. } => "disabled",
1434 SemanticAvailability::UpdateAvailable { .. } => "stale",
1435 SemanticAvailability::NotInstalled
1436 | SemanticAvailability::NeedsConsent
1437 | SemanticAvailability::ModelMissing { .. }
1438 | SemanticAvailability::IndexMissing { .. } => "missing",
1439 SemanticAvailability::DatabaseUnavailable { .. }
1440 | SemanticAvailability::LoadFailed { .. } => "error",
1441 }
1442}
1443
1444fn semantic_hint(
1445 availability: &SemanticAvailability,
1446 preference: SemanticPreference,
1447) -> Option<String> {
1448 let hint = match (preference, availability) {
1449 (SemanticPreference::HashFallback, SemanticAvailability::IndexMissing { .. }) => {
1450 "Run 'cass index --semantic --embedder hash' to build the hash vector index; lexical search remains available without semantic assets"
1451 }
1452 (SemanticPreference::HashFallback, SemanticAvailability::LoadFailed { .. })
1453 | (SemanticPreference::HashFallback, SemanticAvailability::DatabaseUnavailable { .. }) => {
1454 "Run 'cass index --semantic --embedder hash' after the database is healthy; lexical search remains available"
1455 }
1456 (SemanticPreference::HashFallback, _) => {
1457 "Run 'cass index --semantic --embedder hash' to build the hash vector index; lexical search remains available"
1458 }
1459 (_, SemanticAvailability::NotInstalled)
1460 | (_, SemanticAvailability::NeedsConsent)
1461 | (_, SemanticAvailability::ModelMissing { .. }) => {
1462 "Run 'cass models install' and then 'cass index --semantic'; lexical search remains available without the model"
1463 }
1464 (_, SemanticAvailability::IndexMissing { .. })
1465 | (_, SemanticAvailability::UpdateAvailable { .. })
1466 | (_, SemanticAvailability::IndexBuilding { .. }) => {
1467 "Run 'cass index --semantic' to build or refresh vector assets; lexical search remains available"
1468 }
1469 (_, SemanticAvailability::Downloading { .. }) | (_, SemanticAvailability::Verifying) => {
1470 "Wait for the semantic model installation to finish; lexical search remains available"
1471 }
1472 (_, SemanticAvailability::Disabled { .. }) => {
1473 "Semantic search is disabled by policy; lexical search remains available, or re-enable semantic search"
1474 }
1475 (_, SemanticAvailability::DatabaseUnavailable { .. })
1476 | (_, SemanticAvailability::LoadFailed { .. }) => {
1477 "Restore the semantic assets and database; lexical search remains available when the archive database is healthy"
1478 }
1479 (_, SemanticAvailability::Ready { .. }) | (_, SemanticAvailability::HashFallback) => {
1480 return None;
1481 }
1482 };
1483 Some(hint.to_string())
1484}
1485
1486#[cfg_attr(not(test), allow(dead_code))]
1491const HEARTBEAT_STALE_THRESHOLD_MS: i64 = 30_000;
1492#[cfg_attr(not(test), allow(dead_code))]
1493const BOUNDED_WAIT_DEFAULT: Duration = Duration::from_secs(5);
1494#[cfg_attr(not(test), allow(dead_code))]
1495const POLL_INTERVAL_DEFAULT: Duration = Duration::from_millis(250);
1496
1497#[cfg_attr(not(test), allow(dead_code))]
1498#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
1499pub(crate) enum MaintenanceCoordinationOutcome {
1500 Idle,
1501 Active {
1502 job_id: String,
1503 job_kind: SearchMaintenanceJobKind,
1504 phase: Option<String>,
1505 started_at_ms: i64,
1506 updated_at_ms: i64,
1507 },
1508 Stale {
1509 job_id: String,
1510 reason: String,
1511 },
1512}
1513
1514#[cfg_attr(not(test), allow(dead_code))]
1515#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
1516pub(crate) enum MaintenanceDecision {
1517 Launch,
1518 AttachOrWait {
1519 job_id: String,
1520 job_kind: SearchMaintenanceJobKind,
1521 phase: Option<String>,
1522 elapsed_ms: u64,
1523 },
1524 FailOpen {
1525 reason: String,
1526 },
1527}
1528
1529#[cfg_attr(not(test), allow(dead_code))]
1530pub(crate) fn evaluate_maintenance_coordination(
1531 data_dir: &Path,
1532 now_ms: i64,
1533) -> MaintenanceCoordinationOutcome {
1534 evaluate_maintenance_coordination_from_snapshot(
1535 &read_search_maintenance_snapshot(data_dir),
1536 now_ms,
1537 )
1538}
1539
1540#[cfg_attr(not(test), allow(dead_code))]
1541pub(crate) fn evaluate_maintenance_coordination_from_snapshot(
1542 snapshot: &SearchMaintenanceSnapshot,
1543 now_ms: i64,
1544) -> MaintenanceCoordinationOutcome {
1545 if !snapshot.active {
1546 return MaintenanceCoordinationOutcome::Idle;
1547 }
1548 let job_id = maintenance_snapshot_job_id(snapshot);
1549 if let Some(updated_at_ms) = snapshot.updated_at_ms {
1550 let heartbeat_age_ms = now_ms.saturating_sub(updated_at_ms);
1551 if heartbeat_age_ms > HEARTBEAT_STALE_THRESHOLD_MS {
1552 return MaintenanceCoordinationOutcome::Stale {
1553 job_id,
1554 reason: format!(
1555 "heartbeat is {heartbeat_age_ms}ms old (threshold {HEARTBEAT_STALE_THRESHOLD_MS}ms)"
1556 ),
1557 };
1558 }
1559 }
1560 if let Some(stall_age_ms) = maintenance_stall_age_ms(snapshot, now_ms) {
1567 let threshold_ms = rebuild_stall_detect_threshold_ms().unwrap_or(0);
1568 return MaintenanceCoordinationOutcome::Stale {
1569 job_id,
1570 reason: format!(
1571 "indexing thread has not posted forward progress for {stall_age_ms}ms while the heartbeat keeps refreshing (stall threshold {threshold_ms}ms) — see issue #258"
1572 ),
1573 };
1574 }
1575 MaintenanceCoordinationOutcome::Active {
1576 job_id,
1577 job_kind: snapshot
1578 .job_kind
1579 .unwrap_or(SearchMaintenanceJobKind::LexicalRefresh),
1580 phase: snapshot.phase.clone(),
1581 started_at_ms: snapshot.started_at_ms.unwrap_or(0),
1582 updated_at_ms: snapshot.updated_at_ms.unwrap_or(now_ms),
1583 }
1584}
1585
1586fn maintenance_snapshot_job_id(snapshot: &SearchMaintenanceSnapshot) -> String {
1587 snapshot
1588 .job_id
1589 .as_ref()
1590 .filter(|job_id| !job_id.is_empty())
1591 .cloned()
1592 .unwrap_or_else(|| {
1593 let mode = snapshot
1594 .mode
1595 .map(|mode| mode.as_lock_value())
1596 .unwrap_or("unknown");
1597 let owner = snapshot
1598 .pid
1599 .map(|pid| pid.to_string())
1600 .unwrap_or_else(|| "unknown-owner".to_string());
1601 format!("{mode}-active-lock-{owner}")
1602 })
1603}
1604
1605fn maintenance_snapshot_job_kind(snapshot: &SearchMaintenanceSnapshot) -> SearchMaintenanceJobKind {
1606 snapshot
1607 .job_kind
1608 .unwrap_or(SearchMaintenanceJobKind::LexicalRefresh)
1609}
1610
1611fn maintenance_elapsed_ms(snapshot: &SearchMaintenanceSnapshot, now_ms: i64) -> u64 {
1612 snapshot
1613 .started_at_ms
1614 .map(|started_at_ms| u64::try_from(now_ms.saturating_sub(started_at_ms)).unwrap_or(0))
1615 .unwrap_or(0)
1616}
1617
1618fn stale_heartbeat_phase(snapshot: &SearchMaintenanceSnapshot) -> Option<String> {
1619 snapshot
1620 .phase
1621 .clone()
1622 .or_else(|| Some("stale-heartbeat".to_string()))
1623}
1624
1625#[cfg_attr(not(test), allow(dead_code))]
1626pub(crate) fn decide_maintenance_action(data_dir: &Path, now_ms: i64) -> MaintenanceDecision {
1627 decide_maintenance_action_from_snapshot(&read_search_maintenance_snapshot(data_dir), now_ms)
1628}
1629
1630#[cfg_attr(not(test), allow(dead_code))]
1631pub(crate) fn decide_maintenance_action_from_snapshot(
1632 snapshot: &SearchMaintenanceSnapshot,
1633 now_ms: i64,
1634) -> MaintenanceDecision {
1635 match evaluate_maintenance_coordination_from_snapshot(snapshot, now_ms) {
1636 MaintenanceCoordinationOutcome::Idle => MaintenanceDecision::Launch,
1637 MaintenanceCoordinationOutcome::Stale { job_id, .. } => MaintenanceDecision::AttachOrWait {
1638 job_id,
1639 job_kind: maintenance_snapshot_job_kind(snapshot),
1640 phase: stale_heartbeat_phase(snapshot),
1641 elapsed_ms: maintenance_elapsed_ms(snapshot, now_ms),
1642 },
1643 MaintenanceCoordinationOutcome::Active {
1644 job_id,
1645 job_kind,
1646 phase,
1647 started_at_ms,
1648 ..
1649 } => MaintenanceDecision::AttachOrWait {
1650 job_id,
1651 job_kind,
1652 phase,
1653 elapsed_ms: u64::try_from(now_ms.saturating_sub(started_at_ms)).unwrap_or(0),
1654 },
1655 }
1656}
1657
1658#[cfg_attr(not(test), allow(dead_code))]
1659pub(crate) fn decide_search_failopen(
1660 data_dir: &Path,
1661 now_ms: i64,
1662 lexical_available: bool,
1663) -> MaintenanceDecision {
1664 let snapshot = read_search_maintenance_snapshot(data_dir);
1665 match evaluate_maintenance_coordination_from_snapshot(&snapshot, now_ms) {
1666 MaintenanceCoordinationOutcome::Idle => MaintenanceDecision::Launch,
1667 MaintenanceCoordinationOutcome::Stale { job_id, reason } => {
1668 if lexical_available {
1669 MaintenanceDecision::FailOpen {
1670 reason: format!(
1671 "maintenance job {job_id} has a stale heartbeat ({reason}); lexical index is available, failing open"
1672 ),
1673 }
1674 } else {
1675 MaintenanceDecision::AttachOrWait {
1676 job_id,
1677 job_kind: maintenance_snapshot_job_kind(&snapshot),
1678 phase: stale_heartbeat_phase(&snapshot),
1679 elapsed_ms: maintenance_elapsed_ms(&snapshot, now_ms),
1680 }
1681 }
1682 }
1683 MaintenanceCoordinationOutcome::Active {
1684 job_id,
1685 job_kind,
1686 phase,
1687 started_at_ms,
1688 ..
1689 } => {
1690 if lexical_available {
1691 MaintenanceDecision::FailOpen {
1692 reason: format!(
1693 "maintenance job {job_id} is active; lexical index is available, failing open"
1694 ),
1695 }
1696 } else {
1697 MaintenanceDecision::AttachOrWait {
1698 job_id,
1699 job_kind,
1700 phase,
1701 elapsed_ms: u64::try_from(now_ms.saturating_sub(started_at_ms)).unwrap_or(0),
1702 }
1703 }
1704 }
1705 }
1706}
1707
1708#[cfg_attr(not(test), allow(dead_code))]
1709pub(crate) struct PollResult {
1710 pub outcome: MaintenanceCoordinationOutcome,
1711 pub polls: u32,
1712 pub elapsed: Duration,
1713 pub timed_out: bool,
1714}
1715
1716#[cfg_attr(not(test), allow(dead_code))]
1717pub(crate) fn poll_maintenance_until_idle(
1718 data_dir: &Path,
1719 timeout: Option<Duration>,
1720 poll_interval: Option<Duration>,
1721) -> PollResult {
1722 let timeout = timeout.unwrap_or(BOUNDED_WAIT_DEFAULT);
1723 let interval = poll_interval.unwrap_or(POLL_INTERVAL_DEFAULT);
1724 let start = Instant::now();
1725 let deadline = start + timeout;
1726 let mut polls = 0u32;
1727 loop {
1728 let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
1729 let outcome = evaluate_maintenance_coordination(data_dir, now_ms);
1730 polls += 1;
1731 if matches!(outcome, MaintenanceCoordinationOutcome::Idle) {
1732 return PollResult {
1733 outcome,
1734 polls,
1735 elapsed: start.elapsed(),
1736 timed_out: false,
1737 };
1738 }
1739
1740 let now = Instant::now();
1741 if now >= deadline {
1742 return PollResult {
1743 outcome,
1744 polls,
1745 elapsed: start.elapsed(),
1746 timed_out: true,
1747 };
1748 }
1749 let remaining = deadline - now;
1750 std::thread::sleep(interval.min(remaining));
1751 }
1752}
1753
1754#[cfg_attr(not(test), allow(dead_code))]
1759const MAINTENANCE_EVENTS_FILE: &str = ".maintenance-events.jsonl";
1760#[cfg_attr(not(test), allow(dead_code))]
1761const YIELD_SIGNAL_FILE: &str = "maintenance-yield.signal";
1762#[cfg_attr(not(test), allow(dead_code))]
1763const MAX_EVENT_LOG_ENTRIES: usize = 500;
1764
1765#[cfg_attr(not(test), allow(dead_code))]
1766#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
1767pub(crate) struct MaintenanceEvent {
1768 pub timestamp_ms: i64,
1769 pub job_id: String,
1770 pub actor_pid: u32,
1771 pub kind: MaintenanceEventKind,
1772}
1773
1774#[cfg_attr(not(test), allow(dead_code))]
1775#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
1776pub(crate) enum MaintenanceEventKind {
1777 Started { job_kind: String, phase: String },
1778 PhaseChanged { from: String, to: String },
1779 Progress { processed: u64, total: u64 },
1780 YieldRequested { requester_pid: u32, reason: String },
1781 Paused { reason: String },
1782 Resumed,
1783 Completed { summary: String },
1784 Failed { error: String },
1785 Cancelled { reason: String },
1786}
1787
1788#[cfg_attr(not(test), allow(dead_code))]
1789pub(crate) fn append_maintenance_event(data_dir: &Path, event: &MaintenanceEvent) -> Result<()> {
1790 let path = data_dir.join(MAINTENANCE_EVENTS_FILE);
1791 let line = serde_json::to_string(event).with_context(|| "serializing maintenance event")?;
1792 let mut file = OpenOptions::new()
1793 .create(true)
1794 .append(true)
1795 .open(&path)
1796 .with_context(|| format!("opening maintenance event log at {}", path.display()))?;
1797 use std::io::Write;
1798 writeln!(file, "{line}")
1799 .with_context(|| format!("appending to maintenance event log at {}", path.display()))?;
1800 Ok(())
1801}
1802
1803#[cfg_attr(not(test), allow(dead_code))]
1804pub(crate) fn read_maintenance_events(
1805 data_dir: &Path,
1806 after_ms: Option<i64>,
1807 limit: Option<usize>,
1808) -> Vec<MaintenanceEvent> {
1809 let path = data_dir.join(MAINTENANCE_EVENTS_FILE);
1810 let contents = match std::fs::read_to_string(&path) {
1811 Ok(c) => c,
1812 Err(_) => return Vec::new(),
1813 };
1814 let cap = limit.unwrap_or(MAX_EVENT_LOG_ENTRIES);
1815 contents
1816 .lines()
1817 .filter_map(|line| serde_json::from_str::<MaintenanceEvent>(line).ok())
1818 .filter(|e| after_ms.is_none_or(|threshold| e.timestamp_ms > threshold))
1819 .rev()
1820 .take(cap)
1821 .collect::<Vec<_>>()
1822 .into_iter()
1823 .rev()
1824 .collect()
1825}
1826
1827#[cfg_attr(not(test), allow(dead_code))]
1828pub(crate) fn truncate_maintenance_event_log(data_dir: &Path) -> Result<()> {
1829 let path = data_dir.join(MAINTENANCE_EVENTS_FILE);
1830 let contents = match std::fs::read_to_string(&path) {
1831 Ok(c) => c,
1832 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
1833 Err(e) => {
1834 return Err(e).with_context(|| {
1835 format!("reading event log for truncation at {}", path.display())
1836 });
1837 }
1838 };
1839 let lines: Vec<&str> = contents.lines().collect();
1840 if lines.len() <= MAX_EVENT_LOG_ENTRIES {
1841 return Ok(());
1842 }
1843 let keep = &lines[lines.len() - MAX_EVENT_LOG_ENTRIES..];
1844 let mut output = keep.join("\n");
1845 output.push('\n');
1846 std::fs::write(&path, output)
1847 .with_context(|| format!("truncating event log at {}", path.display()))
1848}
1849
1850#[cfg_attr(not(test), allow(dead_code))]
1855#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
1856pub(crate) struct YieldRequest {
1857 pub requester_pid: u32,
1858 pub requested_at_ms: i64,
1859 pub reason: String,
1860}
1861
1862#[cfg_attr(not(test), allow(dead_code))]
1863pub(crate) fn request_yield(data_dir: &Path, reason: &str) -> Result<()> {
1864 let path = data_dir.join(YIELD_SIGNAL_FILE);
1865 let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
1866 let req = YieldRequest {
1867 requester_pid: std::process::id(),
1868 requested_at_ms: now_ms,
1869 reason: reason.to_string(),
1870 };
1871 let payload = serde_json::to_string(&req).with_context(|| "serializing yield request")?;
1872 std::fs::write(&path, payload)
1873 .with_context(|| format!("writing yield signal to {}", path.display()))
1874}
1875
1876#[cfg_attr(not(test), allow(dead_code))]
1877pub(crate) fn check_yield_requested(data_dir: &Path) -> Option<YieldRequest> {
1878 let path = data_dir.join(YIELD_SIGNAL_FILE);
1879 let contents = std::fs::read_to_string(&path).ok()?;
1880 serde_json::from_str::<YieldRequest>(&contents).ok()
1881}
1882
1883#[cfg_attr(not(test), allow(dead_code))]
1884pub(crate) fn clear_yield_signal(data_dir: &Path) -> Result<()> {
1885 let path = data_dir.join(YIELD_SIGNAL_FILE);
1886 match std::fs::remove_file(&path) {
1887 Ok(()) => Ok(()),
1888 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
1889 Err(e) => Err(e).with_context(|| format!("clearing yield signal at {}", path.display())),
1890 }
1891}
1892
1893#[cfg_attr(not(test), allow(dead_code))]
1898#[derive(Debug, Clone, serde::Serialize)]
1899pub(crate) struct UnifiedMaintenanceView {
1900 pub coordination: MaintenanceCoordinationOutcome,
1901 pub snapshot: SearchMaintenanceSnapshot,
1902 pub yield_pending: Option<YieldRequest>,
1903 pub recent_events: Vec<MaintenanceEvent>,
1904 pub decision: MaintenanceDecision,
1905}
1906
1907#[cfg_attr(not(test), allow(dead_code))]
1908pub(crate) fn unified_maintenance_view(
1909 data_dir: &Path,
1910 lexical_available: bool,
1911) -> UnifiedMaintenanceView {
1912 let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
1913 let snapshot = read_search_maintenance_snapshot(data_dir);
1914 let coordination = evaluate_maintenance_coordination_from_snapshot(&snapshot, now_ms);
1915 let yield_pending = check_yield_requested(data_dir);
1916 let recent_events = read_maintenance_events(data_dir, None, Some(20));
1917 let decision = if lexical_available {
1918 match &coordination {
1919 MaintenanceCoordinationOutcome::Active {
1920 job_id,
1921 job_kind,
1922 phase,
1923 ..
1924 } => MaintenanceDecision::FailOpen {
1925 reason: format!(
1926 "maintenance job {} ({:?}) is active (phase: {}); lexical available, failing open",
1927 job_id,
1928 job_kind,
1929 phase.as_deref().unwrap_or("unknown")
1930 ),
1931 },
1932 MaintenanceCoordinationOutcome::Stale { job_id, reason } => {
1933 MaintenanceDecision::FailOpen {
1934 reason: format!(
1935 "maintenance job {job_id} has a stale heartbeat ({reason}); lexical available, failing open"
1936 ),
1937 }
1938 }
1939 _ => decide_maintenance_action_from_snapshot(&snapshot, now_ms),
1940 }
1941 } else {
1942 decide_maintenance_action_from_snapshot(&snapshot, now_ms)
1943 };
1944
1945 UnifiedMaintenanceView {
1946 coordination,
1947 snapshot,
1948 yield_pending,
1949 recent_events,
1950 decision,
1951 }
1952}
1953
1954#[cfg(test)]
1955mod tests {
1956 use super::*;
1957
1958 #[test]
1959 fn maintenance_mode_round_trips_lock_values() {
1960 for mode in [
1961 SearchMaintenanceMode::Index,
1962 SearchMaintenanceMode::WatchStartup,
1963 SearchMaintenanceMode::Watch,
1964 SearchMaintenanceMode::WatchOnce,
1965 ] {
1966 assert_eq!(
1967 SearchMaintenanceMode::parse_lock_value(mode.as_lock_value()),
1968 Some(mode)
1969 );
1970 }
1971 }
1972
1973 #[test]
1974 fn maintenance_job_kind_round_trips_lock_values() {
1975 for kind in [
1976 SearchMaintenanceJobKind::LexicalRefresh,
1977 SearchMaintenanceJobKind::SemanticAcquire,
1978 ] {
1979 assert_eq!(
1980 SearchMaintenanceJobKind::parse_lock_value(kind.as_lock_value()),
1981 Some(kind)
1982 );
1983 }
1984 }
1985
1986 #[test]
1987 fn stale_lock_metadata_from_dead_owner_is_reaped_on_read() {
1988 let temp = tempfile::tempdir().expect("tempdir");
1995 let lock_path = temp.path().join("index-run.lock");
1996 std::fs::write(
2001 &lock_path,
2002 concat!(
2003 "pid=4242\n",
2004 "started_at_ms=1733000111000\n",
2005 "updated_at_ms=1733000112000\n",
2006 "db_path=/tmp/cass/agent_search.db\n",
2007 "mode=index\n",
2008 "job_id=lexical-refresh-1733000111000-4242\n",
2009 "job_kind=lexical_refresh\n",
2010 "phase=rebuilding\n"
2011 ),
2012 )
2013 .expect("write lock metadata");
2014
2015 let snapshot = read_search_maintenance_snapshot(temp.path());
2016 assert!(!snapshot.active, "no owner, must not be reported active");
2017 assert!(
2018 !snapshot.orphaned,
2019 "stale metadata must be reaped, not reported as orphaned"
2020 );
2021 assert!(snapshot.pid.is_none(), "pid must be cleared after reap");
2022 assert!(
2023 snapshot.job_id.is_none(),
2024 "job_id must be cleared after reap"
2025 );
2026 assert!(snapshot.phase.is_none(), "phase must be cleared after reap");
2027
2028 let post = std::fs::metadata(&lock_path).expect("lock file still present");
2031 assert_eq!(post.len(), 0, "stale metadata must be truncated in place");
2032
2033 let snapshot2 = read_search_maintenance_snapshot(temp.path());
2035 assert!(!snapshot2.active);
2036 assert!(!snapshot2.orphaned);
2037 }
2038
2039 #[test]
2040 fn live_owner_metadata_is_preserved_when_flock_is_held() {
2041 use fs2::FileExt;
2044 let temp = tempfile::tempdir().expect("tempdir");
2045 let lock_path = temp.path().join("index-run.lock");
2046 let owner = OpenOptions::new()
2047 .create(true)
2048 .read(true)
2049 .write(true)
2050 .truncate(true)
2051 .open(&lock_path)
2052 .expect("open owner handle");
2053 owner
2054 .try_lock_exclusive()
2055 .expect("owner acquires exclusive lock");
2056 std::fs::write(
2058 &lock_path,
2059 concat!(
2060 "pid=4242\n",
2061 "started_at_ms=1733000111000\n",
2062 "updated_at_ms=1733000112000\n",
2063 "db_path=/tmp/cass/agent_search.db\n",
2064 "mode=index\n",
2065 "job_id=lexical-refresh-1733000111000-4242\n",
2066 "job_kind=lexical_refresh\n",
2067 "phase=rebuilding\n"
2068 ),
2069 )
2070 .expect("write lock metadata");
2071
2072 let snapshot = read_search_maintenance_snapshot(temp.path());
2073 assert!(snapshot.active, "live owner must be reported active");
2074 assert!(!snapshot.orphaned);
2075 assert_eq!(snapshot.pid, Some(4242));
2076 assert_eq!(
2077 snapshot.job_id.as_deref(),
2078 Some("lexical-refresh-1733000111000-4242")
2079 );
2080 assert_eq!(
2081 snapshot.job_kind,
2082 Some(SearchMaintenanceJobKind::LexicalRefresh)
2083 );
2084 assert_eq!(snapshot.phase.as_deref(), Some("rebuilding"));
2085 assert_eq!(snapshot.updated_at_ms, Some(1_733_000_112_000));
2086
2087 let post = std::fs::metadata(&lock_path).expect("lock file still present");
2089 assert!(post.len() > 0, "live-owner metadata must not be truncated");
2090
2091 let _ = FileExt::unlock(&owner);
2092 }
2093
2094 #[test]
2095 fn lexical_storage_fingerprint_matching_handles_jitter_and_size_drift() {
2096 let cases = [
2097 (
2098 "small mtime settle jitter",
2099 "323584:1776310228000:329632:1776310227824",
2100 "323584:1776310227832:329632:1776310227824",
2101 true,
2102 ),
2103 (
2104 "wal size drift",
2105 "323584:1776310228000:329632:1776310227824",
2106 "323584:1776310227832:400000:1776310227824",
2107 false,
2108 ),
2109 ];
2110
2111 for (label, current, saved, expected) in cases {
2112 assert_eq!(
2113 lexical_storage_fingerprints_match(current, saved),
2114 expected,
2115 "{label}"
2116 );
2117 }
2118 }
2119
2120 #[test]
2121 fn lexical_state_marks_fingerprint_mismatch_stale() {
2122 let temp = tempfile::tempdir().expect("tempdir");
2123 let index_path = temp.path().join("index").join("v4");
2124 std::fs::create_dir_all(&index_path).expect("create index dir");
2125 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2129 let db_path = temp.path().join("agent_search.db");
2130 std::fs::write(&db_path, b"db").expect("write db file");
2131
2132 let checkpoint = LexicalRebuildCheckpoint {
2133 db_path: db_path.display().to_string(),
2134 total_conversations: 10,
2135 storage_fingerprint: "before".to_string(),
2136 committed_offset: 10,
2137 committed_conversation_id: Some(10),
2138 processed_conversations: 10,
2139 indexed_docs: 100,
2140 schema_hash: SCHEMA_HASH.to_string(),
2141 page_size: LEXICAL_REBUILD_PAGE_SIZE_PUBLIC,
2142 completed: true,
2143 updated_at_ms: 1_733_000_000_000,
2144 };
2145
2146 let state = lexical_state_from_observations(LexicalObservationInput {
2147 index_path: &index_path,
2148 db_path: &db_path,
2149 stale_threshold: 60,
2150 last_indexed_at_ms: Some(1_733_000_000_000),
2151 now_ms: 1_733_000_001_000,
2152 maintenance: SearchMaintenanceSnapshot::default(),
2153 checkpoint: Some(&checkpoint),
2154 current_db_fingerprint: Some("after"),
2155 });
2156
2157 assert_eq!(state.status, "stale");
2158 assert_eq!(
2159 state.fingerprint.matches_current_db_fingerprint,
2160 Some(false)
2161 );
2162 assert!(
2163 state
2164 .status_reason
2165 .as_deref()
2166 .is_some_and(|reason| reason.contains("fingerprint"))
2167 );
2168 assert_eq!(state.pending_sessions, 0);
2169 assert_eq!(state.processed_conversations, None);
2170 assert_eq!(state.total_conversations, None);
2171 assert_eq!(state.indexed_docs, None);
2172 }
2173
2174 #[test]
2175 fn lexical_state_marks_checkpoint_db_mismatch_stale_without_fingerprint_probe() {
2176 let temp = tempfile::tempdir().expect("tempdir");
2177 let index_path = temp.path().join("index").join("v4");
2178 std::fs::create_dir_all(&index_path).expect("create index dir");
2179 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2180 let db_path = temp.path().join("agent_search.db");
2181 let other_db_path = temp.path().join("other_agent_search.db");
2182 std::fs::write(&db_path, b"db").expect("write db file");
2183 std::fs::write(&other_db_path, b"other db").expect("write other db file");
2184
2185 let checkpoint = LexicalRebuildCheckpoint {
2186 db_path: other_db_path.display().to_string(),
2187 total_conversations: 10,
2188 storage_fingerprint: "old-db-fingerprint".to_string(),
2189 committed_offset: 10,
2190 committed_conversation_id: Some(10),
2191 processed_conversations: 10,
2192 indexed_docs: 100,
2193 schema_hash: SCHEMA_HASH.to_string(),
2194 page_size: LEXICAL_REBUILD_PAGE_SIZE_PUBLIC,
2195 completed: true,
2196 updated_at_ms: 1_733_000_000_000,
2197 };
2198
2199 let state = lexical_state_from_observations(LexicalObservationInput {
2200 index_path: &index_path,
2201 db_path: &db_path,
2202 stale_threshold: 60,
2203 last_indexed_at_ms: Some(1_733_000_000_000),
2204 now_ms: 1_733_000_001_000,
2205 maintenance: SearchMaintenanceSnapshot::default(),
2206 checkpoint: Some(&checkpoint),
2207 current_db_fingerprint: None,
2208 });
2209
2210 assert_eq!(state.status, "stale");
2211 assert!(state.stale);
2212 assert!(!state.fresh);
2213 assert_eq!(state.checkpoint.db_matches, Some(false));
2214 assert_eq!(state.fingerprint.matches_current_db_fingerprint, None);
2215 assert_eq!(state.pending_sessions, 0);
2216 assert_eq!(state.processed_conversations, None);
2217 assert_eq!(state.total_conversations, None);
2218 assert_eq!(state.indexed_docs, None);
2219 assert!(
2220 state
2221 .status_reason
2222 .as_deref()
2223 .is_some_and(|reason| reason.contains("different database"))
2224 );
2225 }
2226
2227 #[test]
2228 fn lexical_state_missing_index_is_not_marked_stale_until_initialized() {
2229 let temp = tempfile::tempdir().expect("tempdir");
2230 let index_path = temp.path().join("index").join("v4");
2231 std::fs::create_dir_all(&index_path).expect("create index dir");
2232 let db_path = temp.path().join("agent_search.db");
2233 std::fs::write(&db_path, b"db").expect("write db file");
2234
2235 let state = lexical_state_from_observations(LexicalObservationInput {
2236 index_path: &index_path,
2237 db_path: &db_path,
2238 stale_threshold: 60,
2239 last_indexed_at_ms: None,
2240 now_ms: 1_733_000_001_000,
2241 maintenance: SearchMaintenanceSnapshot::default(),
2242 checkpoint: None,
2243 current_db_fingerprint: None,
2244 });
2245
2246 assert_eq!(state.status, "missing");
2247 assert!(!state.exists);
2248 assert!(!state.stale);
2249 assert!(!state.fresh);
2250 assert_eq!(
2251 state.status_reason.as_deref(),
2252 Some("lexical Tantivy metadata missing")
2253 );
2254 }
2255
2256 #[test]
2257 fn lexical_state_keeps_progress_visible_during_active_rebuild_despite_fingerprint_drift() {
2258 let temp = tempfile::tempdir().expect("tempdir");
2259 let index_path = temp.path().join("index").join("v4");
2260 std::fs::create_dir_all(&index_path).expect("create index dir");
2261 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2262 let db_path = temp.path().join("agent_search.db");
2263 std::fs::write(&db_path, b"db").expect("write db file");
2264
2265 let checkpoint = LexicalRebuildCheckpoint {
2266 db_path: db_path.display().to_string(),
2267 total_conversations: 10,
2268 storage_fingerprint: "before".to_string(),
2269 committed_offset: 4,
2270 committed_conversation_id: Some(4),
2271 processed_conversations: 4,
2272 indexed_docs: 20,
2273 schema_hash: SCHEMA_HASH.to_string(),
2274 page_size: 200,
2275 completed: false,
2276 updated_at_ms: 1_733_000_123_000,
2277 };
2278
2279 let state = lexical_state_from_observations(LexicalObservationInput {
2280 index_path: &index_path,
2281 db_path: &db_path,
2282 stale_threshold: 60,
2283 last_indexed_at_ms: Some(1_733_000_000_000),
2284 now_ms: 1_733_000_001_000,
2285 maintenance: SearchMaintenanceSnapshot {
2286 active: true,
2287 pid: Some(std::process::id()),
2288 started_at_ms: Some(1_733_000_111_000),
2289 db_path: Some(db_path.clone()),
2290 mode: Some(SearchMaintenanceMode::Index),
2291 job_id: None,
2292 job_kind: None,
2293 phase: None,
2294 updated_at_ms: None,
2295 last_progress_at_ms: None,
2296 orphaned: false,
2297 },
2298 checkpoint: Some(&checkpoint),
2299 current_db_fingerprint: Some("after"),
2300 });
2301
2302 assert_eq!(state.status, "building");
2303 assert!(!state.stale);
2304 assert!(!state.fresh);
2305 assert_eq!(state.pending_sessions, 6);
2306 assert_eq!(state.processed_conversations, Some(4));
2307 assert_eq!(state.total_conversations, Some(10));
2308 assert_eq!(state.indexed_docs, Some(20));
2309 assert_eq!(state.checkpoint.page_size_matches, Some(false));
2310 assert_eq!(state.checkpoint.page_size_compatible, Some(true));
2311 assert_eq!(
2312 state.status_reason.as_deref(),
2313 Some("lexical rebuild is in progress")
2314 );
2315 }
2316
2317 #[test]
2318 fn lexical_state_hides_progress_for_incompatible_page_size_checkpoint() {
2319 let temp = tempfile::tempdir().expect("tempdir");
2320 let index_path = temp.path().join("index").join("v4");
2321 std::fs::create_dir_all(&index_path).expect("create index dir");
2322 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2323 let db_path = temp.path().join("agent_search.db");
2324 std::fs::write(&db_path, b"db").expect("write db file");
2325
2326 let checkpoint = LexicalRebuildCheckpoint {
2327 db_path: db_path.display().to_string(),
2328 total_conversations: 10,
2329 storage_fingerprint: "before".to_string(),
2330 committed_offset: 4,
2331 committed_conversation_id: Some(4),
2332 processed_conversations: 4,
2333 indexed_docs: 20,
2334 schema_hash: SCHEMA_HASH.to_string(),
2335 page_size: 13,
2336 completed: false,
2337 updated_at_ms: 1_733_000_123_000,
2338 };
2339
2340 let state = lexical_state_from_observations(LexicalObservationInput {
2341 index_path: &index_path,
2342 db_path: &db_path,
2343 stale_threshold: 60,
2344 last_indexed_at_ms: Some(1_733_000_000_000),
2345 now_ms: 1_733_000_001_000,
2346 maintenance: SearchMaintenanceSnapshot::default(),
2347 checkpoint: Some(&checkpoint),
2348 current_db_fingerprint: Some("before"),
2349 });
2350
2351 assert_eq!(state.status, "stale");
2352 assert!(state.stale);
2353 assert_eq!(state.pending_sessions, 0);
2354 assert_eq!(state.processed_conversations, None);
2355 assert_eq!(state.total_conversations, None);
2356 assert_eq!(state.indexed_docs, None);
2357 assert_eq!(state.checkpoint.page_size_matches, Some(false));
2358 assert_eq!(state.checkpoint.page_size_compatible, Some(false));
2359 assert!(
2360 state
2361 .status_reason
2362 .as_deref()
2363 .is_some_and(|reason| reason.contains("contract"))
2364 );
2365 }
2366
2367 #[test]
2368 fn lexical_state_prefers_newer_maintenance_heartbeat_over_stale_checkpoint_timestamp() {
2369 let temp = tempfile::tempdir().expect("tempdir");
2370 let index_path = temp.path().join("index").join("v4");
2371 std::fs::create_dir_all(&index_path).expect("create index dir");
2372 let db_path = temp.path().join("agent_search.db");
2373 std::fs::write(&db_path, b"db").expect("write db file");
2374
2375 let checkpoint = LexicalRebuildCheckpoint {
2376 db_path: db_path.display().to_string(),
2377 total_conversations: 10,
2378 storage_fingerprint: "before".to_string(),
2379 committed_offset: 4,
2380 committed_conversation_id: Some(4),
2381 processed_conversations: 4,
2382 indexed_docs: 20,
2383 schema_hash: SCHEMA_HASH.to_string(),
2384 page_size: LEXICAL_REBUILD_PAGE_SIZE_PUBLIC,
2385 completed: false,
2386 updated_at_ms: 1_733_000_123_000,
2387 };
2388
2389 let state = lexical_state_from_observations(LexicalObservationInput {
2390 index_path: &index_path,
2391 db_path: &db_path,
2392 stale_threshold: 60,
2393 last_indexed_at_ms: Some(1_733_000_000_000),
2394 now_ms: 1_733_000_001_000,
2395 maintenance: SearchMaintenanceSnapshot {
2396 active: true,
2397 pid: Some(std::process::id()),
2398 started_at_ms: Some(1_733_000_111_000),
2399 db_path: Some(db_path.clone()),
2400 mode: Some(SearchMaintenanceMode::Index),
2401 job_id: None,
2402 job_kind: None,
2403 phase: None,
2404 updated_at_ms: Some(1_733_000_456_000),
2405 last_progress_at_ms: None,
2406 orphaned: false,
2407 },
2408 checkpoint: Some(&checkpoint),
2409 current_db_fingerprint: Some("after"),
2410 });
2411
2412 assert_eq!(state.status, "building");
2413 assert_eq!(state.activity_at_ms, Some(1_733_000_456_000));
2414 }
2415
2416 #[test]
2417 fn lexical_state_ignores_rebuild_lock_for_different_database() {
2418 let temp = tempfile::tempdir().expect("tempdir");
2419 let index_path = temp.path().join("index").join("v4");
2420 std::fs::create_dir_all(&index_path).expect("create index dir");
2421 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2422 let db_path = temp.path().join("agent_search.db");
2423 std::fs::write(&db_path, b"db").expect("write db file");
2424 let other_db_path = temp.path().join("other.db");
2425 std::fs::write(&other_db_path, b"other").expect("write other db file");
2426
2427 let checkpoint = LexicalRebuildCheckpoint {
2428 db_path: db_path.display().to_string(),
2429 total_conversations: 10,
2430 storage_fingerprint: "before".to_string(),
2431 committed_offset: 4,
2432 committed_conversation_id: Some(4),
2433 processed_conversations: 4,
2434 indexed_docs: 20,
2435 schema_hash: SCHEMA_HASH.to_string(),
2436 page_size: LEXICAL_REBUILD_PAGE_SIZE_PUBLIC,
2437 completed: false,
2438 updated_at_ms: 1_733_000_123_000,
2439 };
2440
2441 let state = lexical_state_from_observations(LexicalObservationInput {
2442 index_path: &index_path,
2443 db_path: &db_path,
2444 stale_threshold: 60,
2445 last_indexed_at_ms: Some(1_733_000_000_000),
2446 now_ms: 1_733_000_001_000,
2447 maintenance: SearchMaintenanceSnapshot {
2448 active: true,
2449 pid: Some(std::process::id()),
2450 started_at_ms: Some(1_733_000_111_000),
2451 db_path: Some(other_db_path),
2452 mode: Some(SearchMaintenanceMode::Index),
2453 job_id: None,
2454 job_kind: None,
2455 phase: None,
2456 updated_at_ms: None,
2457 last_progress_at_ms: None,
2458 orphaned: false,
2459 },
2460 checkpoint: Some(&checkpoint),
2461 current_db_fingerprint: Some("after"),
2462 });
2463
2464 assert_eq!(state.status, "stale");
2465 assert!(state.stale);
2466 assert!(!state.fresh);
2467 assert!(!state.rebuilding);
2468 assert!(!state.watch_active);
2469 assert_eq!(state.activity_at_ms, None);
2470 assert_eq!(state.pending_sessions, 0);
2471 assert_eq!(state.processed_conversations, None);
2472 assert_eq!(state.total_conversations, None);
2473 assert_eq!(state.indexed_docs, None);
2474 assert!(
2475 state
2476 .status_reason
2477 .as_deref()
2478 .is_some_and(|reason| reason.contains("fingerprint"))
2479 );
2480 }
2481
2482 #[test]
2483 fn lexical_state_ignores_watch_lock_for_different_database() {
2484 let temp = tempfile::tempdir().expect("tempdir");
2485 let index_path = temp.path().join("index").join("v4");
2486 std::fs::create_dir_all(&index_path).expect("create index dir");
2487 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2488 let db_path = temp.path().join("agent_search.db");
2489 std::fs::write(&db_path, b"db").expect("write db file");
2490 let other_db_path = temp.path().join("other.db");
2491 std::fs::write(&other_db_path, b"other").expect("write other db file");
2492
2493 let state = lexical_state_from_observations(LexicalObservationInput {
2494 index_path: &index_path,
2495 db_path: &db_path,
2496 stale_threshold: 60,
2497 last_indexed_at_ms: Some(1_733_000_000_000),
2498 now_ms: 1_733_000_020_000,
2499 maintenance: SearchMaintenanceSnapshot {
2500 active: true,
2501 pid: Some(std::process::id()),
2502 started_at_ms: Some(1_733_000_111_000),
2503 db_path: Some(other_db_path),
2504 mode: Some(SearchMaintenanceMode::Watch),
2505 job_id: None,
2506 job_kind: None,
2507 phase: None,
2508 updated_at_ms: None,
2509 last_progress_at_ms: None,
2510 orphaned: false,
2511 },
2512 checkpoint: None,
2513 current_db_fingerprint: None,
2514 });
2515
2516 assert_eq!(state.status, "ready");
2517 assert!(state.fresh);
2518 assert!(!state.stale);
2519 assert!(!state.rebuilding);
2520 assert!(!state.watch_active);
2521 assert_eq!(state.activity_at_ms, None);
2522 }
2523
2524 #[test]
2532 fn lexical_state_reports_stalled_when_progress_is_stale_despite_fresh_heartbeat() {
2533 let temp = tempfile::tempdir().expect("tempdir");
2534 let index_path = temp.path().join("index").join("v4");
2535 std::fs::create_dir_all(&index_path).expect("create index dir");
2536 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2537 let db_path = temp.path().join("agent_search.db");
2538 std::fs::write(&db_path, b"db").expect("write db file");
2539
2540 let now_ms: i64 = 1_733_000_300_000;
2547
2548 let state = lexical_state_from_observations(LexicalObservationInput {
2549 index_path: &index_path,
2550 db_path: &db_path,
2551 stale_threshold: 60,
2552 last_indexed_at_ms: Some(1_733_000_000_000),
2553 now_ms,
2554 maintenance: SearchMaintenanceSnapshot {
2555 active: true,
2556 pid: Some(std::process::id()),
2557 started_at_ms: Some(now_ms - 600_000),
2558 db_path: Some(db_path.clone()),
2559 mode: Some(SearchMaintenanceMode::WatchStartup),
2560 job_id: Some("lexical_refresh-1-1".to_string()),
2561 job_kind: Some(SearchMaintenanceJobKind::LexicalRefresh),
2562 phase: Some("watch_startup".to_string()),
2563 updated_at_ms: Some(now_ms - 500),
2564 last_progress_at_ms: Some(now_ms - 300_000),
2565 orphaned: false,
2566 },
2567 checkpoint: None,
2568 current_db_fingerprint: None,
2569 });
2570
2571 assert!(state.rebuilding, "active rebuild lock must still register");
2572 assert!(
2573 state.stalled,
2574 "stale forward-progress timestamp must flip stalled=true",
2575 );
2576 assert_eq!(state.status, "stalled");
2577 assert_eq!(
2578 state.last_progress_at_ms,
2579 Some(now_ms - 300_000),
2580 "last_progress_at_ms must be surfaced to status callers",
2581 );
2582 let age = state
2583 .last_progress_age_ms
2584 .expect("last_progress_age_ms must be computed");
2585 assert!(
2586 (299_900..=300_100).contains(&age),
2587 "computed last_progress_age_ms ({age}ms) should equal now - last_progress_at_ms (300_000ms)",
2588 );
2589 let reason = state
2590 .status_reason
2591 .as_deref()
2592 .expect("stalled state should populate status_reason");
2593 assert!(
2594 reason.contains("forward progress")
2595 && (reason.contains("#258") || reason.contains("issue #258")),
2596 "status_reason should mention forward progress and reference #258 ({reason})",
2597 );
2598 }
2599
2600 #[test]
2604 fn lexical_state_stays_building_when_progress_is_recent() {
2605 let temp = tempfile::tempdir().expect("tempdir");
2606 let index_path = temp.path().join("index").join("v4");
2607 std::fs::create_dir_all(&index_path).expect("create index dir");
2608 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2609 let db_path = temp.path().join("agent_search.db");
2610 std::fs::write(&db_path, b"db").expect("write db file");
2611
2612 let now_ms: i64 = 1_733_000_300_000;
2616
2617 let state = lexical_state_from_observations(LexicalObservationInput {
2618 index_path: &index_path,
2619 db_path: &db_path,
2620 stale_threshold: 60,
2621 last_indexed_at_ms: Some(1_733_000_000_000),
2622 now_ms,
2623 maintenance: SearchMaintenanceSnapshot {
2624 active: true,
2625 pid: Some(std::process::id()),
2626 started_at_ms: Some(now_ms - 30_000),
2627 db_path: Some(db_path.clone()),
2628 mode: Some(SearchMaintenanceMode::Index),
2629 job_id: Some("lexical_refresh-1-1".to_string()),
2630 job_kind: Some(SearchMaintenanceJobKind::LexicalRefresh),
2631 phase: Some("scanning".to_string()),
2632 updated_at_ms: Some(now_ms - 500),
2633 last_progress_at_ms: Some(now_ms - 1_000),
2634 orphaned: false,
2635 },
2636 checkpoint: None,
2637 current_db_fingerprint: None,
2638 });
2639
2640 assert!(state.rebuilding);
2641 assert!(!state.stalled, "fresh progress must not flip stalled");
2642 assert_eq!(state.status, "building");
2643 }
2644
2645 #[test]
2650 fn lexical_state_does_not_stall_when_legacy_lock_omits_progress_field() {
2651 let temp = tempfile::tempdir().expect("tempdir");
2652 let index_path = temp.path().join("index").join("v4");
2653 std::fs::create_dir_all(&index_path).expect("create index dir");
2654 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2655 let db_path = temp.path().join("agent_search.db");
2656 std::fs::write(&db_path, b"db").expect("write db file");
2657
2658 let now_ms: i64 = 1_733_000_300_000;
2662
2663 let state = lexical_state_from_observations(LexicalObservationInput {
2664 index_path: &index_path,
2665 db_path: &db_path,
2666 stale_threshold: 60,
2667 last_indexed_at_ms: Some(1_733_000_000_000),
2668 now_ms,
2669 maintenance: SearchMaintenanceSnapshot {
2670 active: true,
2671 pid: Some(std::process::id()),
2672 started_at_ms: Some(now_ms - 30_000),
2673 db_path: Some(db_path.clone()),
2674 mode: Some(SearchMaintenanceMode::Index),
2675 job_id: None,
2676 job_kind: None,
2677 phase: None,
2678 updated_at_ms: Some(now_ms - 500),
2679 last_progress_at_ms: None,
2680 orphaned: false,
2681 },
2682 checkpoint: None,
2683 current_db_fingerprint: None,
2684 });
2685
2686 assert!(state.rebuilding);
2687 assert!(
2688 !state.stalled,
2689 "legacy lock without last_progress_at_ms must NOT be misreported as stalled",
2690 );
2691 assert_eq!(state.status, "building");
2692 assert!(state.last_progress_age_ms.is_none());
2693 }
2694
2695 #[test]
2704 fn lexical_state_progress_age_is_ms_precision_not_seconds_quantised() {
2705 let temp = tempfile::tempdir().expect("tempdir");
2706 let index_path = temp.path().join("index").join("v4");
2707 std::fs::create_dir_all(&index_path).expect("create index dir");
2708 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2709 let db_path = temp.path().join("agent_search.db");
2710 std::fs::write(&db_path, b"db").expect("write db file");
2711
2712 let now_ms: i64 = 1_733_000_300_700;
2715 let last_progress_at_ms = now_ms - 119_500;
2716
2717 let state = lexical_state_from_observations(LexicalObservationInput {
2718 index_path: &index_path,
2719 db_path: &db_path,
2720 stale_threshold: 60,
2721 last_indexed_at_ms: Some(1_733_000_000_000),
2722 now_ms,
2723 maintenance: SearchMaintenanceSnapshot {
2724 active: true,
2725 pid: Some(std::process::id()),
2726 started_at_ms: Some(now_ms - 600_000),
2727 db_path: Some(db_path.clone()),
2728 mode: Some(SearchMaintenanceMode::WatchStartup),
2729 job_id: Some("lexical_refresh-1-1".to_string()),
2730 job_kind: Some(SearchMaintenanceJobKind::LexicalRefresh),
2731 phase: Some("watch_startup".to_string()),
2732 updated_at_ms: Some(now_ms - 500),
2733 last_progress_at_ms: Some(last_progress_at_ms),
2734 orphaned: false,
2735 },
2736 checkpoint: None,
2737 current_db_fingerprint: None,
2738 });
2739
2740 let age = state
2741 .last_progress_age_ms
2742 .expect("forward-progress age must be computed");
2743 assert_eq!(
2744 age, 119_500,
2745 "ms-precision plumbing must surface the exact diff (no second-quantisation)"
2746 );
2747 assert!(
2754 !state.stalled,
2755 "119.5 s lag must remain `building`, not flip to `stalled`",
2756 );
2757 assert_eq!(state.status, "building");
2758 }
2759
2760 #[test]
2764 fn coordination_reports_stale_when_forward_progress_is_stuck() {
2765 let now_ms: i64 = 1_733_000_300_000;
2766 let snapshot = SearchMaintenanceSnapshot {
2767 active: true,
2768 pid: Some(12345),
2769 started_at_ms: Some(now_ms - 600_000),
2770 db_path: Some(PathBuf::from("/tmp/cass/agent_search.db")),
2771 mode: Some(SearchMaintenanceMode::WatchStartup),
2772 job_id: Some("lexical_refresh-1-12345".to_string()),
2773 job_kind: Some(SearchMaintenanceJobKind::LexicalRefresh),
2774 phase: Some("watch_startup".to_string()),
2775 updated_at_ms: Some(now_ms - 500),
2776 last_progress_at_ms: Some(now_ms - 300_000),
2777 orphaned: false,
2778 };
2779
2780 let outcome = evaluate_maintenance_coordination_from_snapshot(&snapshot, now_ms);
2781 match outcome {
2782 MaintenanceCoordinationOutcome::Stale { ref reason, .. } => {
2783 assert!(
2784 reason.contains("forward progress")
2785 && (reason.contains("#258") || reason.contains("issue #258")),
2786 "stalled coordination reason should mention forward progress and #258: {reason}",
2787 );
2788 }
2789 other => {
2790 panic!("stalled forward-progress snapshot must coordinate as Stale, got {other:?}",)
2791 }
2792 }
2793 }
2794
2795 #[test]
2796 fn inspect_search_assets_preserves_semantic_database_unavailable_signal() {
2797 let temp = tempfile::tempdir().expect("tempdir");
2798 let index_path = temp.path().join("index").join("v4");
2799 std::fs::create_dir_all(&index_path).expect("create index dir");
2800 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2801
2802 let db_path = temp.path().join("agent_search.db");
2803 std::fs::create_dir_all(&db_path).expect("create unopenable db path");
2804
2805 let vector_path = vector_index_path(temp.path(), HashEmbedder::default().id());
2806 std::fs::create_dir_all(vector_path.parent().expect("vector parent"))
2807 .expect("create vector dir");
2808 std::fs::write(&vector_path, b"index").expect("write vector index");
2809
2810 let snapshot = inspect_search_assets(InspectSearchAssetsInput {
2811 data_dir: temp.path(),
2812 db_path: &db_path,
2813 stale_threshold: 60,
2814 last_indexed_at_ms: Some(1_733_000_000_000),
2815 now_ms: 1_733_000_001_000,
2816 maintenance: SearchMaintenanceSnapshot::default(),
2817 semantic_preference: SemanticPreference::HashFallback,
2818 db_available: false,
2819 compute_lexical_fingerprint: false,
2820 inspect_semantic: true,
2821 })
2822 .expect("asset inspection should not fail when db availability is already known");
2823
2824 assert_ne!(snapshot.lexical.status, "error");
2825 assert_eq!(snapshot.semantic.status, "error");
2826 assert_eq!(snapshot.semantic.availability, "database_unavailable");
2827 assert_eq!(snapshot.semantic.fallback_mode, Some("lexical"));
2828 assert!(snapshot.semantic.summary.contains("db unavailable"));
2829 }
2830
2831 #[test]
2832 fn inspect_search_assets_can_skip_semantic_db_open_for_fast_paths() {
2833 let temp = tempfile::tempdir().expect("tempdir");
2834 let index_path = temp.path().join("index").join("v4");
2835 std::fs::create_dir_all(&index_path).expect("create index dir");
2836 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2837
2838 let db_path = temp.path().join("agent_search.db");
2839 std::fs::create_dir_all(&db_path).expect("create unopenable db path");
2840
2841 let vector_path = vector_index_path(temp.path(), HashEmbedder::default().id());
2842 std::fs::create_dir_all(vector_path.parent().expect("vector parent"))
2843 .expect("create vector dir");
2844 std::fs::write(&vector_path, b"index").expect("write vector index");
2845
2846 let snapshot = inspect_search_assets(InspectSearchAssetsInput {
2847 data_dir: temp.path(),
2848 db_path: &db_path,
2849 stale_threshold: 60,
2850 last_indexed_at_ms: Some(1_733_000_000_000),
2851 now_ms: 1_733_000_001_000,
2852 maintenance: SearchMaintenanceSnapshot::default(),
2853 semantic_preference: SemanticPreference::HashFallback,
2854 db_available: false,
2855 compute_lexical_fingerprint: false,
2856 inspect_semantic: false,
2857 })
2858 .expect("asset inspection should not open semantic DB when semantic inspection is skipped");
2859
2860 assert_ne!(snapshot.lexical.status, "error");
2861 assert_eq!(snapshot.semantic.status, "not_inspected");
2862 assert_eq!(snapshot.semantic.availability, "not_inspected");
2863 assert_eq!(snapshot.semantic.fallback_mode, Some("lexical"));
2864 }
2865
2866 #[test]
2867 fn inspect_search_assets_trusts_db_probe_for_semantic_metadata_probe() {
2868 let temp = tempfile::tempdir().expect("tempdir");
2869 let index_path = temp.path().join("index").join("v4");
2870 std::fs::create_dir_all(&index_path).expect("create index dir");
2871 std::fs::write(index_path.join("meta.json"), b"{}").expect("write meta.json");
2872
2873 let db_path = temp.path().join("agent_search.db");
2874 std::fs::create_dir_all(&db_path).expect("create unopenable db path");
2875
2876 let vector_path = vector_index_path(temp.path(), HashEmbedder::default().id());
2877 std::fs::create_dir_all(vector_path.parent().expect("vector parent"))
2878 .expect("create vector dir");
2879 std::fs::write(&vector_path, b"index").expect("write vector index");
2880
2881 let snapshot = inspect_search_assets(InspectSearchAssetsInput {
2882 data_dir: temp.path(),
2883 db_path: &db_path,
2884 stale_threshold: 60,
2885 last_indexed_at_ms: Some(1_733_000_000_000),
2886 now_ms: 1_733_000_001_000,
2887 maintenance: SearchMaintenanceSnapshot::default(),
2888 semantic_preference: SemanticPreference::HashFallback,
2889 db_available: true,
2890 compute_lexical_fingerprint: false,
2891 inspect_semantic: true,
2892 })
2893 .expect("semantic metadata probe should trust the existing DB availability signal");
2894
2895 assert_eq!(snapshot.semantic.status, "hash_fallback");
2896 assert_eq!(snapshot.semantic.availability, "hash_fallback");
2897 assert!(snapshot.semantic.can_search);
2898 }
2899
2900 #[test]
2901 fn semantic_state_reports_hash_fallback_as_searchable() {
2902 let state = semantic_state_from_availability(
2903 Path::new("/tmp/cass"),
2904 &SemanticAvailability::HashFallback,
2905 SemanticPreference::HashFallback,
2906 None,
2907 );
2908
2909 assert_eq!(state.status, "hash_fallback");
2910 assert_eq!(state.availability, "hash_fallback");
2911 assert!(state.available);
2912 assert!(state.can_search);
2913 assert_eq!(state.fallback_mode, None);
2914 }
2915
2916 #[test]
2917 fn semantic_preference_surface_preserves_backend_and_model_dir_projection() {
2918 let data_dir = Path::new("/tmp/cass");
2919 let cases = [
2920 (
2921 SemanticPreference::DefaultModel,
2922 "fastembed",
2923 Some(FastEmbedder::default_model_dir(data_dir)),
2924 ),
2925 (SemanticPreference::HashFallback, "hash", None),
2926 ];
2927
2928 for (preference, expected_backend, expected_model_dir) in cases {
2929 let surface = semantic_preference_surface(data_dir, preference);
2930
2931 assert_eq!(surface.preferred_backend, expected_backend);
2932 assert_eq!(surface.model_dir, expected_model_dir);
2933 }
2934 }
2935
2936 #[test]
2937 fn semantic_state_detects_progressive_and_hnsw_assets() {
2938 let temp = tempfile::tempdir().expect("tempdir");
2939 let vector_dir = temp.path().join(VECTOR_INDEX_DIR);
2940 std::fs::create_dir_all(&vector_dir).expect("create vector dir");
2941 std::fs::write(vector_dir.join("vector.fast.idx"), b"fast").expect("write fast tier");
2942 std::fs::write(vector_dir.join("vector.quality.idx"), b"quality")
2943 .expect("write quality tier");
2944 let hnsw_path = hnsw_index_path(temp.path(), FastEmbedder::embedder_id_static());
2945 std::fs::write(&hnsw_path, b"hnsw").expect("write hnsw");
2946
2947 let state = semantic_state_from_availability(
2948 temp.path(),
2949 &SemanticAvailability::Ready {
2950 embedder_id: FastEmbedder::embedder_id_static().to_string(),
2951 },
2952 SemanticPreference::DefaultModel,
2953 None,
2954 );
2955
2956 assert_eq!(state.status, "ready");
2957 assert!(state.progressive_ready);
2958 assert!(state.hnsw_ready);
2959 assert_eq!(
2960 state.embedder_id.as_deref(),
2961 Some(FastEmbedder::embedder_id_static())
2962 );
2963 }
2964
2965 #[test]
2966 fn semantic_state_reports_backfill_when_manifest_only_has_stale_assets() {
2967 let temp = tempfile::tempdir().expect("tempdir");
2968 let mut manifest = SemanticManifest {
2969 fast_tier: Some(ArtifactRecord {
2970 tier: crate::search::semantic_manifest::TierKind::Fast,
2971 embedder_id: HashEmbedder::default().id().to_string(),
2972 model_revision: "hash".to_string(),
2973 schema_version: crate::search::policy::SEMANTIC_SCHEMA_VERSION,
2974 chunking_version: crate::search::policy::CHUNKING_STRATEGY_VERSION,
2975 dimension: 256,
2976 doc_count: 12,
2977 conversation_count: 3,
2978 db_fingerprint: "stale-db".to_string(),
2979 index_path: "vector_index/vector.fast.idx".to_string(),
2980 size_bytes: 4096,
2981 started_at_ms: 1_733_100_000_000,
2982 completed_at_ms: 1_733_100_100_000,
2983 ready: true,
2984 }),
2985 backlog: crate::search::semantic_manifest::BacklogLedger {
2986 total_conversations: 20,
2987 fast_tier_processed: 3,
2988 quality_tier_processed: 0,
2989 db_fingerprint: "current-db".to_string(),
2990 computed_at_ms: 1_733_100_200_000,
2991 },
2992 checkpoint: Some(BuildCheckpoint {
2993 tier: crate::search::semantic_manifest::TierKind::Fast,
2994 embedder_id: HashEmbedder::default().id().to_string(),
2995 last_offset: 77,
2996 docs_embedded: 66,
2997 conversations_processed: 3,
2998 total_conversations: 20,
2999 db_fingerprint: "current-db".to_string(),
3000 schema_version: crate::search::policy::SEMANTIC_SCHEMA_VERSION,
3001 chunking_version: crate::search::policy::CHUNKING_STRATEGY_VERSION,
3002 saved_at_ms: 1_733_100_300_000,
3003 last_message_id: None,
3004 }),
3005 ..Default::default()
3006 };
3007 manifest.save(temp.path()).expect("save semantic manifest");
3008
3009 let state = semantic_state_from_availability(
3010 temp.path(),
3011 &SemanticAvailability::NeedsConsent,
3012 SemanticPreference::DefaultModel,
3013 Some("current-db"),
3014 );
3015
3016 assert_eq!(state.status, "building");
3017 assert_eq!(state.availability, "index_building");
3018 assert!(!state.can_search);
3019 assert_eq!(state.fallback_mode, Some("lexical"));
3020 assert!(state.summary.contains("backfill"));
3021 assert!(
3022 state
3023 .hint
3024 .as_deref()
3025 .is_some_and(|hint| hint.contains("finish backfilling"))
3026 );
3027 }
3028
3029 #[test]
3030 fn semantic_state_prefers_current_hash_tier_over_missing_model() {
3031 let temp = tempfile::tempdir().expect("tempdir");
3032 let mut manifest = SemanticManifest {
3033 fast_tier: Some(ArtifactRecord {
3034 tier: crate::search::semantic_manifest::TierKind::Fast,
3035 embedder_id: HashEmbedder::default().id().to_string(),
3036 model_revision: "hash".to_string(),
3037 schema_version: crate::search::policy::SEMANTIC_SCHEMA_VERSION,
3038 chunking_version: crate::search::policy::CHUNKING_STRATEGY_VERSION,
3039 dimension: 256,
3040 doc_count: 12,
3041 conversation_count: 3,
3042 db_fingerprint: "current-db".to_string(),
3043 index_path: "vector_index/vector.fast.idx".to_string(),
3044 size_bytes: 4096,
3045 started_at_ms: 1_733_100_000_000,
3046 completed_at_ms: 1_733_100_100_000,
3047 ready: true,
3048 }),
3049 ..Default::default()
3050 };
3051 manifest.save(temp.path()).expect("save semantic manifest");
3052 let vector_path = vector_index_path(temp.path(), HashEmbedder::default().id());
3053 std::fs::create_dir_all(vector_path.parent().expect("vector parent"))
3054 .expect("create vector dir");
3055 std::fs::write(&vector_path, b"fast").expect("write fast vector index");
3056
3057 let state = semantic_state_from_availability(
3058 temp.path(),
3059 &SemanticAvailability::NeedsConsent,
3060 SemanticPreference::DefaultModel,
3061 Some("current-db"),
3062 );
3063
3064 assert_eq!(state.status, "ready");
3065 assert_eq!(state.availability, "ready");
3066 assert!(state.can_search);
3067 assert_eq!(state.fallback_mode, None);
3068 assert_eq!(
3069 state.embedder_id.as_deref(),
3070 Some(HashEmbedder::default().id())
3071 );
3072 assert_eq!(state.model_dir, None);
3073 assert_eq!(
3074 state.vector_index_path.as_deref(),
3075 Some(vector_path.as_path())
3076 );
3077 assert_eq!(state.hint, None);
3078 }
3079
3080 #[test]
3081 fn semantic_state_treats_ready_quality_tier_with_unknown_db_match_as_queryable() {
3082 let temp = tempfile::tempdir().expect("tempdir");
3083 let mut manifest = SemanticManifest {
3084 quality_tier: Some(ArtifactRecord {
3085 tier: crate::search::semantic_manifest::TierKind::Quality,
3086 embedder_id: HashEmbedder::default().id().to_string(),
3087 model_revision: "hash".to_string(),
3088 schema_version: crate::search::policy::SEMANTIC_SCHEMA_VERSION,
3089 chunking_version: crate::search::policy::CHUNKING_STRATEGY_VERSION,
3090 dimension: 256,
3091 doc_count: 249,
3092 conversation_count: 21,
3093 db_fingerprint: "boxed-db".to_string(),
3094 index_path: "vector_index/vector.quality.idx".to_string(),
3095 size_bytes: 221_824,
3096 started_at_ms: 1_733_100_000_000,
3097 completed_at_ms: 1_733_100_100_000,
3098 ready: true,
3099 }),
3100 ..Default::default()
3101 };
3102 manifest.save(temp.path()).expect("save semantic manifest");
3103
3104 let state = semantic_state_from_availability(
3105 temp.path(),
3106 &SemanticAvailability::NeedsConsent,
3107 SemanticPreference::DefaultModel,
3108 None,
3109 );
3110
3111 assert_eq!(state.quality_tier.current_db_matches, None);
3112 assert!(
3113 state.quality_tier_published,
3114 "a ready quality tier with unknown DB match should remain visible as published in boxed data-dir status"
3115 );
3116 assert!(
3117 state.semantic_only_search_available,
3118 "semantic-only search can still run when the quality tier is ready and DB match is unknown"
3119 );
3120 assert!(state.can_search);
3121 assert_eq!(state.fallback_mode, None);
3122 }
3123
3124 #[test]
3125 fn semantic_state_promotes_complete_current_shard_generation() {
3126 let temp = tempfile::tempdir().expect("tempdir");
3127 let embedder_id = HashEmbedder::default().id().to_string();
3128 let mut records = Vec::new();
3129 for shard_index in 0..2_u32 {
3130 let relative_path = format!("vector_index/shards/fast-hash/shard-{shard_index}.fsvi");
3131 let path = temp.path().join(&relative_path);
3132 std::fs::create_dir_all(path.parent().expect("shard parent"))
3133 .expect("create shard parent");
3134 std::fs::write(&path, b"fsvi").expect("write shard placeholder");
3135 records.push(SemanticShardRecord {
3136 tier: TierKind::Fast,
3137 embedder_id: embedder_id.clone(),
3138 model_revision: "hash".to_string(),
3139 schema_version: SEMANTIC_SCHEMA_VERSION,
3140 chunking_version: CHUNKING_STRATEGY_VERSION,
3141 dimension: HashEmbedder::default().dimension(),
3142 shard_index,
3143 shard_count: 2,
3144 doc_count: 10 + u64::from(shard_index),
3145 total_conversations: 7,
3146 db_fingerprint: "current-db".to_string(),
3147 index_path: relative_path,
3148 quantization: "f16".to_string(),
3149 mmap_ready: true,
3150 ann_index_path: None,
3151 ann_size_bytes: 0,
3152 ann_ready: false,
3153 size_bytes: 100 + u64::from(shard_index),
3154 started_at_ms: 1_733_100_000_000,
3155 completed_at_ms: 1_733_100_000_000 + i64::from(shard_index),
3156 ready: true,
3157 });
3158 }
3159 let mut shards = SemanticShardManifest {
3160 shards: records,
3161 ..Default::default()
3162 };
3163 shards.save(temp.path()).expect("save shard manifest");
3164
3165 let state = semantic_state_from_availability(
3166 temp.path(),
3167 &SemanticAvailability::IndexMissing {
3168 index_path: vector_index_path(temp.path(), &embedder_id),
3169 },
3170 SemanticPreference::HashFallback,
3171 Some("current-db"),
3172 );
3173
3174 assert_eq!(state.status, "ready");
3175 assert_eq!(state.availability, "ready");
3176 assert!(state.can_search);
3177 assert_eq!(state.fallback_mode, None);
3178 assert_eq!(state.fast_tier.doc_count, Some(21));
3179 let expected_path = temp
3180 .path()
3181 .join("vector_index/shards/fast-hash/shard-0.fsvi");
3182 assert_eq!(
3183 state.vector_index_path.as_deref(),
3184 Some(expected_path.as_path())
3185 );
3186 }
3187
3188 #[test]
3189 fn semantic_state_rejects_complete_shard_generation_with_unsafe_path() {
3190 let temp = tempfile::tempdir().expect("tempdir");
3191 let outside = tempfile::tempdir().expect("outside tempdir");
3192 let outside_path = outside.path().join("outside.fsvi");
3193 std::fs::write(&outside_path, b"fsvi").expect("write outside placeholder");
3194 let embedder_id = HashEmbedder::default().id().to_string();
3195 let mut shards = SemanticShardManifest {
3196 shards: vec![SemanticShardRecord {
3197 tier: TierKind::Fast,
3198 embedder_id: embedder_id.clone(),
3199 model_revision: "hash".to_string(),
3200 schema_version: SEMANTIC_SCHEMA_VERSION,
3201 chunking_version: CHUNKING_STRATEGY_VERSION,
3202 dimension: HashEmbedder::default().dimension(),
3203 shard_index: 0,
3204 shard_count: 1,
3205 doc_count: 10,
3206 total_conversations: 7,
3207 db_fingerprint: "current-db".to_string(),
3208 index_path: outside_path.to_string_lossy().to_string(),
3209 quantization: "f16".to_string(),
3210 mmap_ready: true,
3211 ann_index_path: None,
3212 ann_size_bytes: 0,
3213 ann_ready: false,
3214 size_bytes: 100,
3215 started_at_ms: 1_733_100_000_000,
3216 completed_at_ms: 1_733_100_000_001,
3217 ready: true,
3218 }],
3219 ..Default::default()
3220 };
3221 shards.save(temp.path()).expect("save shard manifest");
3222
3223 let base_vector_path = vector_index_path(temp.path(), &embedder_id);
3224 let state = semantic_state_from_availability(
3225 temp.path(),
3226 &SemanticAvailability::IndexMissing {
3227 index_path: base_vector_path.clone(),
3228 },
3229 SemanticPreference::HashFallback,
3230 Some("current-db"),
3231 );
3232
3233 assert_ne!(state.status, "ready");
3234 assert!(!state.can_search);
3235 assert_eq!(
3236 state.vector_index_path.as_deref(),
3237 Some(base_vector_path.as_path())
3238 );
3239 assert_ne!(
3240 state.vector_index_path.as_deref(),
3241 Some(outside_path.as_path())
3242 );
3243 }
3244
3245 fn make_active_snapshot(now_ms: i64) -> SearchMaintenanceSnapshot {
3250 SearchMaintenanceSnapshot {
3251 active: true,
3252 pid: Some(12345),
3253 started_at_ms: Some(now_ms - 5_000),
3254 db_path: Some(PathBuf::from("/tmp/cass/agent_search.db")),
3255 mode: Some(SearchMaintenanceMode::Index),
3256 job_id: Some("lexical_refresh-1000-12345".to_string()),
3257 job_kind: Some(SearchMaintenanceJobKind::LexicalRefresh),
3258 phase: Some("scanning".to_string()),
3259 updated_at_ms: Some(now_ms - 500),
3260 last_progress_at_ms: Some(now_ms - 500),
3261 orphaned: false,
3262 }
3263 }
3264
3265 #[test]
3266 fn coordination_no_active_job_when_snapshot_inactive() {
3267 let snapshot = SearchMaintenanceSnapshot::default();
3268 let outcome = evaluate_maintenance_coordination_from_snapshot(&snapshot, 1_733_000_000_000);
3269 assert_eq!(outcome, MaintenanceCoordinationOutcome::Idle);
3270 }
3271
3272 #[test]
3273 fn coordination_tracks_active_legacy_lock_without_job_id() {
3274 let snapshot = SearchMaintenanceSnapshot {
3275 active: true,
3276 pid: Some(12345),
3277 job_id: None,
3278 mode: Some(SearchMaintenanceMode::Index),
3279 ..Default::default()
3280 };
3281 let outcome = evaluate_maintenance_coordination_from_snapshot(&snapshot, 1_733_000_000_000);
3282 if let MaintenanceCoordinationOutcome::Active {
3283 ref job_id,
3284 job_kind,
3285 ..
3286 } = outcome
3287 {
3288 assert_eq!(job_id, "index-active-lock-12345");
3289 assert_eq!(job_kind, SearchMaintenanceJobKind::LexicalRefresh);
3290 } else {
3291 assert!(
3292 matches!(outcome, MaintenanceCoordinationOutcome::Active { .. }),
3293 "legacy active lock must remain active, got {outcome:?}"
3294 );
3295 }
3296 }
3297
3298 #[test]
3299 fn coordination_active_job_with_fresh_heartbeat() {
3300 let now_ms = 1_733_000_000_000i64;
3301 let snapshot = make_active_snapshot(now_ms);
3302 let outcome = evaluate_maintenance_coordination_from_snapshot(&snapshot, now_ms);
3303 if let MaintenanceCoordinationOutcome::Active {
3304 ref job_id,
3305 ref phase,
3306 ..
3307 } = outcome
3308 {
3309 assert_eq!(job_id, "lexical_refresh-1000-12345");
3310 assert_eq!(phase.as_deref(), Some("scanning"));
3311 } else {
3312 assert!(
3313 matches!(outcome, MaintenanceCoordinationOutcome::Active { .. }),
3314 "expected ActiveJob, got {outcome:?}"
3315 );
3316 }
3317 }
3318
3319 #[test]
3320 fn coordination_stale_job_with_old_heartbeat() {
3321 let now_ms = 1_733_000_000_000i64;
3322 let snapshot = SearchMaintenanceSnapshot {
3323 updated_at_ms: Some(now_ms - 60_000),
3324 ..make_active_snapshot(now_ms)
3325 };
3326 let outcome = evaluate_maintenance_coordination_from_snapshot(&snapshot, now_ms);
3327 if let MaintenanceCoordinationOutcome::Stale {
3328 ref job_id,
3329 ref reason,
3330 } = outcome
3331 {
3332 assert_eq!(job_id, "lexical_refresh-1000-12345");
3333 assert!(reason.contains("60000ms"), "reason={reason}");
3334 } else {
3335 assert!(
3336 matches!(outcome, MaintenanceCoordinationOutcome::Stale { .. }),
3337 "expected StaleJob, got {outcome:?}"
3338 );
3339 }
3340 }
3341
3342 #[test]
3343 fn coordination_missing_heartbeat_timestamp_still_respects_active_flock() {
3344 let now_ms = 1_733_000_000_000i64;
3345 let snapshot = SearchMaintenanceSnapshot {
3346 updated_at_ms: None,
3347 ..make_active_snapshot(now_ms)
3348 };
3349 let outcome = evaluate_maintenance_coordination_from_snapshot(&snapshot, now_ms);
3350 if let MaintenanceCoordinationOutcome::Active { updated_at_ms, .. } = outcome {
3351 assert_eq!(updated_at_ms, now_ms);
3352 } else {
3353 assert!(
3354 matches!(outcome, MaintenanceCoordinationOutcome::Active { .. }),
3355 "missing heartbeat metadata must not hide an active flock, got {outcome:?}"
3356 );
3357 }
3358 }
3359
3360 #[test]
3361 fn decision_launch_when_no_job() {
3362 let snapshot = SearchMaintenanceSnapshot::default();
3363 let decision = decide_maintenance_action_from_snapshot(&snapshot, 1_733_000_000_000);
3364 assert_eq!(decision, MaintenanceDecision::Launch);
3365 }
3366
3367 #[test]
3368 fn decision_launch_when_no_lock_file() {
3369 let temp = tempfile::tempdir().expect("tempdir");
3370 let decision = decide_maintenance_action(temp.path(), 1_733_000_000_000);
3371 assert_eq!(decision, MaintenanceDecision::Launch);
3372 }
3373
3374 #[test]
3375 fn decision_attaches_when_active_lock_has_stale_heartbeat() {
3376 let now_ms = 1_733_000_000_000i64;
3377 let snapshot = SearchMaintenanceSnapshot {
3378 updated_at_ms: Some(now_ms - 60_000),
3379 ..make_active_snapshot(now_ms)
3380 };
3381 let decision = decide_maintenance_action_from_snapshot(&snapshot, now_ms);
3382 if let MaintenanceDecision::AttachOrWait {
3383 ref job_id,
3384 ref phase,
3385 elapsed_ms,
3386 ..
3387 } = decision
3388 {
3389 assert_eq!(job_id, "lexical_refresh-1000-12345");
3390 assert_eq!(phase.as_deref(), Some("scanning"));
3391 assert_eq!(elapsed_ms, 5_000);
3392 } else {
3393 assert!(
3394 matches!(decision, MaintenanceDecision::AttachOrWait { .. }),
3395 "stale heartbeat still has an active lock, got {decision:?}"
3396 );
3397 }
3398 }
3399
3400 #[test]
3401 fn decision_attach_when_active_fresh_job() {
3402 let now_ms = 1_733_000_000_000i64;
3403 let snapshot = make_active_snapshot(now_ms);
3404 let decision = decide_maintenance_action_from_snapshot(&snapshot, now_ms);
3405 if let MaintenanceDecision::AttachOrWait {
3406 ref job_id,
3407 elapsed_ms,
3408 ..
3409 } = decision
3410 {
3411 assert_eq!(job_id, "lexical_refresh-1000-12345");
3412 assert_eq!(elapsed_ms, 5_000);
3413 } else {
3414 assert!(
3415 matches!(decision, MaintenanceDecision::AttachOrWait { .. }),
3416 "expected AttachOrWait, got {decision:?}"
3417 );
3418 }
3419 }
3420
3421 #[test]
3422 fn poll_returns_immediately_when_no_active_job() {
3423 let temp = tempfile::tempdir().expect("tempdir");
3424 let result = poll_maintenance_until_idle(
3425 temp.path(),
3426 Some(Duration::from_millis(500)),
3427 Some(Duration::from_millis(50)),
3428 );
3429 assert!(!result.timed_out);
3430 assert_eq!(result.polls, 1);
3431 assert!(
3432 matches!(result.outcome, MaintenanceCoordinationOutcome::Idle),
3433 "expected NoActiveJob"
3434 );
3435 assert!(
3436 result.elapsed <= Duration::from_millis(500),
3437 "immediate idle poll should finish before timeout, elapsed={:?}",
3438 result.elapsed
3439 );
3440 }
3441
3442 #[test]
3443 fn poll_returns_active_on_timeout_when_lock_held() {
3444 use fs2::FileExt;
3445 let temp = tempfile::tempdir().expect("tempdir");
3446 let lock_path = temp.path().join("index-run.lock");
3447 let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
3448 let owner = OpenOptions::new()
3449 .create(true)
3450 .read(true)
3451 .write(true)
3452 .truncate(true)
3453 .open(&lock_path)
3454 .expect("open owner handle");
3455 owner.try_lock_exclusive().expect("acquire lock");
3456 std::fs::write(
3457 &lock_path,
3458 format!(
3459 "pid=99999\nstarted_at_ms={}\nupdated_at_ms={}\ndb_path=/tmp/test.db\nmode=index\njob_id=test-job-1\njob_kind=lexical_refresh\nphase=scanning\n",
3460 now_ms - 1_000,
3461 now_ms,
3462 ),
3463 )
3464 .expect("write lock metadata");
3465
3466 let result = poll_maintenance_until_idle(
3467 temp.path(),
3468 Some(Duration::from_millis(300)),
3469 Some(Duration::from_millis(50)),
3470 );
3471 assert!(result.timed_out, "should time out when lock is held");
3472 assert!(result.polls >= 2, "should have polled multiple times");
3473 assert!(
3474 matches!(
3475 result.outcome,
3476 MaintenanceCoordinationOutcome::Active { .. }
3477 ),
3478 "expected ActiveJob on timeout"
3479 );
3480
3481 let _ = FileExt::unlock(&owner);
3482 }
3483
3484 #[test]
3485 fn poll_times_out_instead_of_declaring_stale_held_lock_idle() {
3486 use fs2::FileExt;
3487 let temp = tempfile::tempdir().expect("tempdir");
3488 let lock_path = temp.path().join("index-run.lock");
3489 let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
3490 let owner = OpenOptions::new()
3491 .create(true)
3492 .read(true)
3493 .write(true)
3494 .truncate(true)
3495 .open(&lock_path)
3496 .expect("open owner handle");
3497 owner.try_lock_exclusive().expect("acquire lock");
3498 std::fs::write(
3499 &lock_path,
3500 format!(
3501 "pid=99999\nstarted_at_ms={}\nupdated_at_ms={}\ndb_path=/tmp/test.db\nmode=index\njob_id=test-job-stale\njob_kind=lexical_refresh\nphase=scanning\n",
3502 now_ms - 120_000,
3503 now_ms - 120_000,
3504 ),
3505 )
3506 .expect("write lock metadata");
3507
3508 let result = poll_maintenance_until_idle(
3509 temp.path(),
3510 Some(Duration::from_millis(150)),
3511 Some(Duration::from_millis(25)),
3512 );
3513 assert!(result.timed_out, "held stale lock is still not idle");
3514 assert!(
3515 matches!(result.outcome, MaintenanceCoordinationOutcome::Stale { .. }),
3516 "expected stale held lock on timeout, got {:?}",
3517 result.outcome
3518 );
3519
3520 let _ = FileExt::unlock(&owner);
3521 }
3522
3523 #[test]
3524 fn poll_detects_release_mid_wait() {
3525 use fs2::FileExt;
3526 let temp = tempfile::tempdir().expect("tempdir");
3527 let lock_path = temp.path().join("index-run.lock");
3528 let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
3529 let owner = OpenOptions::new()
3530 .create(true)
3531 .read(true)
3532 .write(true)
3533 .truncate(true)
3534 .open(&lock_path)
3535 .expect("open owner handle");
3536 owner.try_lock_exclusive().expect("acquire lock");
3537 std::fs::write(
3538 &lock_path,
3539 format!(
3540 "pid=99999\nstarted_at_ms={}\nupdated_at_ms={}\ndb_path=/tmp/test.db\nmode=index\njob_id=test-job-2\njob_kind=lexical_refresh\nphase=committing\n",
3541 now_ms - 1_000,
3542 now_ms,
3543 ),
3544 )
3545 .expect("write lock metadata");
3546
3547 let temp_path = temp.path().to_path_buf();
3548 let release_thread = std::thread::spawn(move || {
3549 std::thread::sleep(Duration::from_millis(150));
3550 let _ = owner.set_len(0);
3551 let _ = FileExt::unlock(&owner);
3552 drop(owner);
3553 });
3554
3555 let result = poll_maintenance_until_idle(
3556 &temp_path,
3557 Some(Duration::from_secs(2)),
3558 Some(Duration::from_millis(50)),
3559 );
3560 assert!(!result.timed_out, "should detect release before timeout");
3561 release_thread.join().expect("release thread");
3562 }
3563
3564 #[test]
3565 fn failopen_returns_failopen_when_lexical_available_and_job_active() {
3566 let temp = tempfile::tempdir().expect("tempdir");
3567 let lock_path = temp.path().join("index-run.lock");
3568 let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
3569
3570 use fs2::FileExt;
3571 let owner = OpenOptions::new()
3572 .create(true)
3573 .read(true)
3574 .write(true)
3575 .truncate(true)
3576 .open(&lock_path)
3577 .expect("open owner handle");
3578 owner.try_lock_exclusive().expect("acquire lock");
3579 std::fs::write(
3580 &lock_path,
3581 format!(
3582 "pid=99999\nstarted_at_ms={}\nupdated_at_ms={}\ndb_path=/tmp/test.db\nmode=index\njob_id=fo-job-1\njob_kind=lexical_refresh\nphase=indexing\n",
3583 now_ms - 1_000,
3584 now_ms,
3585 ),
3586 )
3587 .expect("write lock metadata");
3588
3589 let decision = decide_search_failopen(temp.path(), now_ms, true);
3590 if let MaintenanceDecision::FailOpen { ref reason } = decision {
3591 assert!(reason.contains("fo-job-1"), "reason={reason}");
3592 assert!(reason.contains("failing open"), "reason={reason}");
3593 } else {
3594 assert!(
3595 matches!(decision, MaintenanceDecision::FailOpen { .. }),
3596 "expected FailOpen, got {decision:?}"
3597 );
3598 }
3599
3600 let decision_no_lexical = decide_search_failopen(temp.path(), now_ms, false);
3601 assert!(
3602 matches!(
3603 decision_no_lexical,
3604 MaintenanceDecision::AttachOrWait { .. }
3605 ),
3606 "without lexical must attach, got {decision_no_lexical:?}"
3607 );
3608
3609 let _ = FileExt::unlock(&owner);
3610 }
3611
3612 #[test]
3613 fn failopen_handles_active_stale_heartbeat_without_launching_repair() {
3614 let temp = tempfile::tempdir().expect("tempdir");
3615 let lock_path = temp.path().join("index-run.lock");
3616 let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
3617
3618 use fs2::FileExt;
3619 let owner = OpenOptions::new()
3620 .create(true)
3621 .read(true)
3622 .write(true)
3623 .truncate(true)
3624 .open(&lock_path)
3625 .expect("open owner handle");
3626 owner.try_lock_exclusive().expect("acquire lock");
3627 std::fs::write(
3628 &lock_path,
3629 format!(
3630 "pid=99999\nstarted_at_ms={}\nupdated_at_ms={}\ndb_path=/tmp/test.db\nmode=index\njob_id=fo-stale-1\njob_kind=lexical_refresh\nphase=indexing\n",
3631 now_ms - 120_000,
3632 now_ms - 120_000,
3633 ),
3634 )
3635 .expect("write lock metadata");
3636
3637 let decision = decide_search_failopen(temp.path(), now_ms, true);
3638 if let MaintenanceDecision::FailOpen { ref reason } = decision {
3639 assert!(reason.contains("fo-stale-1"), "reason={reason}");
3640 assert!(reason.contains("stale heartbeat"), "reason={reason}");
3641 assert!(reason.contains("failing open"), "reason={reason}");
3642 } else {
3643 assert!(
3644 matches!(decision, MaintenanceDecision::FailOpen { .. }),
3645 "expected FailOpen for searchable stale active lock, got {decision:?}"
3646 );
3647 }
3648
3649 let decision_no_lexical = decide_search_failopen(temp.path(), now_ms, false);
3650 assert!(
3651 matches!(
3652 decision_no_lexical,
3653 MaintenanceDecision::AttachOrWait { .. }
3654 ),
3655 "without lexical must wait for the held lock, got {decision_no_lexical:?}"
3656 );
3657
3658 let _ = FileExt::unlock(&owner);
3659 }
3660
3661 #[test]
3666 fn event_log_append_and_read() {
3667 let temp = tempfile::tempdir().expect("tempdir");
3668 let event = MaintenanceEvent {
3669 timestamp_ms: 1_733_000_000_000,
3670 job_id: "test-job-1".to_string(),
3671 actor_pid: 42,
3672 kind: MaintenanceEventKind::Started {
3673 job_kind: "lexical_refresh".to_string(),
3674 phase: "scanning".to_string(),
3675 },
3676 };
3677 append_maintenance_event(temp.path(), &event).expect("append");
3678 let events = read_maintenance_events(temp.path(), None, None);
3679 assert_eq!(events.len(), 1);
3680 assert_eq!(events[0].job_id, "test-job-1");
3681 assert_eq!(events[0].actor_pid, 42);
3682 assert!(matches!(
3683 events[0].kind,
3684 MaintenanceEventKind::Started { .. }
3685 ));
3686 }
3687
3688 #[test]
3689 fn event_log_filters_by_timestamp() {
3690 let temp = tempfile::tempdir().expect("tempdir");
3691 for i in 0..5 {
3692 let event = MaintenanceEvent {
3693 timestamp_ms: 1_000 + i,
3694 job_id: format!("job-{i}"),
3695 actor_pid: 1,
3696 kind: MaintenanceEventKind::Progress {
3697 processed: i as u64,
3698 total: 5,
3699 },
3700 };
3701 append_maintenance_event(temp.path(), &event).expect("append");
3702 }
3703 let events = read_maintenance_events(temp.path(), Some(1_002), None);
3704 assert_eq!(events.len(), 2, "should only get events after ts 1002");
3705 assert_eq!(events[0].timestamp_ms, 1_003);
3706 assert_eq!(events[1].timestamp_ms, 1_004);
3707 }
3708
3709 #[test]
3710 fn event_log_respects_limit() {
3711 let temp = tempfile::tempdir().expect("tempdir");
3712 for i in 0..10 {
3713 let event = MaintenanceEvent {
3714 timestamp_ms: 1_000 + i,
3715 job_id: format!("job-{i}"),
3716 actor_pid: 1,
3717 kind: MaintenanceEventKind::Resumed,
3718 };
3719 append_maintenance_event(temp.path(), &event).expect("append");
3720 }
3721 let events = read_maintenance_events(temp.path(), None, Some(3));
3722 assert_eq!(events.len(), 3);
3723 assert_eq!(events[0].timestamp_ms, 1_007);
3724 assert_eq!(events[2].timestamp_ms, 1_009);
3725 }
3726
3727 #[test]
3728 fn event_log_returns_empty_when_missing() {
3729 let temp = tempfile::tempdir().expect("tempdir");
3730 let events = read_maintenance_events(temp.path(), None, None);
3731 assert!(events.is_empty());
3732 }
3733
3734 #[test]
3735 fn event_log_truncation_retains_tail() {
3736 let temp = tempfile::tempdir().expect("tempdir");
3737 for i in 0..550 {
3738 let event = MaintenanceEvent {
3739 timestamp_ms: i,
3740 job_id: format!("job-{i}"),
3741 actor_pid: 1,
3742 kind: MaintenanceEventKind::Resumed,
3743 };
3744 append_maintenance_event(temp.path(), &event).expect("append");
3745 }
3746 let before = read_maintenance_events(temp.path(), None, Some(600));
3747 assert_eq!(before.len(), 550);
3748 truncate_maintenance_event_log(temp.path()).expect("truncate");
3749 let after = read_maintenance_events(temp.path(), None, Some(600));
3750 assert_eq!(after.len(), MAX_EVENT_LOG_ENTRIES);
3751 assert_eq!(after[0].timestamp_ms, 50);
3752 assert_eq!(after[MAX_EVENT_LOG_ENTRIES - 1].timestamp_ms, 549);
3753 }
3754
3755 #[test]
3756 fn yield_signal_round_trip() {
3757 let temp = tempfile::tempdir().expect("tempdir");
3758 assert!(
3759 check_yield_requested(temp.path()).is_none(),
3760 "no signal initially"
3761 );
3762 request_yield(temp.path(), "foreground search pressure").expect("request yield");
3763 let req = check_yield_requested(temp.path()).expect("yield should be present");
3764 assert_eq!(req.requester_pid, std::process::id());
3765 assert_eq!(req.reason, "foreground search pressure");
3766 assert!(req.requested_at_ms > 0);
3767 clear_yield_signal(temp.path()).expect("clear");
3768 assert!(
3769 check_yield_requested(temp.path()).is_none(),
3770 "signal cleared"
3771 );
3772 }
3773
3774 #[test]
3775 fn clear_yield_signal_is_idempotent() {
3776 let temp = tempfile::tempdir().expect("tempdir");
3777 clear_yield_signal(temp.path()).expect("clear nonexistent");
3778 clear_yield_signal(temp.path()).expect("clear again");
3779 }
3780
3781 #[test]
3782 fn unified_view_idle_no_events() {
3783 let temp = tempfile::tempdir().expect("tempdir");
3784 let view = unified_maintenance_view(temp.path(), true);
3785 assert!(matches!(
3786 view.coordination,
3787 MaintenanceCoordinationOutcome::Idle
3788 ));
3789 assert!(view.yield_pending.is_none());
3790 assert!(view.recent_events.is_empty());
3791 assert_eq!(view.decision, MaintenanceDecision::Launch);
3792 }
3793
3794 #[test]
3795 fn unified_view_active_with_lexical_fails_open() {
3796 use fs2::FileExt;
3797 let temp = tempfile::tempdir().expect("tempdir");
3798 let lock_path = temp.path().join("index-run.lock");
3799 let now_ms = crate::storage::sqlite::FrankenStorage::now_millis();
3800 let owner = OpenOptions::new()
3801 .create(true)
3802 .read(true)
3803 .write(true)
3804 .truncate(true)
3805 .open(&lock_path)
3806 .expect("open");
3807 owner.try_lock_exclusive().expect("lock");
3808 std::fs::write(
3809 &lock_path,
3810 format!(
3811 "pid=99999\nstarted_at_ms={}\nupdated_at_ms={}\ndb_path=/tmp/t.db\nmode=index\njob_id=uv-1\njob_kind=lexical_refresh\nphase=indexing\n",
3812 now_ms - 1_000,
3813 now_ms,
3814 ),
3815 )
3816 .expect("write metadata");
3817
3818 let event = MaintenanceEvent {
3819 timestamp_ms: now_ms,
3820 job_id: "uv-1".to_string(),
3821 actor_pid: 99999,
3822 kind: MaintenanceEventKind::Started {
3823 job_kind: "lexical_refresh".to_string(),
3824 phase: "indexing".to_string(),
3825 },
3826 };
3827 append_maintenance_event(temp.path(), &event).expect("append");
3828
3829 let view = unified_maintenance_view(temp.path(), true);
3830 assert!(matches!(
3831 view.coordination,
3832 MaintenanceCoordinationOutcome::Active { .. }
3833 ));
3834 assert!(matches!(
3835 view.decision,
3836 MaintenanceDecision::FailOpen { .. }
3837 ));
3838 assert_eq!(view.recent_events.len(), 1);
3839
3840 let _ = FileExt::unlock(&owner);
3841 }
3842
3843 #[test]
3844 fn unified_view_includes_yield_signal() {
3845 let temp = tempfile::tempdir().expect("tempdir");
3846 request_yield(temp.path(), "test yield").expect("yield");
3847 let view = unified_maintenance_view(temp.path(), true);
3848 assert!(view.yield_pending.is_some());
3849 assert_eq!(view.yield_pending.as_ref().unwrap().reason, "test yield");
3850 clear_yield_signal(temp.path()).expect("clear");
3851 }
3852
3853 #[test]
3854 fn event_kinds_serialize_round_trip() {
3855 let temp = tempfile::tempdir().expect("tempdir");
3856 let kinds = vec![
3857 MaintenanceEventKind::Started {
3858 job_kind: "lexical_refresh".to_string(),
3859 phase: "init".to_string(),
3860 },
3861 MaintenanceEventKind::PhaseChanged {
3862 from: "init".to_string(),
3863 to: "scanning".to_string(),
3864 },
3865 MaintenanceEventKind::Progress {
3866 processed: 50,
3867 total: 100,
3868 },
3869 MaintenanceEventKind::YieldRequested {
3870 requester_pid: 42,
3871 reason: "foreground".to_string(),
3872 },
3873 MaintenanceEventKind::Paused {
3874 reason: "yield".to_string(),
3875 },
3876 MaintenanceEventKind::Resumed,
3877 MaintenanceEventKind::Completed {
3878 summary: "done".to_string(),
3879 },
3880 MaintenanceEventKind::Failed {
3881 error: "oops".to_string(),
3882 },
3883 MaintenanceEventKind::Cancelled {
3884 reason: "user".to_string(),
3885 },
3886 ];
3887 for (i, kind) in kinds.into_iter().enumerate() {
3888 let event = MaintenanceEvent {
3889 timestamp_ms: i as i64,
3890 job_id: "rt-test".to_string(),
3891 actor_pid: 1,
3892 kind,
3893 };
3894 append_maintenance_event(temp.path(), &event).expect("append");
3895 }
3896 let events = read_maintenance_events(temp.path(), None, None);
3897 assert_eq!(events.len(), 9);
3898 assert!(matches!(
3899 events[0].kind,
3900 MaintenanceEventKind::Started { .. }
3901 ));
3902 assert!(matches!(events[5].kind, MaintenanceEventKind::Resumed));
3903 assert!(matches!(
3904 events[8].kind,
3905 MaintenanceEventKind::Cancelled { .. }
3906 ));
3907 }
3908}