1use reqwest::StatusCode;
2use serde_json::{Value, json};
3use std::collections::{BTreeSet, HashSet};
4use std::sync::OnceLock;
5use std::time::Duration;
6
7use crate::config::{CODE_SYMBOL_COLLECTION_PREFIX, QdrantConfig};
8use gobby_core::degradation::ServiceState;
9use gobby_core::qdrant::{CollectionNameError, CollectionScope, SearchRequest};
10
11use super::types::{ExistingVectorCollectionSchema, VectorLifecycleError};
12
13pub const VECTOR_DISTANCE_COSINE: &str = "Cosine";
15const QDRANT_DELETE_TIMEOUT_SECS_ENV: &str = "GCODE_QDRANT_DELETE_TIMEOUT_SECS";
16const DEFAULT_QDRANT_DELETE_TIMEOUT: Duration = Duration::from_secs(60);
17const QDRANT_SCROLL_LIMIT: usize = 256;
18static QDRANT_HTTP_CLIENT: OnceLock<reqwest::blocking::Client> = OnceLock::new();
19
20#[derive(Debug, Clone, PartialEq, Eq)]
21pub struct VectorOrphanCleanup {
22 pub project_id: String,
23 pub collection: String,
24 pub vector_files_scanned: usize,
25 pub orphan_files_deleted: usize,
26 pub vectors_deleted: usize,
27}
28
29pub fn collection_name(
30 collection_prefix: &str,
31 project_id: &str,
32) -> Result<String, CollectionNameError> {
33 let collection = format!("{collection_prefix}{project_id}");
34 gobby_core::qdrant::collection_name("gcode", CollectionScope::Custom(&collection))
35}
36
37pub(super) fn collection_path(collection: &str) -> String {
38 format!("/collections/{}", urlencoding::encode(collection))
39}
40
41pub fn delete_project_collection(
42 qdrant: &QdrantConfig,
43 project_id: &str,
44) -> Result<usize, VectorLifecycleError> {
45 let client = qdrant_http_client()?;
46 let collection = collection_name(CODE_SYMBOL_COLLECTION_PREFIX, project_id)?;
47 delete_qdrant_collection(&client, qdrant, &collection)
48}
49
50pub fn delete_file_vectors(
51 qdrant: &QdrantConfig,
52 project_id: &str,
53 file_path: &str,
54) -> Result<usize, VectorLifecycleError> {
55 let client = qdrant_http_client()?;
56 let collection = collection_name(CODE_SYMBOL_COLLECTION_PREFIX, project_id)?;
57 delete_vectors_for_filter(&client, qdrant, &collection, project_id, Some(file_path))
58}
59
60pub fn cleanup_orphan_file_vectors(
61 qdrant: &QdrantConfig,
62 project_id: &str,
63 indexed_file_paths: &HashSet<String>,
64) -> Result<VectorOrphanCleanup, VectorLifecycleError> {
65 let vector_file_paths = list_project_vector_file_paths(qdrant, project_id)?;
66 let collection = collection_name(CODE_SYMBOL_COLLECTION_PREFIX, project_id)?;
67 let mut orphan_files_deleted = 0;
68 let mut vectors_deleted = 0;
69
70 for file_path in vector_file_paths
71 .iter()
72 .filter(|file_path| !indexed_file_paths.contains(*file_path))
73 {
74 orphan_files_deleted += 1;
75 vectors_deleted += delete_file_vectors(qdrant, project_id, file_path)?;
76 }
77
78 Ok(VectorOrphanCleanup {
79 project_id: project_id.to_string(),
80 collection,
81 vector_files_scanned: vector_file_paths.len(),
82 orphan_files_deleted,
83 vectors_deleted,
84 })
85}
86
87pub(super) fn list_project_vector_file_paths(
88 qdrant: &QdrantConfig,
89 project_id: &str,
90) -> Result<BTreeSet<String>, VectorLifecycleError> {
91 let client = qdrant_http_client()?;
92 let collection = collection_name(CODE_SYMBOL_COLLECTION_PREFIX, project_id)?;
93 let mut file_paths = BTreeSet::new();
94 let mut offset = None;
95
96 loop {
97 let mut body = json!({
98 "filter": {
99 "must": [
100 {
101 "key": "project_id",
102 "match": {"value": project_id},
103 },
104 ],
105 },
106 "with_payload": ["project_id", "file_path"],
107 "with_vector": false,
108 "limit": QDRANT_SCROLL_LIMIT,
109 });
110 if let Some(next_offset) = offset.take() {
111 body["offset"] = next_offset;
112 }
113
114 let resp = qdrant_request_for_config(
115 &client,
116 qdrant,
117 reqwest::Method::POST,
118 &format!("{}/points/scroll", collection_path(&collection)),
119 )?
120 .json(&body)
121 .send()
122 .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
123 let status = resp.status();
124 if status == StatusCode::NOT_FOUND {
125 return Ok(file_paths);
126 }
127 if !status.is_success() {
128 return Err(qdrant_http_error("scroll points", status, resp));
129 }
130
131 let data: Value = resp
132 .json()
133 .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
134 collect_file_paths_from_scroll_page(&data, &mut file_paths)?;
135 offset = data
136 .pointer("/result/next_page_offset")
137 .filter(|offset| !offset.is_null())
138 .cloned();
139 if offset.is_none() {
140 return Ok(file_paths);
141 }
142 }
143}
144
145fn collect_file_paths_from_scroll_page(
146 data: &Value,
147 file_paths: &mut BTreeSet<String>,
148) -> Result<(), VectorLifecycleError> {
149 let points = data
150 .pointer("/result/points")
151 .and_then(Value::as_array)
152 .ok_or_else(|| {
153 VectorLifecycleError::QdrantOperation(
154 "missing result.points in Qdrant scroll response".to_string(),
155 )
156 })?;
157 for point in points {
158 if let Some(file_path) = point.pointer("/payload/file_path").and_then(Value::as_str) {
159 file_paths.insert(file_path.to_string());
160 }
161 }
162 Ok(())
163}
164
165pub fn delete_code_symbol_collections_with_prefix(
166 qdrant: &QdrantConfig,
167) -> Result<Vec<String>, VectorLifecycleError> {
168 let client = qdrant_http_client()?;
169 let resp = qdrant_request_for_config(&client, qdrant, reqwest::Method::GET, "/collections")?
170 .send()
171 .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
172 let status = resp.status();
173 if !status.is_success() {
174 return Err(qdrant_http_error("list collections", status, resp));
175 }
176
177 let data: Value = resp
178 .json()
179 .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
180 let collections = parse_collection_names(&data)
181 .into_iter()
182 .filter(|name| name.starts_with(CODE_SYMBOL_COLLECTION_PREFIX))
183 .collect::<Vec<_>>();
184
185 let mut deleted = Vec::new();
186 for collection in collections {
187 if delete_qdrant_collection(&client, qdrant, &collection)? > 0 {
188 deleted.push(collection);
189 }
190 }
191 Ok(deleted)
192}
193
194pub fn vector_search(
195 config: &QdrantConfig,
196 collection: &str,
197 query_vector: &[f32],
198 limit: usize,
199) -> anyhow::Result<Vec<(String, f64)>> {
200 let request = SearchRequest {
201 vector: query_vector.to_vec(),
202 limit,
203 filter: None,
204 };
205 let (hits, state) = gobby_core::qdrant::with_qdrant(Some(config), Vec::new(), |config| {
206 gobby_core::qdrant::search(config, collection, request)
207 })?;
208 if let Some(message) = vector_search_degradation_warning(&state) {
209 log::warn!("{message}");
210 }
211 Ok(hits
212 .into_iter()
213 .map(|hit| (hit.id, f64::from(hit.score)))
214 .collect())
215}
216
217fn vector_search_degradation_warning(state: &ServiceState) -> Option<String> {
218 match state {
219 ServiceState::Available => None,
220 ServiceState::NotConfigured => {
221 Some("semantic vector search skipped: Qdrant is not configured".to_string())
222 }
223 ServiceState::Unreachable { message } => Some(format!(
224 "semantic vector search skipped: Qdrant is unreachable: {message}"
225 )),
226 }
227}
228
229pub(super) fn parse_collection_schema(data: &Value) -> Option<ExistingVectorCollectionSchema> {
230 let vectors = data.pointer("/result/config/params/vectors")?;
231 let size = vectors
232 .get("size")
233 .and_then(Value::as_u64)
234 .and_then(|size| usize::try_from(size).ok());
235 let distance = vectors
236 .get("distance")
237 .and_then(Value::as_str)
238 .map(str::to_string);
239 Some(ExistingVectorCollectionSchema { size, distance })
240}
241
242fn parse_collection_names(data: &Value) -> Vec<String> {
243 data.pointer("/result/collections")
244 .and_then(Value::as_array)
245 .map(|collections| {
246 collections
247 .iter()
248 .filter_map(|collection| {
249 collection
250 .get("name")
251 .and_then(Value::as_str)
252 .map(str::to_string)
253 })
254 .collect()
255 })
256 .unwrap_or_default()
257}
258
259fn parse_points_count(data: &Value) -> Result<usize, VectorLifecycleError> {
260 data.pointer("/result/count")
261 .and_then(Value::as_u64)
262 .and_then(|count| usize::try_from(count).ok())
263 .ok_or_else(|| {
264 VectorLifecycleError::QdrantOperation(
265 "count points response did not include result.count".to_string(),
266 )
267 })
268}
269
270fn qdrant_http_client() -> Result<reqwest::blocking::Client, VectorLifecycleError> {
271 if let Some(client) = QDRANT_HTTP_CLIENT.get() {
272 return Ok(client.clone());
273 }
274 let client = reqwest::blocking::Client::builder()
275 .timeout(qdrant_delete_timeout())
276 .build()
277 .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
278 let _ = QDRANT_HTTP_CLIENT.set(client.clone());
279 Ok(client)
280}
281
282fn qdrant_delete_timeout() -> Duration {
283 std::env::var(QDRANT_DELETE_TIMEOUT_SECS_ENV)
284 .ok()
285 .and_then(|value| value.parse::<u64>().ok())
286 .filter(|seconds| *seconds > 0)
287 .map(Duration::from_secs)
288 .unwrap_or(DEFAULT_QDRANT_DELETE_TIMEOUT)
289}
290
291pub(super) fn qdrant_request_for_config(
292 client: &reqwest::blocking::Client,
293 qdrant: &QdrantConfig,
294 method: reqwest::Method,
295 path: &str,
296) -> Result<reqwest::blocking::RequestBuilder, VectorLifecycleError> {
297 let base = qdrant
298 .url
299 .as_deref()
300 .map(str::trim)
301 .filter(|url| !url.is_empty())
302 .ok_or(VectorLifecycleError::MissingQdrantConfig)?
303 .trim_end_matches('/');
304 let url = format!("{base}{path}");
305 let mut req = client.request(method, url);
306 if let Some(key) = &qdrant.api_key {
307 req = req.header("api-key", key);
308 }
309 Ok(req)
310}
311
312fn delete_qdrant_collection(
313 client: &reqwest::blocking::Client,
314 qdrant: &QdrantConfig,
315 collection: &str,
316) -> Result<usize, VectorLifecycleError> {
317 let resp = qdrant_request_for_config(
318 client,
319 qdrant,
320 reqwest::Method::DELETE,
321 &collection_path(collection),
322 )?
323 .send()
324 .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
325 let status = resp.status();
326 if status == StatusCode::NOT_FOUND {
327 return Ok(0);
328 }
329 if !status.is_success() {
330 return Err(qdrant_http_error("delete collection", status, resp));
331 }
332 Ok(1)
333}
334
335pub(super) fn delete_vectors_for_filter(
336 client: &reqwest::blocking::Client,
337 qdrant: &QdrantConfig,
338 collection: &str,
339 project_id: &str,
340 file_path: Option<&str>,
341) -> Result<usize, VectorLifecycleError> {
342 delete_vectors_for_filter_excluding_ids(client, qdrant, collection, project_id, file_path, &[])
343}
344
345pub(super) fn delete_vectors_for_filter_excluding_ids(
346 client: &reqwest::blocking::Client,
347 qdrant: &QdrantConfig,
348 collection: &str,
349 project_id: &str,
350 file_path: Option<&str>,
351 keep_point_ids: &[String],
352) -> Result<usize, VectorLifecycleError> {
353 let mut must = vec![json!({
354 "key": "project_id",
355 "match": {"value": project_id},
356 })];
357 if let Some(file_path) = file_path {
358 must.push(json!({
359 "key": "file_path",
360 "match": {"value": file_path},
361 }));
362 }
363 let mut filter_object = serde_json::Map::new();
364 filter_object.insert("must".to_string(), Value::Array(must));
365 if !keep_point_ids.is_empty() {
366 filter_object.insert(
367 "must_not".to_string(),
368 json!([{ "has_id": keep_point_ids }]),
369 );
370 }
371 let filter = Value::Object(filter_object);
372 let count_body = json!({ "filter": filter.clone(), "exact": true });
373 let resp = qdrant_request_for_config(
374 client,
375 qdrant,
376 reqwest::Method::POST,
377 &format!("{}/points/count", collection_path(collection)),
378 )?
379 .json(&count_body)
380 .send()
381 .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
382 let status = resp.status();
383 if status == StatusCode::NOT_FOUND {
384 return Ok(0);
385 }
386 if !status.is_success() {
387 return Err(qdrant_http_error("count points", status, resp));
388 }
389 let data: Value = resp
390 .json()
391 .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
392 let count = parse_points_count(&data)?;
393 if count == 0 {
394 return Ok(0);
395 }
396
397 let body = json!({ "filter": filter });
398 let resp = qdrant_request_for_config(
399 client,
400 qdrant,
401 reqwest::Method::POST,
402 &format!("{}/points/delete?wait=true", collection_path(collection)),
403 )?
404 .json(&body)
405 .send()
406 .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
407 let status = resp.status();
408 if status == StatusCode::NOT_FOUND {
409 return Ok(0);
410 }
411 if !status.is_success() {
412 return Err(qdrant_http_error("delete points", status, resp));
413 }
414 Ok(count)
415}
416
417pub(super) fn qdrant_http_error(
418 operation: &'static str,
419 status: StatusCode,
420 resp: reqwest::blocking::Response,
421) -> VectorLifecycleError {
422 VectorLifecycleError::QdrantHttp {
423 operation,
424 status: status.as_u16(),
425 body: resp.text().unwrap_or_default(),
426 }
427}
428
429#[cfg(test)]
430mod tests {
431 use super::*;
432
433 #[test]
434 fn vector_search_degradation_warning_mentions_missing_qdrant_config() {
435 assert_eq!(
436 vector_search_degradation_warning(&ServiceState::NotConfigured),
437 Some("semantic vector search skipped: Qdrant is not configured".to_string())
438 );
439 }
440
441 #[test]
442 fn vector_search_degradation_warning_mentions_unreachable_qdrant() {
443 assert_eq!(
444 vector_search_degradation_warning(&ServiceState::Unreachable {
445 message: "connection refused".to_string()
446 }),
447 Some(
448 "semantic vector search skipped: Qdrant is unreachable: connection refused"
449 .to_string()
450 )
451 );
452 }
453}