nodedb_vector/sieve/
workload.rs1use std::collections::HashMap;
7
8use super::collection::PredicateSignature;
9
10pub struct QueryRecord {
12 pub predicate_signature: PredicateSignature,
14 pub timestamp_secs: u64,
16}
17
18pub struct WorkloadAnalyzer {
21 log: Vec<QueryRecord>,
22 stable_fraction_threshold: f32,
25}
26
27impl WorkloadAnalyzer {
28 pub fn new(stable_fraction_threshold: f32) -> Self {
32 Self {
33 log: Vec::new(),
34 stable_fraction_threshold,
35 }
36 }
37
38 pub fn record(&mut self, signature: PredicateSignature, timestamp_secs: u64) {
40 self.log.push(QueryRecord {
41 predicate_signature: signature,
42 timestamp_secs,
43 });
44 }
45
46 pub fn stable_predicates(&self) -> Vec<(PredicateSignature, f32)> {
52 if self.log.is_empty() {
53 return Vec::new();
54 }
55
56 let total = self.log.len() as f32;
57 let mut counts: HashMap<&str, usize> = HashMap::new();
58 for record in &self.log {
59 *counts
60 .entry(record.predicate_signature.as_str())
61 .or_insert(0) += 1;
62 }
63
64 let mut result: Vec<(PredicateSignature, f32)> = counts
65 .into_iter()
66 .filter_map(|(sig, count)| {
67 let freq = count as f32 / total;
68 if freq >= self.stable_fraction_threshold {
69 Some((sig.to_owned(), freq))
70 } else {
71 None
72 }
73 })
74 .collect();
75
76 result.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
77 result
78 }
79
80 pub fn estimate_subindex_cost(
92 &self,
93 vectors_in_subindex: usize,
94 dim: usize,
95 ) -> (usize, f32, f32) {
96 const AVG_LAYERS: usize = 2;
97 const SUB_M: usize = 16; const BYTES_PER_FLOAT: usize = 4;
99 const ASSUMED_RECALL: f32 = 0.95;
100
101 if vectors_in_subindex == 0 {
102 return (0, 0.0, ASSUMED_RECALL);
103 }
104
105 let memory_bytes =
106 vectors_in_subindex * (dim * BYTES_PER_FLOAT + SUB_M * BYTES_PER_FLOAT * AVG_LAYERS);
107
108 let latency_ms = (vectors_in_subindex as f64).log2() as f32 * 0.01;
109
110 (memory_bytes, latency_ms, ASSUMED_RECALL)
111 }
112
113 pub fn compact(&mut self, now_secs: u64, retention_secs: u64) {
118 let cutoff = now_secs.saturating_sub(retention_secs);
119 self.log.retain(|r| r.timestamp_secs >= cutoff);
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126
127 #[test]
130 fn stable_predicates_returns_dominant_signature() {
131 let mut analyzer = WorkloadAnalyzer::new(0.5);
132 for i in 0u64..80 {
133 analyzer.record("A".to_string(), i);
134 }
135 for i in 80u64..100 {
136 analyzer.record("B".to_string(), i);
137 }
138
139 let stable = analyzer.stable_predicates();
140 assert_eq!(stable.len(), 1, "only A should exceed 50% threshold");
141 assert_eq!(stable[0].0, "A");
142 let freq = stable[0].1;
143 assert!(
144 (freq - 0.8).abs() < 1e-5,
145 "frequency should be ~0.80, got {freq}"
146 );
147 }
148
149 #[test]
151 fn stable_predicates_multiple_signatures() {
152 let mut analyzer = WorkloadAnalyzer::new(0.1);
153 for i in 0u64..80 {
154 analyzer.record("A".to_string(), i);
155 }
156 for i in 80u64..100 {
157 analyzer.record("B".to_string(), i);
158 }
159
160 let stable = analyzer.stable_predicates();
161 assert_eq!(stable.len(), 2);
162 assert_eq!(stable[0].0, "A");
164 assert_eq!(stable[1].0, "B");
165 }
166
167 #[test]
169 fn empty_log_returns_empty() {
170 let analyzer = WorkloadAnalyzer::new(0.05);
171 assert!(analyzer.stable_predicates().is_empty());
172 }
173
174 #[test]
176 fn compact_drops_old_records() {
177 let mut analyzer = WorkloadAnalyzer::new(0.05);
178 for i in 0u64..50 {
180 analyzer.record("old".to_string(), i);
181 }
182 for i in 150u64..200 {
183 analyzer.record("recent".to_string(), i);
184 }
185
186 analyzer.compact(200, 100);
188
189 let stable = analyzer.stable_predicates();
190 assert_eq!(stable.len(), 1);
192 assert_eq!(stable[0].0, "recent");
193 }
194
195 #[test]
197 fn compact_all_yields_empty() {
198 let mut analyzer = WorkloadAnalyzer::new(0.05);
199 for i in 0u64..10 {
200 analyzer.record("X".to_string(), i);
201 }
202 analyzer.compact(1000, 0);
204 assert!(analyzer.stable_predicates().is_empty());
205 }
206
207 #[test]
209 fn estimate_subindex_cost_plausible() {
210 let analyzer = WorkloadAnalyzer::new(0.05);
211 let (mem, lat, recall) = analyzer.estimate_subindex_cost(1000, 128);
212
213 assert_eq!(mem, 640_000);
215
216 assert!(lat > 0.0 && lat < 1.0, "latency_ms={lat} should be sub-ms");
218
219 assert!((recall - 0.95).abs() < 1e-6);
220 }
221
222 #[test]
223 fn estimate_subindex_cost_zero_vectors() {
224 let analyzer = WorkloadAnalyzer::new(0.05);
225 let (mem, lat, recall) = analyzer.estimate_subindex_cost(0, 128);
226 assert_eq!(mem, 0);
227 assert_eq!(lat, 0.0);
228 assert!((recall - 0.95).abs() < 1e-6);
229 }
230}