Skip to main content

gobby_code/projection/
sync.rs

1use super::reconcile_deleted_file;
2use crate::config::Context;
3use crate::db;
4use crate::graph::code_graph::{self, GraphReadError};
5use crate::vector::code_symbols::{
6    self, CodeSymbolVectorLifecycle, VectorLifecycleError, embedding_source_from_context,
7};
8use serde::{Deserialize, Serialize};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
11#[serde(rename_all = "snake_case")]
12pub enum ProjectionTarget {
13    Graph,
14    Vectors,
15}
16
17#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
18pub struct ProjectionSyncRequest {
19    pub project_id: String,
20    pub file_paths: Vec<String>,
21    pub targets: Vec<ProjectionTarget>,
22}
23
24#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
25pub struct ProjectionSyncStatus {
26    pub project_id: String,
27    pub file_paths: Vec<String>,
28    pub graph_pending: bool,
29    pub vectors_pending: bool,
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
33#[serde(rename_all = "snake_case")]
34pub enum ProjectionStatus {
35    Ok,
36    Degraded,
37    Failed,
38}
39
40#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
41pub struct ProjectionSyncError {
42    pub kind: String,
43    pub message: String,
44}
45
46#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
47pub struct ProjectionSyncReport {
48    pub status: ProjectionStatus,
49    pub synced_files: usize,
50    pub synced_symbols: usize,
51    pub skipped_files: usize,
52    pub failed_files: usize,
53    pub degraded: bool,
54    pub error: Option<ProjectionSyncError>,
55}
56
57impl ProjectionSyncReport {
58    pub fn ok(synced_files: usize, synced_symbols: usize) -> Self {
59        Self::ok_with_counts(synced_files, synced_symbols, 0, 0)
60    }
61
62    pub fn ok_with_counts(
63        synced_files: usize,
64        synced_symbols: usize,
65        skipped_files: usize,
66        failed_files: usize,
67    ) -> Self {
68        Self {
69            status: ProjectionStatus::Ok,
70            synced_files,
71            synced_symbols,
72            skipped_files,
73            failed_files,
74            degraded: false,
75            error: None,
76        }
77    }
78
79    pub fn degraded(
80        kind: impl Into<String>,
81        message: impl Into<String>,
82        synced_files: usize,
83        synced_symbols: usize,
84    ) -> Self {
85        Self::degraded_with_counts(kind, message, synced_files, synced_symbols, 0, 0)
86    }
87
88    pub fn degraded_with_counts(
89        kind: impl Into<String>,
90        message: impl Into<String>,
91        synced_files: usize,
92        synced_symbols: usize,
93        skipped_files: usize,
94        failed_files: usize,
95    ) -> Self {
96        Self {
97            status: ProjectionStatus::Degraded,
98            synced_files,
99            synced_symbols,
100            skipped_files,
101            failed_files,
102            degraded: true,
103            error: Some(ProjectionSyncError {
104                kind: kind.into(),
105                message: message.into(),
106            }),
107        }
108    }
109
110    fn degraded_from_error(
111        error: &anyhow::Error,
112        synced_files: usize,
113        synced_symbols: usize,
114    ) -> Self {
115        Self::degraded_from_error_with_counts(error, synced_files, synced_symbols, 0, 0)
116    }
117
118    fn degraded_from_error_with_counts(
119        error: &anyhow::Error,
120        synced_files: usize,
121        synced_symbols: usize,
122        skipped_files: usize,
123        failed_files: usize,
124    ) -> Self {
125        let typed = typed_projection_error(error);
126        Self {
127            status: ProjectionStatus::Degraded,
128            synced_files,
129            synced_symbols,
130            skipped_files,
131            failed_files,
132            degraded: true,
133            error: Some(typed),
134        }
135    }
136}
137
138#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
139pub struct ProjectionSyncReports {
140    pub graph: ProjectionSyncReport,
141    pub vector: ProjectionSyncReport,
142}
143
144#[derive(Debug, Clone, PartialEq, Eq)]
145pub(crate) enum ProjectionFileSyncOutcome {
146    Synced { symbols: usize },
147    SkippedMissingIndexedFile,
148}
149
150pub fn pending_after_code_fact_write(request: ProjectionSyncRequest) -> ProjectionSyncStatus {
151    ProjectionSyncStatus {
152        graph_pending: request.targets.contains(&ProjectionTarget::Graph),
153        vectors_pending: request.targets.contains(&ProjectionTarget::Vectors),
154        project_id: request.project_id,
155        file_paths: request.file_paths,
156    }
157}
158
159pub fn sync_after_index(
160    ctx: &Context,
161    file_paths: &[String],
162) -> anyhow::Result<ProjectionSyncReports> {
163    Ok(ProjectionSyncReports {
164        graph: sync_graph_files(ctx, file_paths)?,
165        vector: sync_vector_files(ctx, file_paths)?,
166    })
167}
168
169pub(crate) fn sync_files_with_state<S>(
170    ctx: &Context,
171    file_paths: &[String],
172    state: &mut S,
173    mut sync_one: impl FnMut(&mut S, &str) -> anyhow::Result<ProjectionFileSyncOutcome>,
174) -> ProjectionSyncReport {
175    let mut synced_files = 0usize;
176    let mut synced_symbols = 0usize;
177    let mut skipped_files = 0usize;
178    let mut failed_files = 0usize;
179    let mut errors = Vec::new();
180    let mut error_kind = None;
181
182    for file_path in file_paths {
183        match sync_one(state, file_path) {
184            Ok(ProjectionFileSyncOutcome::Synced { symbols }) => {
185                synced_files += 1;
186                synced_symbols += symbols;
187            }
188            Ok(ProjectionFileSyncOutcome::SkippedMissingIndexedFile) => {
189                skipped_files += 1;
190                for failure in reconcile_deleted_file(ctx, file_path) {
191                    error_kind.get_or_insert_with(|| "projection_reconcile_failed".to_string());
192                    errors.push(format!(
193                        "{file_path}: failed to reconcile {:?} projection: {}",
194                        failure.target, failure.message
195                    ));
196                }
197            }
198            Err(error) => {
199                failed_files += 1;
200                let typed = typed_projection_error(&error);
201                error_kind.get_or_insert(typed.kind);
202                errors.push(format!("{file_path}: {}", typed.message));
203            }
204        }
205    }
206
207    if errors.is_empty() {
208        ProjectionSyncReport::ok_with_counts(
209            synced_files,
210            synced_symbols,
211            skipped_files,
212            failed_files,
213        )
214    } else {
215        ProjectionSyncReport::degraded_with_counts(
216            error_kind.unwrap_or_else(|| "sync_failed".to_string()),
217            errors.join("; "),
218            synced_files,
219            synced_symbols,
220            skipped_files,
221            failed_files,
222        )
223    }
224}
225
226fn sync_graph_files(ctx: &Context, file_paths: &[String]) -> anyhow::Result<ProjectionSyncReport> {
227    if file_paths.is_empty() {
228        return Ok(ProjectionSyncReport::ok(0, 0));
229    }
230    if let Err(error) = code_graph::require_graph_reads(ctx) {
231        return Ok(ProjectionSyncReport::degraded_from_error(&error, 0, 0));
232    }
233
234    let mut conn = db::connect_readwrite(&ctx.database_url)?;
235    let report = match code_graph::with_code_graph(ctx, |graph| {
236        let mut synced_files = 0usize;
237        let mut synced_symbols = 0usize;
238        let mut skipped_files = 0usize;
239        let mut failed_files = 0usize;
240        let mut errors = Vec::new();
241        let mut error_kind = None;
242
243        for file_path in file_paths {
244            match sync_graph_file(ctx, &mut conn, graph, file_path) {
245                Ok(ProjectionFileSyncOutcome::Synced { symbols }) => {
246                    synced_files += 1;
247                    synced_symbols += symbols;
248                }
249                Ok(ProjectionFileSyncOutcome::SkippedMissingIndexedFile) => {
250                    skipped_files += 1;
251                    for failure in reconcile_deleted_file(ctx, file_path) {
252                        error_kind.get_or_insert_with(|| "projection_reconcile_failed".to_string());
253                        errors.push(format!(
254                            "{file_path}: failed to reconcile {:?} projection: {}",
255                            failure.target, failure.message
256                        ));
257                    }
258                }
259                Err(error) => {
260                    failed_files += 1;
261                    let typed = typed_projection_error(&error);
262                    error_kind.get_or_insert(typed.kind);
263                    errors.push(format!("{file_path}: {}", typed.message));
264                }
265            }
266        }
267
268        if errors.is_empty() {
269            Ok(ProjectionSyncReport::ok_with_counts(
270                synced_files,
271                synced_symbols,
272                skipped_files,
273                failed_files,
274            ))
275        } else {
276            Ok(ProjectionSyncReport::degraded_with_counts(
277                error_kind.unwrap_or_else(|| "sync_failed".to_string()),
278                errors.join("; "),
279                synced_files,
280                synced_symbols,
281                skipped_files,
282                failed_files,
283            ))
284        }
285    }) {
286        Ok(report) => report,
287        Err(error)
288            if matches!(
289                error.downcast_ref::<GraphReadError>(),
290                Some(GraphReadError::Unreachable { .. })
291            ) =>
292        {
293            return Ok(ProjectionSyncReport::degraded_from_error(&error, 0, 0));
294        }
295        Err(error) => return Err(error),
296    };
297    if report.synced_files > 0
298        && report.error.is_none()
299        && let Err(error) = code_graph::cleanup_orphans(ctx)
300    {
301        return Ok(ProjectionSyncReport::degraded_from_error_with_counts(
302            &error,
303            report.synced_files,
304            report.synced_symbols,
305            report.skipped_files,
306            report.failed_files,
307        ));
308    }
309    Ok(report)
310}
311
312fn sync_vector_files(ctx: &Context, file_paths: &[String]) -> anyhow::Result<ProjectionSyncReport> {
313    if file_paths.is_empty() {
314        return Ok(ProjectionSyncReport::ok(0, 0));
315    }
316
317    let lifecycle = match vector_lifecycle_from_context(ctx) {
318        Ok(lifecycle) => lifecycle,
319        Err(error) => {
320            return Ok(ProjectionSyncReport::degraded(
321                vector_error_kind(&error),
322                error.to_string(),
323                0,
324                0,
325            ));
326        }
327    };
328    let conn = db::connect_readwrite(&ctx.database_url)?;
329    let mut state = VectorProjectionState {
330        ctx,
331        conn,
332        lifecycle,
333    };
334    Ok(sync_files_with_state(
335        ctx,
336        file_paths,
337        &mut state,
338        VectorProjectionState::sync_file,
339    ))
340}
341
342fn sync_graph_file(
343    ctx: &Context,
344    conn: &mut postgres::Client,
345    graph: &mut code_graph::CodeGraph<'_>,
346    file_path: &str,
347) -> anyhow::Result<ProjectionFileSyncOutcome> {
348    if !db::mark_graph_sync_attempted(conn, &ctx.project_id, file_path)? {
349        return Ok(ProjectionFileSyncOutcome::SkippedMissingIndexedFile);
350    }
351    let facts = db::read_graph_file_facts(conn, &ctx.project_id, file_path)?;
352    graph.sync_file(
353        &facts.file_path,
354        &facts.imports,
355        &facts.definitions,
356        &facts.calls,
357        false,
358    )?;
359    if !db::mark_graph_synced(conn, &ctx.project_id, file_path)? {
360        return Ok(ProjectionFileSyncOutcome::SkippedMissingIndexedFile);
361    }
362    Ok(ProjectionFileSyncOutcome::Synced {
363        symbols: facts.definitions.len(),
364    })
365}
366
367struct VectorProjectionState<'a> {
368    ctx: &'a Context,
369    conn: postgres::Client,
370    lifecycle: CodeSymbolVectorLifecycle,
371}
372
373impl VectorProjectionState<'_> {
374    fn sync_file(&mut self, file_path: &str) -> anyhow::Result<ProjectionFileSyncOutcome> {
375        if !db::indexed_file_exists(&mut self.conn, &self.ctx.project_id, file_path)? {
376            return Ok(ProjectionFileSyncOutcome::SkippedMissingIndexedFile);
377        }
378        let symbols =
379            code_symbols::fetch_symbols_for_file(&mut self.conn, &self.ctx.project_id, file_path)?;
380        let symbol_count = symbols.len();
381        self.lifecycle.sync_file_symbols(file_path, &symbols)?;
382        if db::mark_vectors_synced(&mut self.conn, &self.ctx.project_id, file_path)? {
383            Ok(ProjectionFileSyncOutcome::Synced {
384                symbols: symbol_count,
385            })
386        } else {
387            Ok(ProjectionFileSyncOutcome::SkippedMissingIndexedFile)
388        }
389    }
390}
391
392fn vector_lifecycle_from_context(
393    ctx: &Context,
394) -> Result<CodeSymbolVectorLifecycle, VectorLifecycleError> {
395    let qdrant = ctx
396        .qdrant
397        .clone()
398        .ok_or(VectorLifecycleError::MissingQdrantConfig)?;
399    let embedding =
400        embedding_source_from_context(ctx).ok_or(VectorLifecycleError::MissingEmbeddingConfig)?;
401    CodeSymbolVectorLifecycle::new(
402        ctx.project_id.clone(),
403        qdrant,
404        embedding,
405        ctx.code_vectors.clone(),
406    )
407}
408
409fn typed_projection_error(error: &anyhow::Error) -> ProjectionSyncError {
410    let kind = error
411        .downcast_ref::<VectorLifecycleError>()
412        .map(vector_error_kind)
413        .or_else(|| error.downcast_ref::<GraphReadError>().map(graph_error_kind))
414        .unwrap_or("sync_failed");
415    ProjectionSyncError {
416        kind: kind.to_string(),
417        message: error.to_string(),
418    }
419}
420
421fn graph_error_kind(error: &GraphReadError) -> &'static str {
422    match error {
423        GraphReadError::NotConfigured => "missing_falkordb_config",
424        GraphReadError::Unreachable { .. } => "falkordb_unreachable",
425        GraphReadError::QueryFailed { .. } => "falkordb_query_failed",
426        GraphReadError::InvalidTarget { .. } => "invalid_graph_target",
427    }
428}
429
430fn vector_error_kind(error: &VectorLifecycleError) -> &'static str {
431    match error {
432        VectorLifecycleError::MissingQdrantConfig => "missing_qdrant_config",
433        VectorLifecycleError::MissingEmbeddingConfig => "missing_embedding_config",
434        VectorLifecycleError::EmbeddingHttp { .. } => "embedding_http",
435        VectorLifecycleError::EmbeddingResponse(_) => "embedding_response",
436        VectorLifecycleError::QdrantHttp { .. } => "qdrant_http",
437        VectorLifecycleError::QdrantOperation(_) => "qdrant_operation",
438        VectorLifecycleError::InvalidCollectionName(_) => "invalid_collection_name",
439        VectorLifecycleError::DimensionMismatch { .. } => "dimension_mismatch",
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use super::*;
446    use std::path::PathBuf;
447
448    fn test_context() -> Context {
449        Context {
450            database_url: "postgresql://localhost/nonexistent".to_string(),
451            project_root: PathBuf::from("/nonexistent"),
452            project_id: "project-1".to_string(),
453            quiet: true,
454            falkordb: None,
455            qdrant: None,
456            embedding: None,
457            code_vectors: crate::config::CodeVectorSettings { vector_dim: None },
458            indexing: gobby_core::config::IndexingConfig::default(),
459            daemon_url: None,
460            index_scope: crate::config::ProjectIndexScope::Single,
461        }
462    }
463
464    #[test]
465    fn sync_state_continues_after_projection_errors() {
466        let files = vec![
467            "src/ok.rs".to_string(),
468            "src/fail.rs".to_string(),
469            "src/next.rs".to_string(),
470        ];
471        #[derive(Default)]
472        struct State {
473            synced: Vec<String>,
474        }
475        let mut state = State::default();
476
477        let report =
478            sync_files_with_state(&test_context(), &files, &mut state, |state, file_path| {
479                state.synced.push(file_path.to_string());
480                if file_path == "src/fail.rs" {
481                    anyhow::bail!("projection write failed");
482                }
483                Ok(ProjectionFileSyncOutcome::Synced { symbols: 3 })
484            });
485
486        assert_eq!(
487            state.synced,
488            vec!["src/ok.rs", "src/fail.rs", "src/next.rs"]
489        );
490        assert_eq!(report.status, ProjectionStatus::Degraded);
491        assert_eq!(report.synced_files, 2);
492        assert_eq!(report.synced_symbols, 6);
493        assert_eq!(report.skipped_files, 0);
494        assert_eq!(report.failed_files, 1);
495        assert!(report.degraded);
496        assert_eq!(
497            report.error.as_ref().map(|error| error.kind.as_str()),
498            Some("sync_failed")
499        );
500    }
501
502    #[test]
503    fn sync_state_treats_missing_indexed_file_as_non_degraded_skip() {
504        let files = vec!["src/missing.rs".to_string(), "src/ok.rs".to_string()];
505        #[derive(Default)]
506        struct State {
507            synced: Vec<String>,
508        }
509        let mut state = State::default();
510
511        let report =
512            sync_files_with_state(&test_context(), &files, &mut state, |state, file_path| {
513                state.synced.push(file_path.to_string());
514                if file_path == "src/missing.rs" {
515                    return Ok(ProjectionFileSyncOutcome::SkippedMissingIndexedFile);
516                }
517                Ok(ProjectionFileSyncOutcome::Synced { symbols: 2 })
518            });
519
520        assert_eq!(state.synced, vec!["src/missing.rs", "src/ok.rs"]);
521        assert_eq!(report.status, ProjectionStatus::Ok);
522        assert_eq!(report.synced_files, 1);
523        assert_eq!(report.synced_symbols, 2);
524        assert_eq!(report.skipped_files, 1);
525        assert_eq!(report.failed_files, 0);
526        assert!(!report.degraded);
527        assert!(report.error.is_none());
528    }
529}