1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
//! Vector index checkpoint methods for [`CoreLoop`].
//!
//! Contains HNSW build completion polling and checkpoint load/save operations.
use super::core_loop::CoreLoop;
impl CoreLoop {
/// Drain completed HNSW builds from the background builder thread and
/// promote the corresponding building segments to sealed segments.
///
/// Called at the top of `tick()` before draining new requests.
pub fn poll_build_completions(&mut self) {
let Some(rx) = &self.build_rx else { return };
while let Ok(complete) = rx.try_recv() {
if let Some(coll) = self.vector_collections.get_mut(&complete.key) {
coll.complete_build(complete.segment_id, complete.index);
tracing::info!(
core = self.core_id,
key = %complete.key,
segment_id = complete.segment_id,
"HNSW build completed, segment promoted to sealed"
);
}
}
}
/// Write HNSW checkpoints for all vector indexes to disk.
///
/// Called periodically from the TPC event loop (e.g., every 5 minutes
/// or when idle). Each index is serialized to a file at
/// `{data_dir}/vector-ckpt/{index_key}.ckpt`.
///
/// After checkpointing, WAL replay only needs to process entries
/// since the checkpoint — not the entire history.
pub fn checkpoint_vector_indexes(&self) -> usize {
if self.vector_collections.is_empty() {
return 0;
}
let ckpt_dir = self.data_dir.join("vector-ckpt");
if std::fs::create_dir_all(&ckpt_dir).is_err() {
tracing::warn!(
core = self.core_id,
"failed to create vector checkpoint dir"
);
return 0;
}
let mut checkpointed = 0;
for (key, collection) in &self.vector_collections {
if collection.is_empty() {
continue;
}
let bytes = collection.checkpoint_to_bytes();
if bytes.is_empty() {
continue;
}
// Write to temp file, then rename for atomicity.
let ckpt_path = ckpt_dir.join(format!("{key}.ckpt"));
let tmp_path = ckpt_dir.join(format!("{key}.ckpt.tmp"));
if std::fs::write(&tmp_path, &bytes).is_ok()
&& std::fs::rename(&tmp_path, &ckpt_path).is_ok()
{
checkpointed += 1;
}
}
if checkpointed > 0 {
tracing::info!(
core = self.core_id,
checkpointed,
total = self.vector_collections.len(),
"vector collections checkpointed"
);
}
checkpointed
}
/// Load HNSW checkpoints from disk on startup, before WAL replay.
///
/// For each checkpoint file, loads the index. WAL replay then only
/// needs to process entries after the checkpoint LSN.
pub fn load_vector_checkpoints(&mut self) {
let ckpt_dir = self.data_dir.join("vector-ckpt");
if !ckpt_dir.exists() {
return;
}
let entries = match std::fs::read_dir(&ckpt_dir) {
Ok(e) => e,
Err(_) => return,
};
let mut loaded = 0;
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("ckpt") {
continue;
}
let key = path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("")
.to_string();
if key.is_empty() {
continue;
}
if let Ok(bytes) = std::fs::read(&path)
&& let Some(collection) =
crate::engine::vector::collection::VectorCollection::from_checkpoint(&bytes)
{
tracing::info!(
core = self.core_id,
%key,
vectors = collection.len(),
"loaded vector checkpoint"
);
self.vector_collections.insert(key, collection);
loaded += 1;
}
}
if loaded > 0 {
tracing::info!(core = self.core_id, loaded, "vector checkpoints loaded");
}
}
}