Skip to main content

nodedb_vector/sieve/
workload.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! WorkloadAnalyzer — tracks historical query predicates and exposes a 3D cost
4//! model (memory × latency × recall) to guide SIEVE subindex build decisions.
5
6use std::collections::HashMap;
7
8use super::collection::PredicateSignature;
9
10/// A single query event recorded in the workload log.
11pub struct QueryRecord {
12    /// Predicate signature observed in this query (e.g. `"tenant_id=42"`).
13    pub predicate_signature: PredicateSignature,
14    /// Unix timestamp (seconds) at which the query arrived.
15    pub timestamp_secs: u64,
16}
17
18/// Tracks historical query logs and exposes heuristics for deciding which
19/// predicates warrant a dedicated SIEVE subindex.
20pub struct WorkloadAnalyzer {
21    log: Vec<QueryRecord>,
22    /// A predicate is considered "stable" when its share of total queries is at
23    /// or above this fraction.  Typical value: `0.05` (5%).
24    stable_fraction_threshold: f32,
25}
26
27impl WorkloadAnalyzer {
28    /// Create a new analyzer.  `stable_fraction_threshold` is the minimum
29    /// fraction of total queries a predicate must represent to be considered
30    /// stable.
31    pub fn new(stable_fraction_threshold: f32) -> Self {
32        Self {
33            log: Vec::new(),
34            stable_fraction_threshold,
35        }
36    }
37
38    /// Record a query that used the given predicate signature.
39    pub fn record(&mut self, signature: PredicateSignature, timestamp_secs: u64) {
40        self.log.push(QueryRecord {
41            predicate_signature: signature,
42            timestamp_secs,
43        });
44    }
45
46    /// Returns `(signature, frequency)` pairs sorted by frequency descending,
47    /// filtered to those whose frequency exceeds `stable_fraction_threshold`.
48    ///
49    /// Frequency is the fraction of total log entries contributed by that
50    /// predicate.  Returns an empty Vec if the log is empty.
51    pub fn stable_predicates(&self) -> Vec<(PredicateSignature, f32)> {
52        if self.log.is_empty() {
53            return Vec::new();
54        }
55
56        let total = self.log.len() as f32;
57        let mut counts: HashMap<&str, usize> = HashMap::new();
58        for record in &self.log {
59            *counts
60                .entry(record.predicate_signature.as_str())
61                .or_insert(0) += 1;
62        }
63
64        let mut result: Vec<(PredicateSignature, f32)> = counts
65            .into_iter()
66            .filter_map(|(sig, count)| {
67                let freq = count as f32 / total;
68                if freq >= self.stable_fraction_threshold {
69                    Some((sig.to_owned(), freq))
70                } else {
71                    None
72                }
73            })
74            .collect();
75
76        result.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
77        result
78    }
79
80    /// Rough 3D cost model for a candidate subindex.
81    ///
82    /// Returns `(memory_bytes, latency_ms, recall)` estimates:
83    ///
84    /// - `memory_bytes` ≈ `n * (dim * 4 + sub_m * 4 * avg_layers)` where
85    ///   `avg_layers ≈ 2`.
86    /// - `latency_ms`   ≈ `log2(n) * 0.01`.
87    /// - `recall`       = `0.95` (assumed for a correctly built HNSW index).
88    ///
89    /// These are intentionally coarse: they guide the build/no-build decision
90    /// for a planner, not a precision benchmark.
91    pub fn estimate_subindex_cost(
92        &self,
93        vectors_in_subindex: usize,
94        dim: usize,
95    ) -> (usize, f32, f32) {
96        const AVG_LAYERS: usize = 2;
97        const SUB_M: usize = 16; // representative default
98        const BYTES_PER_FLOAT: usize = 4;
99        const ASSUMED_RECALL: f32 = 0.95;
100
101        if vectors_in_subindex == 0 {
102            return (0, 0.0, ASSUMED_RECALL);
103        }
104
105        let memory_bytes =
106            vectors_in_subindex * (dim * BYTES_PER_FLOAT + SUB_M * BYTES_PER_FLOAT * AVG_LAYERS);
107
108        let latency_ms = (vectors_in_subindex as f64).log2() as f32 * 0.01;
109
110        (memory_bytes, latency_ms, ASSUMED_RECALL)
111    }
112
113    /// Drop all log entries older than `retention_secs` seconds before `now`.
114    ///
115    /// Entries with `timestamp_secs < now - retention_secs` are removed.
116    /// Entries at or after that boundary are kept.
117    pub fn compact(&mut self, now_secs: u64, retention_secs: u64) {
118        let cutoff = now_secs.saturating_sub(retention_secs);
119        self.log.retain(|r| r.timestamp_secs >= cutoff);
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126
127    /// Record 100 queries: 80 with signature "A", 20 with "B".
128    /// `stable_predicates(0.5)` must return only "A".
129    #[test]
130    fn stable_predicates_returns_dominant_signature() {
131        let mut analyzer = WorkloadAnalyzer::new(0.5);
132        for i in 0u64..80 {
133            analyzer.record("A".to_string(), i);
134        }
135        for i in 80u64..100 {
136            analyzer.record("B".to_string(), i);
137        }
138
139        let stable = analyzer.stable_predicates();
140        assert_eq!(stable.len(), 1, "only A should exceed 50% threshold");
141        assert_eq!(stable[0].0, "A");
142        let freq = stable[0].1;
143        assert!(
144            (freq - 0.8).abs() < 1e-5,
145            "frequency should be ~0.80, got {freq}"
146        );
147    }
148
149    /// With a 0.1 threshold both A (80%) and B (20%) are stable.
150    #[test]
151    fn stable_predicates_multiple_signatures() {
152        let mut analyzer = WorkloadAnalyzer::new(0.1);
153        for i in 0u64..80 {
154            analyzer.record("A".to_string(), i);
155        }
156        for i in 80u64..100 {
157            analyzer.record("B".to_string(), i);
158        }
159
160        let stable = analyzer.stable_predicates();
161        assert_eq!(stable.len(), 2);
162        // Sorted descending by frequency: A first.
163        assert_eq!(stable[0].0, "A");
164        assert_eq!(stable[1].0, "B");
165    }
166
167    /// Empty log returns an empty result.
168    #[test]
169    fn empty_log_returns_empty() {
170        let analyzer = WorkloadAnalyzer::new(0.05);
171        assert!(analyzer.stable_predicates().is_empty());
172    }
173
174    /// `compact` drops entries older than the retention window.
175    #[test]
176    fn compact_drops_old_records() {
177        let mut analyzer = WorkloadAnalyzer::new(0.05);
178        // Timestamps: 0..50 old, 150..200 recent.
179        for i in 0u64..50 {
180            analyzer.record("old".to_string(), i);
181        }
182        for i in 150u64..200 {
183            analyzer.record("recent".to_string(), i);
184        }
185
186        // now=200, retention=100 → cutoff=100; entries with ts < 100 removed.
187        analyzer.compact(200, 100);
188
189        let stable = analyzer.stable_predicates();
190        // Only "recent" entries remain.
191        assert_eq!(stable.len(), 1);
192        assert_eq!(stable[0].0, "recent");
193    }
194
195    /// After compacting everything, log is empty.
196    #[test]
197    fn compact_all_yields_empty() {
198        let mut analyzer = WorkloadAnalyzer::new(0.05);
199        for i in 0u64..10 {
200            analyzer.record("X".to_string(), i);
201        }
202        // cutoff = 1000 - 0 = 1000; all ts < 1000 dropped.
203        analyzer.compact(1000, 0);
204        assert!(analyzer.stable_predicates().is_empty());
205    }
206
207    /// `estimate_subindex_cost` returns plausible values.
208    #[test]
209    fn estimate_subindex_cost_plausible() {
210        let analyzer = WorkloadAnalyzer::new(0.05);
211        let (mem, lat, recall) = analyzer.estimate_subindex_cost(1000, 128);
212
213        // memory_bytes = 1000 * (128*4 + 16*4*2) = 1000 * (512 + 128) = 640_000
214        assert_eq!(mem, 640_000);
215
216        // latency_ms ≈ log2(1000) * 0.01 ≈ 9.97 * 0.01 ≈ 0.0997
217        assert!(lat > 0.0 && lat < 1.0, "latency_ms={lat} should be sub-ms");
218
219        assert!((recall - 0.95).abs() < 1e-6);
220    }
221
222    #[test]
223    fn estimate_subindex_cost_zero_vectors() {
224        let analyzer = WorkloadAnalyzer::new(0.05);
225        let (mem, lat, recall) = analyzer.estimate_subindex_cost(0, 128);
226        assert_eq!(mem, 0);
227        assert_eq!(lat, 0.0);
228        assert!((recall - 0.95).abs() < 1e-6);
229    }
230}