Skip to main content

gobby_code/projection/
sync.rs

1use crate::config::Context;
2use crate::db;
3use crate::graph::code_graph::{self, GraphReadError};
4use crate::vector::code_symbols::{self, CodeSymbolVectorLifecycle, VectorLifecycleError};
5use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
8#[serde(rename_all = "snake_case")]
9pub enum ProjectionTarget {
10    Graph,
11    Vectors,
12}
13
14#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
15pub struct ProjectionSyncRequest {
16    pub project_id: String,
17    pub file_paths: Vec<String>,
18    pub targets: Vec<ProjectionTarget>,
19}
20
21#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
22pub struct ProjectionSyncStatus {
23    pub project_id: String,
24    pub file_paths: Vec<String>,
25    pub graph_pending: bool,
26    pub vectors_pending: bool,
27}
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
30#[serde(rename_all = "snake_case")]
31pub enum ProjectionStatus {
32    Ok,
33    Degraded,
34    Failed,
35}
36
37#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
38pub struct ProjectionSyncError {
39    pub kind: String,
40    pub message: String,
41}
42
43#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
44pub struct ProjectionSyncReport {
45    pub status: ProjectionStatus,
46    pub synced_files: usize,
47    pub synced_symbols: usize,
48    pub degraded: bool,
49    pub error: Option<ProjectionSyncError>,
50}
51
52impl ProjectionSyncReport {
53    pub fn ok(synced_files: usize, synced_symbols: usize) -> Self {
54        Self {
55            status: ProjectionStatus::Ok,
56            synced_files,
57            synced_symbols,
58            degraded: false,
59            error: None,
60        }
61    }
62
63    pub fn degraded(
64        kind: impl Into<String>,
65        message: impl Into<String>,
66        synced_files: usize,
67        synced_symbols: usize,
68    ) -> Self {
69        Self {
70            status: ProjectionStatus::Degraded,
71            synced_files,
72            synced_symbols,
73            degraded: true,
74            error: Some(ProjectionSyncError {
75                kind: kind.into(),
76                message: message.into(),
77            }),
78        }
79    }
80
81    fn degraded_from_error(
82        error: &anyhow::Error,
83        synced_files: usize,
84        synced_symbols: usize,
85    ) -> Self {
86        let typed = typed_projection_error(error);
87        Self {
88            status: ProjectionStatus::Degraded,
89            synced_files,
90            synced_symbols,
91            degraded: true,
92            error: Some(typed),
93        }
94    }
95}
96
97#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
98pub struct ProjectionSyncReports {
99    pub graph: ProjectionSyncReport,
100    pub vector: ProjectionSyncReport,
101}
102
103pub fn pending_after_code_fact_write(request: ProjectionSyncRequest) -> ProjectionSyncStatus {
104    ProjectionSyncStatus {
105        graph_pending: request.targets.contains(&ProjectionTarget::Graph),
106        vectors_pending: request.targets.contains(&ProjectionTarget::Vectors),
107        project_id: request.project_id,
108        file_paths: request.file_paths,
109    }
110}
111
112pub fn sync_after_index(
113    ctx: &Context,
114    file_paths: &[String],
115) -> anyhow::Result<ProjectionSyncReports> {
116    Ok(ProjectionSyncReports {
117        graph: sync_graph_files(ctx, file_paths)?,
118        vector: sync_vector_files(ctx, file_paths)?,
119    })
120}
121
122pub(crate) fn sync_files_with_state<S>(
123    file_paths: &[String],
124    state: &mut S,
125    mut sync_one: impl FnMut(&mut S, &str) -> anyhow::Result<usize>,
126    mut mark_synced: impl FnMut(&mut S, &str) -> anyhow::Result<()>,
127) -> ProjectionSyncReport {
128    let mut synced_files = 0usize;
129    let mut synced_symbols = 0usize;
130
131    for file_path in file_paths {
132        let symbols = match sync_one(state, file_path)
133            .and_then(|symbols| mark_synced(state, file_path).map(|()| symbols))
134        {
135            Ok(symbols) => symbols,
136            Err(error) => {
137                return ProjectionSyncReport::degraded_from_error(
138                    &error,
139                    synced_files,
140                    synced_symbols,
141                );
142            }
143        };
144        synced_files += 1;
145        synced_symbols += symbols;
146    }
147
148    ProjectionSyncReport::ok(synced_files, synced_symbols)
149}
150
151fn sync_graph_files(ctx: &Context, file_paths: &[String]) -> anyhow::Result<ProjectionSyncReport> {
152    if file_paths.is_empty() {
153        return Ok(ProjectionSyncReport::ok(0, 0));
154    }
155    if let Err(error) = code_graph::require_graph_reads(ctx) {
156        return Ok(ProjectionSyncReport::degraded_from_error(&error, 0, 0));
157    }
158
159    let conn = db::connect_readwrite(&ctx.database_url)?;
160    let mut state = GraphProjectionState { ctx, conn };
161    Ok(sync_files_with_state(
162        file_paths,
163        &mut state,
164        GraphProjectionState::sync_file,
165        GraphProjectionState::mark_synced,
166    ))
167}
168
169fn sync_vector_files(ctx: &Context, file_paths: &[String]) -> anyhow::Result<ProjectionSyncReport> {
170    if file_paths.is_empty() {
171        return Ok(ProjectionSyncReport::ok(0, 0));
172    }
173
174    let lifecycle = match vector_lifecycle_from_context(ctx) {
175        Ok(lifecycle) => lifecycle,
176        Err(error) => {
177            return Ok(ProjectionSyncReport::degraded(
178                vector_error_kind(&error),
179                error.to_string(),
180                0,
181                0,
182            ));
183        }
184    };
185    let conn = db::connect_readwrite(&ctx.database_url)?;
186    let mut state = VectorProjectionState {
187        ctx,
188        conn,
189        lifecycle,
190    };
191    Ok(sync_files_with_state(
192        file_paths,
193        &mut state,
194        VectorProjectionState::sync_file,
195        VectorProjectionState::mark_synced,
196    ))
197}
198
199struct GraphProjectionState<'a> {
200    ctx: &'a Context,
201    conn: postgres::Client,
202}
203
204impl GraphProjectionState<'_> {
205    fn sync_file(&mut self, file_path: &str) -> anyhow::Result<usize> {
206        let facts = db::read_graph_file_facts(&mut self.conn, &self.ctx.project_id, file_path)?;
207        if !db::mark_graph_sync_attempted(&mut self.conn, &self.ctx.project_id, file_path)? {
208            anyhow::bail!(
209                "indexed file `{file_path}` was not found for project {}",
210                self.ctx.project_id
211            );
212        }
213        code_graph::sync_file_graph(
214            self.ctx,
215            &facts.file_path,
216            &facts.imports,
217            &facts.definitions,
218            &facts.calls,
219        )?;
220        Ok(facts.definitions.len())
221    }
222
223    fn mark_synced(&mut self, file_path: &str) -> anyhow::Result<()> {
224        if db::mark_graph_synced(&mut self.conn, &self.ctx.project_id, file_path)? {
225            Ok(())
226        } else {
227            anyhow::bail!(
228                "indexed file `{file_path}` was not found for project {}",
229                self.ctx.project_id
230            )
231        }
232    }
233}
234
235struct VectorProjectionState<'a> {
236    ctx: &'a Context,
237    conn: postgres::Client,
238    lifecycle: CodeSymbolVectorLifecycle,
239}
240
241impl VectorProjectionState<'_> {
242    fn sync_file(&mut self, file_path: &str) -> anyhow::Result<usize> {
243        if !db::indexed_file_exists(&mut self.conn, &self.ctx.project_id, file_path)? {
244            anyhow::bail!(
245                "indexed file `{file_path}` was not found for project {}",
246                self.ctx.project_id
247            );
248        }
249        let symbols =
250            code_symbols::fetch_symbols_for_file(&mut self.conn, &self.ctx.project_id, file_path)?;
251        let symbol_count = symbols.len();
252        self.lifecycle.sync_file_symbols(file_path, &symbols)?;
253        Ok(symbol_count)
254    }
255
256    fn mark_synced(&mut self, file_path: &str) -> anyhow::Result<()> {
257        if db::mark_vectors_synced(&mut self.conn, &self.ctx.project_id, file_path)? {
258            Ok(())
259        } else {
260            anyhow::bail!(
261                "indexed file `{file_path}` was not found for project {}",
262                self.ctx.project_id
263            )
264        }
265    }
266}
267
268fn vector_lifecycle_from_context(
269    ctx: &Context,
270) -> Result<CodeSymbolVectorLifecycle, VectorLifecycleError> {
271    let qdrant = ctx
272        .qdrant
273        .clone()
274        .ok_or(VectorLifecycleError::MissingQdrantConfig)?;
275    let embedding = ctx
276        .embedding
277        .clone()
278        .ok_or(VectorLifecycleError::MissingEmbeddingConfig)?;
279    CodeSymbolVectorLifecycle::new(
280        ctx.project_id.clone(),
281        qdrant,
282        embedding,
283        ctx.code_vectors.clone(),
284    )
285}
286
287fn typed_projection_error(error: &anyhow::Error) -> ProjectionSyncError {
288    let kind = error
289        .downcast_ref::<VectorLifecycleError>()
290        .map(vector_error_kind)
291        .or_else(|| error.downcast_ref::<GraphReadError>().map(graph_error_kind))
292        .unwrap_or("sync_failed");
293    ProjectionSyncError {
294        kind: kind.to_string(),
295        message: error.to_string(),
296    }
297}
298
299fn graph_error_kind(error: &GraphReadError) -> &'static str {
300    match error {
301        GraphReadError::NotConfigured => "missing_falkordb_config",
302        GraphReadError::Unreachable { .. } => "falkordb_unreachable",
303        GraphReadError::QueryFailed { .. } => "falkordb_query_failed",
304        GraphReadError::InvalidTarget { .. } => "invalid_graph_target",
305    }
306}
307
308fn vector_error_kind(error: &VectorLifecycleError) -> &'static str {
309    match error {
310        VectorLifecycleError::MissingQdrantConfig => "missing_qdrant_config",
311        VectorLifecycleError::MissingEmbeddingConfig => "missing_embedding_config",
312        VectorLifecycleError::EmbeddingHttp { .. } => "embedding_http",
313        VectorLifecycleError::EmbeddingResponse(_) => "embedding_response",
314        VectorLifecycleError::QdrantHttp { .. } => "qdrant_http",
315        VectorLifecycleError::QdrantOperation(_) => "qdrant_operation",
316        VectorLifecycleError::DimensionMismatch { .. } => "dimension_mismatch",
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323
324    #[test]
325    fn sync_state_tracks_projection_success() {
326        let files = vec!["src/ok.rs".to_string(), "src/fail.rs".to_string()];
327        #[derive(Default)]
328        struct State {
329            synced: Vec<String>,
330            marked_synced: Vec<String>,
331        }
332        let mut state = State::default();
333
334        let report = sync_files_with_state(
335            &files,
336            &mut state,
337            |state, file_path| {
338                state.synced.push(file_path.to_string());
339                if file_path == "src/fail.rs" {
340                    anyhow::bail!("projection write failed");
341                }
342                Ok(3)
343            },
344            |state, file_path| {
345                state.marked_synced.push(file_path.to_string());
346                Ok(())
347            },
348        );
349
350        assert_eq!(state.synced, vec!["src/ok.rs", "src/fail.rs"]);
351        assert_eq!(state.marked_synced, vec!["src/ok.rs"]);
352        assert_eq!(report.status, ProjectionStatus::Degraded);
353        assert_eq!(report.synced_files, 1);
354        assert_eq!(report.synced_symbols, 3);
355        assert!(report.degraded);
356        assert_eq!(
357            report.error.as_ref().map(|error| error.kind.as_str()),
358            Some("sync_failed")
359        );
360    }
361}