1use std::sync::Arc;
2use std::sync::atomic::AtomicBool;
3
4use crate::common::budget::ResourceBudget;
5use crate::common::progress_tracker::new_progress_tracker;
6use crate::segment::common::operation_error::{OperationError, OperationResult};
7use crate::segment::types::HnswGlobalConfig;
8use crate::shard::optimizers::config::{
9 DEFAULT_DELETED_THRESHOLD, DEFAULT_VACUUM_MIN_VECTOR_NUMBER, TEMP_SEGMENTS_PATH,
10};
11use crate::shard::optimizers::config_mismatch_optimizer::ConfigMismatchOptimizer;
12use crate::shard::optimizers::indexing_optimizer::IndexingOptimizer;
13use crate::shard::optimizers::merge_optimizer::MergeOptimizer;
14use crate::shard::optimizers::segment_optimizer::{
15 Optimizer, max_num_indexing_threads, plan_optimizations,
16};
17use crate::shard::optimizers::vacuum_optimizer::VacuumOptimizer;
18use uuid::Uuid;
19
20use crate::edge::{EdgeShard, SEGMENTS_PATH};
21
22impl EdgeShard {
23 pub fn optimize(&self) -> OperationResult<bool> {
27 let optimizers = self.build_blocking_optimizers();
28 let stopped = AtomicBool::new(false);
29 let mut optimized_any = false;
30
31 loop {
32 let planned = {
33 let segments = self.segments.read();
34 plan_optimizations(&segments, &optimizers)
35 };
36
37 if planned.is_empty() {
38 return Ok(optimized_any);
39 }
40
41 let mut optimized_in_iteration = false;
42
43 for (optimizer, segment_ids) in planned {
44 let num_indexing_threads = optimizer.num_indexing_threads();
45 let desired_io = num_indexing_threads;
46 let budget = ResourceBudget::new(num_indexing_threads, desired_io);
48 let permit = budget.try_acquire(0, desired_io).ok_or_else(|| {
49 OperationError::service_error(format!(
50 "failed to acquire resource permit for {} optimizer",
51 optimizer.name(),
52 ))
53 })?;
54
55 let (_, progress) = new_progress_tracker();
56 let points_optimized = optimizer.as_ref().optimize(
57 self.segments.clone(),
58 segment_ids,
59 Uuid::new_v4(),
60 permit,
61 budget,
62 &stopped,
63 progress,
64 Box::new(|| ()),
65 )?;
66
67 if points_optimized > 0 {
68 optimized_in_iteration = true;
69 optimized_any = true;
70 }
71 }
72
73 if !optimized_in_iteration {
75 return Ok(optimized_any);
76 }
77 }
78 }
79
80 fn build_blocking_optimizers(&self) -> Vec<Arc<Optimizer>> {
81 let segments_path = self.path.join(SEGMENTS_PATH);
82 let temp_segments_path = self.path.join(TEMP_SEGMENTS_PATH);
83
84 let cfg = self.config();
85 let segment_optimizer_config = cfg.segment_optimizer_config();
86 let global_hnsw_config = cfg.hnsw_config;
87 let hnsw_global_config = HnswGlobalConfig::default();
88 let num_indexing_threads = max_num_indexing_threads(&segment_optimizer_config);
89 let threshold_config = cfg.optimizer_thresholds(num_indexing_threads);
90 let default_segments_number = cfg.optimizers.get_number_segments();
91
92 vec![
93 Arc::new(MergeOptimizer::new(
94 default_segments_number,
95 threshold_config,
96 segments_path.clone(),
97 temp_segments_path.clone(),
98 segment_optimizer_config.clone(),
99 hnsw_global_config.clone(),
100 )),
101 Arc::new(IndexingOptimizer::new(
102 default_segments_number,
103 threshold_config,
104 segments_path.clone(),
105 temp_segments_path.clone(),
106 segment_optimizer_config.clone(),
107 hnsw_global_config.clone(),
108 )),
109 Arc::new(VacuumOptimizer::new(
110 cfg.optimizers
111 .deleted_threshold
112 .unwrap_or(DEFAULT_DELETED_THRESHOLD),
113 cfg.optimizers
114 .vacuum_min_vector_number
115 .unwrap_or(DEFAULT_VACUUM_MIN_VECTOR_NUMBER),
116 threshold_config,
117 segments_path.clone(),
118 temp_segments_path.clone(),
119 segment_optimizer_config.clone(),
120 hnsw_global_config.clone(),
121 )),
122 Arc::new(ConfigMismatchOptimizer::new(
123 threshold_config,
124 segments_path,
125 temp_segments_path,
126 segment_optimizer_config,
127 global_hnsw_config,
128 hnsw_global_config,
129 )),
130 ]
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use std::collections::HashMap;
137 use std::path::Path;
138
139 use fs_err as fs;
140 use crate::segment::data_types::vectors::{VectorInternal, VectorStructInternal};
141 use crate::segment::types::{Distance, ExtendedPointId, WithPayloadInterface, WithVector};
142 use crate::shard::count::CountRequestInternal;
143 use crate::shard::operations::CollectionUpdateOperations::PointOperation;
144 use crate::shard::operations::point_ops::PointInsertOperationsInternal::PointsList;
145 use crate::shard::operations::point_ops::PointOperations::{DeletePoints, UpsertPoints};
146 use crate::shard::operations::point_ops::{PointStructPersisted, VectorStructPersisted};
147 use crate::shard::optimizers::config::default_segment_number;
148 use uuid::Uuid;
149
150 use crate::edge::config::vectors::EdgeVectorParams;
151 use crate::edge::{EdgeConfig, EdgeShard};
152
153 const VECTOR_NAME: &str = "edge-test-vector";
154
155 #[test]
156 fn does_not_force_merge_all_segments_into_one() {
157 let dir = tempfile::Builder::new()
158 .prefix("edge-opt-do-not-force-one")
159 .tempdir()
160 .unwrap();
161
162 let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
163 shard
164 .update(PointOperation(UpsertPoints(PointsList(vec![point(1)]))))
165 .unwrap();
166 drop(shard);
167
168 duplicate_single_segment(dir.path());
169
170 let reopened = EdgeShard::load(dir.path(), None).unwrap();
171 assert_eq!(reopened.info().segments_count, 2);
172
173 let optimized = reopened.optimize().unwrap();
174 assert!(!optimized, "optimizer should not force-merge all segments");
175 assert_eq!(reopened.info().segments_count, 2);
176
177 assert_points_retrievable_with_vectors(&reopened, &[1]);
178 }
179
180 #[test]
181 fn vacuum_optimizer_runs_in_blocking_mode_until_idle() {
182 let dir = tempfile::Builder::new()
183 .prefix("edge-opt-vacuum")
184 .tempdir()
185 .unwrap();
186
187 let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
188
189 let points = (1..=1000).map(point).collect::<Vec<_>>();
190 shard
191 .update(PointOperation(UpsertPoints(PointsList(points))))
192 .unwrap();
193
194 let deleted_ids = (1..=250).map(ExtendedPointId::NumId).collect::<Vec<_>>();
196 shard
197 .update(PointOperation(DeletePoints { ids: deleted_ids }))
198 .unwrap();
199
200 let optimized = shard.optimize().unwrap();
201 assert!(optimized, "vacuum candidate should be optimized");
202
203 let optimized_again = shard.optimize().unwrap();
204 assert!(
205 !optimized_again,
206 "second run should be idle after blocking optimization"
207 );
208
209 assert_points_retrievable_with_vectors(&shard, &[251, 500, 999, 1000]);
211 }
212
213 #[test]
216 fn no_op_on_single_segment_without_deletions() {
217 let dir = tempfile::Builder::new()
218 .prefix("edge-opt-noop-single")
219 .tempdir()
220 .unwrap();
221
222 let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
223
224 let points = (1..=100).map(point).collect::<Vec<_>>();
225 shard
226 .update(PointOperation(UpsertPoints(PointsList(points))))
227 .unwrap();
228
229 let optimized = shard.optimize().unwrap();
230 assert!(!optimized, "single clean segment should not be optimized");
231 assert_eq!(shard.info().points_count, 100);
232 assert_eq!(shard.info().segments_count, 1);
233
234 assert_points_retrievable_with_vectors(&shard, &[1, 50, 100]);
235 }
236
237 #[test]
239 fn no_op_on_empty_shard() {
240 let dir = tempfile::Builder::new()
241 .prefix("edge-opt-noop-empty")
242 .tempdir()
243 .unwrap();
244
245 let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
246
247 let optimized = shard.optimize().unwrap();
248 assert!(!optimized, "empty shard should not trigger optimization");
249 assert_eq!(shard.info().points_count, 0);
250 }
251
252 #[test]
255 fn merge_reduces_excess_segments() {
256 let target_count = default_segment_number() + 6;
257
258 let dir = tempfile::Builder::new()
259 .prefix("edge-opt-merge-excess")
260 .tempdir()
261 .unwrap();
262
263 let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
264 shard
265 .update(PointOperation(UpsertPoints(PointsList(vec![point(1)]))))
266 .unwrap();
267 drop(shard);
268
269 multiply_segments(dir.path(), target_count);
270
271 let reopened = EdgeShard::load(dir.path(), None).unwrap();
272 reopened.optimize().unwrap();
273 let info = reopened.info();
274 assert!(
275 info.segments_count <= default_segment_number() + 1,
276 "segments should be reduced after merge: got {} segments, \
277 expected at most {} (default_segment_number={}, +1 for appendable)",
278 info.segments_count,
279 default_segment_number() + 1,
280 default_segment_number(),
281 );
282
283 let count = reopened
288 .count(CountRequestInternal {
289 filter: None,
290 exact: true,
291 })
292 .unwrap();
293 assert!(count >= 1, "shard should still have data after merge");
294
295 assert_points_retrievable_with_vectors(&reopened, &[1]);
296 }
297
298 #[test]
300 fn optimization_is_idempotent_after_merge() {
301 let target_count = default_segment_number() + 6;
302
303 let dir = tempfile::Builder::new()
304 .prefix("edge-opt-merge-idempotent")
305 .tempdir()
306 .unwrap();
307
308 let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
309 shard
310 .update(PointOperation(UpsertPoints(PointsList(vec![point(1)]))))
311 .unwrap();
312 drop(shard);
313
314 multiply_segments(dir.path(), target_count);
315
316 let reopened = EdgeShard::load(dir.path(), None).unwrap();
317 reopened.optimize().unwrap();
319 let segments_after_first = reopened.info().segments_count;
320
321 let optimized = reopened.optimize().unwrap();
323 assert!(
324 !optimized,
325 "second optimization run should be idle after merge"
326 );
327 assert_eq!(reopened.info().segments_count, segments_after_first);
328
329 assert_points_retrievable_with_vectors(&reopened, &[1]);
330 }
331
332 #[test]
335 fn vacuum_below_threshold_is_noop() {
336 let dir = tempfile::Builder::new()
337 .prefix("edge-opt-vacuum-below")
338 .tempdir()
339 .unwrap();
340
341 let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
342
343 let points = (1..=1000).map(point).collect::<Vec<_>>();
344 shard
345 .update(PointOperation(UpsertPoints(PointsList(points))))
346 .unwrap();
347
348 let deleted_ids = (1..=50).map(ExtendedPointId::NumId).collect::<Vec<_>>();
350 shard
351 .update(PointOperation(DeletePoints { ids: deleted_ids }))
352 .unwrap();
353
354 let optimized = shard.optimize().unwrap();
355 assert!(
356 !optimized,
357 "5% deletion should not trigger vacuum (threshold is 20%)"
358 );
359
360 assert_points_retrievable_with_vectors(&shard, &[51, 500, 1000]);
362 }
363
364 #[test]
367 fn vacuum_below_min_vector_count_is_noop() {
368 let dir = tempfile::Builder::new()
369 .prefix("edge-opt-vacuum-min-vecs")
370 .tempdir()
371 .unwrap();
372
373 let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
374
375 let points = (1..=100).map(point).collect::<Vec<_>>();
377 shard
378 .update(PointOperation(UpsertPoints(PointsList(points))))
379 .unwrap();
380
381 let deleted_ids = (1..=50).map(ExtendedPointId::NumId).collect::<Vec<_>>();
383 shard
384 .update(PointOperation(DeletePoints { ids: deleted_ids }))
385 .unwrap();
386
387 let optimized = shard.optimize().unwrap();
388 assert!(
389 !optimized,
390 "high deletion ratio with only 100 total points should not trigger vacuum \
391 (min_vectors_number=1000)"
392 );
393
394 assert_points_retrievable_with_vectors(&shard, &[51, 75, 100]);
395 }
396
397 #[test]
400 fn vacuum_preserves_remaining_points() {
401 let dir = tempfile::Builder::new()
402 .prefix("edge-opt-vacuum-data")
403 .tempdir()
404 .unwrap();
405
406 let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
407
408 let points = (1..=1000).map(point).collect::<Vec<_>>();
409 shard
410 .update(PointOperation(UpsertPoints(PointsList(points))))
411 .unwrap();
412
413 let deleted_ids = (1..=250).map(ExtendedPointId::NumId).collect::<Vec<_>>();
415 shard
416 .update(PointOperation(DeletePoints {
417 ids: deleted_ids.clone(),
418 }))
419 .unwrap();
420
421 let optimized = shard.optimize().unwrap();
422 assert!(optimized, "25% deletion should trigger vacuum");
423
424 let count = shard
426 .count(CountRequestInternal {
427 filter: None,
428 exact: true,
429 })
430 .unwrap();
431 assert_eq!(count, 750, "should have 750 remaining points after vacuum");
432
433 let deleted_results = shard
435 .retrieve(
436 &deleted_ids,
437 Some(WithPayloadInterface::Bool(false)),
438 Some(WithVector::Bool(false)),
439 )
440 .unwrap();
441 assert!(
442 deleted_results.is_empty(),
443 "deleted points should not be retrievable"
444 );
445
446 assert_points_retrievable_with_vectors(&shard, &[251, 500, 750, 1000]);
448 }
449
450 #[test]
456 fn vacuum_after_all_points_deleted() {
457 let dir = tempfile::Builder::new()
458 .prefix("edge-opt-vacuum-all-deleted")
459 .tempdir()
460 .unwrap();
461
462 let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
463
464 let points = (1..=1000).map(point).collect::<Vec<_>>();
465 shard
466 .update(PointOperation(UpsertPoints(PointsList(points))))
467 .unwrap();
468
469 let deleted_ids = (1..=1000).map(ExtendedPointId::NumId).collect();
471 shard
472 .update(PointOperation(DeletePoints { ids: deleted_ids }))
473 .unwrap();
474
475 let _optimized = shard.optimize().unwrap();
478
479 let count = shard
480 .count(CountRequestInternal {
481 filter: None,
482 exact: true,
483 })
484 .unwrap();
485 assert_eq!(count, 0, "all points should be gone after vacuum");
486
487 shard
489 .update(PointOperation(UpsertPoints(PointsList(vec![point(9999)]))))
490 .unwrap();
491 let count = shard
492 .count(CountRequestInternal {
493 filter: None,
494 exact: true,
495 })
496 .unwrap();
497 assert_eq!(count, 1, "shard should accept new points after full vacuum");
498
499 assert_points_retrievable_with_vectors(&shard, &[9999]);
500 }
501
502 #[test]
506 fn vacuum_at_exact_threshold_boundary_is_noop() {
507 let dir = tempfile::Builder::new()
508 .prefix("edge-opt-vacuum-boundary")
509 .tempdir()
510 .unwrap();
511
512 let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
513
514 let points = (1..=1000).map(point).collect::<Vec<_>>();
515 shard
516 .update(PointOperation(UpsertPoints(PointsList(points))))
517 .unwrap();
518
519 let deleted_ids = (1..=200).map(ExtendedPointId::NumId).collect();
521 shard
522 .update(PointOperation(DeletePoints { ids: deleted_ids }))
523 .unwrap();
524
525 let optimized = shard.optimize().unwrap();
526 assert!(
527 !optimized,
528 "exactly 20% deletion (not strictly greater) should not trigger vacuum"
529 );
530
531 assert_points_retrievable_with_vectors(&shard, &[201, 500, 1000]);
532 }
533
534 #[test]
536 fn vacuum_just_above_threshold_triggers() {
537 let dir = tempfile::Builder::new()
538 .prefix("edge-opt-vacuum-above")
539 .tempdir()
540 .unwrap();
541
542 let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
543
544 let points = (1..=1000).map(point).collect::<Vec<_>>();
545 shard
546 .update(PointOperation(UpsertPoints(PointsList(points))))
547 .unwrap();
548
549 let deleted_ids = (1..=201).map(ExtendedPointId::NumId).collect();
551 shard
552 .update(PointOperation(DeletePoints { ids: deleted_ids }))
553 .unwrap();
554
555 let optimized = shard.optimize().unwrap();
556 assert!(
557 optimized,
558 "20.1% deletion should trigger vacuum (threshold is >20%)"
559 );
560
561 assert_points_retrievable_with_vectors(&shard, &[202, 500, 1000]);
563 }
564
565 #[test]
568 fn merge_and_vacuum_cooperate() {
569 let target_count = default_segment_number() + 6;
570
571 let dir = tempfile::Builder::new()
572 .prefix("edge-opt-merge-vacuum")
573 .tempdir()
574 .unwrap();
575
576 let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
577
578 let points = (1..=1000).map(point).collect::<Vec<_>>();
580 shard
581 .update(PointOperation(UpsertPoints(PointsList(points))))
582 .unwrap();
583 let deleted_ids = (1..=250).map(ExtendedPointId::NumId).collect();
584 shard
585 .update(PointOperation(DeletePoints { ids: deleted_ids }))
586 .unwrap();
587 drop(shard);
588
589 multiply_segments(dir.path(), target_count);
591
592 let reopened = EdgeShard::load(dir.path(), None).unwrap();
594 reopened.optimize().unwrap();
595
596 let info = reopened.info();
597 assert!(
598 info.segments_count <= default_segment_number() + 1,
599 "excess segments should be merged: got {}",
600 info.segments_count,
601 );
602
603 let count = reopened
608 .count(CountRequestInternal {
609 filter: None,
610 exact: true,
611 })
612 .unwrap();
613 assert!(
614 count >= 750,
615 "merged shard should preserve surviving points"
616 );
617
618 assert_points_retrievable_with_vectors(&reopened, &[251, 500, 1000]);
620
621 let optimized = reopened.optimize().unwrap();
623 assert!(!optimized, "second run should be idle after merge+vacuum");
624 }
625
626 #[test]
628 fn data_survives_optimize_and_reload() {
629 let dir = tempfile::Builder::new()
630 .prefix("edge-opt-reload")
631 .tempdir()
632 .unwrap();
633
634 let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
635
636 let points = (1..=1000).map(point).collect::<Vec<_>>();
637 shard
638 .update(PointOperation(UpsertPoints(PointsList(points))))
639 .unwrap();
640
641 let deleted_ids = (1..=250).map(ExtendedPointId::NumId).collect();
643 shard
644 .update(PointOperation(DeletePoints { ids: deleted_ids }))
645 .unwrap();
646
647 let optimized = shard.optimize().unwrap();
648 assert!(optimized);
649 drop(shard);
650
651 let reopened = EdgeShard::load(dir.path(), None).unwrap();
653
654 let count = reopened
655 .count(CountRequestInternal {
656 filter: None,
657 exact: true,
658 })
659 .unwrap();
660 assert_eq!(count, 750, "point count should be preserved across reload");
661
662 assert_points_retrievable_with_vectors(&reopened, &[251, 500, 750, 1000]);
664 }
665
666 fn assert_points_retrievable_with_vectors(shard: &EdgeShard, ids: &[u64]) {
669 let point_ids = ids
670 .iter()
671 .map(|id| ExtendedPointId::NumId(*id))
672 .collect::<Vec<_>>();
673 let results = shard
674 .retrieve(
675 &point_ids,
676 Some(WithPayloadInterface::Bool(false)),
677 Some(WithVector::Bool(true)),
678 )
679 .unwrap();
680 assert_eq!(
681 results.len(),
682 ids.len(),
683 "expected {} retrievable points, got {}",
684 ids.len(),
685 results.len(),
686 );
687 for (result, &expected_id) in results.iter().zip(ids) {
688 assert_eq!(result.id, ExtendedPointId::NumId(expected_id));
689 let vectors = match result.vector.as_ref().expect("vector should be present") {
690 VectorStructInternal::Named(named) => named,
691 other => panic!("expected Named vectors, got {other:?}"),
692 };
693 let vec = match vectors.get(VECTOR_NAME).expect("vector name should exist") {
694 VectorInternal::Dense(v) => v,
695 other => panic!("expected Dense vector, got {other:?}"),
696 };
697 assert_eq!(
698 vec,
699 &vec![expected_id as f32],
700 "vector value mismatch for point {expected_id}"
701 );
702 }
703 }
704
705 fn test_config() -> EdgeConfig {
706 EdgeConfig {
707 on_disk_payload: false,
708 vectors: HashMap::from([(
709 VECTOR_NAME.to_string(),
710 EdgeVectorParams {
711 size: 1,
712 distance: Distance::Dot,
713 quantization_config: None,
714 multivector_config: None,
715 datatype: None,
716 on_disk: None,
717 hnsw_config: None,
718 },
719 )]),
720 sparse_vectors: HashMap::new(),
721 hnsw_config: Default::default(),
722 quantization_config: None,
723 optimizers: Default::default(),
724 }
725 }
726
727 fn point(id: u64) -> PointStructPersisted {
728 PointStructPersisted {
729 id: ExtendedPointId::NumId(id),
730 vector: VectorStructPersisted::from(VectorStructInternal::Named(HashMap::from([(
731 VECTOR_NAME.to_string(),
732 VectorInternal::from(vec![id as f32]),
733 )]))),
734 payload: None,
735 }
736 }
737
738 fn multiply_segments(shard_dir: &Path, target_count: usize) {
740 let segments_path = shard_dir.join("segments");
741 let segment_dirs = fs::read_dir(&segments_path)
742 .unwrap()
743 .filter_map(Result::ok)
744 .map(|entry| entry.path())
745 .filter(|path| path.is_dir())
746 .collect::<Vec<_>>();
747 assert!(!segment_dirs.is_empty(), "need at least one source segment");
748
749 let source = &segment_dirs[0];
750 let current_count = segment_dirs.len();
751 for _ in current_count..target_count {
752 let target = segments_path.join(Uuid::new_v4().to_string());
753 copy_dir_recursive(source, &target);
754 }
755 }
756
757 fn duplicate_single_segment(shard_dir: &Path) {
758 let segments_path = shard_dir.join("segments");
759 let segment_dirs = fs::read_dir(&segments_path)
760 .unwrap()
761 .filter_map(Result::ok)
762 .map(|entry| entry.path())
763 .filter(|path| path.is_dir())
764 .collect::<Vec<_>>();
765 assert_eq!(segment_dirs.len(), 1, "expected exactly one source segment");
766
767 let source = &segment_dirs[0];
768 let target = segments_path.join(Uuid::new_v4().to_string());
769 copy_dir_recursive(source, &target);
770 }
771
772 fn copy_dir_recursive(from: &Path, to: &Path) {
773 fs::create_dir_all(to).unwrap();
774 for entry in fs::read_dir(from).unwrap().filter_map(Result::ok) {
775 let from_path = entry.path();
776 let to_path = to.join(entry.file_name());
777 if entry.file_type().unwrap().is_dir() {
778 copy_dir_recursive(&from_path, &to_path);
779 } else {
780 fs::copy(&from_path, &to_path).unwrap();
781 }
782 }
783 }
784}