1use std::collections::HashMap;
43use std::sync::atomic::{AtomicU64, Ordering};
44use std::time::{Duration, Instant, SystemTime};
45
46#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
52pub struct SegmentId(pub u64);
53
54impl SegmentId {
55 pub fn next() -> Self {
57 static COUNTER: AtomicU64 = AtomicU64::new(1);
58 Self(COUNTER.fetch_add(1, Ordering::SeqCst))
59 }
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum SegmentState {
65 Building,
67 Active,
69 Compacting,
71 Tombstoned,
73 Deleted,
75}
76
77#[derive(Debug, Clone)]
79pub struct QuantizerMeta {
80 pub version: u32,
82 pub n_training_samples: usize,
84 pub training_error: f32,
86 pub current_error: f32,
88 pub created_at: SystemTime,
90}
91
92impl Default for QuantizerMeta {
93 fn default() -> Self {
94 Self {
95 version: 1,
96 n_training_samples: 0,
97 training_error: 0.0,
98 current_error: 0.0,
99 created_at: SystemTime::now(),
100 }
101 }
102}
103
104#[derive(Debug, Clone)]
106pub struct SegmentStats {
107 pub n_vectors: usize,
109 pub n_deleted: usize,
111 pub size_bytes: u64,
113 pub created_at: SystemTime,
115 pub last_accessed: SystemTime,
117 pub access_count: u64,
119 pub quantizer_meta: QuantizerMeta,
121 pub error_samples: Vec<f32>,
123}
124
125impl SegmentStats {
126 pub fn new(n_vectors: usize, size_bytes: u64) -> Self {
128 Self {
129 n_vectors,
130 n_deleted: 0,
131 size_bytes,
132 created_at: SystemTime::now(),
133 last_accessed: SystemTime::now(),
134 access_count: 0,
135 quantizer_meta: QuantizerMeta::default(),
136 error_samples: Vec::new(),
137 }
138 }
139
140 pub fn deletion_ratio(&self) -> f32 {
142 if self.n_vectors == 0 {
143 0.0
144 } else {
145 self.n_deleted as f32 / self.n_vectors as f32
146 }
147 }
148
149 pub fn live_vectors(&self) -> usize {
151 self.n_vectors.saturating_sub(self.n_deleted)
152 }
153
154 pub fn record_error(&mut self, error: f32) {
156 self.error_samples.push(error);
157 if self.error_samples.len() > 1000 {
159 self.error_samples.remove(0);
160 }
161 }
162
163 pub fn estimated_error(&self) -> f32 {
165 if self.error_samples.is_empty() {
166 self.quantizer_meta.current_error
167 } else {
168 let sum: f32 = self.error_samples.iter().sum();
169 sum / self.error_samples.len() as f32
170 }
171 }
172
173 pub fn needs_retraining(&self, threshold: f32) -> bool {
175 let current = self.estimated_error();
176 let original = self.quantizer_meta.training_error;
177
178 if original == 0.0 {
179 false
180 } else {
181 (current - original) / original > threshold
182 }
183 }
184}
185
186#[derive(Debug, Clone)]
192pub struct Segment {
193 pub id: SegmentId,
195 pub state: SegmentState,
197 pub stats: SegmentStats,
199 pub data_path: String,
201 pub index_path: String,
203 pub tombstone_path: String,
205 pub generation: u32,
207}
208
209impl Segment {
210 pub fn new(id: SegmentId, n_vectors: usize, size_bytes: u64, data_path: String) -> Self {
212 Self {
213 id,
214 state: SegmentState::Building,
215 stats: SegmentStats::new(n_vectors, size_bytes),
216 data_path: data_path.clone(),
217 index_path: format!("{}.idx", data_path),
218 tombstone_path: format!("{}.tomb", data_path),
219 generation: 1,
220 }
221 }
222
223 pub fn activate(&mut self) {
225 self.state = SegmentState::Active;
226 }
227
228 pub fn mark_deleted(&mut self, count: usize) {
230 self.stats.n_deleted += count;
231 }
232
233 pub fn record_access(&mut self) {
235 self.stats.access_count += 1;
236 self.stats.last_accessed = SystemTime::now();
237 }
238}
239
240#[derive(Debug, Clone)]
246pub struct CompactionPolicy {
247 pub deletion_ratio_threshold: f32,
249
250 pub max_segment_size: u64,
252
253 pub min_segment_size: u64,
255
256 pub target_segment_size: u64,
258
259 pub max_segments: usize,
261
262 pub quantizer_drift_threshold: f32,
264
265 pub compaction_cooldown: Duration,
267
268 pub max_compaction_threads: usize,
270}
271
272impl Default for CompactionPolicy {
273 fn default() -> Self {
274 Self {
275 deletion_ratio_threshold: 0.3,
276 max_segment_size: 1024 * 1024 * 1024, min_segment_size: 64 * 1024 * 1024, target_segment_size: 256 * 1024 * 1024, max_segments: 100,
280 quantizer_drift_threshold: 0.2, compaction_cooldown: Duration::from_secs(60),
282 max_compaction_threads: 2,
283 }
284 }
285}
286
287impl CompactionPolicy {
288 pub fn ssd_optimized() -> Self {
290 Self {
291 deletion_ratio_threshold: 0.25, target_segment_size: 512 * 1024 * 1024, ..Default::default()
294 }
295 }
296
297 pub fn ram_optimized() -> Self {
299 Self {
300 deletion_ratio_threshold: 0.4, target_segment_size: 64 * 1024 * 1024, max_segments: 50, ..Default::default()
304 }
305 }
306}
307
308#[derive(Debug)]
314pub enum CompactionDecision {
315 None,
317 Merge(Vec<SegmentId>),
319 Split(SegmentId),
321 Retrain(Vec<SegmentId>),
323 FullRecompact(Vec<SegmentId>),
325}
326
327#[derive(Debug)]
329pub struct CompactionJob {
330 pub id: u64,
332 pub decision: CompactionDecision,
334 pub source_segments: Vec<SegmentId>,
336 pub created_at: Instant,
338 pub priority: u32,
340}
341
342pub struct CompactionPlanner {
348 policy: CompactionPolicy,
349}
350
351impl CompactionPlanner {
352 pub fn new(policy: CompactionPolicy) -> Self {
354 Self { policy }
355 }
356
357 pub fn plan(&self, segments: &[&Segment]) -> Vec<CompactionDecision> {
359 let mut decisions = Vec::new();
360
361 let high_deletion: Vec<_> = segments
363 .iter()
364 .filter(|s| s.stats.deletion_ratio() > self.policy.deletion_ratio_threshold)
365 .map(|s| s.id)
366 .collect();
367
368 if !high_deletion.is_empty() {
369 decisions.push(CompactionDecision::Merge(high_deletion));
370 }
371
372 let small_segments: Vec<_> = segments
374 .iter()
375 .filter(|s| s.stats.size_bytes < self.policy.min_segment_size)
376 .collect();
377
378 if small_segments.len() >= 2 {
379 let mut current_group: Vec<SegmentId> = Vec::new();
381 let mut current_size = 0u64;
382
383 for seg in small_segments {
384 if current_size + seg.stats.size_bytes <= self.policy.target_segment_size {
385 current_group.push(seg.id);
386 current_size += seg.stats.size_bytes;
387 } else {
388 if current_group.len() >= 2 {
389 decisions.push(CompactionDecision::Merge(current_group.clone()));
390 }
391 current_group.clear();
392 current_group.push(seg.id);
393 current_size = seg.stats.size_bytes;
394 }
395 }
396
397 if current_group.len() >= 2 {
398 decisions.push(CompactionDecision::Merge(current_group));
399 }
400 }
401
402 for seg in segments {
404 if seg.stats.size_bytes > self.policy.max_segment_size {
405 decisions.push(CompactionDecision::Split(seg.id));
406 }
407 }
408
409 let drifted: Vec<_> = segments
411 .iter()
412 .filter(|s| {
413 s.stats
414 .needs_retraining(self.policy.quantizer_drift_threshold)
415 })
416 .map(|s| s.id)
417 .collect();
418
419 if !drifted.is_empty() {
420 decisions.push(CompactionDecision::Retrain(drifted));
421 }
422
423 if segments.len() > self.policy.max_segments {
425 let mut sorted: Vec<_> = segments.iter().collect();
427 sorted.sort_by_key(|s| s.stats.live_vectors());
428
429 let to_merge: Vec<_> = sorted
430 .iter()
431 .take(segments.len() / 2)
432 .map(|s| s.id)
433 .collect();
434
435 if to_merge.len() >= 2 {
436 decisions.push(CompactionDecision::FullRecompact(to_merge));
437 }
438 }
439
440 decisions
441 }
442
443 pub fn policy(&self) -> &CompactionPolicy {
445 &self.policy
446 }
447}
448
449pub struct VersionManager {
455 current_version: AtomicU64,
457 versions: parking_lot::RwLock<HashMap<u64, Vec<SegmentId>>>,
459}
460
461impl VersionManager {
462 pub fn new() -> Self {
464 Self {
465 current_version: AtomicU64::new(1),
466 versions: parking_lot::RwLock::new(HashMap::new()),
467 }
468 }
469
470 pub fn current(&self) -> u64 {
472 self.current_version.load(Ordering::SeqCst)
473 }
474
475 pub fn create_version(&self, segments: Vec<SegmentId>) -> u64 {
477 let version = self.current_version.fetch_add(1, Ordering::SeqCst) + 1;
478 self.versions.write().insert(version, segments);
479 version
480 }
481
482 pub fn switch_to(&self, version: u64) -> bool {
484 let versions = self.versions.read();
485 if versions.contains_key(&version) {
486 self.current_version.store(version, Ordering::SeqCst);
487 true
488 } else {
489 false
490 }
491 }
492
493 pub fn get_segments(&self, version: u64) -> Option<Vec<SegmentId>> {
495 self.versions.read().get(&version).cloned()
496 }
497
498 pub fn rollback(&self) -> bool {
500 let current = self.current_version.load(Ordering::SeqCst);
501 if current > 1 {
502 self.current_version.store(current - 1, Ordering::SeqCst);
503 true
504 } else {
505 false
506 }
507 }
508
509 pub fn clean_old_versions(&self, keep_n: usize) {
511 let current = self.current();
512 let mut versions = self.versions.write();
513
514 let to_remove: Vec<_> = versions
515 .keys()
516 .filter(|&&v| v + keep_n as u64 <= current)
517 .cloned()
518 .collect();
519
520 for v in to_remove {
521 versions.remove(&v);
522 }
523 }
524}
525
526impl Default for VersionManager {
527 fn default() -> Self {
528 Self::new()
529 }
530}
531
532pub struct SegmentManager {
538 segments: parking_lot::RwLock<HashMap<SegmentId, Segment>>,
540 planner: CompactionPlanner,
542 versions: VersionManager,
544 last_compaction: parking_lot::Mutex<Option<Instant>>,
546 job_counter: AtomicU64,
548}
549
550impl SegmentManager {
551 pub fn new(policy: CompactionPolicy) -> Self {
553 Self {
554 segments: parking_lot::RwLock::new(HashMap::new()),
555 planner: CompactionPlanner::new(policy),
556 versions: VersionManager::new(),
557 last_compaction: parking_lot::Mutex::new(None),
558 job_counter: AtomicU64::new(0),
559 }
560 }
561
562 pub fn add_segment(&self, segment: Segment) {
564 let id = segment.id;
565 self.segments.write().insert(id, segment);
566
567 let current_segments: Vec<_> = self
569 .segments
570 .read()
571 .iter()
572 .filter(|(_, s)| s.state == SegmentState::Active)
573 .map(|(id, _)| *id)
574 .collect();
575
576 self.versions.create_version(current_segments);
577 }
578
579 pub fn get_segment(&self, id: SegmentId) -> Option<Segment> {
581 self.segments.read().get(&id).cloned()
582 }
583
584 pub fn mark_deleted(&self, id: SegmentId, count: usize) {
586 if let Some(segment) = self.segments.write().get_mut(&id) {
587 segment.mark_deleted(count);
588 }
589 }
590
591 pub fn record_quantizer_error(&self, id: SegmentId, error: f32) {
593 if let Some(segment) = self.segments.write().get_mut(&id) {
594 segment.stats.record_error(error);
595 }
596 }
597
598 pub fn maybe_compact(&self) -> Vec<CompactionJob> {
600 let mut last = self.last_compaction.lock();
602 if let Some(last_time) = *last {
603 if last_time.elapsed() < self.planner.policy().compaction_cooldown {
604 return Vec::new();
605 }
606 }
607
608 let segments = self.segments.read();
610 let active: Vec<_> = segments
611 .values()
612 .filter(|s| s.state == SegmentState::Active)
613 .collect();
614
615 let decisions = self.planner.plan(&active);
616
617 if !decisions.is_empty() {
618 *last = Some(Instant::now());
619 }
620
621 decisions
622 .into_iter()
623 .map(|d| {
624 let source_segments = match &d {
625 CompactionDecision::None => Vec::new(),
626 CompactionDecision::Merge(ids) => ids.clone(),
627 CompactionDecision::Split(id) => vec![*id],
628 CompactionDecision::Retrain(ids) => ids.clone(),
629 CompactionDecision::FullRecompact(ids) => ids.clone(),
630 };
631
632 CompactionJob {
633 id: self.job_counter.fetch_add(1, Ordering::SeqCst),
634 decision: d,
635 source_segments,
636 created_at: Instant::now(),
637 priority: 0,
638 }
639 })
640 .collect()
641 }
642
643 pub fn execute_compaction(
645 &self,
646 job: &CompactionJob,
647 ) -> Result<Option<Segment>, CompactionError> {
648 match &job.decision {
649 CompactionDecision::None => Ok(None),
650
651 CompactionDecision::Merge(ids) => {
652 {
654 let mut segments = self.segments.write();
655 for id in ids {
656 if let Some(seg) = segments.get_mut(id) {
657 seg.state = SegmentState::Compacting;
658 }
659 }
660 }
661
662 let merged_id = SegmentId::next();
664 let segments = self.segments.read();
665
666 let total_size: u64 = ids
667 .iter()
668 .filter_map(|id| segments.get(id))
669 .map(|s| s.stats.size_bytes)
670 .sum();
671
672 let total_live: usize = ids
673 .iter()
674 .filter_map(|id| segments.get(id))
675 .map(|s| s.stats.live_vectors())
676 .sum();
677
678 let max_gen = ids
679 .iter()
680 .filter_map(|id| segments.get(id))
681 .map(|s| s.generation)
682 .max()
683 .unwrap_or(0);
684
685 drop(segments);
686
687 let mut merged = Segment::new(
688 merged_id,
689 total_live,
690 total_size,
691 format!("/segments/{}", merged_id.0),
692 );
693 merged.generation = max_gen + 1;
694 merged.state = SegmentState::Active;
695
696 {
698 let mut segments = self.segments.write();
699 for id in ids {
700 if let Some(seg) = segments.get_mut(id) {
701 seg.state = SegmentState::Tombstoned;
702 }
703 }
704 }
705
706 self.add_segment(merged.clone());
707 Ok(Some(merged))
708 }
709
710 CompactionDecision::Split(id) => {
711 let segment = self
713 .get_segment(*id)
714 .ok_or(CompactionError::SegmentNotFound(*id))?;
715
716 let half_size = segment.stats.size_bytes / 2;
717 let half_vectors = segment.stats.n_vectors / 2;
718
719 let seg1_id = SegmentId::next();
720 let seg2_id = SegmentId::next();
721
722 let mut seg1 = Segment::new(
723 seg1_id,
724 half_vectors,
725 half_size,
726 format!("/segments/{}", seg1_id.0),
727 );
728 seg1.generation = segment.generation + 1;
729 seg1.state = SegmentState::Active;
730
731 let mut seg2 = Segment::new(
732 seg2_id,
733 segment.stats.n_vectors - half_vectors,
734 segment.stats.size_bytes - half_size,
735 format!("/segments/{}", seg2_id.0),
736 );
737 seg2.generation = segment.generation + 1;
738 seg2.state = SegmentState::Active;
739
740 if let Some(seg) = self.segments.write().get_mut(id) {
742 seg.state = SegmentState::Tombstoned;
743 }
744
745 self.add_segment(seg1);
746 self.add_segment(seg2.clone());
747
748 Ok(Some(seg2))
749 }
750
751 CompactionDecision::Retrain(_ids) => {
752 Ok(None)
759 }
760
761 CompactionDecision::FullRecompact(ids) => {
762 self.execute_compaction(&CompactionJob {
764 id: job.id,
765 decision: CompactionDecision::Merge(ids.clone()),
766 source_segments: ids.clone(),
767 created_at: job.created_at,
768 priority: job.priority,
769 })
770 }
771 }
772 }
773
774 pub fn clean_tombstones(&self) -> Vec<SegmentId> {
776 let mut segments = self.segments.write();
777 let tombstoned: Vec<_> = segments
778 .iter()
779 .filter(|(_, s)| s.state == SegmentState::Tombstoned)
780 .map(|(id, _)| *id)
781 .collect();
782
783 for id in &tombstoned {
784 if let Some(seg) = segments.get_mut(id) {
785 seg.state = SegmentState::Deleted;
786 }
787 }
788
789 tombstoned
790 }
791
792 pub fn stats(&self) -> ManagerStats {
794 let segments = self.segments.read();
795
796 let total_segments = segments.len();
797 let active_segments = segments
798 .values()
799 .filter(|s| s.state == SegmentState::Active)
800 .count();
801
802 let total_vectors: usize = segments
803 .values()
804 .filter(|s| s.state == SegmentState::Active)
805 .map(|s| s.stats.n_vectors)
806 .sum();
807
808 let total_deleted: usize = segments
809 .values()
810 .filter(|s| s.state == SegmentState::Active)
811 .map(|s| s.stats.n_deleted)
812 .sum();
813
814 let total_size: u64 = segments
815 .values()
816 .filter(|s| s.state == SegmentState::Active)
817 .map(|s| s.stats.size_bytes)
818 .sum();
819
820 let avg_deletion_ratio = if active_segments > 0 {
821 segments
822 .values()
823 .filter(|s| s.state == SegmentState::Active)
824 .map(|s| s.stats.deletion_ratio())
825 .sum::<f32>()
826 / active_segments as f32
827 } else {
828 0.0
829 };
830
831 ManagerStats {
832 total_segments,
833 active_segments,
834 total_vectors,
835 live_vectors: total_vectors - total_deleted,
836 deleted_vectors: total_deleted,
837 total_size_bytes: total_size,
838 avg_deletion_ratio,
839 current_version: self.versions.current(),
840 }
841 }
842
843 pub fn versions(&self) -> &VersionManager {
845 &self.versions
846 }
847}
848
849#[derive(Debug, Clone)]
851pub struct ManagerStats {
852 pub total_segments: usize,
853 pub active_segments: usize,
854 pub total_vectors: usize,
855 pub live_vectors: usize,
856 pub deleted_vectors: usize,
857 pub total_size_bytes: u64,
858 pub avg_deletion_ratio: f32,
859 pub current_version: u64,
860}
861
862#[derive(Debug)]
864pub enum CompactionError {
865 SegmentNotFound(SegmentId),
866 IoError(std::io::Error),
867 InvalidState(String),
868}
869
870impl std::fmt::Display for CompactionError {
871 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
872 match self {
873 Self::SegmentNotFound(id) => write!(f, "Segment not found: {:?}", id),
874 Self::IoError(e) => write!(f, "IO error: {}", e),
875 Self::InvalidState(s) => write!(f, "Invalid state: {}", s),
876 }
877 }
878}
879
880impl std::error::Error for CompactionError {}
881
882#[cfg(test)]
883mod tests {
884 use super::*;
885
886 #[test]
887 fn test_segment_lifecycle() {
888 let mut segment = Segment::new(
889 SegmentId::next(),
890 1000,
891 1024 * 1024,
892 "/data/segment1".to_string(),
893 );
894
895 assert_eq!(segment.state, SegmentState::Building);
896
897 segment.activate();
898 assert_eq!(segment.state, SegmentState::Active);
899
900 segment.mark_deleted(100);
901 assert_eq!(segment.stats.n_deleted, 100);
902 assert_eq!(segment.stats.live_vectors(), 900);
903
904 let ratio = segment.stats.deletion_ratio();
905 assert!((ratio - 0.1).abs() < 0.001);
906 }
907
908 #[test]
909 fn test_compaction_planner() {
910 let policy = CompactionPolicy {
911 deletion_ratio_threshold: 0.3,
912 min_segment_size: 1024,
913 max_segment_size: 1024 * 1024,
914 ..Default::default()
915 };
916
917 let planner = CompactionPlanner::new(policy);
918
919 let mut seg1 = Segment::new(SegmentId(1), 1000, 2048, "/seg1".to_string());
921 seg1.state = SegmentState::Active;
922 seg1.stats.n_deleted = 400; let mut seg2 = Segment::new(SegmentId(2), 100, 512, "/seg2".to_string());
926 seg2.state = SegmentState::Active;
927
928 let mut seg3 = Segment::new(SegmentId(3), 100, 512, "/seg3".to_string());
929 seg3.state = SegmentState::Active;
930
931 let segments: Vec<&Segment> = vec![&seg1, &seg2, &seg3];
932 let decisions = planner.plan(&segments);
933
934 assert!(!decisions.is_empty());
936 }
937
938 #[test]
939 fn test_version_manager() {
940 let vm = VersionManager::new();
941
942 let v1 = vm.create_version(vec![SegmentId(1), SegmentId(2)]);
943 let v2 = vm.create_version(vec![SegmentId(1), SegmentId(2), SegmentId(3)]);
944
945 assert!(v2 > v1);
946
947 vm.switch_to(v2);
948 assert_eq!(vm.current(), v2);
949
950 vm.rollback();
951 assert_eq!(vm.current(), v2 - 1);
952
953 let segments = vm.get_segments(v1).unwrap();
954 assert_eq!(segments.len(), 2);
955 }
956
957 #[test]
958 fn test_segment_manager() {
959 let policy = CompactionPolicy::default();
960 let manager = SegmentManager::new(policy);
961
962 let mut seg1 = Segment::new(SegmentId::next(), 1000, 1024 * 1024, "/seg1".to_string());
964 seg1.state = SegmentState::Active;
965 manager.add_segment(seg1);
966
967 let mut seg2 = Segment::new(SegmentId::next(), 500, 512 * 1024, "/seg2".to_string());
968 seg2.state = SegmentState::Active;
969 manager.add_segment(seg2);
970
971 let stats = manager.stats();
972 assert_eq!(stats.active_segments, 2);
973 assert_eq!(stats.total_vectors, 1500);
974 }
975
976 #[test]
977 fn test_quantizer_drift() {
978 let mut stats = SegmentStats::new(1000, 1024);
979 stats.quantizer_meta.training_error = 0.1;
980
981 assert!(!stats.needs_retraining(0.2));
983
984 for _ in 0..100 {
986 stats.record_error(0.15); }
988
989 assert!(stats.needs_retraining(0.2));
990 }
991}