1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
//! Secondary index management for the KV engine.
//!
//! Implements register, drop, lookup, and stats methods on [`super::engine::KvEngine`].
use super::engine::KvEngine;
use super::engine_helpers::{extract_field_values_from_msgpack, table_key};
impl KvEngine {
/// Register a secondary index on a field for a collection.
///
/// If `backfill` is true, scans all existing entries and populates the index.
/// Returns the number of entries backfilled (0 if index already existed).
///
/// **Note**: backfill scans all entries synchronously. For large collections
/// (> 10k entries), consider `backfill=false` and rebuilding offline.
pub fn register_index(
&mut self,
tenant_id: u32,
collection: &str,
field: &str,
field_position: usize,
backfill: bool,
now_ms: u64,
) -> usize {
let tkey = table_key(tenant_id, collection);
let idx_set = self.indexes.entry(tkey).or_default();
if !idx_set.add_index(field, field_position) {
return 0; // Already indexed.
}
if !backfill {
return 0;
}
// Backfill: collect entries first, then update indexes.
// Two-phase approach avoids borrow conflicts on self.indexes vs self.tables.
let entries_to_backfill: Vec<(Vec<u8>, Vec<u8>)> = match self.tables.get(&tkey) {
Some(table) => {
let mut all = Vec::new();
let mut cursor = 0;
loop {
let (entries, next) = table.scan(cursor, 1000, now_ms, None);
if entries.is_empty() {
break;
}
all.extend(entries.into_iter().map(|(k, v)| (k.to_vec(), v.to_vec())));
if next == 0 {
break;
}
cursor = next;
}
all
}
None => return 0,
};
// Now update indexes — idx_set is guaranteed to exist (inserted above).
let idx_set = self
.indexes
.get_mut(&tkey)
.expect("index set was inserted at entry point of register_index");
let mut backfilled = 0;
for (key, value) in &entries_to_backfill {
let field_values = extract_field_values_from_msgpack(value, field);
for fv in &field_values {
let fv_pairs: Vec<(&str, &[u8])> = vec![(field, fv.as_slice())];
idx_set.on_put(key, &fv_pairs, None);
backfilled += 1;
}
}
backfilled
}
/// Remove a secondary index on a field.
///
/// Returns the number of index entries that were dropped.
pub fn drop_index(&mut self, tenant_id: u32, collection: &str, field: &str) -> usize {
let tkey = table_key(tenant_id, collection);
let idx_set = match self.indexes.get_mut(&tkey) {
Some(s) => s,
None => return 0,
};
match idx_set.remove_index(field) {
Some(removed) => removed.entry_count(),
None => 0,
}
}
/// Lookup primary keys by exact field value match using a secondary index.
///
/// Returns empty if the field is not indexed.
pub fn index_lookup_eq(
&self,
tenant_id: u32,
collection: &str,
field: &str,
value: &[u8],
) -> Vec<Vec<u8>> {
let tkey = table_key(tenant_id, collection);
self.indexes
.get(&tkey)
.map(|idx| {
idx.lookup_eq(field, value)
.into_iter()
.map(|k| k.to_vec())
.collect()
})
.unwrap_or_default()
}
/// Check if a collection has any secondary indexes.
pub fn has_indexes(&self, tenant_id: u32, collection: &str) -> bool {
let tkey = table_key(tenant_id, collection);
self.indexes.get(&tkey).is_some_and(|s| !s.is_empty())
}
/// Get the write amplification ratio for a collection.
pub fn write_amp_ratio(&self, tenant_id: u32, collection: &str) -> f64 {
let tkey = table_key(tenant_id, collection);
self.indexes
.get(&tkey)
.map(|s| s.write_amp_ratio())
.unwrap_or(0.0)
}
/// Get the number of secondary indexes for a collection.
pub fn index_count(&self, tenant_id: u32, collection: &str) -> usize {
let tkey = table_key(tenant_id, collection);
self.indexes
.get(&tkey)
.map(|s| s.index_count())
.unwrap_or(0)
}
}