Skip to main content

gobby_code/vector/code_symbols/
qdrant.rs

1use reqwest::StatusCode;
2use serde_json::{Value, json};
3use std::collections::{BTreeSet, HashSet};
4use std::sync::OnceLock;
5use std::time::Duration;
6
7use crate::config::{CODE_SYMBOL_COLLECTION_PREFIX, QdrantConfig};
8use gobby_core::degradation::ServiceState;
9use gobby_core::qdrant::{CollectionNameError, CollectionScope, SearchRequest};
10
11use super::types::{ExistingVectorCollectionSchema, VectorLifecycleError};
12
13// Keep code-symbol collections compatible with the Python daemon's Qdrant schema.
14pub const VECTOR_DISTANCE_COSINE: &str = "Cosine";
15const QDRANT_DELETE_TIMEOUT_SECS_ENV: &str = "GCODE_QDRANT_DELETE_TIMEOUT_SECS";
16const DEFAULT_QDRANT_DELETE_TIMEOUT: Duration = Duration::from_secs(60);
17const QDRANT_SCROLL_LIMIT: usize = 256;
18static QDRANT_HTTP_CLIENT: OnceLock<reqwest::blocking::Client> = OnceLock::new();
19
20#[derive(Debug, Clone, PartialEq, Eq)]
21pub struct VectorOrphanCleanup {
22    pub project_id: String,
23    pub collection: String,
24    pub vector_files_scanned: usize,
25    pub orphan_files_deleted: usize,
26    pub vectors_deleted: usize,
27}
28
29pub fn collection_name(
30    collection_prefix: &str,
31    project_id: &str,
32) -> Result<String, CollectionNameError> {
33    let collection = format!("{collection_prefix}{project_id}");
34    gobby_core::qdrant::collection_name("gcode", CollectionScope::Custom(&collection))
35}
36
37pub(super) fn collection_path(collection: &str) -> String {
38    format!("/collections/{}", urlencoding::encode(collection))
39}
40
41pub fn delete_project_collection(
42    qdrant: &QdrantConfig,
43    project_id: &str,
44) -> Result<usize, VectorLifecycleError> {
45    let client = qdrant_http_client()?;
46    let collection = collection_name(CODE_SYMBOL_COLLECTION_PREFIX, project_id)?;
47    delete_qdrant_collection(&client, qdrant, &collection)
48}
49
50pub fn delete_file_vectors(
51    qdrant: &QdrantConfig,
52    project_id: &str,
53    file_path: &str,
54) -> Result<usize, VectorLifecycleError> {
55    let client = qdrant_http_client()?;
56    let collection = collection_name(CODE_SYMBOL_COLLECTION_PREFIX, project_id)?;
57    delete_vectors_for_filter(&client, qdrant, &collection, project_id, Some(file_path))
58}
59
60pub fn cleanup_orphan_file_vectors(
61    qdrant: &QdrantConfig,
62    project_id: &str,
63    indexed_file_paths: &HashSet<String>,
64) -> Result<VectorOrphanCleanup, VectorLifecycleError> {
65    let vector_file_paths = list_project_vector_file_paths(qdrant, project_id)?;
66    let collection = collection_name(CODE_SYMBOL_COLLECTION_PREFIX, project_id)?;
67    let mut orphan_files_deleted = 0;
68    let mut vectors_deleted = 0;
69
70    for file_path in vector_file_paths
71        .iter()
72        .filter(|file_path| !indexed_file_paths.contains(*file_path))
73    {
74        orphan_files_deleted += 1;
75        vectors_deleted += delete_file_vectors(qdrant, project_id, file_path)?;
76    }
77
78    Ok(VectorOrphanCleanup {
79        project_id: project_id.to_string(),
80        collection,
81        vector_files_scanned: vector_file_paths.len(),
82        orphan_files_deleted,
83        vectors_deleted,
84    })
85}
86
87pub(super) fn list_project_vector_file_paths(
88    qdrant: &QdrantConfig,
89    project_id: &str,
90) -> Result<BTreeSet<String>, VectorLifecycleError> {
91    let client = qdrant_http_client()?;
92    let collection = collection_name(CODE_SYMBOL_COLLECTION_PREFIX, project_id)?;
93    let mut file_paths = BTreeSet::new();
94    let mut offset = None;
95
96    loop {
97        let mut body = json!({
98            "filter": {
99                "must": [
100                    {
101                        "key": "project_id",
102                        "match": {"value": project_id},
103                    },
104                ],
105            },
106            "with_payload": ["project_id", "file_path"],
107            "with_vector": false,
108            "limit": QDRANT_SCROLL_LIMIT,
109        });
110        if let Some(next_offset) = offset.take() {
111            body["offset"] = next_offset;
112        }
113
114        let resp = qdrant_request_for_config(
115            &client,
116            qdrant,
117            reqwest::Method::POST,
118            &format!("{}/points/scroll", collection_path(&collection)),
119        )?
120        .json(&body)
121        .send()
122        .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
123        let status = resp.status();
124        if status == StatusCode::NOT_FOUND {
125            return Ok(file_paths);
126        }
127        if !status.is_success() {
128            return Err(qdrant_http_error("scroll points", status, resp));
129        }
130
131        let data: Value = resp
132            .json()
133            .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
134        collect_file_paths_from_scroll_page(&data, &mut file_paths)?;
135        offset = data
136            .pointer("/result/next_page_offset")
137            .filter(|offset| !offset.is_null())
138            .cloned();
139        if offset.is_none() {
140            return Ok(file_paths);
141        }
142    }
143}
144
145fn collect_file_paths_from_scroll_page(
146    data: &Value,
147    file_paths: &mut BTreeSet<String>,
148) -> Result<(), VectorLifecycleError> {
149    let points = data
150        .pointer("/result/points")
151        .and_then(Value::as_array)
152        .ok_or_else(|| {
153            VectorLifecycleError::QdrantOperation(
154                "missing result.points in Qdrant scroll response".to_string(),
155            )
156        })?;
157    for point in points {
158        if let Some(file_path) = point.pointer("/payload/file_path").and_then(Value::as_str) {
159            file_paths.insert(file_path.to_string());
160        }
161    }
162    Ok(())
163}
164
165pub fn delete_code_symbol_collections_with_prefix(
166    qdrant: &QdrantConfig,
167) -> Result<Vec<String>, VectorLifecycleError> {
168    let client = qdrant_http_client()?;
169    let resp = qdrant_request_for_config(&client, qdrant, reqwest::Method::GET, "/collections")?
170        .send()
171        .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
172    let status = resp.status();
173    if !status.is_success() {
174        return Err(qdrant_http_error("list collections", status, resp));
175    }
176
177    let data: Value = resp
178        .json()
179        .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
180    let collections = parse_collection_names(&data)
181        .into_iter()
182        .filter(|name| name.starts_with(CODE_SYMBOL_COLLECTION_PREFIX))
183        .collect::<Vec<_>>();
184
185    let mut deleted = Vec::new();
186    for collection in collections {
187        if delete_qdrant_collection(&client, qdrant, &collection)? > 0 {
188            deleted.push(collection);
189        }
190    }
191    Ok(deleted)
192}
193
194pub fn vector_search(
195    config: &QdrantConfig,
196    collection: &str,
197    query_vector: &[f32],
198    limit: usize,
199) -> anyhow::Result<Vec<(String, f64)>> {
200    let request = SearchRequest {
201        vector: query_vector.to_vec(),
202        limit,
203        filter: None,
204    };
205    let (hits, state) = gobby_core::qdrant::with_qdrant(Some(config), Vec::new(), |config| {
206        gobby_core::qdrant::search(config, collection, request)
207    })?;
208    if let Some(message) = vector_search_degradation_warning(&state) {
209        log::warn!("{message}");
210    }
211    Ok(hits
212        .into_iter()
213        .map(|hit| (hit.id, f64::from(hit.score)))
214        .collect())
215}
216
217fn vector_search_degradation_warning(state: &ServiceState) -> Option<String> {
218    match state {
219        ServiceState::Available => None,
220        ServiceState::NotConfigured => {
221            Some("semantic vector search skipped: Qdrant is not configured".to_string())
222        }
223        ServiceState::Unreachable { message } => Some(format!(
224            "semantic vector search skipped: Qdrant is unreachable: {message}"
225        )),
226    }
227}
228
229pub(super) fn parse_collection_schema(data: &Value) -> Option<ExistingVectorCollectionSchema> {
230    let vectors = data.pointer("/result/config/params/vectors")?;
231    let size = vectors
232        .get("size")
233        .and_then(Value::as_u64)
234        .and_then(|size| usize::try_from(size).ok());
235    let distance = vectors
236        .get("distance")
237        .and_then(Value::as_str)
238        .map(str::to_string);
239    Some(ExistingVectorCollectionSchema { size, distance })
240}
241
242fn parse_collection_names(data: &Value) -> Vec<String> {
243    data.pointer("/result/collections")
244        .and_then(Value::as_array)
245        .map(|collections| {
246            collections
247                .iter()
248                .filter_map(|collection| {
249                    collection
250                        .get("name")
251                        .and_then(Value::as_str)
252                        .map(str::to_string)
253                })
254                .collect()
255        })
256        .unwrap_or_default()
257}
258
259fn parse_points_count(data: &Value) -> Result<usize, VectorLifecycleError> {
260    data.pointer("/result/count")
261        .and_then(Value::as_u64)
262        .and_then(|count| usize::try_from(count).ok())
263        .ok_or_else(|| {
264            VectorLifecycleError::QdrantOperation(
265                "count points response did not include result.count".to_string(),
266            )
267        })
268}
269
270fn qdrant_http_client() -> Result<reqwest::blocking::Client, VectorLifecycleError> {
271    if let Some(client) = QDRANT_HTTP_CLIENT.get() {
272        return Ok(client.clone());
273    }
274    let client = reqwest::blocking::Client::builder()
275        .timeout(qdrant_delete_timeout())
276        .build()
277        .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
278    let _ = QDRANT_HTTP_CLIENT.set(client.clone());
279    Ok(client)
280}
281
282fn qdrant_delete_timeout() -> Duration {
283    std::env::var(QDRANT_DELETE_TIMEOUT_SECS_ENV)
284        .ok()
285        .and_then(|value| value.parse::<u64>().ok())
286        .filter(|seconds| *seconds > 0)
287        .map(Duration::from_secs)
288        .unwrap_or(DEFAULT_QDRANT_DELETE_TIMEOUT)
289}
290
291pub(super) fn qdrant_request_for_config(
292    client: &reqwest::blocking::Client,
293    qdrant: &QdrantConfig,
294    method: reqwest::Method,
295    path: &str,
296) -> Result<reqwest::blocking::RequestBuilder, VectorLifecycleError> {
297    let base = qdrant
298        .url
299        .as_deref()
300        .map(str::trim)
301        .filter(|url| !url.is_empty())
302        .ok_or(VectorLifecycleError::MissingQdrantConfig)?
303        .trim_end_matches('/');
304    let url = format!("{base}{path}");
305    let mut req = client.request(method, url);
306    if let Some(key) = &qdrant.api_key {
307        req = req.header("api-key", key);
308    }
309    Ok(req)
310}
311
312fn delete_qdrant_collection(
313    client: &reqwest::blocking::Client,
314    qdrant: &QdrantConfig,
315    collection: &str,
316) -> Result<usize, VectorLifecycleError> {
317    let resp = qdrant_request_for_config(
318        client,
319        qdrant,
320        reqwest::Method::DELETE,
321        &collection_path(collection),
322    )?
323    .send()
324    .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
325    let status = resp.status();
326    if status == StatusCode::NOT_FOUND {
327        return Ok(0);
328    }
329    if !status.is_success() {
330        return Err(qdrant_http_error("delete collection", status, resp));
331    }
332    Ok(1)
333}
334
335pub(super) fn delete_vectors_for_filter(
336    client: &reqwest::blocking::Client,
337    qdrant: &QdrantConfig,
338    collection: &str,
339    project_id: &str,
340    file_path: Option<&str>,
341) -> Result<usize, VectorLifecycleError> {
342    delete_vectors_for_filter_excluding_ids(client, qdrant, collection, project_id, file_path, &[])
343}
344
345pub(super) fn delete_vectors_for_filter_excluding_ids(
346    client: &reqwest::blocking::Client,
347    qdrant: &QdrantConfig,
348    collection: &str,
349    project_id: &str,
350    file_path: Option<&str>,
351    keep_point_ids: &[String],
352) -> Result<usize, VectorLifecycleError> {
353    let mut must = vec![json!({
354        "key": "project_id",
355        "match": {"value": project_id},
356    })];
357    if let Some(file_path) = file_path {
358        must.push(json!({
359            "key": "file_path",
360            "match": {"value": file_path},
361        }));
362    }
363    let mut filter_object = serde_json::Map::new();
364    filter_object.insert("must".to_string(), Value::Array(must));
365    if !keep_point_ids.is_empty() {
366        filter_object.insert(
367            "must_not".to_string(),
368            json!([{ "has_id": keep_point_ids }]),
369        );
370    }
371    let filter = Value::Object(filter_object);
372    let count_body = json!({ "filter": filter.clone(), "exact": true });
373    let resp = qdrant_request_for_config(
374        client,
375        qdrant,
376        reqwest::Method::POST,
377        &format!("{}/points/count", collection_path(collection)),
378    )?
379    .json(&count_body)
380    .send()
381    .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
382    let status = resp.status();
383    if status == StatusCode::NOT_FOUND {
384        return Ok(0);
385    }
386    if !status.is_success() {
387        return Err(qdrant_http_error("count points", status, resp));
388    }
389    let data: Value = resp
390        .json()
391        .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
392    let count = parse_points_count(&data)?;
393    if count == 0 {
394        return Ok(0);
395    }
396
397    let body = json!({ "filter": filter });
398    let resp = qdrant_request_for_config(
399        client,
400        qdrant,
401        reqwest::Method::POST,
402        &format!("{}/points/delete?wait=true", collection_path(collection)),
403    )?
404    .json(&body)
405    .send()
406    .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
407    let status = resp.status();
408    if status == StatusCode::NOT_FOUND {
409        return Ok(0);
410    }
411    if !status.is_success() {
412        return Err(qdrant_http_error("delete points", status, resp));
413    }
414    Ok(count)
415}
416
417pub(super) fn qdrant_http_error(
418    operation: &'static str,
419    status: StatusCode,
420    resp: reqwest::blocking::Response,
421) -> VectorLifecycleError {
422    VectorLifecycleError::QdrantHttp {
423        operation,
424        status: status.as_u16(),
425        body: resp.text().unwrap_or_default(),
426    }
427}
428
429#[cfg(test)]
430mod tests {
431    use super::*;
432
433    #[test]
434    fn vector_search_degradation_warning_mentions_missing_qdrant_config() {
435        assert_eq!(
436            vector_search_degradation_warning(&ServiceState::NotConfigured),
437            Some("semantic vector search skipped: Qdrant is not configured".to_string())
438        );
439    }
440
441    #[test]
442    fn vector_search_degradation_warning_mentions_unreachable_qdrant() {
443        assert_eq!(
444            vector_search_degradation_warning(&ServiceState::Unreachable {
445                message: "connection refused".to_string()
446            }),
447            Some(
448                "semantic vector search skipped: Qdrant is unreachable: connection refused"
449                    .to_string()
450            )
451        );
452    }
453}