Skip to main content

batuta/oracle/rag/
indexer.rs

1//! Heijunka Reindexer - Load-Leveled Incremental Indexing
2//!
3//! Implements Toyota Way Heijunka (平準化) principle for smooth workload distribution.
4//! Prevents thundering herd during bulk updates through batched processing.
5
6use super::fingerprint::DocumentFingerprint;
7use std::collections::{BinaryHeap, HashMap, HashSet};
8use std::path::PathBuf;
9use std::time::{Duration, Instant};
10
11/// Heijunka reindexer for load-leveled updates
12///
13/// Following queueing theory principles from Harchol-Balter (2013)
14/// and tail latency management from Dean & Barroso (2013).
15#[derive(Debug)]
16pub struct HeijunkaReindexer {
17    /// Maximum documents per batch (load leveling)
18    batch_size: usize,
19    /// Inter-batch delay for backpressure (milliseconds)
20    batch_delay_ms: u64,
21    /// Priority queue ordered by staleness
22    queue: BinaryHeap<StalenessEntry>,
23    /// Document fingerprints for change detection
24    fingerprints: HashMap<String, DocumentFingerprint>,
25    /// Query counts for popularity-weighted staleness
26    query_counts: HashMap<String, u64>,
27    /// Configuration
28    config: HeijunkaConfig,
29}
30
31/// Heijunka configuration
32#[derive(Debug, Clone)]
33pub struct HeijunkaConfig {
34    /// Maximum batch size
35    pub batch_size: usize,
36    /// Delay between batches (ms)
37    pub batch_delay_ms: u64,
38    /// Maximum staleness before forced reindex (seconds)
39    pub max_staleness_seconds: u64,
40    /// Query count decay factor (for aging popularity)
41    pub popularity_decay: f64,
42}
43
44impl Default for HeijunkaConfig {
45    fn default() -> Self {
46        Self {
47            batch_size: 50,
48            batch_delay_ms: 100,
49            max_staleness_seconds: 86400, // 24 hours
50            popularity_decay: 0.95,
51        }
52    }
53}
54
55/// Entry in the staleness priority queue
56#[derive(Debug, Clone)]
57struct StalenessEntry {
58    /// Document ID
59    doc_id: String,
60    /// Staleness score (higher = more stale, process first)
61    staleness_score: f64,
62    /// Document path
63    path: PathBuf,
64}
65
66impl From<StalenessEntry> for ReindexTask {
67    fn from(entry: StalenessEntry) -> Self {
68        Self { doc_id: entry.doc_id, path: entry.path, staleness_score: entry.staleness_score }
69    }
70}
71
72impl PartialEq for StalenessEntry {
73    fn eq(&self, other: &Self) -> bool {
74        self.doc_id == other.doc_id
75    }
76}
77
78impl Eq for StalenessEntry {}
79
80impl PartialOrd for StalenessEntry {
81    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
82        Some(self.cmp(other))
83    }
84}
85
86impl Ord for StalenessEntry {
87    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
88        // Higher staleness score = higher priority
89        self.staleness_score
90            .partial_cmp(&other.staleness_score)
91            .unwrap_or(std::cmp::Ordering::Equal)
92    }
93}
94
95impl HeijunkaReindexer {
96    /// Create a new Heijunka reindexer
97    pub fn new() -> Self {
98        Self::with_config(HeijunkaConfig::default())
99    }
100
101    /// Create with custom configuration
102    pub fn with_config(config: HeijunkaConfig) -> Self {
103        Self {
104            batch_size: config.batch_size,
105            batch_delay_ms: config.batch_delay_ms,
106            queue: BinaryHeap::new(),
107            fingerprints: HashMap::new(),
108            query_counts: HashMap::new(),
109            config,
110        }
111    }
112
113    /// Calculate staleness score for a document
114    ///
115    /// Score formula: recency_weight * popularity_weight
116    /// - recency_weight: exponential decay based on age
117    /// - popularity_weight: log-scaled query count
118    pub fn staleness_score(age_seconds: u64, query_count: u64) -> f64 {
119        let recency_weight = 1.0 - (-(age_seconds as f64) / 86400.0).exp();
120        let popularity_weight = (query_count as f64 + 1.0).ln();
121        recency_weight * popularity_weight
122    }
123
124    /// Add a document to the reindex queue
125    pub fn enqueue(&mut self, doc_id: &str, path: PathBuf, age_seconds: u64) {
126        let query_count = self.query_counts.get(doc_id).copied().unwrap_or(0);
127        let staleness_score = Self::staleness_score(age_seconds, query_count);
128
129        self.queue.push(StalenessEntry { doc_id: doc_id.to_string(), staleness_score, path });
130    }
131
132    /// Record a query for a document (affects staleness priority)
133    pub fn record_query(&mut self, doc_id: &str) {
134        *self.query_counts.entry(doc_id.to_string()).or_insert(0) += 1;
135    }
136
137    /// Apply popularity decay to all query counts
138    pub fn decay_popularity(&mut self) {
139        for count in self.query_counts.values_mut() {
140            *count = (*count as f64 * self.config.popularity_decay) as u64;
141        }
142    }
143
144    /// Get the next batch of documents to reindex
145    pub fn next_batch(&mut self) -> Vec<ReindexTask> {
146        let mut batch = Vec::with_capacity(self.batch_size);
147
148        while batch.len() < self.batch_size {
149            if let Some(entry) = self.queue.pop() {
150                batch.push(entry.into());
151            } else {
152                break;
153            }
154        }
155
156        batch
157    }
158
159    /// Check if queue is empty
160    pub fn is_empty(&self) -> bool {
161        self.queue.is_empty()
162    }
163
164    /// Get queue size
165    pub fn queue_size(&self) -> usize {
166        self.queue.len()
167    }
168
169    /// Store fingerprint for a document
170    pub fn store_fingerprint(&mut self, doc_id: &str, fingerprint: DocumentFingerprint) {
171        self.fingerprints.insert(doc_id.to_string(), fingerprint);
172    }
173
174    /// Get fingerprint for a document
175    pub fn get_fingerprint(&self, doc_id: &str) -> Option<&DocumentFingerprint> {
176        self.fingerprints.get(doc_id)
177    }
178
179    /// Calculate delta between old and new chunks (Muda elimination)
180    pub fn calculate_delta<'a>(
181        old_hashes: &HashSet<[u8; 32]>,
182        new_chunks: &'a [(String, [u8; 32])],
183    ) -> DeltaSet<'a> {
184        let new_hashes: HashSet<[u8; 32]> = new_chunks.iter().map(|(_, h)| *h).collect();
185
186        DeltaSet {
187            to_add: new_chunks.iter().filter(|(_, h)| !old_hashes.contains(h)).collect(),
188            to_remove: old_hashes.iter().filter(|h| !new_hashes.contains(*h)).copied().collect(),
189        }
190    }
191
192    /// Get batch delay as Duration
193    pub fn batch_delay(&self) -> Duration {
194        Duration::from_millis(self.batch_delay_ms)
195    }
196
197    /// Get reindexing statistics
198    pub fn stats(&self) -> ReindexerStats {
199        ReindexerStats {
200            queue_size: self.queue.len(),
201            tracked_documents: self.fingerprints.len(),
202            total_queries: self.query_counts.values().sum(),
203        }
204    }
205}
206
207impl Default for HeijunkaReindexer {
208    fn default() -> Self {
209        Self::new()
210    }
211}
212
213/// A task to reindex a document
214#[derive(Debug, Clone)]
215pub struct ReindexTask {
216    /// Document ID
217    pub doc_id: String,
218    /// Document path
219    pub path: PathBuf,
220    /// Staleness score (for logging/metrics)
221    pub staleness_score: f64,
222}
223
224/// Delta set for incremental updates (Muda elimination)
225#[derive(Debug)]
226pub struct DeltaSet<'a> {
227    /// Chunks to add (new or modified)
228    pub to_add: Vec<&'a (String, [u8; 32])>,
229    /// Chunk hashes to remove
230    pub to_remove: Vec<[u8; 32]>,
231}
232
233impl DeltaSet<'_> {
234    /// Calculate efficiency (percentage of chunks unchanged)
235    pub fn efficiency(&self, _total_old: usize, total_new: usize) -> f64 {
236        if total_new == 0 {
237            return 100.0;
238        }
239        let unchanged = total_new - self.to_add.len();
240        unchanged as f64 / total_new as f64 * 100.0
241    }
242}
243
244/// Reindexer statistics
245#[derive(Debug, Clone)]
246pub struct ReindexerStats {
247    /// Documents in queue
248    pub queue_size: usize,
249    /// Documents with stored fingerprints
250    pub tracked_documents: usize,
251    /// Total queries recorded
252    pub total_queries: u64,
253}
254
255/// Progress tracker for reindexing
256#[derive(Debug)]
257pub struct ReindexProgress {
258    /// Total documents to process
259    pub total: usize,
260    /// Documents processed
261    pub processed: usize,
262    /// Documents modified
263    pub modified: usize,
264    /// Documents added
265    pub added: usize,
266    /// Documents removed
267    pub removed: usize,
268    /// Start time
269    start_time: Instant,
270}
271
272impl ReindexProgress {
273    /// Create a new progress tracker
274    pub fn new(total: usize) -> Self {
275        Self {
276            total,
277            processed: 0,
278            modified: 0,
279            added: 0,
280            removed: 0,
281            start_time: crate::timing::start_timer(),
282        }
283    }
284
285    /// Record a processed document
286    pub fn record_processed(&mut self, was_modified: bool) {
287        self.processed += 1;
288        if was_modified {
289            self.modified += 1;
290        }
291    }
292
293    /// Get completion percentage
294    pub fn percent_complete(&self) -> f64 {
295        if self.total == 0 {
296            return 100.0;
297        }
298        self.processed as f64 / self.total as f64 * 100.0
299    }
300
301    /// Get elapsed time
302    pub fn elapsed(&self) -> Duration {
303        self.start_time.elapsed()
304    }
305
306    /// Get processing rate (docs/second)
307    pub fn rate(&self) -> f64 {
308        let elapsed = self.elapsed().as_secs_f64();
309        if elapsed > 0.0 {
310            self.processed as f64 / elapsed
311        } else {
312            0.0
313        }
314    }
315
316    /// Estimate time remaining
317    pub fn eta(&self) -> Duration {
318        let rate = self.rate();
319        if rate > 0.0 {
320            let remaining = self.total - self.processed;
321            Duration::from_secs_f64(remaining as f64 / rate)
322        } else {
323            Duration::from_secs(0)
324        }
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331
332    /// Generate a synthetic doc ID for test use.
333    fn test_doc_id(i: usize) -> String {
334        format!("doc{i}")
335    }
336
337    /// Generate a synthetic path for test use.
338    fn test_doc_path(i: usize) -> PathBuf {
339        PathBuf::from(format!("/doc{i}"))
340    }
341
342    /// Enqueue `count` synthetic documents with linearly increasing age.
343    fn enqueue_synthetic(reindexer: &mut HeijunkaReindexer, count: usize, age_step: u64) {
344        for i in 0..count {
345            reindexer.enqueue(&test_doc_id(i), test_doc_path(i), i as u64 * age_step);
346        }
347    }
348
349    /// Build a `HashSet<[u8; 32]>` from an iterator of single-byte fill values.
350    fn hash_set_from_fills(fills: impl IntoIterator<Item = u8>) -> HashSet<[u8; 32]> {
351        fills.into_iter().map(|b| [b; 32]).collect()
352    }
353
354    /// Build a `Vec<(String, [u8; 32])>` of chunks from `(label, fill_byte)` pairs.
355    fn chunks_from_fills(pairs: &[(&str, u8)]) -> Vec<(String, [u8; 32])> {
356        pairs.iter().map(|(label, b)| (label.to_string(), [*b; 32])).collect()
357    }
358
359    #[test]
360    fn test_heijunka_creation() {
361        let reindexer = HeijunkaReindexer::new();
362        assert!(reindexer.is_empty());
363        assert_eq!(reindexer.queue_size(), 0);
364    }
365
366    #[test]
367    fn test_staleness_score_new_document() {
368        // Brand new document (0 age) should have low staleness
369        let score = HeijunkaReindexer::staleness_score(0, 0);
370        assert!(score < 0.1);
371    }
372
373    #[test]
374    fn test_staleness_score_old_document() {
375        // Old document (1 day) should have higher staleness
376        let score = HeijunkaReindexer::staleness_score(86400, 1);
377        // With 1 day age and query_count=1: recency ~0.63, popularity ~0.69
378        assert!(score > 0.3);
379    }
380
381    #[test]
382    fn test_staleness_score_popular_document() {
383        // Popular document should have higher staleness (more important to update)
384        let score_low = HeijunkaReindexer::staleness_score(3600, 1);
385        let score_high = HeijunkaReindexer::staleness_score(3600, 100);
386        assert!(score_high > score_low);
387    }
388
389    #[test]
390    fn test_enqueue_and_batch() {
391        let mut reindexer = HeijunkaReindexer::new();
392
393        // Add query counts so popularity factor is non-zero
394        for id in ["doc1", "doc2", "doc3"] {
395            reindexer.record_query(id);
396        }
397
398        reindexer.enqueue("doc1", test_doc_path(1), 1000);
399        reindexer.enqueue("doc2", test_doc_path(2), 5000);
400        reindexer.enqueue("doc3", test_doc_path(3), 100);
401
402        assert_eq!(reindexer.queue_size(), 3);
403
404        let batch = reindexer.next_batch();
405
406        // Higher staleness (older) should come first
407        assert!(!batch.is_empty());
408        assert_eq!(batch.len(), 3);
409        // doc2 has highest age (5000s) so should be first
410        assert_eq!(batch[0].doc_id, "doc2");
411    }
412
413    #[test]
414    fn test_batch_size_limit() {
415        let config = HeijunkaConfig { batch_size: 2, ..Default::default() };
416        let mut reindexer = HeijunkaReindexer::with_config(config);
417
418        enqueue_synthetic(&mut reindexer, 10, 100);
419
420        let batch = reindexer.next_batch();
421        assert_eq!(batch.len(), 2); // Limited to batch_size
422    }
423
424    #[test]
425    fn test_record_query() {
426        let mut reindexer = HeijunkaReindexer::new();
427
428        for _ in 0..3 {
429            reindexer.record_query("doc1");
430        }
431        reindexer.record_query("doc2");
432
433        // doc1 should have higher query count
434        assert_eq!(*reindexer.query_counts.get("doc1").expect("key not found"), 3);
435        assert_eq!(*reindexer.query_counts.get("doc2").expect("key not found"), 1);
436    }
437
438    #[test]
439    fn test_popularity_decay() {
440        let mut reindexer = HeijunkaReindexer::new();
441
442        for _ in 0..4 {
443            reindexer.record_query("doc1");
444        }
445
446        let before = *reindexer.query_counts.get("doc1").expect("key not found");
447        reindexer.decay_popularity();
448        let after = *reindexer.query_counts.get("doc1").expect("key not found");
449
450        assert!(after < before);
451    }
452
453    #[test]
454    fn test_delta_calculation() {
455        let old_hashes = hash_set_from_fills(1..=3);
456        let new_chunks = chunks_from_fills(&[("chunk1", 2), ("chunk2", 3), ("chunk3", 4)]);
457
458        let delta = HeijunkaReindexer::calculate_delta(&old_hashes, &new_chunks);
459
460        // One chunk to add (hash 4)
461        assert_eq!(delta.to_add.len(), 1);
462        assert_eq!(delta.to_add[0].1, [4u8; 32]);
463
464        // One chunk to remove (hash 1)
465        assert_eq!(delta.to_remove.len(), 1);
466        assert!(delta.to_remove.contains(&[1u8; 32]));
467    }
468
469    #[test]
470    fn test_delta_efficiency() {
471        let old_hashes = hash_set_from_fills(1..=4);
472        let new_chunks =
473            chunks_from_fills(&[("c1", 1), ("c2", 2), ("c3", 3), ("c4", 5) /* one changed */]);
474
475        let delta = HeijunkaReindexer::calculate_delta(&old_hashes, &new_chunks);
476        let efficiency = delta.efficiency(4, 4);
477
478        // 3/4 unchanged = 75% efficiency
479        assert!((efficiency - 75.0).abs() < 0.1);
480    }
481
482    #[test]
483    fn test_progress_tracking() {
484        let mut progress = ReindexProgress::new(100);
485
486        progress.record_processed(false);
487        progress.record_processed(true);
488        progress.record_processed(false);
489
490        assert_eq!(progress.processed, 3);
491        assert_eq!(progress.modified, 1);
492        assert!((progress.percent_complete() - 3.0).abs() < 0.1);
493    }
494
495    #[test]
496    fn test_progress_rate() {
497        let progress = ReindexProgress::new(100);
498        // Just created, rate should be 0 or very low
499        assert!(progress.rate() >= 0.0);
500    }
501
502    #[test]
503    fn test_fingerprint_storage() {
504        let mut reindexer = HeijunkaReindexer::new();
505        let fp = DocumentFingerprint {
506            content_hash: [1u8; 32],
507            chunker_config_hash: [2u8; 32],
508            embedding_model_hash: [3u8; 32],
509            indexed_at: 12345,
510        };
511
512        reindexer.store_fingerprint("doc1", fp.clone());
513
514        let retrieved = reindexer.get_fingerprint("doc1");
515        assert!(retrieved.is_some());
516        assert_eq!(retrieved.expect("unexpected failure").content_hash, [1u8; 32]);
517    }
518
519    #[test]
520    fn test_heijunka_default() {
521        let reindexer = HeijunkaReindexer::default();
522        assert!(reindexer.is_empty());
523    }
524
525    #[test]
526    fn test_heijunka_config_default() {
527        let config = HeijunkaConfig::default();
528        assert_eq!(config.batch_size, 50);
529        assert_eq!(config.batch_delay_ms, 100);
530        assert_eq!(config.max_staleness_seconds, 86400);
531        assert!((config.popularity_decay - 0.95).abs() < 0.01);
532    }
533
534    #[test]
535    fn test_batch_delay() {
536        let reindexer = HeijunkaReindexer::new();
537        let delay = reindexer.batch_delay();
538        assert_eq!(delay, Duration::from_millis(100));
539    }
540
541    #[test]
542    fn test_stats() {
543        let mut reindexer = HeijunkaReindexer::new();
544        reindexer.record_query("doc1");
545        reindexer.record_query("doc2");
546
547        let stats = reindexer.stats();
548        assert_eq!(stats.queue_size, 0);
549        assert_eq!(stats.tracked_documents, 0);
550        assert_eq!(stats.total_queries, 2);
551    }
552
553    #[test]
554    fn test_progress_empty() {
555        let progress = ReindexProgress::new(0);
556        assert!((progress.percent_complete() - 100.0).abs() < 0.01);
557    }
558
559    #[test]
560    fn test_delta_efficiency_empty() {
561        let old_hashes = hash_set_from_fills(std::iter::empty());
562        let new_chunks = chunks_from_fills(&[]);
563        let delta = HeijunkaReindexer::calculate_delta(&old_hashes, &new_chunks);
564        let efficiency = delta.efficiency(0, 0);
565        assert!((efficiency - 100.0).abs() < 0.01);
566    }
567
568    #[test]
569    fn test_progress_eta() {
570        let mut progress = ReindexProgress::new(100);
571        progress.processed = 50;
572        // ETA depends on elapsed time, which is instant here
573        let _ = progress.eta();
574    }
575
576    #[test]
577    fn test_get_fingerprint_not_found() {
578        let reindexer = HeijunkaReindexer::new();
579        assert!(reindexer.get_fingerprint("nonexistent").is_none());
580    }
581
582    // Property-based tests for Heijunka reindexer
583    mod proptests {
584        use super::*;
585        use proptest::prelude::*;
586
587        proptest! {
588            #![proptest_config(ProptestConfig::with_cases(50))]
589
590            /// Property: Staleness score is non-negative
591            #[test]
592            fn prop_staleness_score_non_negative(age_seconds in 0u64..1000000, query_count in 0u64..10000) {
593                let score = HeijunkaReindexer::staleness_score(age_seconds, query_count);
594                prop_assert!(score >= 0.0, "Staleness score {} should be >= 0", score);
595            }
596
597            /// Property: Higher age produces higher staleness
598            #[test]
599            fn prop_higher_age_higher_staleness(
600                low_age in 0u64..10000,
601                high_age in 50000u64..100000,
602                query_count in 1u64..100
603            ) {
604                let low_score = HeijunkaReindexer::staleness_score(low_age, query_count);
605                let high_score = HeijunkaReindexer::staleness_score(high_age, query_count);
606                prop_assert!(high_score >= low_score, "Age {} score {} < age {} score {}", high_age, high_score, low_age, low_score);
607            }
608
609            /// Property: Higher query count produces higher staleness (for same age)
610            #[test]
611            fn prop_higher_popularity_higher_staleness(
612                age_seconds in 1000u64..50000,
613                low_count in 0u64..10,
614                high_count in 100u64..1000
615            ) {
616                let low_score = HeijunkaReindexer::staleness_score(age_seconds, low_count);
617                let high_score = HeijunkaReindexer::staleness_score(age_seconds, high_count);
618                prop_assert!(high_score >= low_score);
619            }
620
621            /// Property: Batch size is respected
622            #[test]
623            fn prop_batch_size_respected(batch_size in 1usize..20, num_docs in 1usize..100) {
624                let config = HeijunkaConfig {
625                    batch_size,
626                    ..Default::default()
627                };
628                let mut reindexer = HeijunkaReindexer::with_config(config);
629
630                enqueue_synthetic(&mut reindexer, num_docs, 100);
631
632                let batch = reindexer.next_batch();
633                prop_assert!(batch.len() <= batch_size);
634            }
635
636            /// Property: Enqueue increases queue size
637            #[test]
638            fn prop_enqueue_increases_size(num_docs in 1usize..50) {
639                let mut reindexer = HeijunkaReindexer::new();
640
641                enqueue_synthetic(&mut reindexer, num_docs, 0);
642
643                prop_assert_eq!(reindexer.queue_size(), num_docs);
644            }
645
646            /// Property: Progress percentage is in [0, 100]
647            #[test]
648            fn prop_progress_percentage_valid(total in 0usize..1000, processed in 0usize..500) {
649                let mut progress = ReindexProgress::new(total);
650                for _ in 0..processed.min(total) {
651                    progress.record_processed(false);
652                }
653                let pct = progress.percent_complete();
654                prop_assert!((0.0..=100.0).contains(&pct), "Progress {} not in [0, 100]", pct);
655            }
656
657            /// Property: Delta efficiency is in [0, 100]
658            #[test]
659            fn prop_delta_efficiency_valid(
660                old_count in 0usize..10,
661                new_count in 0usize..10,
662                overlap in 0usize..10
663            ) {
664                let overlap = overlap.min(old_count).min(new_count);
665
666                let old_hashes = hash_set_from_fills((0..old_count).map(|i| i as u8));
667                let new_chunks: Vec<(String, [u8; 32])> = (0..new_count)
668                    .map(|i| {
669                        let fill = if i < overlap { i as u8 } else { (old_count + i) as u8 };
670                        (format!("c{i}"), [fill; 32])
671                    })
672                    .collect();
673
674                let delta = HeijunkaReindexer::calculate_delta(&old_hashes, &new_chunks);
675                let efficiency = delta.efficiency(old_count, new_count);
676                prop_assert!((0.0..=100.0).contains(&efficiency), "Efficiency {} not in [0, 100]", efficiency);
677            }
678        }
679    }
680}