1use super::reconcile_deleted_file;
2use crate::config::Context;
3use crate::db;
4use crate::graph::code_graph::{self, GraphReadError};
5use crate::vector::code_symbols::{
6 self, CodeSymbolVectorLifecycle, VectorLifecycleError, embedding_source_from_context,
7};
8use serde::{Deserialize, Serialize};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
11#[serde(rename_all = "snake_case")]
12pub enum ProjectionTarget {
13 Graph,
14 Vectors,
15}
16
17#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
18pub struct ProjectionSyncRequest {
19 pub project_id: String,
20 pub file_paths: Vec<String>,
21 pub targets: Vec<ProjectionTarget>,
22}
23
24#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
25pub struct ProjectionSyncStatus {
26 pub project_id: String,
27 pub file_paths: Vec<String>,
28 pub graph_pending: bool,
29 pub vectors_pending: bool,
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
33#[serde(rename_all = "snake_case")]
34pub enum ProjectionStatus {
35 Ok,
36 Degraded,
37 Failed,
38}
39
40#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
41pub struct ProjectionSyncError {
42 pub kind: String,
43 pub message: String,
44}
45
46#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
47pub struct ProjectionSyncReport {
48 pub status: ProjectionStatus,
49 pub synced_files: usize,
50 pub synced_symbols: usize,
51 pub skipped_files: usize,
52 pub failed_files: usize,
53 pub degraded: bool,
54 pub error: Option<ProjectionSyncError>,
55}
56
57impl ProjectionSyncReport {
58 pub fn ok(synced_files: usize, synced_symbols: usize) -> Self {
59 Self::ok_with_counts(synced_files, synced_symbols, 0, 0)
60 }
61
62 pub fn ok_with_counts(
63 synced_files: usize,
64 synced_symbols: usize,
65 skipped_files: usize,
66 failed_files: usize,
67 ) -> Self {
68 Self {
69 status: ProjectionStatus::Ok,
70 synced_files,
71 synced_symbols,
72 skipped_files,
73 failed_files,
74 degraded: false,
75 error: None,
76 }
77 }
78
79 pub fn degraded(
80 kind: impl Into<String>,
81 message: impl Into<String>,
82 synced_files: usize,
83 synced_symbols: usize,
84 ) -> Self {
85 Self::degraded_with_counts(kind, message, synced_files, synced_symbols, 0, 0)
86 }
87
88 pub fn degraded_with_counts(
89 kind: impl Into<String>,
90 message: impl Into<String>,
91 synced_files: usize,
92 synced_symbols: usize,
93 skipped_files: usize,
94 failed_files: usize,
95 ) -> Self {
96 Self {
97 status: ProjectionStatus::Degraded,
98 synced_files,
99 synced_symbols,
100 skipped_files,
101 failed_files,
102 degraded: true,
103 error: Some(ProjectionSyncError {
104 kind: kind.into(),
105 message: message.into(),
106 }),
107 }
108 }
109
110 fn degraded_from_error(
111 error: &anyhow::Error,
112 synced_files: usize,
113 synced_symbols: usize,
114 ) -> Self {
115 Self::degraded_from_error_with_counts(error, synced_files, synced_symbols, 0, 0)
116 }
117
118 fn degraded_from_error_with_counts(
119 error: &anyhow::Error,
120 synced_files: usize,
121 synced_symbols: usize,
122 skipped_files: usize,
123 failed_files: usize,
124 ) -> Self {
125 let typed = typed_projection_error(error);
126 Self {
127 status: ProjectionStatus::Degraded,
128 synced_files,
129 synced_symbols,
130 skipped_files,
131 failed_files,
132 degraded: true,
133 error: Some(typed),
134 }
135 }
136}
137
138#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
139pub struct ProjectionSyncReports {
140 pub graph: ProjectionSyncReport,
141 pub vector: ProjectionSyncReport,
142}
143
144#[derive(Debug, Clone, PartialEq, Eq)]
145pub(crate) enum ProjectionFileSyncOutcome {
146 Synced { symbols: usize },
147 SkippedMissingIndexedFile,
148}
149
150pub fn pending_after_code_fact_write(request: ProjectionSyncRequest) -> ProjectionSyncStatus {
151 ProjectionSyncStatus {
152 graph_pending: request.targets.contains(&ProjectionTarget::Graph),
153 vectors_pending: request.targets.contains(&ProjectionTarget::Vectors),
154 project_id: request.project_id,
155 file_paths: request.file_paths,
156 }
157}
158
159pub fn sync_after_index(
160 ctx: &Context,
161 file_paths: &[String],
162) -> anyhow::Result<ProjectionSyncReports> {
163 Ok(ProjectionSyncReports {
164 graph: sync_graph_files(ctx, file_paths)?,
165 vector: sync_vector_files(ctx, file_paths)?,
166 })
167}
168
169pub(crate) fn sync_files_with_state<S>(
170 ctx: &Context,
171 file_paths: &[String],
172 state: &mut S,
173 mut sync_one: impl FnMut(&mut S, &str) -> anyhow::Result<ProjectionFileSyncOutcome>,
174) -> ProjectionSyncReport {
175 let mut synced_files = 0usize;
176 let mut synced_symbols = 0usize;
177 let mut skipped_files = 0usize;
178 let mut failed_files = 0usize;
179 let mut errors = Vec::new();
180 let mut error_kind = None;
181
182 for file_path in file_paths {
183 match sync_one(state, file_path) {
184 Ok(ProjectionFileSyncOutcome::Synced { symbols }) => {
185 synced_files += 1;
186 synced_symbols += symbols;
187 }
188 Ok(ProjectionFileSyncOutcome::SkippedMissingIndexedFile) => {
189 skipped_files += 1;
190 for failure in reconcile_deleted_file(ctx, file_path) {
191 error_kind.get_or_insert_with(|| "projection_reconcile_failed".to_string());
192 errors.push(format!(
193 "{file_path}: failed to reconcile {:?} projection: {}",
194 failure.target, failure.message
195 ));
196 }
197 }
198 Err(error) => {
199 failed_files += 1;
200 let typed = typed_projection_error(&error);
201 error_kind.get_or_insert(typed.kind);
202 errors.push(format!("{file_path}: {}", typed.message));
203 }
204 }
205 }
206
207 if errors.is_empty() {
208 ProjectionSyncReport::ok_with_counts(
209 synced_files,
210 synced_symbols,
211 skipped_files,
212 failed_files,
213 )
214 } else {
215 ProjectionSyncReport::degraded_with_counts(
216 error_kind.unwrap_or_else(|| "sync_failed".to_string()),
217 errors.join("; "),
218 synced_files,
219 synced_symbols,
220 skipped_files,
221 failed_files,
222 )
223 }
224}
225
226fn sync_graph_files(ctx: &Context, file_paths: &[String]) -> anyhow::Result<ProjectionSyncReport> {
227 if file_paths.is_empty() {
228 return Ok(ProjectionSyncReport::ok(0, 0));
229 }
230 if let Err(error) = code_graph::require_graph_reads(ctx) {
231 return Ok(ProjectionSyncReport::degraded_from_error(&error, 0, 0));
232 }
233
234 let mut conn = db::connect_readwrite(&ctx.database_url)?;
235 let report = match code_graph::with_code_graph(ctx, |graph| {
236 let mut synced_files = 0usize;
237 let mut synced_symbols = 0usize;
238 let mut skipped_files = 0usize;
239 let mut failed_files = 0usize;
240 let mut errors = Vec::new();
241 let mut error_kind = None;
242
243 for file_path in file_paths {
244 match sync_graph_file(ctx, &mut conn, graph, file_path) {
245 Ok(ProjectionFileSyncOutcome::Synced { symbols }) => {
246 synced_files += 1;
247 synced_symbols += symbols;
248 }
249 Ok(ProjectionFileSyncOutcome::SkippedMissingIndexedFile) => {
250 skipped_files += 1;
251 for failure in reconcile_deleted_file(ctx, file_path) {
252 error_kind.get_or_insert_with(|| "projection_reconcile_failed".to_string());
253 errors.push(format!(
254 "{file_path}: failed to reconcile {:?} projection: {}",
255 failure.target, failure.message
256 ));
257 }
258 }
259 Err(error) => {
260 failed_files += 1;
261 let typed = typed_projection_error(&error);
262 error_kind.get_or_insert(typed.kind);
263 errors.push(format!("{file_path}: {}", typed.message));
264 }
265 }
266 }
267
268 if errors.is_empty() {
269 Ok(ProjectionSyncReport::ok_with_counts(
270 synced_files,
271 synced_symbols,
272 skipped_files,
273 failed_files,
274 ))
275 } else {
276 Ok(ProjectionSyncReport::degraded_with_counts(
277 error_kind.unwrap_or_else(|| "sync_failed".to_string()),
278 errors.join("; "),
279 synced_files,
280 synced_symbols,
281 skipped_files,
282 failed_files,
283 ))
284 }
285 }) {
286 Ok(report) => report,
287 Err(error)
288 if matches!(
289 error.downcast_ref::<GraphReadError>(),
290 Some(GraphReadError::Unreachable { .. })
291 ) =>
292 {
293 return Ok(ProjectionSyncReport::degraded_from_error(&error, 0, 0));
294 }
295 Err(error) => return Err(error),
296 };
297 if report.synced_files > 0
298 && report.error.is_none()
299 && let Err(error) = code_graph::cleanup_orphans(ctx)
300 {
301 return Ok(ProjectionSyncReport::degraded_from_error_with_counts(
302 &error,
303 report.synced_files,
304 report.synced_symbols,
305 report.skipped_files,
306 report.failed_files,
307 ));
308 }
309 Ok(report)
310}
311
312fn sync_vector_files(ctx: &Context, file_paths: &[String]) -> anyhow::Result<ProjectionSyncReport> {
313 if file_paths.is_empty() {
314 return Ok(ProjectionSyncReport::ok(0, 0));
315 }
316
317 let lifecycle = match vector_lifecycle_from_context(ctx) {
318 Ok(lifecycle) => lifecycle,
319 Err(error) => {
320 return Ok(ProjectionSyncReport::degraded(
321 vector_error_kind(&error),
322 error.to_string(),
323 0,
324 0,
325 ));
326 }
327 };
328 let conn = db::connect_readwrite(&ctx.database_url)?;
329 let mut state = VectorProjectionState {
330 ctx,
331 conn,
332 lifecycle,
333 };
334 Ok(sync_files_with_state(
335 ctx,
336 file_paths,
337 &mut state,
338 VectorProjectionState::sync_file,
339 ))
340}
341
342fn sync_graph_file(
343 ctx: &Context,
344 conn: &mut postgres::Client,
345 graph: &mut code_graph::CodeGraph<'_>,
346 file_path: &str,
347) -> anyhow::Result<ProjectionFileSyncOutcome> {
348 if !db::mark_graph_sync_attempted(conn, &ctx.project_id, file_path)? {
349 return Ok(ProjectionFileSyncOutcome::SkippedMissingIndexedFile);
350 }
351 let facts = db::read_graph_file_facts(conn, &ctx.project_id, file_path)?;
352 graph.sync_file(
353 &facts.file_path,
354 &facts.imports,
355 &facts.definitions,
356 &facts.calls,
357 false,
358 )?;
359 if !db::mark_graph_synced(conn, &ctx.project_id, file_path)? {
360 return Ok(ProjectionFileSyncOutcome::SkippedMissingIndexedFile);
361 }
362 Ok(ProjectionFileSyncOutcome::Synced {
363 symbols: facts.definitions.len(),
364 })
365}
366
367struct VectorProjectionState<'a> {
368 ctx: &'a Context,
369 conn: postgres::Client,
370 lifecycle: CodeSymbolVectorLifecycle,
371}
372
373impl VectorProjectionState<'_> {
374 fn sync_file(&mut self, file_path: &str) -> anyhow::Result<ProjectionFileSyncOutcome> {
375 if !db::indexed_file_exists(&mut self.conn, &self.ctx.project_id, file_path)? {
376 return Ok(ProjectionFileSyncOutcome::SkippedMissingIndexedFile);
377 }
378 let symbols =
379 code_symbols::fetch_symbols_for_file(&mut self.conn, &self.ctx.project_id, file_path)?;
380 let symbol_count = symbols.len();
381 self.lifecycle.sync_file_symbols(file_path, &symbols)?;
382 if db::mark_vectors_synced(&mut self.conn, &self.ctx.project_id, file_path)? {
383 Ok(ProjectionFileSyncOutcome::Synced {
384 symbols: symbol_count,
385 })
386 } else {
387 Ok(ProjectionFileSyncOutcome::SkippedMissingIndexedFile)
388 }
389 }
390}
391
392fn vector_lifecycle_from_context(
393 ctx: &Context,
394) -> Result<CodeSymbolVectorLifecycle, VectorLifecycleError> {
395 let qdrant = ctx
396 .qdrant
397 .clone()
398 .ok_or(VectorLifecycleError::MissingQdrantConfig)?;
399 let embedding =
400 embedding_source_from_context(ctx).ok_or(VectorLifecycleError::MissingEmbeddingConfig)?;
401 CodeSymbolVectorLifecycle::new(
402 ctx.project_id.clone(),
403 qdrant,
404 embedding,
405 ctx.code_vectors.clone(),
406 )
407}
408
409fn typed_projection_error(error: &anyhow::Error) -> ProjectionSyncError {
410 let kind = error
411 .downcast_ref::<VectorLifecycleError>()
412 .map(vector_error_kind)
413 .or_else(|| error.downcast_ref::<GraphReadError>().map(graph_error_kind))
414 .unwrap_or("sync_failed");
415 ProjectionSyncError {
416 kind: kind.to_string(),
417 message: error.to_string(),
418 }
419}
420
421fn graph_error_kind(error: &GraphReadError) -> &'static str {
422 match error {
423 GraphReadError::NotConfigured => "missing_falkordb_config",
424 GraphReadError::Unreachable { .. } => "falkordb_unreachable",
425 GraphReadError::QueryFailed { .. } => "falkordb_query_failed",
426 GraphReadError::InvalidTarget { .. } => "invalid_graph_target",
427 }
428}
429
430fn vector_error_kind(error: &VectorLifecycleError) -> &'static str {
431 match error {
432 VectorLifecycleError::MissingQdrantConfig => "missing_qdrant_config",
433 VectorLifecycleError::MissingEmbeddingConfig => "missing_embedding_config",
434 VectorLifecycleError::EmbeddingHttp { .. } => "embedding_http",
435 VectorLifecycleError::EmbeddingResponse(_) => "embedding_response",
436 VectorLifecycleError::QdrantHttp { .. } => "qdrant_http",
437 VectorLifecycleError::QdrantOperation(_) => "qdrant_operation",
438 VectorLifecycleError::InvalidCollectionName(_) => "invalid_collection_name",
439 VectorLifecycleError::DimensionMismatch { .. } => "dimension_mismatch",
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 use super::*;
446 use std::path::PathBuf;
447
448 fn test_context() -> Context {
449 Context {
450 database_url: "postgresql://localhost/nonexistent".to_string(),
451 project_root: PathBuf::from("/nonexistent"),
452 project_id: "project-1".to_string(),
453 quiet: true,
454 falkordb: None,
455 qdrant: None,
456 embedding: None,
457 code_vectors: crate::config::CodeVectorSettings { vector_dim: None },
458 indexing: gobby_core::config::IndexingConfig::default(),
459 daemon_url: None,
460 index_scope: crate::config::ProjectIndexScope::Single,
461 }
462 }
463
464 #[test]
465 fn sync_state_continues_after_projection_errors() {
466 let files = vec![
467 "src/ok.rs".to_string(),
468 "src/fail.rs".to_string(),
469 "src/next.rs".to_string(),
470 ];
471 #[derive(Default)]
472 struct State {
473 synced: Vec<String>,
474 }
475 let mut state = State::default();
476
477 let report =
478 sync_files_with_state(&test_context(), &files, &mut state, |state, file_path| {
479 state.synced.push(file_path.to_string());
480 if file_path == "src/fail.rs" {
481 anyhow::bail!("projection write failed");
482 }
483 Ok(ProjectionFileSyncOutcome::Synced { symbols: 3 })
484 });
485
486 assert_eq!(
487 state.synced,
488 vec!["src/ok.rs", "src/fail.rs", "src/next.rs"]
489 );
490 assert_eq!(report.status, ProjectionStatus::Degraded);
491 assert_eq!(report.synced_files, 2);
492 assert_eq!(report.synced_symbols, 6);
493 assert_eq!(report.skipped_files, 0);
494 assert_eq!(report.failed_files, 1);
495 assert!(report.degraded);
496 assert_eq!(
497 report.error.as_ref().map(|error| error.kind.as_str()),
498 Some("sync_failed")
499 );
500 }
501
502 #[test]
503 fn sync_state_treats_missing_indexed_file_as_non_degraded_skip() {
504 let files = vec!["src/missing.rs".to_string(), "src/ok.rs".to_string()];
505 #[derive(Default)]
506 struct State {
507 synced: Vec<String>,
508 }
509 let mut state = State::default();
510
511 let report =
512 sync_files_with_state(&test_context(), &files, &mut state, |state, file_path| {
513 state.synced.push(file_path.to_string());
514 if file_path == "src/missing.rs" {
515 return Ok(ProjectionFileSyncOutcome::SkippedMissingIndexedFile);
516 }
517 Ok(ProjectionFileSyncOutcome::Synced { symbols: 2 })
518 });
519
520 assert_eq!(state.synced, vec!["src/missing.rs", "src/ok.rs"]);
521 assert_eq!(report.status, ProjectionStatus::Ok);
522 assert_eq!(report.synced_files, 1);
523 assert_eq!(report.synced_symbols, 2);
524 assert_eq!(report.skipped_files, 1);
525 assert_eq!(report.failed_files, 0);
526 assert!(!report.degraded);
527 assert!(report.error.is_none());
528 }
529}