1use crate::config::Context;
2use crate::db;
3use crate::graph::code_graph::{self, GraphReadError};
4use crate::vector::code_symbols::{self, CodeSymbolVectorLifecycle, VectorLifecycleError};
5use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
8#[serde(rename_all = "snake_case")]
9pub enum ProjectionTarget {
10 Graph,
11 Vectors,
12}
13
14#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
15pub struct ProjectionSyncRequest {
16 pub project_id: String,
17 pub file_paths: Vec<String>,
18 pub targets: Vec<ProjectionTarget>,
19}
20
21#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
22pub struct ProjectionSyncStatus {
23 pub project_id: String,
24 pub file_paths: Vec<String>,
25 pub graph_pending: bool,
26 pub vectors_pending: bool,
27}
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
30#[serde(rename_all = "snake_case")]
31pub enum ProjectionStatus {
32 Ok,
33 Degraded,
34 Failed,
35}
36
37#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
38pub struct ProjectionSyncError {
39 pub kind: String,
40 pub message: String,
41}
42
43#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
44pub struct ProjectionSyncReport {
45 pub status: ProjectionStatus,
46 pub synced_files: usize,
47 pub synced_symbols: usize,
48 pub degraded: bool,
49 pub error: Option<ProjectionSyncError>,
50}
51
52impl ProjectionSyncReport {
53 pub fn ok(synced_files: usize, synced_symbols: usize) -> Self {
54 Self {
55 status: ProjectionStatus::Ok,
56 synced_files,
57 synced_symbols,
58 degraded: false,
59 error: None,
60 }
61 }
62
63 pub fn degraded(
64 kind: impl Into<String>,
65 message: impl Into<String>,
66 synced_files: usize,
67 synced_symbols: usize,
68 ) -> Self {
69 Self {
70 status: ProjectionStatus::Degraded,
71 synced_files,
72 synced_symbols,
73 degraded: true,
74 error: Some(ProjectionSyncError {
75 kind: kind.into(),
76 message: message.into(),
77 }),
78 }
79 }
80
81 fn degraded_from_error(
82 error: &anyhow::Error,
83 synced_files: usize,
84 synced_symbols: usize,
85 ) -> Self {
86 let typed = typed_projection_error(error);
87 Self {
88 status: ProjectionStatus::Degraded,
89 synced_files,
90 synced_symbols,
91 degraded: true,
92 error: Some(typed),
93 }
94 }
95}
96
97#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
98pub struct ProjectionSyncReports {
99 pub graph: ProjectionSyncReport,
100 pub vector: ProjectionSyncReport,
101}
102
103pub fn pending_after_code_fact_write(request: ProjectionSyncRequest) -> ProjectionSyncStatus {
104 ProjectionSyncStatus {
105 graph_pending: request.targets.contains(&ProjectionTarget::Graph),
106 vectors_pending: request.targets.contains(&ProjectionTarget::Vectors),
107 project_id: request.project_id,
108 file_paths: request.file_paths,
109 }
110}
111
112pub fn sync_after_index(
113 ctx: &Context,
114 file_paths: &[String],
115) -> anyhow::Result<ProjectionSyncReports> {
116 Ok(ProjectionSyncReports {
117 graph: sync_graph_files(ctx, file_paths)?,
118 vector: sync_vector_files(ctx, file_paths)?,
119 })
120}
121
122pub(crate) fn sync_files_with_state<S>(
123 file_paths: &[String],
124 state: &mut S,
125 mut sync_one: impl FnMut(&mut S, &str) -> anyhow::Result<usize>,
126 mut mark_synced: impl FnMut(&mut S, &str) -> anyhow::Result<()>,
127) -> ProjectionSyncReport {
128 let mut synced_files = 0usize;
129 let mut synced_symbols = 0usize;
130
131 for file_path in file_paths {
132 let symbols = match sync_one(state, file_path)
133 .and_then(|symbols| mark_synced(state, file_path).map(|()| symbols))
134 {
135 Ok(symbols) => symbols,
136 Err(error) => {
137 return ProjectionSyncReport::degraded_from_error(
138 &error,
139 synced_files,
140 synced_symbols,
141 );
142 }
143 };
144 synced_files += 1;
145 synced_symbols += symbols;
146 }
147
148 ProjectionSyncReport::ok(synced_files, synced_symbols)
149}
150
151fn sync_graph_files(ctx: &Context, file_paths: &[String]) -> anyhow::Result<ProjectionSyncReport> {
152 if file_paths.is_empty() {
153 return Ok(ProjectionSyncReport::ok(0, 0));
154 }
155 if let Err(error) = code_graph::require_graph_reads(ctx) {
156 return Ok(ProjectionSyncReport::degraded_from_error(&error, 0, 0));
157 }
158
159 let conn = db::connect_readwrite(&ctx.database_url)?;
160 let mut state = GraphProjectionState { ctx, conn };
161 Ok(sync_files_with_state(
162 file_paths,
163 &mut state,
164 GraphProjectionState::sync_file,
165 GraphProjectionState::mark_synced,
166 ))
167}
168
169fn sync_vector_files(ctx: &Context, file_paths: &[String]) -> anyhow::Result<ProjectionSyncReport> {
170 if file_paths.is_empty() {
171 return Ok(ProjectionSyncReport::ok(0, 0));
172 }
173
174 let lifecycle = match vector_lifecycle_from_context(ctx) {
175 Ok(lifecycle) => lifecycle,
176 Err(error) => {
177 return Ok(ProjectionSyncReport::degraded(
178 vector_error_kind(&error),
179 error.to_string(),
180 0,
181 0,
182 ));
183 }
184 };
185 let conn = db::connect_readwrite(&ctx.database_url)?;
186 let mut state = VectorProjectionState {
187 ctx,
188 conn,
189 lifecycle,
190 };
191 Ok(sync_files_with_state(
192 file_paths,
193 &mut state,
194 VectorProjectionState::sync_file,
195 VectorProjectionState::mark_synced,
196 ))
197}
198
199struct GraphProjectionState<'a> {
200 ctx: &'a Context,
201 conn: postgres::Client,
202}
203
204impl GraphProjectionState<'_> {
205 fn sync_file(&mut self, file_path: &str) -> anyhow::Result<usize> {
206 let facts = db::read_graph_file_facts(&mut self.conn, &self.ctx.project_id, file_path)?;
207 if !db::mark_graph_sync_attempted(&mut self.conn, &self.ctx.project_id, file_path)? {
208 anyhow::bail!(
209 "indexed file `{file_path}` was not found for project {}",
210 self.ctx.project_id
211 );
212 }
213 code_graph::sync_file_graph(
214 self.ctx,
215 &facts.file_path,
216 &facts.imports,
217 &facts.definitions,
218 &facts.calls,
219 )?;
220 Ok(facts.definitions.len())
221 }
222
223 fn mark_synced(&mut self, file_path: &str) -> anyhow::Result<()> {
224 if db::mark_graph_synced(&mut self.conn, &self.ctx.project_id, file_path)? {
225 Ok(())
226 } else {
227 anyhow::bail!(
228 "indexed file `{file_path}` was not found for project {}",
229 self.ctx.project_id
230 )
231 }
232 }
233}
234
235struct VectorProjectionState<'a> {
236 ctx: &'a Context,
237 conn: postgres::Client,
238 lifecycle: CodeSymbolVectorLifecycle,
239}
240
241impl VectorProjectionState<'_> {
242 fn sync_file(&mut self, file_path: &str) -> anyhow::Result<usize> {
243 if !db::indexed_file_exists(&mut self.conn, &self.ctx.project_id, file_path)? {
244 anyhow::bail!(
245 "indexed file `{file_path}` was not found for project {}",
246 self.ctx.project_id
247 );
248 }
249 let symbols =
250 code_symbols::fetch_symbols_for_file(&mut self.conn, &self.ctx.project_id, file_path)?;
251 let symbol_count = symbols.len();
252 self.lifecycle.sync_file_symbols(file_path, &symbols)?;
253 Ok(symbol_count)
254 }
255
256 fn mark_synced(&mut self, file_path: &str) -> anyhow::Result<()> {
257 if db::mark_vectors_synced(&mut self.conn, &self.ctx.project_id, file_path)? {
258 Ok(())
259 } else {
260 anyhow::bail!(
261 "indexed file `{file_path}` was not found for project {}",
262 self.ctx.project_id
263 )
264 }
265 }
266}
267
268fn vector_lifecycle_from_context(
269 ctx: &Context,
270) -> Result<CodeSymbolVectorLifecycle, VectorLifecycleError> {
271 let qdrant = ctx
272 .qdrant
273 .clone()
274 .ok_or(VectorLifecycleError::MissingQdrantConfig)?;
275 let embedding = ctx
276 .embedding
277 .clone()
278 .ok_or(VectorLifecycleError::MissingEmbeddingConfig)?;
279 CodeSymbolVectorLifecycle::new(
280 ctx.project_id.clone(),
281 qdrant,
282 embedding,
283 ctx.code_vectors.clone(),
284 )
285}
286
287fn typed_projection_error(error: &anyhow::Error) -> ProjectionSyncError {
288 let kind = error
289 .downcast_ref::<VectorLifecycleError>()
290 .map(vector_error_kind)
291 .or_else(|| error.downcast_ref::<GraphReadError>().map(graph_error_kind))
292 .unwrap_or("sync_failed");
293 ProjectionSyncError {
294 kind: kind.to_string(),
295 message: error.to_string(),
296 }
297}
298
299fn graph_error_kind(error: &GraphReadError) -> &'static str {
300 match error {
301 GraphReadError::NotConfigured => "missing_falkordb_config",
302 GraphReadError::Unreachable { .. } => "falkordb_unreachable",
303 GraphReadError::QueryFailed { .. } => "falkordb_query_failed",
304 GraphReadError::InvalidTarget { .. } => "invalid_graph_target",
305 }
306}
307
308fn vector_error_kind(error: &VectorLifecycleError) -> &'static str {
309 match error {
310 VectorLifecycleError::MissingQdrantConfig => "missing_qdrant_config",
311 VectorLifecycleError::MissingEmbeddingConfig => "missing_embedding_config",
312 VectorLifecycleError::EmbeddingHttp { .. } => "embedding_http",
313 VectorLifecycleError::EmbeddingResponse(_) => "embedding_response",
314 VectorLifecycleError::QdrantHttp { .. } => "qdrant_http",
315 VectorLifecycleError::QdrantOperation(_) => "qdrant_operation",
316 VectorLifecycleError::DimensionMismatch { .. } => "dimension_mismatch",
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use super::*;
323
324 #[test]
325 fn sync_state_tracks_projection_success() {
326 let files = vec!["src/ok.rs".to_string(), "src/fail.rs".to_string()];
327 #[derive(Default)]
328 struct State {
329 synced: Vec<String>,
330 marked_synced: Vec<String>,
331 }
332 let mut state = State::default();
333
334 let report = sync_files_with_state(
335 &files,
336 &mut state,
337 |state, file_path| {
338 state.synced.push(file_path.to_string());
339 if file_path == "src/fail.rs" {
340 anyhow::bail!("projection write failed");
341 }
342 Ok(3)
343 },
344 |state, file_path| {
345 state.marked_synced.push(file_path.to_string());
346 Ok(())
347 },
348 );
349
350 assert_eq!(state.synced, vec!["src/ok.rs", "src/fail.rs"]);
351 assert_eq!(state.marked_synced, vec!["src/ok.rs"]);
352 assert_eq!(report.status, ProjectionStatus::Degraded);
353 assert_eq!(report.synced_files, 1);
354 assert_eq!(report.synced_symbols, 3);
355 assert!(report.degraded);
356 assert_eq!(
357 report.error.as_ref().map(|error| error.kind.as_str()),
358 Some("sync_failed")
359 );
360 }
361}