1use std::time::Duration;
2
3use reqwest::StatusCode;
4use serde_json::{Map, Value, json};
5
6use crate::config::{CODE_SYMBOL_COLLECTION_PREFIX, CodeVectorSettings, QdrantConfig};
7use crate::models::Symbol;
8use gobby_core::degradation::ServiceState;
9use gobby_core::qdrant::UpsertRequest;
10
11use super::embedding::{
12 EmbeddingBackend, EmbeddingSource, dimension_probe_text, vector_text_for_symbol,
13};
14use super::qdrant::{
15 VECTOR_DISTANCE_COSINE, collection_name, collection_path, delete_vectors_for_filter,
16 delete_vectors_for_filter_excluding_ids, parse_collection_schema, qdrant_http_error,
17 qdrant_request_for_config,
18};
19use super::types::{
20 CodeSymbolVectorLifecycleAction, CodeSymbolVectorLifecycleOutput,
21 CodeSymbolVectorLifecycleStatus, CodeSymbolVectorPayload, ExistingVectorCollectionSchema,
22 VectorCollectionSchema, VectorLifecycleError,
23};
24
25const QDRANT_LIFECYCLE_TIMEOUT: Duration = Duration::from_secs(10);
26const VECTOR_SYNC_BATCH_SIZE: usize = gobby_core::qdrant::DEFAULT_UPSERT_BATCH_SIZE;
27
28#[derive(Debug)]
29pub struct CodeSymbolVectorLifecycle {
30 project_id: String,
31 collection: String,
32 qdrant: QdrantConfig,
33 embedding: EmbeddingBackend,
34 settings: CodeVectorSettings,
35 probed_vector_size: Option<usize>,
36 client: reqwest::blocking::Client,
37}
38
39pub fn resolve_lifecycle_qdrant_config(
40 source: &mut impl gobby_core::config::ConfigSource,
41) -> Option<QdrantConfig> {
42 gobby_core::config::resolve_qdrant_config(source)
43}
44
45pub fn lifecycle_status(
46 project_id: impl Into<String>,
47 collection_prefix: &str,
48 action: CodeSymbolVectorLifecycleAction,
49) -> Result<CodeSymbolVectorLifecycleStatus, VectorLifecycleError> {
50 let project_id = project_id.into();
51 Ok(CodeSymbolVectorLifecycleStatus {
52 collection: collection_name(collection_prefix, &project_id)?,
53 project_id,
54 action,
55 })
56}
57
58impl CodeSymbolVectorLifecycle {
59 pub fn new(
60 project_id: String,
61 qdrant: QdrantConfig,
62 embedding: impl Into<EmbeddingSource>,
63 settings: CodeVectorSettings,
64 ) -> Result<Self, VectorLifecycleError> {
65 Self::require_qdrant_boundary_config(&qdrant)?;
66
67 let collection = collection_name(CODE_SYMBOL_COLLECTION_PREFIX, &project_id)?;
68 let embedding = EmbeddingBackend::new(embedding.into())?;
69 let client = reqwest::blocking::Client::builder()
70 .timeout(QDRANT_LIFECYCLE_TIMEOUT)
71 .build()
72 .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
73 Ok(Self {
74 project_id,
75 collection,
76 qdrant,
77 embedding,
78 settings,
79 probed_vector_size: None,
80 client,
81 })
82 }
83
84 pub fn collection(&self) -> &str {
85 &self.collection
86 }
87
88 pub fn ensure_collection(&mut self) -> Result<VectorCollectionSchema, VectorLifecycleError> {
89 let expected = self.expected_schema()?;
90 self.require_qdrant_boundary()?;
91 match self.get_collection_schema()? {
92 Some(found) => self.ensure_compatible_schema(expected, found),
93 None => {
94 self.create_collection(&expected)?;
95 Ok(expected)
96 }
97 }
98 }
99
100 pub fn sync_file_symbols(
101 &mut self,
102 file_path: &str,
103 symbols: &[Symbol],
104 ) -> Result<CodeSymbolVectorLifecycleOutput, VectorLifecycleError> {
105 let schema = self.ensure_collection()?;
106 let points = self.points_for_symbols(symbols, schema.size)?;
107 let point_ids = point_ids(&points);
108 let vectors_upserted = self.upsert_points(points)?;
109 let delete_operations_issued = self.delete_stale_vectors(Some(file_path), &point_ids)?;
110
111 Ok(self.output(
112 CodeSymbolVectorLifecycleAction::SyncFile,
113 Some(file_path.to_string()),
114 symbols.len(),
115 vectors_upserted,
116 delete_operations_issued,
117 ))
118 }
119
120 pub fn clear_project_vectors(
121 &mut self,
122 ) -> Result<CodeSymbolVectorLifecycleOutput, VectorLifecycleError> {
123 self.require_qdrant_boundary()?;
124 let deleted = match self.get_collection_schema()? {
125 Some(found) => {
126 if let Some(size) = self.settings.vector_dim {
127 self.ensure_compatible_schema(
128 VectorCollectionSchema {
129 size,
130 distance: VECTOR_DISTANCE_COSINE.to_string(),
131 },
132 found,
133 )?;
134 }
135 self.delete_vectors(None)?
136 }
137 None => 0,
138 };
139
140 Ok(self.output(CodeSymbolVectorLifecycleAction::Clear, None, 0, 0, deleted))
141 }
142
143 pub fn rebuild_symbols(
144 &mut self,
145 symbols: &[Symbol],
146 ) -> Result<CodeSymbolVectorLifecycleOutput, VectorLifecycleError> {
147 let schema = self.ensure_collection()?;
148 let points = self.points_for_symbols(symbols, schema.size)?;
149 let point_ids = point_ids(&points);
150 let vectors_upserted = self.upsert_points(points)?;
151 let delete_operations_issued = self.delete_stale_vectors(None, &point_ids)?;
152
153 Ok(self.output(
154 CodeSymbolVectorLifecycleAction::Rebuild,
155 None,
156 symbols.len(),
157 vectors_upserted,
158 delete_operations_issued,
159 ))
160 }
161
162 fn output(
163 &self,
164 action: CodeSymbolVectorLifecycleAction,
165 file_path: Option<String>,
166 symbols: usize,
167 vectors_upserted: usize,
168 delete_operations_issued: usize,
169 ) -> CodeSymbolVectorLifecycleOutput {
170 CodeSymbolVectorLifecycleOutput {
171 project_id: self.project_id.clone(),
172 collection: self.collection.clone(),
173 action,
174 file_path,
175 symbols,
176 vectors_upserted,
177 delete_operations_issued,
178 summary: format!(
179 "{vectors_upserted} vector(s) upserted, {delete_operations_issued} delete operation(s) issued"
180 ),
181 }
182 }
183
184 fn expected_schema(&mut self) -> Result<VectorCollectionSchema, VectorLifecycleError> {
185 let size = match self.settings.vector_dim {
186 Some(size) => size,
187 None => match self.probed_vector_size {
188 Some(size) => size,
189 None => {
190 let size = self.embedding.embed_text(dimension_probe_text())?.len();
191 self.probed_vector_size = Some(size);
192 size
193 }
194 },
195 };
196
197 Ok(VectorCollectionSchema {
198 size,
199 distance: VECTOR_DISTANCE_COSINE.to_string(),
200 })
201 }
202
203 fn require_qdrant_boundary(&self) -> Result<(), VectorLifecycleError> {
204 Self::require_qdrant_boundary_config(&self.qdrant)
205 }
206
207 fn require_qdrant_boundary_config(qdrant: &QdrantConfig) -> Result<(), VectorLifecycleError> {
208 let ((), state) = gobby_core::qdrant::with_qdrant(Some(qdrant), (), |_| Ok(()))
209 .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
210 match state {
211 ServiceState::Available => Ok(()),
212 ServiceState::NotConfigured => Err(VectorLifecycleError::MissingQdrantConfig),
213 other => Err(VectorLifecycleError::QdrantOperation(format!(
214 "unexpected Qdrant service state: {other:?}"
215 ))),
216 }
217 }
218
219 fn ensure_compatible_schema(
220 &self,
221 expected: VectorCollectionSchema,
222 found: ExistingVectorCollectionSchema,
223 ) -> Result<VectorCollectionSchema, VectorLifecycleError> {
224 if found.size == Some(expected.size)
225 && found.distance.as_deref() == Some(&expected.distance)
226 {
227 return Ok(VectorCollectionSchema {
228 size: expected.size,
229 distance: expected.distance,
230 });
231 }
232
233 Err(VectorLifecycleError::DimensionMismatch {
234 collection: self.collection.clone(),
235 expected_size: expected.size,
236 found_size: found.size,
237 expected_distance: VECTOR_DISTANCE_COSINE,
238 found_distance: found.distance,
239 })
240 }
241
242 fn get_collection_schema(
243 &self,
244 ) -> Result<Option<ExistingVectorCollectionSchema>, VectorLifecycleError> {
245 let resp = self
246 .qdrant_request(reqwest::Method::GET, &collection_path(&self.collection))?
247 .send()
248 .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
249 let status = resp.status();
250 if status == StatusCode::NOT_FOUND {
251 return Ok(None);
252 }
253 if !status.is_success() {
254 return Err(qdrant_http_error("get collection", status, resp));
255 }
256
257 let data: Value = resp
258 .json()
259 .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
260 Ok(parse_collection_schema(&data))
261 }
262
263 fn create_collection(
264 &self,
265 schema: &VectorCollectionSchema,
266 ) -> Result<(), VectorLifecycleError> {
267 let body = json!({
268 "vectors": {
269 "size": schema.size,
270 "distance": schema.distance,
271 },
272 });
273 let resp = self
274 .qdrant_request(reqwest::Method::PUT, &collection_path(&self.collection))?
275 .json(&body)
276 .send()
277 .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
278 if !resp.status().is_success() {
279 return Err(qdrant_http_error("create collection", resp.status(), resp));
280 }
281 Ok(())
282 }
283
284 fn delete_vectors(&self, file_path: Option<&str>) -> Result<usize, VectorLifecycleError> {
285 delete_vectors_for_filter(
286 &self.client,
287 &self.qdrant,
288 &self.collection,
289 &self.project_id,
290 file_path,
291 )
292 }
293
294 fn delete_stale_vectors(
295 &self,
296 file_path: Option<&str>,
297 keep_point_ids: &[String],
298 ) -> Result<usize, VectorLifecycleError> {
299 delete_vectors_for_filter_excluding_ids(
300 &self.client,
301 &self.qdrant,
302 &self.collection,
303 &self.project_id,
304 file_path,
305 keep_point_ids,
306 )
307 }
308
309 fn upsert_points(&self, points: Vec<UpsertRequest>) -> Result<usize, VectorLifecycleError> {
310 if points.is_empty() {
311 return Ok(0);
312 }
313 let requested = points.len();
314 let (_result, state) =
315 gobby_core::qdrant::with_qdrant(Some(&self.qdrant), None, |config| {
316 gobby_core::qdrant::upsert_batched(config, &self.collection, points).map(Some)
317 })
318 .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?;
319 match state {
320 ServiceState::Available => Ok(requested),
321 ServiceState::NotConfigured => Err(VectorLifecycleError::MissingQdrantConfig),
322 other => Err(VectorLifecycleError::QdrantOperation(format!(
323 "unexpected Qdrant service state: {other:?}"
324 ))),
325 }
326 }
327
328 fn points_for_symbols(
329 &self,
330 symbols: &[Symbol],
331 expected_vector_size: usize,
332 ) -> Result<Vec<UpsertRequest>, VectorLifecycleError> {
333 if symbols.is_empty() {
334 return Ok(Vec::new());
335 }
336
337 let mut points = Vec::with_capacity(symbols.len());
338 for batch in symbols.chunks(VECTOR_SYNC_BATCH_SIZE) {
339 let texts = batch.iter().map(vector_text_for_symbol).collect::<Vec<_>>();
340 let vectors = self.embedding.embed_text_batch(&texts)?;
341 if vectors.len() != batch.len() {
342 return Err(VectorLifecycleError::EmbeddingResponse(format!(
343 "embedding batch returned {} vector(s) for {} symbol(s)",
344 vectors.len(),
345 batch.len()
346 )));
347 }
348 for (symbol, vector) in batch.iter().zip(vectors) {
349 if vector.len() != expected_vector_size {
350 return Err(VectorLifecycleError::EmbeddingResponse(format!(
351 "embedding for symbol {} returned {} dimension(s), expected {}",
352 symbol.id,
353 vector.len(),
354 expected_vector_size
355 )));
356 }
357 let payload = payload_map(CodeSymbolVectorPayload::from_symbol(symbol))?;
358 points.push(UpsertRequest {
359 id: symbol.id.clone(),
360 vector,
361 payload,
362 });
363 }
364 }
365
366 Ok(points)
367 }
368
369 fn qdrant_request(
370 &self,
371 method: reqwest::Method,
372 path: &str,
373 ) -> Result<reqwest::blocking::RequestBuilder, VectorLifecycleError> {
374 qdrant_request_for_config(&self.client, &self.qdrant, method, path)
375 }
376}
377
378fn payload_map(
379 payload: CodeSymbolVectorPayload,
380) -> Result<Map<String, Value>, VectorLifecycleError> {
381 match serde_json::to_value(payload)
382 .map_err(|err| VectorLifecycleError::QdrantOperation(err.to_string()))?
383 {
384 Value::Object(map) => Ok(map),
385 _ => Err(VectorLifecycleError::QdrantOperation(
386 "vector payload did not serialize to an object".to_string(),
387 )),
388 }
389}
390
391fn point_ids(points: &[UpsertRequest]) -> Vec<String> {
392 points.iter().map(|point| point.id.clone()).collect()
393}