1use super::fingerprint::DocumentFingerprint;
7use std::collections::{BinaryHeap, HashMap, HashSet};
8use std::path::PathBuf;
9use std::time::{Duration, Instant};
10
11#[derive(Debug)]
16pub struct HeijunkaReindexer {
17 batch_size: usize,
19 batch_delay_ms: u64,
21 queue: BinaryHeap<StalenessEntry>,
23 fingerprints: HashMap<String, DocumentFingerprint>,
25 query_counts: HashMap<String, u64>,
27 config: HeijunkaConfig,
29}
30
31#[derive(Debug, Clone)]
33pub struct HeijunkaConfig {
34 pub batch_size: usize,
36 pub batch_delay_ms: u64,
38 pub max_staleness_seconds: u64,
40 pub popularity_decay: f64,
42}
43
44impl Default for HeijunkaConfig {
45 fn default() -> Self {
46 Self {
47 batch_size: 50,
48 batch_delay_ms: 100,
49 max_staleness_seconds: 86400, popularity_decay: 0.95,
51 }
52 }
53}
54
55#[derive(Debug, Clone)]
57struct StalenessEntry {
58 doc_id: String,
60 staleness_score: f64,
62 path: PathBuf,
64}
65
66impl From<StalenessEntry> for ReindexTask {
67 fn from(entry: StalenessEntry) -> Self {
68 Self { doc_id: entry.doc_id, path: entry.path, staleness_score: entry.staleness_score }
69 }
70}
71
72impl PartialEq for StalenessEntry {
73 fn eq(&self, other: &Self) -> bool {
74 self.doc_id == other.doc_id
75 }
76}
77
78impl Eq for StalenessEntry {}
79
80impl PartialOrd for StalenessEntry {
81 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
82 Some(self.cmp(other))
83 }
84}
85
86impl Ord for StalenessEntry {
87 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
88 self.staleness_score
90 .partial_cmp(&other.staleness_score)
91 .unwrap_or(std::cmp::Ordering::Equal)
92 }
93}
94
95impl HeijunkaReindexer {
96 pub fn new() -> Self {
98 Self::with_config(HeijunkaConfig::default())
99 }
100
101 pub fn with_config(config: HeijunkaConfig) -> Self {
103 Self {
104 batch_size: config.batch_size,
105 batch_delay_ms: config.batch_delay_ms,
106 queue: BinaryHeap::new(),
107 fingerprints: HashMap::new(),
108 query_counts: HashMap::new(),
109 config,
110 }
111 }
112
113 pub fn staleness_score(age_seconds: u64, query_count: u64) -> f64 {
119 let recency_weight = 1.0 - (-(age_seconds as f64) / 86400.0).exp();
120 let popularity_weight = (query_count as f64 + 1.0).ln();
121 recency_weight * popularity_weight
122 }
123
124 pub fn enqueue(&mut self, doc_id: &str, path: PathBuf, age_seconds: u64) {
126 let query_count = self.query_counts.get(doc_id).copied().unwrap_or(0);
127 let staleness_score = Self::staleness_score(age_seconds, query_count);
128
129 self.queue.push(StalenessEntry { doc_id: doc_id.to_string(), staleness_score, path });
130 }
131
132 pub fn record_query(&mut self, doc_id: &str) {
134 *self.query_counts.entry(doc_id.to_string()).or_insert(0) += 1;
135 }
136
137 pub fn decay_popularity(&mut self) {
139 for count in self.query_counts.values_mut() {
140 *count = (*count as f64 * self.config.popularity_decay) as u64;
141 }
142 }
143
144 pub fn next_batch(&mut self) -> Vec<ReindexTask> {
146 let mut batch = Vec::with_capacity(self.batch_size);
147
148 while batch.len() < self.batch_size {
149 if let Some(entry) = self.queue.pop() {
150 batch.push(entry.into());
151 } else {
152 break;
153 }
154 }
155
156 batch
157 }
158
159 pub fn is_empty(&self) -> bool {
161 self.queue.is_empty()
162 }
163
164 pub fn queue_size(&self) -> usize {
166 self.queue.len()
167 }
168
169 pub fn store_fingerprint(&mut self, doc_id: &str, fingerprint: DocumentFingerprint) {
171 self.fingerprints.insert(doc_id.to_string(), fingerprint);
172 }
173
174 pub fn get_fingerprint(&self, doc_id: &str) -> Option<&DocumentFingerprint> {
176 self.fingerprints.get(doc_id)
177 }
178
179 pub fn calculate_delta<'a>(
181 old_hashes: &HashSet<[u8; 32]>,
182 new_chunks: &'a [(String, [u8; 32])],
183 ) -> DeltaSet<'a> {
184 let new_hashes: HashSet<[u8; 32]> = new_chunks.iter().map(|(_, h)| *h).collect();
185
186 DeltaSet {
187 to_add: new_chunks.iter().filter(|(_, h)| !old_hashes.contains(h)).collect(),
188 to_remove: old_hashes.iter().filter(|h| !new_hashes.contains(*h)).copied().collect(),
189 }
190 }
191
192 pub fn batch_delay(&self) -> Duration {
194 Duration::from_millis(self.batch_delay_ms)
195 }
196
197 pub fn stats(&self) -> ReindexerStats {
199 ReindexerStats {
200 queue_size: self.queue.len(),
201 tracked_documents: self.fingerprints.len(),
202 total_queries: self.query_counts.values().sum(),
203 }
204 }
205}
206
207impl Default for HeijunkaReindexer {
208 fn default() -> Self {
209 Self::new()
210 }
211}
212
213#[derive(Debug, Clone)]
215pub struct ReindexTask {
216 pub doc_id: String,
218 pub path: PathBuf,
220 pub staleness_score: f64,
222}
223
224#[derive(Debug)]
226pub struct DeltaSet<'a> {
227 pub to_add: Vec<&'a (String, [u8; 32])>,
229 pub to_remove: Vec<[u8; 32]>,
231}
232
233impl DeltaSet<'_> {
234 pub fn efficiency(&self, _total_old: usize, total_new: usize) -> f64 {
236 if total_new == 0 {
237 return 100.0;
238 }
239 let unchanged = total_new - self.to_add.len();
240 unchanged as f64 / total_new as f64 * 100.0
241 }
242}
243
244#[derive(Debug, Clone)]
246pub struct ReindexerStats {
247 pub queue_size: usize,
249 pub tracked_documents: usize,
251 pub total_queries: u64,
253}
254
255#[derive(Debug)]
257pub struct ReindexProgress {
258 pub total: usize,
260 pub processed: usize,
262 pub modified: usize,
264 pub added: usize,
266 pub removed: usize,
268 start_time: Instant,
270}
271
272impl ReindexProgress {
273 pub fn new(total: usize) -> Self {
275 Self {
276 total,
277 processed: 0,
278 modified: 0,
279 added: 0,
280 removed: 0,
281 start_time: crate::timing::start_timer(),
282 }
283 }
284
285 pub fn record_processed(&mut self, was_modified: bool) {
287 self.processed += 1;
288 if was_modified {
289 self.modified += 1;
290 }
291 }
292
293 pub fn percent_complete(&self) -> f64 {
295 if self.total == 0 {
296 return 100.0;
297 }
298 self.processed as f64 / self.total as f64 * 100.0
299 }
300
301 pub fn elapsed(&self) -> Duration {
303 self.start_time.elapsed()
304 }
305
306 pub fn rate(&self) -> f64 {
308 let elapsed = self.elapsed().as_secs_f64();
309 if elapsed > 0.0 {
310 self.processed as f64 / elapsed
311 } else {
312 0.0
313 }
314 }
315
316 pub fn eta(&self) -> Duration {
318 let rate = self.rate();
319 if rate > 0.0 {
320 let remaining = self.total - self.processed;
321 Duration::from_secs_f64(remaining as f64 / rate)
322 } else {
323 Duration::from_secs(0)
324 }
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331
332 fn test_doc_id(i: usize) -> String {
334 format!("doc{i}")
335 }
336
337 fn test_doc_path(i: usize) -> PathBuf {
339 PathBuf::from(format!("/doc{i}"))
340 }
341
342 fn enqueue_synthetic(reindexer: &mut HeijunkaReindexer, count: usize, age_step: u64) {
344 for i in 0..count {
345 reindexer.enqueue(&test_doc_id(i), test_doc_path(i), i as u64 * age_step);
346 }
347 }
348
349 fn hash_set_from_fills(fills: impl IntoIterator<Item = u8>) -> HashSet<[u8; 32]> {
351 fills.into_iter().map(|b| [b; 32]).collect()
352 }
353
354 fn chunks_from_fills(pairs: &[(&str, u8)]) -> Vec<(String, [u8; 32])> {
356 pairs.iter().map(|(label, b)| (label.to_string(), [*b; 32])).collect()
357 }
358
359 #[test]
360 fn test_heijunka_creation() {
361 let reindexer = HeijunkaReindexer::new();
362 assert!(reindexer.is_empty());
363 assert_eq!(reindexer.queue_size(), 0);
364 }
365
366 #[test]
367 fn test_staleness_score_new_document() {
368 let score = HeijunkaReindexer::staleness_score(0, 0);
370 assert!(score < 0.1);
371 }
372
373 #[test]
374 fn test_staleness_score_old_document() {
375 let score = HeijunkaReindexer::staleness_score(86400, 1);
377 assert!(score > 0.3);
379 }
380
381 #[test]
382 fn test_staleness_score_popular_document() {
383 let score_low = HeijunkaReindexer::staleness_score(3600, 1);
385 let score_high = HeijunkaReindexer::staleness_score(3600, 100);
386 assert!(score_high > score_low);
387 }
388
389 #[test]
390 fn test_enqueue_and_batch() {
391 let mut reindexer = HeijunkaReindexer::new();
392
393 for id in ["doc1", "doc2", "doc3"] {
395 reindexer.record_query(id);
396 }
397
398 reindexer.enqueue("doc1", test_doc_path(1), 1000);
399 reindexer.enqueue("doc2", test_doc_path(2), 5000);
400 reindexer.enqueue("doc3", test_doc_path(3), 100);
401
402 assert_eq!(reindexer.queue_size(), 3);
403
404 let batch = reindexer.next_batch();
405
406 assert!(!batch.is_empty());
408 assert_eq!(batch.len(), 3);
409 assert_eq!(batch[0].doc_id, "doc2");
411 }
412
413 #[test]
414 fn test_batch_size_limit() {
415 let config = HeijunkaConfig { batch_size: 2, ..Default::default() };
416 let mut reindexer = HeijunkaReindexer::with_config(config);
417
418 enqueue_synthetic(&mut reindexer, 10, 100);
419
420 let batch = reindexer.next_batch();
421 assert_eq!(batch.len(), 2); }
423
424 #[test]
425 fn test_record_query() {
426 let mut reindexer = HeijunkaReindexer::new();
427
428 for _ in 0..3 {
429 reindexer.record_query("doc1");
430 }
431 reindexer.record_query("doc2");
432
433 assert_eq!(*reindexer.query_counts.get("doc1").expect("key not found"), 3);
435 assert_eq!(*reindexer.query_counts.get("doc2").expect("key not found"), 1);
436 }
437
438 #[test]
439 fn test_popularity_decay() {
440 let mut reindexer = HeijunkaReindexer::new();
441
442 for _ in 0..4 {
443 reindexer.record_query("doc1");
444 }
445
446 let before = *reindexer.query_counts.get("doc1").expect("key not found");
447 reindexer.decay_popularity();
448 let after = *reindexer.query_counts.get("doc1").expect("key not found");
449
450 assert!(after < before);
451 }
452
453 #[test]
454 fn test_delta_calculation() {
455 let old_hashes = hash_set_from_fills(1..=3);
456 let new_chunks = chunks_from_fills(&[("chunk1", 2), ("chunk2", 3), ("chunk3", 4)]);
457
458 let delta = HeijunkaReindexer::calculate_delta(&old_hashes, &new_chunks);
459
460 assert_eq!(delta.to_add.len(), 1);
462 assert_eq!(delta.to_add[0].1, [4u8; 32]);
463
464 assert_eq!(delta.to_remove.len(), 1);
466 assert!(delta.to_remove.contains(&[1u8; 32]));
467 }
468
469 #[test]
470 fn test_delta_efficiency() {
471 let old_hashes = hash_set_from_fills(1..=4);
472 let new_chunks =
473 chunks_from_fills(&[("c1", 1), ("c2", 2), ("c3", 3), ("c4", 5) ]);
474
475 let delta = HeijunkaReindexer::calculate_delta(&old_hashes, &new_chunks);
476 let efficiency = delta.efficiency(4, 4);
477
478 assert!((efficiency - 75.0).abs() < 0.1);
480 }
481
482 #[test]
483 fn test_progress_tracking() {
484 let mut progress = ReindexProgress::new(100);
485
486 progress.record_processed(false);
487 progress.record_processed(true);
488 progress.record_processed(false);
489
490 assert_eq!(progress.processed, 3);
491 assert_eq!(progress.modified, 1);
492 assert!((progress.percent_complete() - 3.0).abs() < 0.1);
493 }
494
495 #[test]
496 fn test_progress_rate() {
497 let progress = ReindexProgress::new(100);
498 assert!(progress.rate() >= 0.0);
500 }
501
502 #[test]
503 fn test_fingerprint_storage() {
504 let mut reindexer = HeijunkaReindexer::new();
505 let fp = DocumentFingerprint {
506 content_hash: [1u8; 32],
507 chunker_config_hash: [2u8; 32],
508 embedding_model_hash: [3u8; 32],
509 indexed_at: 12345,
510 };
511
512 reindexer.store_fingerprint("doc1", fp.clone());
513
514 let retrieved = reindexer.get_fingerprint("doc1");
515 assert!(retrieved.is_some());
516 assert_eq!(retrieved.expect("unexpected failure").content_hash, [1u8; 32]);
517 }
518
519 #[test]
520 fn test_heijunka_default() {
521 let reindexer = HeijunkaReindexer::default();
522 assert!(reindexer.is_empty());
523 }
524
525 #[test]
526 fn test_heijunka_config_default() {
527 let config = HeijunkaConfig::default();
528 assert_eq!(config.batch_size, 50);
529 assert_eq!(config.batch_delay_ms, 100);
530 assert_eq!(config.max_staleness_seconds, 86400);
531 assert!((config.popularity_decay - 0.95).abs() < 0.01);
532 }
533
534 #[test]
535 fn test_batch_delay() {
536 let reindexer = HeijunkaReindexer::new();
537 let delay = reindexer.batch_delay();
538 assert_eq!(delay, Duration::from_millis(100));
539 }
540
541 #[test]
542 fn test_stats() {
543 let mut reindexer = HeijunkaReindexer::new();
544 reindexer.record_query("doc1");
545 reindexer.record_query("doc2");
546
547 let stats = reindexer.stats();
548 assert_eq!(stats.queue_size, 0);
549 assert_eq!(stats.tracked_documents, 0);
550 assert_eq!(stats.total_queries, 2);
551 }
552
553 #[test]
554 fn test_progress_empty() {
555 let progress = ReindexProgress::new(0);
556 assert!((progress.percent_complete() - 100.0).abs() < 0.01);
557 }
558
559 #[test]
560 fn test_delta_efficiency_empty() {
561 let old_hashes = hash_set_from_fills(std::iter::empty());
562 let new_chunks = chunks_from_fills(&[]);
563 let delta = HeijunkaReindexer::calculate_delta(&old_hashes, &new_chunks);
564 let efficiency = delta.efficiency(0, 0);
565 assert!((efficiency - 100.0).abs() < 0.01);
566 }
567
568 #[test]
569 fn test_progress_eta() {
570 let mut progress = ReindexProgress::new(100);
571 progress.processed = 50;
572 let _ = progress.eta();
574 }
575
576 #[test]
577 fn test_get_fingerprint_not_found() {
578 let reindexer = HeijunkaReindexer::new();
579 assert!(reindexer.get_fingerprint("nonexistent").is_none());
580 }
581
582 mod proptests {
584 use super::*;
585 use proptest::prelude::*;
586
587 proptest! {
588 #![proptest_config(ProptestConfig::with_cases(50))]
589
590 #[test]
592 fn prop_staleness_score_non_negative(age_seconds in 0u64..1000000, query_count in 0u64..10000) {
593 let score = HeijunkaReindexer::staleness_score(age_seconds, query_count);
594 prop_assert!(score >= 0.0, "Staleness score {} should be >= 0", score);
595 }
596
597 #[test]
599 fn prop_higher_age_higher_staleness(
600 low_age in 0u64..10000,
601 high_age in 50000u64..100000,
602 query_count in 1u64..100
603 ) {
604 let low_score = HeijunkaReindexer::staleness_score(low_age, query_count);
605 let high_score = HeijunkaReindexer::staleness_score(high_age, query_count);
606 prop_assert!(high_score >= low_score, "Age {} score {} < age {} score {}", high_age, high_score, low_age, low_score);
607 }
608
609 #[test]
611 fn prop_higher_popularity_higher_staleness(
612 age_seconds in 1000u64..50000,
613 low_count in 0u64..10,
614 high_count in 100u64..1000
615 ) {
616 let low_score = HeijunkaReindexer::staleness_score(age_seconds, low_count);
617 let high_score = HeijunkaReindexer::staleness_score(age_seconds, high_count);
618 prop_assert!(high_score >= low_score);
619 }
620
621 #[test]
623 fn prop_batch_size_respected(batch_size in 1usize..20, num_docs in 1usize..100) {
624 let config = HeijunkaConfig {
625 batch_size,
626 ..Default::default()
627 };
628 let mut reindexer = HeijunkaReindexer::with_config(config);
629
630 enqueue_synthetic(&mut reindexer, num_docs, 100);
631
632 let batch = reindexer.next_batch();
633 prop_assert!(batch.len() <= batch_size);
634 }
635
636 #[test]
638 fn prop_enqueue_increases_size(num_docs in 1usize..50) {
639 let mut reindexer = HeijunkaReindexer::new();
640
641 enqueue_synthetic(&mut reindexer, num_docs, 0);
642
643 prop_assert_eq!(reindexer.queue_size(), num_docs);
644 }
645
646 #[test]
648 fn prop_progress_percentage_valid(total in 0usize..1000, processed in 0usize..500) {
649 let mut progress = ReindexProgress::new(total);
650 for _ in 0..processed.min(total) {
651 progress.record_processed(false);
652 }
653 let pct = progress.percent_complete();
654 prop_assert!((0.0..=100.0).contains(&pct), "Progress {} not in [0, 100]", pct);
655 }
656
657 #[test]
659 fn prop_delta_efficiency_valid(
660 old_count in 0usize..10,
661 new_count in 0usize..10,
662 overlap in 0usize..10
663 ) {
664 let overlap = overlap.min(old_count).min(new_count);
665
666 let old_hashes = hash_set_from_fills((0..old_count).map(|i| i as u8));
667 let new_chunks: Vec<(String, [u8; 32])> = (0..new_count)
668 .map(|i| {
669 let fill = if i < overlap { i as u8 } else { (old_count + i) as u8 };
670 (format!("c{i}"), [fill; 32])
671 })
672 .collect();
673
674 let delta = HeijunkaReindexer::calculate_delta(&old_hashes, &new_chunks);
675 let efficiency = delta.efficiency(old_count, new_count);
676 prop_assert!((0.0..=100.0).contains(&efficiency), "Efficiency {} not in [0, 100]", efficiency);
677 }
678 }
679 }
680}