Skip to main content

gobby_code/projection/
sync.rs

1use crate::config::Context;
2use crate::db;
3use crate::graph::code_graph::{self, GraphReadError};
4use crate::vector::code_symbols::{
5    self, CodeSymbolVectorLifecycle, VectorLifecycleError, embedding_source_from_context,
6};
7use serde::{Deserialize, Serialize};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
10#[serde(rename_all = "snake_case")]
11pub enum ProjectionTarget {
12    Graph,
13    Vectors,
14}
15
16#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
17pub struct ProjectionSyncRequest {
18    pub project_id: String,
19    pub file_paths: Vec<String>,
20    pub targets: Vec<ProjectionTarget>,
21}
22
23#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
24pub struct ProjectionSyncStatus {
25    pub project_id: String,
26    pub file_paths: Vec<String>,
27    pub graph_pending: bool,
28    pub vectors_pending: bool,
29}
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
32#[serde(rename_all = "snake_case")]
33pub enum ProjectionStatus {
34    Ok,
35    Degraded,
36    Failed,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40pub struct ProjectionSyncError {
41    pub kind: String,
42    pub message: String,
43}
44
45#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
46pub struct ProjectionSyncReport {
47    pub status: ProjectionStatus,
48    pub synced_files: usize,
49    pub synced_symbols: usize,
50    pub degraded: bool,
51    pub error: Option<ProjectionSyncError>,
52}
53
54impl ProjectionSyncReport {
55    pub fn ok(synced_files: usize, synced_symbols: usize) -> Self {
56        Self {
57            status: ProjectionStatus::Ok,
58            synced_files,
59            synced_symbols,
60            degraded: false,
61            error: None,
62        }
63    }
64
65    pub fn degraded(
66        kind: impl Into<String>,
67        message: impl Into<String>,
68        synced_files: usize,
69        synced_symbols: usize,
70    ) -> Self {
71        Self {
72            status: ProjectionStatus::Degraded,
73            synced_files,
74            synced_symbols,
75            degraded: true,
76            error: Some(ProjectionSyncError {
77                kind: kind.into(),
78                message: message.into(),
79            }),
80        }
81    }
82
83    fn degraded_from_error(
84        error: &anyhow::Error,
85        synced_files: usize,
86        synced_symbols: usize,
87    ) -> Self {
88        let typed = typed_projection_error(error);
89        Self {
90            status: ProjectionStatus::Degraded,
91            synced_files,
92            synced_symbols,
93            degraded: true,
94            error: Some(typed),
95        }
96    }
97}
98
99#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
100pub struct ProjectionSyncReports {
101    pub graph: ProjectionSyncReport,
102    pub vector: ProjectionSyncReport,
103}
104
105pub fn pending_after_code_fact_write(request: ProjectionSyncRequest) -> ProjectionSyncStatus {
106    ProjectionSyncStatus {
107        graph_pending: request.targets.contains(&ProjectionTarget::Graph),
108        vectors_pending: request.targets.contains(&ProjectionTarget::Vectors),
109        project_id: request.project_id,
110        file_paths: request.file_paths,
111    }
112}
113
114pub fn sync_after_index(
115    ctx: &Context,
116    file_paths: &[String],
117) -> anyhow::Result<ProjectionSyncReports> {
118    Ok(ProjectionSyncReports {
119        graph: sync_graph_files(ctx, file_paths)?,
120        vector: sync_vector_files(ctx, file_paths)?,
121    })
122}
123
124pub(crate) fn sync_files_with_state<S>(
125    file_paths: &[String],
126    state: &mut S,
127    mut sync_one: impl FnMut(&mut S, &str) -> anyhow::Result<usize>,
128    mut mark_synced: impl FnMut(&mut S, &str) -> anyhow::Result<()>,
129) -> ProjectionSyncReport {
130    let mut synced_files = 0usize;
131    let mut synced_symbols = 0usize;
132
133    for file_path in file_paths {
134        let symbols = match sync_one(state, file_path)
135            .and_then(|symbols| mark_synced(state, file_path).map(|()| symbols))
136        {
137            Ok(symbols) => symbols,
138            Err(error) => {
139                return ProjectionSyncReport::degraded_from_error(
140                    &error,
141                    synced_files,
142                    synced_symbols,
143                );
144            }
145        };
146        synced_files += 1;
147        synced_symbols += symbols;
148    }
149
150    ProjectionSyncReport::ok(synced_files, synced_symbols)
151}
152
153fn sync_graph_files(ctx: &Context, file_paths: &[String]) -> anyhow::Result<ProjectionSyncReport> {
154    if file_paths.is_empty() {
155        return Ok(ProjectionSyncReport::ok(0, 0));
156    }
157    if let Err(error) = code_graph::require_graph_reads(ctx) {
158        return Ok(ProjectionSyncReport::degraded_from_error(&error, 0, 0));
159    }
160
161    let mut conn = db::connect_readwrite(&ctx.database_url)?;
162    let report = match code_graph::with_code_graph(ctx, |graph| {
163        let mut synced_files = 0usize;
164        let mut synced_symbols = 0usize;
165
166        for file_path in file_paths {
167            let symbols = match sync_graph_file(ctx, &mut conn, graph, file_path) {
168                Ok(symbols) => symbols,
169                Err(error) => {
170                    return Ok(ProjectionSyncReport::degraded_from_error(
171                        &error,
172                        synced_files,
173                        synced_symbols,
174                    ));
175                }
176            };
177            synced_files += 1;
178            synced_symbols += symbols;
179        }
180
181        Ok(ProjectionSyncReport::ok(synced_files, synced_symbols))
182    }) {
183        Ok(report) => report,
184        Err(error)
185            if matches!(
186                error.downcast_ref::<GraphReadError>(),
187                Some(GraphReadError::Unreachable { .. })
188            ) =>
189        {
190            return Ok(ProjectionSyncReport::degraded_from_error(&error, 0, 0));
191        }
192        Err(error) => return Err(error),
193    };
194    if report.synced_files > 0
195        && report.error.is_none()
196        && let Err(error) = code_graph::cleanup_orphans(ctx)
197    {
198        return Ok(ProjectionSyncReport::degraded_from_error(
199            &error,
200            report.synced_files,
201            report.synced_symbols,
202        ));
203    }
204    Ok(report)
205}
206
207fn sync_vector_files(ctx: &Context, file_paths: &[String]) -> anyhow::Result<ProjectionSyncReport> {
208    if file_paths.is_empty() {
209        return Ok(ProjectionSyncReport::ok(0, 0));
210    }
211
212    let lifecycle = match vector_lifecycle_from_context(ctx) {
213        Ok(lifecycle) => lifecycle,
214        Err(error) => {
215            return Ok(ProjectionSyncReport::degraded(
216                vector_error_kind(&error),
217                error.to_string(),
218                0,
219                0,
220            ));
221        }
222    };
223    let conn = db::connect_readwrite(&ctx.database_url)?;
224    let mut state = VectorProjectionState {
225        ctx,
226        conn,
227        lifecycle,
228    };
229    Ok(sync_files_with_state(
230        file_paths,
231        &mut state,
232        VectorProjectionState::sync_file,
233        VectorProjectionState::mark_synced,
234    ))
235}
236
237fn sync_graph_file(
238    ctx: &Context,
239    conn: &mut postgres::Client,
240    graph: &mut code_graph::CodeGraph<'_>,
241    file_path: &str,
242) -> anyhow::Result<usize> {
243    let facts = db::read_graph_file_facts(conn, &ctx.project_id, file_path)?;
244    if !db::mark_graph_sync_attempted(conn, &ctx.project_id, file_path)? {
245        anyhow::bail!(
246            "indexed file `{file_path}` was not found for project {}",
247            ctx.project_id
248        );
249    }
250    graph.sync_file(
251        &facts.file_path,
252        &facts.imports,
253        &facts.definitions,
254        &facts.calls,
255        false,
256    )?;
257    if !db::mark_graph_synced(conn, &ctx.project_id, file_path)? {
258        anyhow::bail!(
259            "indexed file `{file_path}` was not found for project {}",
260            ctx.project_id
261        );
262    }
263    Ok(facts.definitions.len())
264}
265
266struct VectorProjectionState<'a> {
267    ctx: &'a Context,
268    conn: postgres::Client,
269    lifecycle: CodeSymbolVectorLifecycle,
270}
271
272impl VectorProjectionState<'_> {
273    fn sync_file(&mut self, file_path: &str) -> anyhow::Result<usize> {
274        if !db::indexed_file_exists(&mut self.conn, &self.ctx.project_id, file_path)? {
275            anyhow::bail!(
276                "indexed file `{file_path}` was not found for project {}",
277                self.ctx.project_id
278            );
279        }
280        let symbols =
281            code_symbols::fetch_symbols_for_file(&mut self.conn, &self.ctx.project_id, file_path)?;
282        let symbol_count = symbols.len();
283        self.lifecycle.sync_file_symbols(file_path, &symbols)?;
284        Ok(symbol_count)
285    }
286
287    fn mark_synced(&mut self, file_path: &str) -> anyhow::Result<()> {
288        if db::mark_vectors_synced(&mut self.conn, &self.ctx.project_id, file_path)? {
289            Ok(())
290        } else {
291            anyhow::bail!(
292                "indexed file `{file_path}` was not found for project {}",
293                self.ctx.project_id
294            )
295        }
296    }
297}
298
299fn vector_lifecycle_from_context(
300    ctx: &Context,
301) -> Result<CodeSymbolVectorLifecycle, VectorLifecycleError> {
302    let qdrant = ctx
303        .qdrant
304        .clone()
305        .ok_or(VectorLifecycleError::MissingQdrantConfig)?;
306    let embedding =
307        embedding_source_from_context(ctx).ok_or(VectorLifecycleError::MissingEmbeddingConfig)?;
308    CodeSymbolVectorLifecycle::new(
309        ctx.project_id.clone(),
310        qdrant,
311        embedding,
312        ctx.code_vectors.clone(),
313    )
314}
315
316fn typed_projection_error(error: &anyhow::Error) -> ProjectionSyncError {
317    let kind = error
318        .downcast_ref::<VectorLifecycleError>()
319        .map(vector_error_kind)
320        .or_else(|| error.downcast_ref::<GraphReadError>().map(graph_error_kind))
321        .unwrap_or("sync_failed");
322    ProjectionSyncError {
323        kind: kind.to_string(),
324        message: error.to_string(),
325    }
326}
327
328fn graph_error_kind(error: &GraphReadError) -> &'static str {
329    match error {
330        GraphReadError::NotConfigured => "missing_falkordb_config",
331        GraphReadError::Unreachable { .. } => "falkordb_unreachable",
332        GraphReadError::QueryFailed { .. } => "falkordb_query_failed",
333        GraphReadError::InvalidTarget { .. } => "invalid_graph_target",
334    }
335}
336
337fn vector_error_kind(error: &VectorLifecycleError) -> &'static str {
338    match error {
339        VectorLifecycleError::MissingQdrantConfig => "missing_qdrant_config",
340        VectorLifecycleError::MissingEmbeddingConfig => "missing_embedding_config",
341        VectorLifecycleError::EmbeddingHttp { .. } => "embedding_http",
342        VectorLifecycleError::EmbeddingResponse(_) => "embedding_response",
343        VectorLifecycleError::QdrantHttp { .. } => "qdrant_http",
344        VectorLifecycleError::QdrantOperation(_) => "qdrant_operation",
345        VectorLifecycleError::DimensionMismatch { .. } => "dimension_mismatch",
346    }
347}
348
349#[cfg(test)]
350mod tests {
351    use super::*;
352
353    #[test]
354    fn sync_state_tracks_projection_success() {
355        let files = vec!["src/ok.rs".to_string(), "src/fail.rs".to_string()];
356        #[derive(Default)]
357        struct State {
358            synced: Vec<String>,
359            marked_synced: Vec<String>,
360        }
361        let mut state = State::default();
362
363        let report = sync_files_with_state(
364            &files,
365            &mut state,
366            |state, file_path| {
367                state.synced.push(file_path.to_string());
368                if file_path == "src/fail.rs" {
369                    anyhow::bail!("projection write failed");
370                }
371                Ok(3)
372            },
373            |state, file_path| {
374                state.marked_synced.push(file_path.to_string());
375                Ok(())
376            },
377        );
378
379        assert_eq!(state.synced, vec!["src/ok.rs", "src/fail.rs"]);
380        assert_eq!(state.marked_synced, vec!["src/ok.rs"]);
381        assert_eq!(report.status, ProjectionStatus::Degraded);
382        assert_eq!(report.synced_files, 1);
383        assert_eq!(report.synced_symbols, 3);
384        assert!(report.degraded);
385        assert_eq!(
386            report.error.as_ref().map(|error| error.kind.as_str()),
387            Some("sync_failed")
388        );
389    }
390}