1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
// SPDX-License-Identifier: BUSL-1.1
//! Vector index checkpoint methods for [`CoreLoop`].
//!
//! Contains HNSW build completion polling and checkpoint load/save operations.
use super::core_loop::CoreLoop;
/// Parse a `"{tid}:{coll_key}"` string (used in `BuildComplete.key` and on-disk
/// checkpoint filenames) back into the `(TenantId, String)` tuple map key.
///
/// If the string has no `':'` separator the entire string is used as the
/// collection key with tenant 0 (should not happen in practice).
fn parse_build_key(s: &str) -> (crate::types::TenantId, String) {
match s.split_once(':') {
Some((tid_str, coll_key)) => {
let tid = tid_str.parse::<u64>().unwrap_or(0);
(crate::types::TenantId::new(tid), coll_key.to_string())
}
None => (crate::types::TenantId::new(0_u64), s.to_string()),
}
}
impl CoreLoop {
/// Drain completed HNSW builds from the background builder thread and
/// promote the corresponding building segments to sealed segments.
///
/// Called at the top of `tick()` before draining new requests.
///
/// `BuildComplete.key` is the old-style `"{tid}:{coll}"` string (produced
/// by `VectorCollection::seal` which still takes `&str`). Parse it back to
/// the tuple key to look up the map.
pub fn poll_build_completions(&mut self) {
let Some(rx) = &self.build_rx else { return };
while let Ok(complete) = rx.try_recv() {
// Parse the string key `"{tid}:{coll_key}"` back into the tuple.
let tuple_key = parse_build_key(&complete.key);
if let Some(coll) = self.vector_collections.get_mut(&tuple_key) {
coll.complete_build(complete.segment_id, complete.index);
tracing::info!(
core = self.core_id,
key = %complete.key,
segment_id = complete.segment_id,
"HNSW build completed, segment promoted to sealed"
);
}
}
}
/// Write HNSW checkpoints for all vector indexes to disk.
///
/// Called periodically from the TPC event loop (e.g., every 5 minutes
/// or when idle). Each index is serialized to a file at
/// `{data_dir}/vector-ckpt/{index_key}.ckpt`.
///
/// After checkpointing, WAL replay only needs to process entries
/// since the checkpoint — not the entire history.
pub fn checkpoint_vector_indexes(&self) -> usize {
if self.vector_collections.is_empty() {
return 0;
}
let ckpt_dir = self.data_dir.join("vector-ckpt");
if std::fs::create_dir_all(&ckpt_dir).is_err() {
tracing::warn!(
core = self.core_id,
"failed to create vector checkpoint dir"
);
return 0;
}
let mut checkpointed = 0;
for (key, collection) in &self.vector_collections {
if collection.is_empty() {
continue;
}
let bytes = collection.checkpoint_to_bytes(self.vector_checkpoint_kek.as_ref());
if bytes.is_empty() {
continue;
}
// Checkpoint filename uses the old-style `"{tid}:{coll}"` form so
// existing on-disk checkpoint files remain valid across upgrades.
let filename = CoreLoop::vector_checkpoint_filename(key);
let ckpt_path = ckpt_dir.join(format!("{filename}.ckpt"));
let tmp_path = ckpt_dir.join(format!("{filename}.ckpt.tmp"));
if nodedb_wal::segment::atomic_write_fsync(&tmp_path, &ckpt_path, &bytes).is_ok() {
checkpointed += 1;
}
}
if checkpointed > 0 {
tracing::info!(
core = self.core_id,
checkpointed,
total = self.vector_collections.len(),
"vector collections checkpointed"
);
}
checkpointed
}
/// Load HNSW checkpoints from disk on startup, before WAL replay.
///
/// For each checkpoint file, loads the index. WAL replay then only
/// needs to process entries after the checkpoint LSN.
pub fn load_vector_checkpoints(&mut self) {
let ckpt_dir = self.data_dir.join("vector-ckpt");
if !ckpt_dir.exists() {
return;
}
let entries = match std::fs::read_dir(&ckpt_dir) {
Ok(e) => e,
Err(_) => return,
};
let mut loaded = 0;
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("ckpt") {
continue;
}
// Checkpoint filenames are `"{tid}:{coll}.ckpt"` — parse back to tuple.
let filename = path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("")
.to_string();
if filename.is_empty() {
continue;
}
let tuple_key = parse_build_key(&filename);
let Ok(bytes) = nodedb_wal::segment::read_checkpoint_dontneed(&path) else {
continue;
};
let kek = self.vector_checkpoint_kek.as_ref();
let load_result =
crate::engine::vector::collection::VectorCollection::from_checkpoint(&bytes, kek);
let collection = match load_result {
Ok(Some(c)) => c,
Ok(None) => continue,
Err(e) => {
tracing::warn!(
core = self.core_id,
key = %filename,
error = %e,
"vector checkpoint rejected"
);
continue;
}
};
tracing::info!(
core = self.core_id,
key = %filename,
vectors = collection.len(),
"loaded vector checkpoint"
);
self.vector_collections.insert(tuple_key, collection);
loaded += 1;
}
if loaded > 0 {
tracing::info!(core = self.core_id, loaded, "vector checkpoints loaded");
}
}
}