Skip to main content

qdrant_edge/edge/
optimize.rs

1use std::sync::Arc;
2use std::sync::atomic::AtomicBool;
3
4use crate::common::budget::ResourceBudget;
5use crate::common::progress_tracker::new_progress_tracker;
6use crate::segment::common::operation_error::{OperationError, OperationResult};
7use crate::segment::types::HnswGlobalConfig;
8use crate::shard::optimizers::config::{
9    DEFAULT_DELETED_THRESHOLD, DEFAULT_VACUUM_MIN_VECTOR_NUMBER, TEMP_SEGMENTS_PATH,
10};
11use crate::shard::optimizers::config_mismatch_optimizer::ConfigMismatchOptimizer;
12use crate::shard::optimizers::indexing_optimizer::IndexingOptimizer;
13use crate::shard::optimizers::merge_optimizer::MergeOptimizer;
14use crate::shard::optimizers::segment_optimizer::{
15    Optimizer, max_num_indexing_threads, plan_optimizations,
16};
17use crate::shard::optimizers::vacuum_optimizer::VacuumOptimizer;
18use uuid::Uuid;
19
20use crate::edge::{EdgeShard, SEGMENTS_PATH};
21
22impl EdgeShard {
23    /// Run shard optimizers in-process and blocking until no more optimization plans are produced.
24    ///
25    /// This is synchronous and does not spawn background optimization workers.
26    pub fn optimize(&self) -> OperationResult<bool> {
27        let optimizers = self.build_blocking_optimizers();
28        let stopped = AtomicBool::new(false);
29        let mut optimized_any = false;
30
31        loop {
32            let planned = {
33                let segments = self.segments.read();
34                plan_optimizations(&segments, &optimizers)
35            };
36
37            if planned.is_empty() {
38                return Ok(optimized_any);
39            }
40
41            let mut optimized_in_iteration = false;
42
43            for (optimizer, segment_ids) in planned {
44                let num_indexing_threads = optimizer.num_indexing_threads();
45                let desired_io = num_indexing_threads;
46                // Bypass budget in Edge, always allocate the full desired IO for the optimizer.
47                let budget = ResourceBudget::new(num_indexing_threads, desired_io);
48                let permit = budget.try_acquire(0, desired_io).ok_or_else(|| {
49                    OperationError::service_error(format!(
50                        "failed to acquire resource permit for {} optimizer",
51                        optimizer.name(),
52                    ))
53                })?;
54
55                let (_, progress) = new_progress_tracker();
56                let points_optimized = optimizer.as_ref().optimize(
57                    self.segments.clone(),
58                    segment_ids,
59                    Uuid::new_v4(),
60                    permit,
61                    budget,
62                    &stopped,
63                    progress,
64                    Box::new(|| ()),
65                )?;
66
67                if points_optimized > 0 {
68                    optimized_in_iteration = true;
69                    optimized_any = true;
70                }
71            }
72
73            // Avoid repeating the same plan forever if no optimizer made effective progress.
74            if !optimized_in_iteration {
75                return Ok(optimized_any);
76            }
77        }
78    }
79
80    fn build_blocking_optimizers(&self) -> Vec<Arc<Optimizer>> {
81        let segments_path = self.path.join(SEGMENTS_PATH);
82        let temp_segments_path = self.path.join(TEMP_SEGMENTS_PATH);
83
84        let cfg = self.config();
85        let segment_optimizer_config = cfg.segment_optimizer_config();
86        let global_hnsw_config = cfg.hnsw_config;
87        let hnsw_global_config = HnswGlobalConfig::default();
88        let num_indexing_threads = max_num_indexing_threads(&segment_optimizer_config);
89        let threshold_config = cfg.optimizer_thresholds(num_indexing_threads);
90        let default_segments_number = cfg.optimizers.get_number_segments();
91
92        vec![
93            Arc::new(MergeOptimizer::new(
94                default_segments_number,
95                threshold_config,
96                segments_path.clone(),
97                temp_segments_path.clone(),
98                segment_optimizer_config.clone(),
99                hnsw_global_config.clone(),
100            )),
101            Arc::new(IndexingOptimizer::new(
102                default_segments_number,
103                threshold_config,
104                segments_path.clone(),
105                temp_segments_path.clone(),
106                segment_optimizer_config.clone(),
107                hnsw_global_config.clone(),
108            )),
109            Arc::new(VacuumOptimizer::new(
110                cfg.optimizers
111                    .deleted_threshold
112                    .unwrap_or(DEFAULT_DELETED_THRESHOLD),
113                cfg.optimizers
114                    .vacuum_min_vector_number
115                    .unwrap_or(DEFAULT_VACUUM_MIN_VECTOR_NUMBER),
116                threshold_config,
117                segments_path.clone(),
118                temp_segments_path.clone(),
119                segment_optimizer_config.clone(),
120                hnsw_global_config.clone(),
121            )),
122            Arc::new(ConfigMismatchOptimizer::new(
123                threshold_config,
124                segments_path,
125                temp_segments_path,
126                segment_optimizer_config,
127                global_hnsw_config,
128                hnsw_global_config,
129            )),
130        ]
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use std::collections::HashMap;
137    use std::path::Path;
138
139    use fs_err as fs;
140    use crate::segment::data_types::vectors::{VectorInternal, VectorStructInternal};
141    use crate::segment::types::{Distance, ExtendedPointId, WithPayloadInterface, WithVector};
142    use crate::shard::count::CountRequestInternal;
143    use crate::shard::operations::CollectionUpdateOperations::PointOperation;
144    use crate::shard::operations::point_ops::PointInsertOperationsInternal::PointsList;
145    use crate::shard::operations::point_ops::PointOperations::{DeletePoints, UpsertPoints};
146    use crate::shard::operations::point_ops::{PointStructPersisted, VectorStructPersisted};
147    use crate::shard::optimizers::config::default_segment_number;
148    use uuid::Uuid;
149
150    use crate::edge::config::vectors::EdgeVectorParams;
151    use crate::edge::{EdgeConfig, EdgeShard};
152
153    const VECTOR_NAME: &str = "edge-test-vector";
154
155    #[test]
156    fn does_not_force_merge_all_segments_into_one() {
157        let dir = tempfile::Builder::new()
158            .prefix("edge-opt-do-not-force-one")
159            .tempdir()
160            .unwrap();
161
162        let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
163        shard
164            .update(PointOperation(UpsertPoints(PointsList(vec![point(1)]))))
165            .unwrap();
166        drop(shard);
167
168        duplicate_single_segment(dir.path());
169
170        let reopened = EdgeShard::load(dir.path(), None).unwrap();
171        assert_eq!(reopened.info().segments_count, 2);
172
173        let optimized = reopened.optimize().unwrap();
174        assert!(!optimized, "optimizer should not force-merge all segments");
175        assert_eq!(reopened.info().segments_count, 2);
176
177        assert_points_retrievable_with_vectors(&reopened, &[1]);
178    }
179
180    #[test]
181    fn vacuum_optimizer_runs_in_blocking_mode_until_idle() {
182        let dir = tempfile::Builder::new()
183            .prefix("edge-opt-vacuum")
184            .tempdir()
185            .unwrap();
186
187        let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
188
189        let points = (1..=1000).map(point).collect::<Vec<_>>();
190        shard
191            .update(PointOperation(UpsertPoints(PointsList(points))))
192            .unwrap();
193
194        // Delete 250/1000 = 25%, above DEFAULT_DELETED_THRESHOLD (20%)
195        let deleted_ids = (1..=250).map(ExtendedPointId::NumId).collect::<Vec<_>>();
196        shard
197            .update(PointOperation(DeletePoints { ids: deleted_ids }))
198            .unwrap();
199
200        let optimized = shard.optimize().unwrap();
201        assert!(optimized, "vacuum candidate should be optimized");
202
203        let optimized_again = shard.optimize().unwrap();
204        assert!(
205            !optimized_again,
206            "second run should be idle after blocking optimization"
207        );
208
209        // Verify surviving points are queryable with correct vectors
210        assert_points_retrievable_with_vectors(&shard, &[251, 500, 999, 1000]);
211    }
212
213    /// A fresh shard with a single small segment and no deletions should not
214    /// trigger any optimizer.
215    #[test]
216    fn no_op_on_single_segment_without_deletions() {
217        let dir = tempfile::Builder::new()
218            .prefix("edge-opt-noop-single")
219            .tempdir()
220            .unwrap();
221
222        let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
223
224        let points = (1..=100).map(point).collect::<Vec<_>>();
225        shard
226            .update(PointOperation(UpsertPoints(PointsList(points))))
227            .unwrap();
228
229        let optimized = shard.optimize().unwrap();
230        assert!(!optimized, "single clean segment should not be optimized");
231        assert_eq!(shard.info().points_count, 100);
232        assert_eq!(shard.info().segments_count, 1);
233
234        assert_points_retrievable_with_vectors(&shard, &[1, 50, 100]);
235    }
236
237    /// An empty shard (no data at all) should be a no-op.
238    #[test]
239    fn no_op_on_empty_shard() {
240        let dir = tempfile::Builder::new()
241            .prefix("edge-opt-noop-empty")
242            .tempdir()
243            .unwrap();
244
245        let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
246
247        let optimized = shard.optimize().unwrap();
248        assert!(!optimized, "empty shard should not trigger optimization");
249        assert_eq!(shard.info().points_count, 0);
250    }
251
252    /// Creating more segments than `default_segment_number` should trigger
253    /// the merge optimizer to reduce the segment count.
254    #[test]
255    fn merge_reduces_excess_segments() {
256        let target_count = default_segment_number() + 6;
257
258        let dir = tempfile::Builder::new()
259            .prefix("edge-opt-merge-excess")
260            .tempdir()
261            .unwrap();
262
263        let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
264        shard
265            .update(PointOperation(UpsertPoints(PointsList(vec![point(1)]))))
266            .unwrap();
267        drop(shard);
268
269        multiply_segments(dir.path(), target_count);
270
271        let reopened = EdgeShard::load(dir.path(), None).unwrap();
272        reopened.optimize().unwrap();
273        let info = reopened.info();
274        assert!(
275            info.segments_count <= default_segment_number() + 1,
276            "segments should be reduced after merge: got {} segments, \
277             expected at most {} (default_segment_number={}, +1 for appendable)",
278            info.segments_count,
279            default_segment_number() + 1,
280            default_segment_number(),
281        );
282
283        // All duplicated segments contained the same point (id=1). After merge,
284        // the exact info().points_count depends on how many segments remain
285        // (info sums per-segment counts without cross-segment deduplication).
286        // The important invariant is that the shard is functional.
287        let count = reopened
288            .count(CountRequestInternal {
289                filter: None,
290                exact: true,
291            })
292            .unwrap();
293        assert!(count >= 1, "shard should still have data after merge");
294
295        assert_points_retrievable_with_vectors(&reopened, &[1]);
296    }
297
298    /// After a merge optimization, a second run should be a no-op.
299    #[test]
300    fn optimization_is_idempotent_after_merge() {
301        let target_count = default_segment_number() + 6;
302
303        let dir = tempfile::Builder::new()
304            .prefix("edge-opt-merge-idempotent")
305            .tempdir()
306            .unwrap();
307
308        let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
309        shard
310            .update(PointOperation(UpsertPoints(PointsList(vec![point(1)]))))
311            .unwrap();
312        drop(shard);
313
314        multiply_segments(dir.path(), target_count);
315
316        let reopened = EdgeShard::load(dir.path(), None).unwrap();
317        // First explicit optimization triggers merge.
318        reopened.optimize().unwrap();
319        let segments_after_first = reopened.info().segments_count;
320
321        // Second explicit optimization should be a no-op.
322        let optimized = reopened.optimize().unwrap();
323        assert!(
324            !optimized,
325            "second optimization run should be idle after merge"
326        );
327        assert_eq!(reopened.info().segments_count, segments_after_first);
328
329        assert_points_retrievable_with_vectors(&reopened, &[1]);
330    }
331
332    /// Deleting less than 20% of points (below the vacuum threshold)
333    /// should NOT trigger the vacuum optimizer.
334    #[test]
335    fn vacuum_below_threshold_is_noop() {
336        let dir = tempfile::Builder::new()
337            .prefix("edge-opt-vacuum-below")
338            .tempdir()
339            .unwrap();
340
341        let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
342
343        let points = (1..=1000).map(point).collect::<Vec<_>>();
344        shard
345            .update(PointOperation(UpsertPoints(PointsList(points))))
346            .unwrap();
347
348        // Delete 5% — below the 20% threshold (DEFAULT_DELETED_THRESHOLD)
349        let deleted_ids = (1..=50).map(ExtendedPointId::NumId).collect::<Vec<_>>();
350        shard
351            .update(PointOperation(DeletePoints { ids: deleted_ids }))
352            .unwrap();
353
354        let optimized = shard.optimize().unwrap();
355        assert!(
356            !optimized,
357            "5% deletion should not trigger vacuum (threshold is 20%)"
358        );
359
360        // Surviving points should still have correct vectors
361        assert_points_retrievable_with_vectors(&shard, &[51, 500, 1000]);
362    }
363
364    /// Deleting below the minimum vector count (< 1000 total points)
365    /// should NOT trigger the vacuum optimizer even with a high deletion ratio.
366    #[test]
367    fn vacuum_below_min_vector_count_is_noop() {
368        let dir = tempfile::Builder::new()
369            .prefix("edge-opt-vacuum-min-vecs")
370            .tempdir()
371            .unwrap();
372
373        let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
374
375        // Only 100 points total (below DEFAULT_VACUUM_MIN_VECTOR_NUMBER=1000)
376        let points = (1..=100).map(point).collect::<Vec<_>>();
377        shard
378            .update(PointOperation(UpsertPoints(PointsList(points))))
379            .unwrap();
380
381        // Delete 50% — above ratio threshold (20%), but total count is below minimum
382        let deleted_ids = (1..=50).map(ExtendedPointId::NumId).collect::<Vec<_>>();
383        shard
384            .update(PointOperation(DeletePoints { ids: deleted_ids }))
385            .unwrap();
386
387        let optimized = shard.optimize().unwrap();
388        assert!(
389            !optimized,
390            "high deletion ratio with only 100 total points should not trigger vacuum \
391             (min_vectors_number=1000)"
392        );
393
394        assert_points_retrievable_with_vectors(&shard, &[51, 75, 100]);
395    }
396
397    /// After vacuum optimization, all non-deleted points should still be
398    /// retrievable and deleted points should be gone.
399    #[test]
400    fn vacuum_preserves_remaining_points() {
401        let dir = tempfile::Builder::new()
402            .prefix("edge-opt-vacuum-data")
403            .tempdir()
404            .unwrap();
405
406        let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
407
408        let points = (1..=1000).map(point).collect::<Vec<_>>();
409        shard
410            .update(PointOperation(UpsertPoints(PointsList(points))))
411            .unwrap();
412
413        // Delete points 1..=250 (25%, above DEFAULT_DELETED_THRESHOLD=20%)
414        let deleted_ids = (1..=250).map(ExtendedPointId::NumId).collect::<Vec<_>>();
415        shard
416            .update(PointOperation(DeletePoints {
417                ids: deleted_ids.clone(),
418            }))
419            .unwrap();
420
421        let optimized = shard.optimize().unwrap();
422        assert!(optimized, "25% deletion should trigger vacuum");
423
424        // Verify point count
425        let count = shard
426            .count(CountRequestInternal {
427                filter: None,
428                exact: true,
429            })
430            .unwrap();
431        assert_eq!(count, 750, "should have 750 remaining points after vacuum");
432
433        // Verify deleted points are gone
434        let deleted_results = shard
435            .retrieve(
436                &deleted_ids,
437                Some(WithPayloadInterface::Bool(false)),
438                Some(WithVector::Bool(false)),
439            )
440            .unwrap();
441        assert!(
442            deleted_results.is_empty(),
443            "deleted points should not be retrievable"
444        );
445
446        // Verify surviving points are accessible with correct vectors
447        assert_points_retrievable_with_vectors(&shard, &[251, 500, 750, 1000]);
448    }
449
450    /// Deleting all points from a segment should be handled gracefully.
451    /// The vacuum optimizer plans the segment for rebuild, but because the
452    /// resulting segment has 0 points, `optimize_all_segments_blocking`
453    /// reports `false` (zero points processed). The shard should still be
454    /// valid and accept new data afterward.
455    #[test]
456    fn vacuum_after_all_points_deleted() {
457        let dir = tempfile::Builder::new()
458            .prefix("edge-opt-vacuum-all-deleted")
459            .tempdir()
460            .unwrap();
461
462        let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
463
464        let points = (1..=1000).map(point).collect::<Vec<_>>();
465        shard
466            .update(PointOperation(UpsertPoints(PointsList(points))))
467            .unwrap();
468
469        // Delete ALL points
470        let deleted_ids = (1..=1000).map(ExtendedPointId::NumId).collect();
471        shard
472            .update(PointOperation(DeletePoints { ids: deleted_ids }))
473            .unwrap();
474
475        // The vacuum optimizer rebuilds the segment, but since 0 points remain
476        // in the result, `points_optimized == 0` and the function returns false.
477        let _optimized = shard.optimize().unwrap();
478
479        let count = shard
480            .count(CountRequestInternal {
481                filter: None,
482                exact: true,
483            })
484            .unwrap();
485        assert_eq!(count, 0, "all points should be gone after vacuum");
486
487        // Shard should still be functional — can insert new points
488        shard
489            .update(PointOperation(UpsertPoints(PointsList(vec![point(9999)]))))
490            .unwrap();
491        let count = shard
492            .count(CountRequestInternal {
493                filter: None,
494                exact: true,
495            })
496            .unwrap();
497        assert_eq!(count, 1, "shard should accept new points after full vacuum");
498
499        assert_points_retrievable_with_vectors(&shard, &[9999]);
500    }
501
502    /// Vacuum at exactly the threshold boundary (20% deleted, 1000 total).
503    /// The threshold check is strictly greater-than, so exactly 20% should
504    /// NOT trigger vacuum.
505    #[test]
506    fn vacuum_at_exact_threshold_boundary_is_noop() {
507        let dir = tempfile::Builder::new()
508            .prefix("edge-opt-vacuum-boundary")
509            .tempdir()
510            .unwrap();
511
512        let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
513
514        let points = (1..=1000).map(point).collect::<Vec<_>>();
515        shard
516            .update(PointOperation(UpsertPoints(PointsList(points))))
517            .unwrap();
518
519        // Delete exactly 20% (200 out of 1000) — matches DEFAULT_DELETED_THRESHOLD
520        let deleted_ids = (1..=200).map(ExtendedPointId::NumId).collect();
521        shard
522            .update(PointOperation(DeletePoints { ids: deleted_ids }))
523            .unwrap();
524
525        let optimized = shard.optimize().unwrap();
526        assert!(
527            !optimized,
528            "exactly 20% deletion (not strictly greater) should not trigger vacuum"
529        );
530
531        assert_points_retrievable_with_vectors(&shard, &[201, 500, 1000]);
532    }
533
534    /// Just above the vacuum threshold should trigger optimization.
535    #[test]
536    fn vacuum_just_above_threshold_triggers() {
537        let dir = tempfile::Builder::new()
538            .prefix("edge-opt-vacuum-above")
539            .tempdir()
540            .unwrap();
541
542        let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
543
544        let points = (1..=1000).map(point).collect::<Vec<_>>();
545        shard
546            .update(PointOperation(UpsertPoints(PointsList(points))))
547            .unwrap();
548
549        // Delete 201 out of 1000 = 20.1% — just above DEFAULT_DELETED_THRESHOLD (20%)
550        let deleted_ids = (1..=201).map(ExtendedPointId::NumId).collect();
551        shard
552            .update(PointOperation(DeletePoints { ids: deleted_ids }))
553            .unwrap();
554
555        let optimized = shard.optimize().unwrap();
556        assert!(
557            optimized,
558            "20.1% deletion should trigger vacuum (threshold is >20%)"
559        );
560
561        // Points 202..=1000 should survive with correct vectors
562        assert_points_retrievable_with_vectors(&shard, &[202, 500, 1000]);
563    }
564
565    /// When there are excess segments AND some have high deletion ratios,
566    /// optimization should handle both (merge + vacuum).
567    #[test]
568    fn merge_and_vacuum_cooperate() {
569        let target_count = default_segment_number() + 6;
570
571        let dir = tempfile::Builder::new()
572            .prefix("edge-opt-merge-vacuum")
573            .tempdir()
574            .unwrap();
575
576        let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
577
578        // Insert 1000 points, then delete 250 (25% — above DEFAULT_DELETED_THRESHOLD=20%)
579        let points = (1..=1000).map(point).collect::<Vec<_>>();
580        shard
581            .update(PointOperation(UpsertPoints(PointsList(points))))
582            .unwrap();
583        let deleted_ids = (1..=250).map(ExtendedPointId::NumId).collect();
584        shard
585            .update(PointOperation(DeletePoints { ids: deleted_ids }))
586            .unwrap();
587        drop(shard);
588
589        // Create excess segments
590        multiply_segments(dir.path(), target_count);
591
592        // Explicit optimization (both merge and vacuum should run)
593        let reopened = EdgeShard::load(dir.path(), None).unwrap();
594        reopened.optimize().unwrap();
595
596        let info = reopened.info();
597        assert!(
598            info.segments_count <= default_segment_number() + 1,
599            "excess segments should be merged: got {}",
600            info.segments_count,
601        );
602
603        // The duplicated segments each had 750 surviving points (same IDs).
604        // After merge, the shard should be functional with correct data.
605        // We use count(exact=true) since info().points_count sums per-segment
606        // counts without cross-segment deduplication.
607        let count = reopened
608            .count(CountRequestInternal {
609                filter: None,
610                exact: true,
611            })
612            .unwrap();
613        assert!(
614            count >= 750,
615            "merged shard should preserve surviving points"
616        );
617
618        // Surviving points (251..=1000) should be queryable with correct vectors
619        assert_points_retrievable_with_vectors(&reopened, &[251, 500, 1000]);
620
621        // Second run should be idle
622        let optimized = reopened.optimize().unwrap();
623        assert!(!optimized, "second run should be idle after merge+vacuum");
624    }
625
626    /// Optimized shard should survive a reload and still serve correct data.
627    #[test]
628    fn data_survives_optimize_and_reload() {
629        let dir = tempfile::Builder::new()
630            .prefix("edge-opt-reload")
631            .tempdir()
632            .unwrap();
633
634        let shard = EdgeShard::new(dir.path(), test_config()).unwrap();
635
636        let points = (1..=1000).map(point).collect::<Vec<_>>();
637        shard
638            .update(PointOperation(UpsertPoints(PointsList(points))))
639            .unwrap();
640
641        // Delete 250 points (25%, above DEFAULT_DELETED_THRESHOLD=20%), then optimize
642        let deleted_ids = (1..=250).map(ExtendedPointId::NumId).collect();
643        shard
644            .update(PointOperation(DeletePoints { ids: deleted_ids }))
645            .unwrap();
646
647        let optimized = shard.optimize().unwrap();
648        assert!(optimized);
649        drop(shard);
650
651        // Reload the shard
652        let reopened = EdgeShard::load(dir.path(), None).unwrap();
653
654        let count = reopened
655            .count(CountRequestInternal {
656                filter: None,
657                exact: true,
658            })
659            .unwrap();
660        assert_eq!(count, 750, "point count should be preserved across reload");
661
662        // Verify specific points survive reload with correct vectors
663        assert_points_retrievable_with_vectors(&reopened, &[251, 500, 750, 1000]);
664    }
665
666    /// Retrieve points by ID and verify each one is present with the correct
667    /// vector value. Every test point was created with vector `[id as f32]`.
668    fn assert_points_retrievable_with_vectors(shard: &EdgeShard, ids: &[u64]) {
669        let point_ids = ids
670            .iter()
671            .map(|id| ExtendedPointId::NumId(*id))
672            .collect::<Vec<_>>();
673        let results = shard
674            .retrieve(
675                &point_ids,
676                Some(WithPayloadInterface::Bool(false)),
677                Some(WithVector::Bool(true)),
678            )
679            .unwrap();
680        assert_eq!(
681            results.len(),
682            ids.len(),
683            "expected {} retrievable points, got {}",
684            ids.len(),
685            results.len(),
686        );
687        for (result, &expected_id) in results.iter().zip(ids) {
688            assert_eq!(result.id, ExtendedPointId::NumId(expected_id));
689            let vectors = match result.vector.as_ref().expect("vector should be present") {
690                VectorStructInternal::Named(named) => named,
691                other => panic!("expected Named vectors, got {other:?}"),
692            };
693            let vec = match vectors.get(VECTOR_NAME).expect("vector name should exist") {
694                VectorInternal::Dense(v) => v,
695                other => panic!("expected Dense vector, got {other:?}"),
696            };
697            assert_eq!(
698                vec,
699                &vec![expected_id as f32],
700                "vector value mismatch for point {expected_id}"
701            );
702        }
703    }
704
705    fn test_config() -> EdgeConfig {
706        EdgeConfig {
707            on_disk_payload: false,
708            vectors: HashMap::from([(
709                VECTOR_NAME.to_string(),
710                EdgeVectorParams {
711                    size: 1,
712                    distance: Distance::Dot,
713                    quantization_config: None,
714                    multivector_config: None,
715                    datatype: None,
716                    on_disk: None,
717                    hnsw_config: None,
718                },
719            )]),
720            sparse_vectors: HashMap::new(),
721            hnsw_config: Default::default(),
722            quantization_config: None,
723            optimizers: Default::default(),
724        }
725    }
726
727    fn point(id: u64) -> PointStructPersisted {
728        PointStructPersisted {
729            id: ExtendedPointId::NumId(id),
730            vector: VectorStructPersisted::from(VectorStructInternal::Named(HashMap::from([(
731                VECTOR_NAME.to_string(),
732                VectorInternal::from(vec![id as f32]),
733            )]))),
734            payload: None,
735        }
736    }
737
738    /// Copy the first segment on disk to reach `target_count` total segments.
739    fn multiply_segments(shard_dir: &Path, target_count: usize) {
740        let segments_path = shard_dir.join("segments");
741        let segment_dirs = fs::read_dir(&segments_path)
742            .unwrap()
743            .filter_map(Result::ok)
744            .map(|entry| entry.path())
745            .filter(|path| path.is_dir())
746            .collect::<Vec<_>>();
747        assert!(!segment_dirs.is_empty(), "need at least one source segment");
748
749        let source = &segment_dirs[0];
750        let current_count = segment_dirs.len();
751        for _ in current_count..target_count {
752            let target = segments_path.join(Uuid::new_v4().to_string());
753            copy_dir_recursive(source, &target);
754        }
755    }
756
757    fn duplicate_single_segment(shard_dir: &Path) {
758        let segments_path = shard_dir.join("segments");
759        let segment_dirs = fs::read_dir(&segments_path)
760            .unwrap()
761            .filter_map(Result::ok)
762            .map(|entry| entry.path())
763            .filter(|path| path.is_dir())
764            .collect::<Vec<_>>();
765        assert_eq!(segment_dirs.len(), 1, "expected exactly one source segment");
766
767        let source = &segment_dirs[0];
768        let target = segments_path.join(Uuid::new_v4().to_string());
769        copy_dir_recursive(source, &target);
770    }
771
772    fn copy_dir_recursive(from: &Path, to: &Path) {
773        fs::create_dir_all(to).unwrap();
774        for entry in fs::read_dir(from).unwrap().filter_map(Result::ok) {
775            let from_path = entry.path();
776            let to_path = to.join(entry.file_name());
777            if entry.file_type().unwrap().is_dir() {
778                copy_dir_recursive(&from_path, &to_path);
779            } else {
780                fs::copy(&from_path, &to_path).unwrap();
781            }
782        }
783    }
784}