Skip to main content

qdrant_edge/edge/
search.rs

1use std::cmp;
2
3use crate::common::counter::hardware_accumulator::HwMeasurementAcc;
4use crate::segment::common::operation_error::OperationResult;
5use crate::segment::data_types::modifier::Modifier;
6use crate::segment::data_types::vectors::QueryVector;
7use crate::segment::types::{DEFAULT_FULL_SCAN_THRESHOLD, ScoredPoint, WithPayload};
8use crate::shard::common::stopping_guard::StoppingGuard;
9use crate::shard::query::query_context::{fill_query_context, init_query_context};
10use crate::shard::search::CoreSearchRequest;
11use crate::shard::search_result_aggregator::BatchResultAggregator;
12
13use crate::edge::{DEFAULT_EDGE_TIMEOUT, EdgeShard};
14
15impl EdgeShard {
16    /// This method is DEPRECATED and should be replaced with query.
17    pub fn search(&self, search: CoreSearchRequest) -> OperationResult<Vec<ScoredPoint>> {
18        let is_stopped_guard = StoppingGuard::new();
19        let searches = [search];
20        let query_context = init_query_context(
21            &searches,
22            DEFAULT_FULL_SCAN_THRESHOLD,
23            &is_stopped_guard,
24            HwMeasurementAcc::disposable_edge(),
25            |vector_name| {
26                self.config
27                    .read()
28                    .sparse_vectors
29                    .get(vector_name)
30                    .is_some_and(|v| v.modifier == Some(Modifier::Idf))
31            },
32        );
33        let [search] = searches;
34        let Some(context) = fill_query_context(
35            query_context,
36            self.segments.clone(),
37            DEFAULT_EDGE_TIMEOUT,
38            &is_stopped_guard.get_is_stopped(),
39        )?
40        else {
41            // No segments to search
42            return Ok(vec![]);
43        };
44
45        let segments: Vec<_> = self
46            .segments
47            .read()
48            .non_appendable_then_appendable_segments()
49            .collect();
50
51        let CoreSearchRequest {
52            query,
53            filter,
54            params,
55            limit,
56            offset,
57            with_payload,
58            with_vector,
59            score_threshold,
60        } = search;
61
62        let vector_name = query.get_vector_name().to_string();
63        let query_vector = QueryVector::from(query);
64        let with_payload = WithPayload::from(with_payload.unwrap_or_default());
65        let with_vector = with_vector.unwrap_or_default();
66
67        let mut points_by_segment = Vec::with_capacity(segments.len());
68
69        for segment in segments {
70            let batched_points = segment.get().read().search_batch(
71                &vector_name,
72                &[&query_vector],
73                &with_payload,
74                &with_vector,
75                filter.as_ref(),
76                offset + limit,
77                params.as_ref(),
78                &context.get_segment_query_context(),
79            )?;
80
81            debug_assert_eq!(batched_points.len(), 1);
82
83            let [points] = batched_points
84                .try_into()
85                .expect("single batched search result");
86
87            points_by_segment.push(points);
88        }
89
90        let mut aggregator = BatchResultAggregator::new([offset + limit]);
91        aggregator.update_point_versions(points_by_segment.iter().flatten());
92
93        for points in points_by_segment {
94            aggregator.update_batch_results(0, points);
95        }
96
97        let [mut points] = aggregator
98            .into_topk()
99            .try_into()
100            .expect("single batched search result");
101
102        let distance = {
103            let config = self.config.read();
104            if let Some(dense) = config.vectors.get(&vector_name) {
105                dense.distance
106            } else if config.sparse_vectors.contains_key(&vector_name) {
107                crate::segment::types::Distance::Dot
108            } else {
109                return Err(
110                    crate::segment::common::operation_error::OperationError::service_error(format!(
111                        "vector config for '{vector_name}' does not exist"
112                    )),
113                );
114            }
115        };
116
117        match &query_vector {
118            QueryVector::Nearest(_) => {
119                for point in &mut points {
120                    point.score = distance.postprocess_score(point.score);
121                }
122            }
123            QueryVector::RecommendBestScore(_) => (),
124            QueryVector::RecommendSumScores(_) => (),
125            QueryVector::Discover(_) => (),
126            QueryVector::Context(_) => (),
127            QueryVector::FeedbackNaive(_) => (),
128        }
129
130        if let Some(score_threshold) = score_threshold {
131            debug_assert!(
132                points.is_sorted_by(|left, right| distance.is_ordered(left.score, right.score)),
133            );
134
135            let below_threshold = points
136                .iter()
137                .enumerate()
138                .find(|(_, point)| !distance.check_threshold(point.score, score_threshold));
139
140            if let Some((below_threshold_idx, _)) = below_threshold {
141                points.truncate(below_threshold_idx);
142            }
143        }
144
145        let _ = points.drain(..cmp::min(points.len(), offset));
146
147        Ok(points)
148    }
149}