Skip to main content

gobby_code/vector/code_symbols/
lifecycle.rs

1use std::time::Duration;
2
3use reqwest::StatusCode;
4use serde_json::{Map, Value, json};
5
6use crate::config::{CODE_SYMBOL_COLLECTION_PREFIX, CodeVectorSettings, QdrantConfig};
7use crate::models::Symbol;
8use gobby_core::degradation::ServiceState;
9use gobby_core::qdrant::UpsertRequest;
10
11use super::embedding::{
12    EmbeddingBackend, EmbeddingSource, dimension_probe_text, vector_text_for_symbol,
13};
14use super::qdrant::{
15    VECTOR_DISTANCE_COSINE, collection_name, collection_path, delete_vectors_for_filter,
16    delete_vectors_for_filter_excluding_ids, parse_collection_schema, qdrant_http_error,
17    qdrant_request_for_config,
18};
19use super::types::{
20    CodeSymbolVectorLifecycleAction, CodeSymbolVectorLifecycleOutput,
21    CodeSymbolVectorLifecycleStatus, CodeSymbolVectorPayload, ExistingVectorCollectionSchema,
22    VectorCollectionSchema, VectorLifecycleError,
23};
24
25const QDRANT_LIFECYCLE_TIMEOUT: Duration = Duration::from_secs(10);
26
27#[derive(Debug)]
28pub struct CodeSymbolVectorLifecycle {
29    project_id: String,
30    collection: String,
31    qdrant: QdrantConfig,
32    embedding: EmbeddingBackend,
33    settings: CodeVectorSettings,
34    probed_vector_size: Option<usize>,
35    client: reqwest::blocking::Client,
36}
37
38pub fn resolve_lifecycle_qdrant_config(
39    source: &mut impl gobby_core::config::ConfigSource,
40) -> Option<QdrantConfig> {
41    gobby_core::config::resolve_qdrant_config(source)
42}
43
44pub fn lifecycle_status(
45    project_id: impl Into<String>,
46    collection_prefix: &str,
47    action: CodeSymbolVectorLifecycleAction,
48) -> CodeSymbolVectorLifecycleStatus {
49    let project_id = project_id.into();
50    CodeSymbolVectorLifecycleStatus {
51        collection: collection_name(collection_prefix, &project_id),
52        project_id,
53        action,
54    }
55}
56
57impl CodeSymbolVectorLifecycle {
58    pub fn new(
59        project_id: String,
60        qdrant: QdrantConfig,
61        embedding: impl Into<EmbeddingSource>,
62        settings: CodeVectorSettings,
63    ) -> Result<Self, VectorLifecycleError> {
64        Self::require_qdrant_boundary_config(&qdrant)?;
65
66        let collection = collection_name(CODE_SYMBOL_COLLECTION_PREFIX, &project_id);
67        let embedding = EmbeddingBackend::new(embedding.into())?;
68        let client = reqwest::blocking::Client::builder()
69            .timeout(QDRANT_LIFECYCLE_TIMEOUT)
70            .build()
71            .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
72        Ok(Self {
73            project_id,
74            collection,
75            qdrant,
76            embedding,
77            settings,
78            probed_vector_size: None,
79            client,
80        })
81    }
82
83    pub fn collection(&self) -> &str {
84        &self.collection
85    }
86
87    pub fn ensure_collection(&mut self) -> Result<VectorCollectionSchema, VectorLifecycleError> {
88        let expected = self.expected_schema()?;
89        self.require_qdrant_boundary()?;
90        match self.get_collection_schema()? {
91            Some(found) => self.ensure_compatible_schema(expected, found),
92            None => {
93                self.create_collection(&expected)?;
94                Ok(expected)
95            }
96        }
97    }
98
99    pub fn sync_file_symbols(
100        &mut self,
101        file_path: &str,
102        symbols: &[Symbol],
103    ) -> Result<CodeSymbolVectorLifecycleOutput, VectorLifecycleError> {
104        let schema = self.ensure_collection()?;
105        let points = self.points_for_symbols(symbols, schema.size)?;
106        let point_ids = point_ids(&points);
107        let vectors_upserted = self.upsert_points(points)?;
108        let delete_operations_issued = self.delete_stale_vectors(Some(file_path), &point_ids)?;
109
110        Ok(self.output(
111            CodeSymbolVectorLifecycleAction::SyncFile,
112            Some(file_path.to_string()),
113            symbols.len(),
114            vectors_upserted,
115            delete_operations_issued,
116        ))
117    }
118
119    pub fn clear_project_vectors(
120        &mut self,
121    ) -> Result<CodeSymbolVectorLifecycleOutput, VectorLifecycleError> {
122        self.require_qdrant_boundary()?;
123        let deleted = match self.get_collection_schema()? {
124            Some(found) => {
125                if let Some(size) = self.settings.vector_dim {
126                    self.ensure_compatible_schema(
127                        VectorCollectionSchema {
128                            size,
129                            distance: VECTOR_DISTANCE_COSINE.to_string(),
130                        },
131                        found,
132                    )?;
133                }
134                self.delete_vectors(None)?
135            }
136            None => 0,
137        };
138
139        Ok(self.output(CodeSymbolVectorLifecycleAction::Clear, None, 0, 0, deleted))
140    }
141
142    pub fn rebuild_symbols(
143        &mut self,
144        symbols: &[Symbol],
145    ) -> Result<CodeSymbolVectorLifecycleOutput, VectorLifecycleError> {
146        let schema = self.ensure_collection()?;
147        let points = self.points_for_symbols(symbols, schema.size)?;
148        let point_ids = point_ids(&points);
149        let vectors_upserted = self.upsert_points(points)?;
150        let delete_operations_issued = self.delete_stale_vectors(None, &point_ids)?;
151
152        Ok(self.output(
153            CodeSymbolVectorLifecycleAction::Rebuild,
154            None,
155            symbols.len(),
156            vectors_upserted,
157            delete_operations_issued,
158        ))
159    }
160
161    fn output(
162        &self,
163        action: CodeSymbolVectorLifecycleAction,
164        file_path: Option<String>,
165        symbols: usize,
166        vectors_upserted: usize,
167        delete_operations_issued: usize,
168    ) -> CodeSymbolVectorLifecycleOutput {
169        CodeSymbolVectorLifecycleOutput {
170            project_id: self.project_id.clone(),
171            collection: self.collection.clone(),
172            action,
173            file_path,
174            symbols,
175            vectors_upserted,
176            delete_operations_issued,
177            summary: format!(
178                "{vectors_upserted} vector(s) upserted, {delete_operations_issued} delete operation(s) issued"
179            ),
180        }
181    }
182
183    fn expected_schema(&mut self) -> Result<VectorCollectionSchema, VectorLifecycleError> {
184        let size = match self.settings.vector_dim {
185            Some(size) => size,
186            None => match self.probed_vector_size {
187                Some(size) => size,
188                None => {
189                    let size = self.embedding.embed_text(dimension_probe_text())?.len();
190                    self.probed_vector_size = Some(size);
191                    size
192                }
193            },
194        };
195
196        Ok(VectorCollectionSchema {
197            size,
198            distance: VECTOR_DISTANCE_COSINE.to_string(),
199        })
200    }
201
202    fn require_qdrant_boundary(&self) -> Result<(), VectorLifecycleError> {
203        Self::require_qdrant_boundary_config(&self.qdrant)
204    }
205
206    fn require_qdrant_boundary_config(qdrant: &QdrantConfig) -> Result<(), VectorLifecycleError> {
207        let ((), state) = gobby_core::qdrant::with_qdrant(Some(qdrant), (), |_| Ok(()))
208            .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
209        match state {
210            ServiceState::Available => Ok(()),
211            ServiceState::NotConfigured => Err(VectorLifecycleError::MissingQdrantConfig),
212            other => Err(VectorLifecycleError::QdrantOperation(format!(
213                "unexpected Qdrant service state: {other:?}"
214            ))),
215        }
216    }
217
218    fn ensure_compatible_schema(
219        &self,
220        expected: VectorCollectionSchema,
221        found: ExistingVectorCollectionSchema,
222    ) -> Result<VectorCollectionSchema, VectorLifecycleError> {
223        if found.size == Some(expected.size)
224            && found.distance.as_deref() == Some(&expected.distance)
225        {
226            return Ok(VectorCollectionSchema {
227                size: expected.size,
228                distance: expected.distance,
229            });
230        }
231
232        Err(VectorLifecycleError::DimensionMismatch {
233            collection: self.collection.clone(),
234            expected_size: expected.size,
235            found_size: found.size,
236            expected_distance: VECTOR_DISTANCE_COSINE,
237            found_distance: found.distance,
238        })
239    }
240
241    fn get_collection_schema(
242        &self,
243    ) -> Result<Option<ExistingVectorCollectionSchema>, VectorLifecycleError> {
244        let resp = self
245            .qdrant_request(reqwest::Method::GET, &collection_path(&self.collection))?
246            .send()
247            .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
248        let status = resp.status();
249        if status == StatusCode::NOT_FOUND {
250            return Ok(None);
251        }
252        if !status.is_success() {
253            return Err(qdrant_http_error("get collection", status, resp));
254        }
255
256        let data: Value = resp
257            .json()
258            .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
259        Ok(parse_collection_schema(&data))
260    }
261
262    fn create_collection(
263        &self,
264        schema: &VectorCollectionSchema,
265    ) -> Result<(), VectorLifecycleError> {
266        let body = json!({
267            "vectors": {
268                "size": schema.size,
269                "distance": schema.distance,
270            },
271        });
272        let resp = self
273            .qdrant_request(reqwest::Method::PUT, &collection_path(&self.collection))?
274            .json(&body)
275            .send()
276            .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
277        if !resp.status().is_success() {
278            return Err(qdrant_http_error("create collection", resp.status(), resp));
279        }
280        Ok(())
281    }
282
283    fn delete_vectors(&self, file_path: Option<&str>) -> Result<usize, VectorLifecycleError> {
284        delete_vectors_for_filter(
285            &self.client,
286            &self.qdrant,
287            &self.collection,
288            &self.project_id,
289            file_path,
290        )
291    }
292
293    fn delete_stale_vectors(
294        &self,
295        file_path: Option<&str>,
296        keep_point_ids: &[String],
297    ) -> Result<usize, VectorLifecycleError> {
298        delete_vectors_for_filter_excluding_ids(
299            &self.client,
300            &self.qdrant,
301            &self.collection,
302            &self.project_id,
303            file_path,
304            keep_point_ids,
305        )
306    }
307
308    fn upsert_points(&self, points: Vec<UpsertRequest>) -> Result<usize, VectorLifecycleError> {
309        if points.is_empty() {
310            return Ok(0);
311        }
312        let requested = points.len();
313        let (_result, state) =
314            gobby_core::qdrant::with_qdrant(Some(&self.qdrant), None, |config| {
315                gobby_core::qdrant::upsert(config, &self.collection, points).map(Some)
316            })
317            .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
318        match state {
319            ServiceState::Available => Ok(requested),
320            ServiceState::NotConfigured => Err(VectorLifecycleError::MissingQdrantConfig),
321            other => Err(VectorLifecycleError::QdrantOperation(format!(
322                "unexpected Qdrant service state: {other:?}"
323            ))),
324        }
325    }
326
327    fn points_for_symbols(
328        &self,
329        symbols: &[Symbol],
330        expected_vector_size: usize,
331    ) -> Result<Vec<UpsertRequest>, VectorLifecycleError> {
332        if symbols.is_empty() {
333            return Ok(Vec::new());
334        }
335
336        let texts = symbols
337            .iter()
338            .map(vector_text_for_symbol)
339            .collect::<Vec<_>>();
340        let vectors = self.embedding.embed_text_batch(&texts)?;
341        if vectors.len() != symbols.len() {
342            return Err(VectorLifecycleError::EmbeddingResponse(format!(
343                "embedding batch returned {} vector(s) for {} symbol(s)",
344                vectors.len(),
345                symbols.len()
346            )));
347        }
348        symbols
349            .iter()
350            .zip(vectors)
351            .map(|(symbol, vector)| {
352                if vector.len() != expected_vector_size {
353                    return Err(VectorLifecycleError::EmbeddingResponse(format!(
354                        "embedding for symbol {} returned {} dimension(s), expected {}",
355                        symbol.id,
356                        vector.len(),
357                        expected_vector_size
358                    )));
359                }
360                let payload = payload_map(CodeSymbolVectorPayload::from_symbol(symbol))?;
361                Ok(UpsertRequest {
362                    id: symbol.id.clone(),
363                    vector,
364                    payload,
365                })
366            })
367            .collect()
368    }
369
370    fn qdrant_request(
371        &self,
372        method: reqwest::Method,
373        path: &str,
374    ) -> Result<reqwest::blocking::RequestBuilder, VectorLifecycleError> {
375        qdrant_request_for_config(&self.client, &self.qdrant, method, path)
376    }
377}
378
379fn payload_map(
380    payload: CodeSymbolVectorPayload,
381) -> Result<Map<String, Value>, VectorLifecycleError> {
382    match serde_json::to_value(payload)
383        .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?
384    {
385        Value::Object(map) => Ok(map),
386        _ => Err(VectorLifecycleError::QdrantOperation(
387            "vector payload did not serialize to an object".to_string(),
388        )),
389    }
390}
391
392fn point_ids(points: &[UpsertRequest]) -> Vec<String> {
393    points.iter().map(|point| point.id.clone()).collect()
394}