Skip to main content

gobby_code/vector/code_symbols/
lifecycle.rs

1use std::time::Duration;
2
3use reqwest::StatusCode;
4use serde_json::{Map, Value, json};
5
6use crate::config::{CODE_SYMBOL_COLLECTION_PREFIX, CodeVectorSettings, QdrantConfig};
7use crate::models::Symbol;
8use gobby_core::degradation::ServiceState;
9use gobby_core::qdrant::UpsertRequest;
10
11use super::embedding::{
12    EmbeddingBackend, EmbeddingSource, dimension_probe_text, vector_text_for_symbol,
13};
14use super::qdrant::{
15    VECTOR_DISTANCE_COSINE, collection_name, collection_path, delete_vectors_for_filter,
16    delete_vectors_for_filter_excluding_ids, parse_collection_schema, qdrant_http_error,
17    qdrant_request_for_config,
18};
19use super::types::{
20    CodeSymbolVectorLifecycleAction, CodeSymbolVectorLifecycleOutput,
21    CodeSymbolVectorLifecycleStatus, CodeSymbolVectorPayload, ExistingVectorCollectionSchema,
22    VectorCollectionSchema, VectorLifecycleError,
23};
24
25const QDRANT_LIFECYCLE_TIMEOUT: Duration = Duration::from_secs(10);
26const VECTOR_SYNC_BATCH_SIZE: usize = gobby_core::qdrant::DEFAULT_UPSERT_BATCH_SIZE;
27
28#[derive(Debug)]
29pub struct CodeSymbolVectorLifecycle {
30    project_id: String,
31    collection: String,
32    qdrant: QdrantConfig,
33    embedding: EmbeddingBackend,
34    settings: CodeVectorSettings,
35    probed_vector_size: Option<usize>,
36    client: reqwest::blocking::Client,
37}
38
39pub fn resolve_lifecycle_qdrant_config(
40    source: &mut impl gobby_core::config::ConfigSource,
41) -> Option<QdrantConfig> {
42    gobby_core::config::resolve_qdrant_config(source)
43}
44
45pub fn lifecycle_status(
46    project_id: impl Into<String>,
47    collection_prefix: &str,
48    action: CodeSymbolVectorLifecycleAction,
49) -> Result<CodeSymbolVectorLifecycleStatus, VectorLifecycleError> {
50    let project_id = project_id.into();
51    Ok(CodeSymbolVectorLifecycleStatus {
52        collection: collection_name(collection_prefix, &project_id)?,
53        project_id,
54        action,
55    })
56}
57
58impl CodeSymbolVectorLifecycle {
59    pub fn new(
60        project_id: String,
61        qdrant: QdrantConfig,
62        embedding: impl Into<EmbeddingSource>,
63        settings: CodeVectorSettings,
64    ) -> Result<Self, VectorLifecycleError> {
65        Self::require_qdrant_boundary_config(&qdrant)?;
66
67        let collection = collection_name(CODE_SYMBOL_COLLECTION_PREFIX, &project_id)?;
68        let embedding = EmbeddingBackend::new(embedding.into())?;
69        let client = reqwest::blocking::Client::builder()
70            .timeout(QDRANT_LIFECYCLE_TIMEOUT)
71            .build()
72            .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
73        Ok(Self {
74            project_id,
75            collection,
76            qdrant,
77            embedding,
78            settings,
79            probed_vector_size: None,
80            client,
81        })
82    }
83
84    pub fn collection(&self) -> &str {
85        &self.collection
86    }
87
88    pub fn ensure_collection(&mut self) -> Result<VectorCollectionSchema, VectorLifecycleError> {
89        let expected = self.expected_schema()?;
90        self.require_qdrant_boundary()?;
91        match self.get_collection_schema()? {
92            Some(found) => self.ensure_compatible_schema(expected, found),
93            None => {
94                self.create_collection(&expected)?;
95                Ok(expected)
96            }
97        }
98    }
99
100    pub fn sync_file_symbols(
101        &mut self,
102        file_path: &str,
103        symbols: &[Symbol],
104    ) -> Result<CodeSymbolVectorLifecycleOutput, VectorLifecycleError> {
105        let schema = self.ensure_collection()?;
106        let points = self.points_for_symbols(symbols, schema.size)?;
107        let point_ids = point_ids(&points);
108        let vectors_upserted = self.upsert_points(points)?;
109        let delete_operations_issued = self.delete_stale_vectors(Some(file_path), &point_ids)?;
110
111        Ok(self.output(
112            CodeSymbolVectorLifecycleAction::SyncFile,
113            Some(file_path.to_string()),
114            symbols.len(),
115            vectors_upserted,
116            delete_operations_issued,
117        ))
118    }
119
120    pub fn clear_project_vectors(
121        &mut self,
122    ) -> Result<CodeSymbolVectorLifecycleOutput, VectorLifecycleError> {
123        self.require_qdrant_boundary()?;
124        let deleted = match self.get_collection_schema()? {
125            Some(found) => {
126                if let Some(size) = self.settings.vector_dim {
127                    self.ensure_compatible_schema(
128                        VectorCollectionSchema {
129                            size,
130                            distance: VECTOR_DISTANCE_COSINE.to_string(),
131                        },
132                        found,
133                    )?;
134                }
135                self.delete_vectors(None)?
136            }
137            None => 0,
138        };
139
140        Ok(self.output(CodeSymbolVectorLifecycleAction::Clear, None, 0, 0, deleted))
141    }
142
143    pub fn rebuild_symbols(
144        &mut self,
145        symbols: &[Symbol],
146    ) -> Result<CodeSymbolVectorLifecycleOutput, VectorLifecycleError> {
147        let schema = self.ensure_collection()?;
148        let points = self.points_for_symbols(symbols, schema.size)?;
149        let point_ids = point_ids(&points);
150        let vectors_upserted = self.upsert_points(points)?;
151        let delete_operations_issued = self.delete_stale_vectors(None, &point_ids)?;
152
153        Ok(self.output(
154            CodeSymbolVectorLifecycleAction::Rebuild,
155            None,
156            symbols.len(),
157            vectors_upserted,
158            delete_operations_issued,
159        ))
160    }
161
162    fn output(
163        &self,
164        action: CodeSymbolVectorLifecycleAction,
165        file_path: Option<String>,
166        symbols: usize,
167        vectors_upserted: usize,
168        delete_operations_issued: usize,
169    ) -> CodeSymbolVectorLifecycleOutput {
170        CodeSymbolVectorLifecycleOutput {
171            project_id: self.project_id.clone(),
172            collection: self.collection.clone(),
173            action,
174            file_path,
175            symbols,
176            vectors_upserted,
177            delete_operations_issued,
178            summary: format!(
179                "{vectors_upserted} vector(s) upserted, {delete_operations_issued} delete operation(s) issued"
180            ),
181        }
182    }
183
184    fn expected_schema(&mut self) -> Result<VectorCollectionSchema, VectorLifecycleError> {
185        let size = match self.settings.vector_dim {
186            Some(size) => size,
187            None => match self.probed_vector_size {
188                Some(size) => size,
189                None => {
190                    let size = self.embedding.embed_text(dimension_probe_text())?.len();
191                    self.probed_vector_size = Some(size);
192                    size
193                }
194            },
195        };
196
197        Ok(VectorCollectionSchema {
198            size,
199            distance: VECTOR_DISTANCE_COSINE.to_string(),
200        })
201    }
202
203    fn require_qdrant_boundary(&self) -> Result<(), VectorLifecycleError> {
204        Self::require_qdrant_boundary_config(&self.qdrant)
205    }
206
207    fn require_qdrant_boundary_config(qdrant: &QdrantConfig) -> Result<(), VectorLifecycleError> {
208        let ((), state) = gobby_core::qdrant::with_qdrant(Some(qdrant), (), |_| Ok(()))
209            .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
210        match state {
211            ServiceState::Available => Ok(()),
212            ServiceState::NotConfigured => Err(VectorLifecycleError::MissingQdrantConfig),
213            other => Err(VectorLifecycleError::QdrantOperation(format!(
214                "unexpected Qdrant service state: {other:?}"
215            ))),
216        }
217    }
218
219    fn ensure_compatible_schema(
220        &self,
221        expected: VectorCollectionSchema,
222        found: ExistingVectorCollectionSchema,
223    ) -> Result<VectorCollectionSchema, VectorLifecycleError> {
224        if found.size == Some(expected.size)
225            && found.distance.as_deref() == Some(&expected.distance)
226        {
227            return Ok(VectorCollectionSchema {
228                size: expected.size,
229                distance: expected.distance,
230            });
231        }
232
233        Err(VectorLifecycleError::DimensionMismatch {
234            collection: self.collection.clone(),
235            expected_size: expected.size,
236            found_size: found.size,
237            expected_distance: VECTOR_DISTANCE_COSINE,
238            found_distance: found.distance,
239        })
240    }
241
242    fn get_collection_schema(
243        &self,
244    ) -> Result<Option<ExistingVectorCollectionSchema>, VectorLifecycleError> {
245        let resp = self
246            .qdrant_request(reqwest::Method::GET, &collection_path(&self.collection))?
247            .send()
248            .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
249        let status = resp.status();
250        if status == StatusCode::NOT_FOUND {
251            return Ok(None);
252        }
253        if !status.is_success() {
254            return Err(qdrant_http_error("get collection", status, resp));
255        }
256
257        let data: Value = resp
258            .json()
259            .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
260        Ok(parse_collection_schema(&data))
261    }
262
263    fn create_collection(
264        &self,
265        schema: &VectorCollectionSchema,
266    ) -> Result<(), VectorLifecycleError> {
267        let body = json!({
268            "vectors": {
269                "size": schema.size,
270                "distance": schema.distance,
271            },
272        });
273        let resp = self
274            .qdrant_request(reqwest::Method::PUT, &collection_path(&self.collection))?
275            .json(&body)
276            .send()
277            .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
278        if !resp.status().is_success() {
279            return Err(qdrant_http_error("create collection", resp.status(), resp));
280        }
281        Ok(())
282    }
283
284    fn delete_vectors(&self, file_path: Option<&str>) -> Result<usize, VectorLifecycleError> {
285        delete_vectors_for_filter(
286            &self.client,
287            &self.qdrant,
288            &self.collection,
289            &self.project_id,
290            file_path,
291        )
292    }
293
294    fn delete_stale_vectors(
295        &self,
296        file_path: Option<&str>,
297        keep_point_ids: &[String],
298    ) -> Result<usize, VectorLifecycleError> {
299        delete_vectors_for_filter_excluding_ids(
300            &self.client,
301            &self.qdrant,
302            &self.collection,
303            &self.project_id,
304            file_path,
305            keep_point_ids,
306        )
307    }
308
309    fn upsert_points(&self, points: Vec<UpsertRequest>) -> Result<usize, VectorLifecycleError> {
310        if points.is_empty() {
311            return Ok(0);
312        }
313        let requested = points.len();
314        let (_result, state) =
315            gobby_core::qdrant::with_qdrant(Some(&self.qdrant), None, |config| {
316                gobby_core::qdrant::upsert_batched(config, &self.collection, points).map(Some)
317            })
318            .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
319        match state {
320            ServiceState::Available => Ok(requested),
321            ServiceState::NotConfigured => Err(VectorLifecycleError::MissingQdrantConfig),
322            other => Err(VectorLifecycleError::QdrantOperation(format!(
323                "unexpected Qdrant service state: {other:?}"
324            ))),
325        }
326    }
327
328    fn points_for_symbols(
329        &self,
330        symbols: &[Symbol],
331        expected_vector_size: usize,
332    ) -> Result<Vec<UpsertRequest>, VectorLifecycleError> {
333        if symbols.is_empty() {
334            return Ok(Vec::new());
335        }
336
337        let mut points = Vec::with_capacity(symbols.len());
338        for batch in symbols.chunks(VECTOR_SYNC_BATCH_SIZE) {
339            let texts = batch.iter().map(vector_text_for_symbol).collect::<Vec<_>>();
340            let vectors = self.embedding.embed_text_batch(&texts)?;
341            if vectors.len() != batch.len() {
342                return Err(VectorLifecycleError::EmbeddingResponse(format!(
343                    "embedding batch returned {} vector(s) for {} symbol(s)",
344                    vectors.len(),
345                    batch.len()
346                )));
347            }
348            for (symbol, vector) in batch.iter().zip(vectors) {
349                if vector.len() != expected_vector_size {
350                    return Err(VectorLifecycleError::EmbeddingResponse(format!(
351                        "embedding for symbol {} returned {} dimension(s), expected {}",
352                        symbol.id,
353                        vector.len(),
354                        expected_vector_size
355                    )));
356                }
357                let payload = payload_map(CodeSymbolVectorPayload::from_symbol(symbol))?;
358                points.push(UpsertRequest {
359                    id: symbol.id.clone(),
360                    vector,
361                    payload,
362                });
363            }
364        }
365
366        Ok(points)
367    }
368
369    fn qdrant_request(
370        &self,
371        method: reqwest::Method,
372        path: &str,
373    ) -> Result<reqwest::blocking::RequestBuilder, VectorLifecycleError> {
374        qdrant_request_for_config(&self.client, &self.qdrant, method, path)
375    }
376}
377
378fn payload_map(
379    payload: CodeSymbolVectorPayload,
380) -> Result<Map<String, Value>, VectorLifecycleError> {
381    match serde_json::to_value(payload)
382        .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?
383    {
384        Value::Object(map) => Ok(map),
385        _ => Err(VectorLifecycleError::QdrantOperation(
386            "vector payload did not serialize to an object".to_string(),
387        )),
388    }
389}
390
391fn point_ids(points: &[UpsertRequest]) -> Vec<String> {
392    points.iter().map(|point| point.id.clone()).collect()
393}