1use crate::config::Context;
2use crate::db;
3use crate::graph::code_graph::{self, GraphReadError};
4use crate::vector::code_symbols::{
5 self, CodeSymbolVectorLifecycle, VectorLifecycleError, embedding_source_from_context,
6};
7use serde::{Deserialize, Serialize};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
10#[serde(rename_all = "snake_case")]
11pub enum ProjectionTarget {
12 Graph,
13 Vectors,
14}
15
16#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
17pub struct ProjectionSyncRequest {
18 pub project_id: String,
19 pub file_paths: Vec<String>,
20 pub targets: Vec<ProjectionTarget>,
21}
22
23#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
24pub struct ProjectionSyncStatus {
25 pub project_id: String,
26 pub file_paths: Vec<String>,
27 pub graph_pending: bool,
28 pub vectors_pending: bool,
29}
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
32#[serde(rename_all = "snake_case")]
33pub enum ProjectionStatus {
34 Ok,
35 Degraded,
36 Failed,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40pub struct ProjectionSyncError {
41 pub kind: String,
42 pub message: String,
43}
44
45#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
46pub struct ProjectionSyncReport {
47 pub status: ProjectionStatus,
48 pub synced_files: usize,
49 pub synced_symbols: usize,
50 pub degraded: bool,
51 pub error: Option<ProjectionSyncError>,
52}
53
54impl ProjectionSyncReport {
55 pub fn ok(synced_files: usize, synced_symbols: usize) -> Self {
56 Self {
57 status: ProjectionStatus::Ok,
58 synced_files,
59 synced_symbols,
60 degraded: false,
61 error: None,
62 }
63 }
64
65 pub fn degraded(
66 kind: impl Into<String>,
67 message: impl Into<String>,
68 synced_files: usize,
69 synced_symbols: usize,
70 ) -> Self {
71 Self {
72 status: ProjectionStatus::Degraded,
73 synced_files,
74 synced_symbols,
75 degraded: true,
76 error: Some(ProjectionSyncError {
77 kind: kind.into(),
78 message: message.into(),
79 }),
80 }
81 }
82
83 fn degraded_from_error(
84 error: &anyhow::Error,
85 synced_files: usize,
86 synced_symbols: usize,
87 ) -> Self {
88 let typed = typed_projection_error(error);
89 Self {
90 status: ProjectionStatus::Degraded,
91 synced_files,
92 synced_symbols,
93 degraded: true,
94 error: Some(typed),
95 }
96 }
97}
98
99#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
100pub struct ProjectionSyncReports {
101 pub graph: ProjectionSyncReport,
102 pub vector: ProjectionSyncReport,
103}
104
105pub fn pending_after_code_fact_write(request: ProjectionSyncRequest) -> ProjectionSyncStatus {
106 ProjectionSyncStatus {
107 graph_pending: request.targets.contains(&ProjectionTarget::Graph),
108 vectors_pending: request.targets.contains(&ProjectionTarget::Vectors),
109 project_id: request.project_id,
110 file_paths: request.file_paths,
111 }
112}
113
114pub fn sync_after_index(
115 ctx: &Context,
116 file_paths: &[String],
117) -> anyhow::Result<ProjectionSyncReports> {
118 Ok(ProjectionSyncReports {
119 graph: sync_graph_files(ctx, file_paths)?,
120 vector: sync_vector_files(ctx, file_paths)?,
121 })
122}
123
124pub(crate) fn sync_files_with_state<S>(
125 file_paths: &[String],
126 state: &mut S,
127 mut sync_one: impl FnMut(&mut S, &str) -> anyhow::Result<usize>,
128 mut mark_synced: impl FnMut(&mut S, &str) -> anyhow::Result<()>,
129) -> ProjectionSyncReport {
130 let mut synced_files = 0usize;
131 let mut synced_symbols = 0usize;
132
133 for file_path in file_paths {
134 let symbols = match sync_one(state, file_path)
135 .and_then(|symbols| mark_synced(state, file_path).map(|()| symbols))
136 {
137 Ok(symbols) => symbols,
138 Err(error) => {
139 return ProjectionSyncReport::degraded_from_error(
140 &error,
141 synced_files,
142 synced_symbols,
143 );
144 }
145 };
146 synced_files += 1;
147 synced_symbols += symbols;
148 }
149
150 ProjectionSyncReport::ok(synced_files, synced_symbols)
151}
152
153fn sync_graph_files(ctx: &Context, file_paths: &[String]) -> anyhow::Result<ProjectionSyncReport> {
154 if file_paths.is_empty() {
155 return Ok(ProjectionSyncReport::ok(0, 0));
156 }
157 if let Err(error) = code_graph::require_graph_reads(ctx) {
158 return Ok(ProjectionSyncReport::degraded_from_error(&error, 0, 0));
159 }
160
161 let mut conn = db::connect_readwrite(&ctx.database_url)?;
162 let report = match code_graph::with_code_graph(ctx, |graph| {
163 let mut synced_files = 0usize;
164 let mut synced_symbols = 0usize;
165
166 for file_path in file_paths {
167 let symbols = match sync_graph_file(ctx, &mut conn, graph, file_path) {
168 Ok(symbols) => symbols,
169 Err(error) => {
170 return Ok(ProjectionSyncReport::degraded_from_error(
171 &error,
172 synced_files,
173 synced_symbols,
174 ));
175 }
176 };
177 synced_files += 1;
178 synced_symbols += symbols;
179 }
180
181 Ok(ProjectionSyncReport::ok(synced_files, synced_symbols))
182 }) {
183 Ok(report) => report,
184 Err(error)
185 if matches!(
186 error.downcast_ref::<GraphReadError>(),
187 Some(GraphReadError::Unreachable { .. })
188 ) =>
189 {
190 return Ok(ProjectionSyncReport::degraded_from_error(&error, 0, 0));
191 }
192 Err(error) => return Err(error),
193 };
194 if report.synced_files > 0
195 && report.error.is_none()
196 && let Err(error) = code_graph::cleanup_orphans(ctx)
197 {
198 return Ok(ProjectionSyncReport::degraded_from_error(
199 &error,
200 report.synced_files,
201 report.synced_symbols,
202 ));
203 }
204 Ok(report)
205}
206
207fn sync_vector_files(ctx: &Context, file_paths: &[String]) -> anyhow::Result<ProjectionSyncReport> {
208 if file_paths.is_empty() {
209 return Ok(ProjectionSyncReport::ok(0, 0));
210 }
211
212 let lifecycle = match vector_lifecycle_from_context(ctx) {
213 Ok(lifecycle) => lifecycle,
214 Err(error) => {
215 return Ok(ProjectionSyncReport::degraded(
216 vector_error_kind(&error),
217 error.to_string(),
218 0,
219 0,
220 ));
221 }
222 };
223 let conn = db::connect_readwrite(&ctx.database_url)?;
224 let mut state = VectorProjectionState {
225 ctx,
226 conn,
227 lifecycle,
228 };
229 Ok(sync_files_with_state(
230 file_paths,
231 &mut state,
232 VectorProjectionState::sync_file,
233 VectorProjectionState::mark_synced,
234 ))
235}
236
237fn sync_graph_file(
238 ctx: &Context,
239 conn: &mut postgres::Client,
240 graph: &mut code_graph::CodeGraph<'_>,
241 file_path: &str,
242) -> anyhow::Result<usize> {
243 let facts = db::read_graph_file_facts(conn, &ctx.project_id, file_path)?;
244 if !db::mark_graph_sync_attempted(conn, &ctx.project_id, file_path)? {
245 anyhow::bail!(
246 "indexed file `{file_path}` was not found for project {}",
247 ctx.project_id
248 );
249 }
250 graph.sync_file(
251 &facts.file_path,
252 &facts.imports,
253 &facts.definitions,
254 &facts.calls,
255 false,
256 )?;
257 if !db::mark_graph_synced(conn, &ctx.project_id, file_path)? {
258 anyhow::bail!(
259 "indexed file `{file_path}` was not found for project {}",
260 ctx.project_id
261 );
262 }
263 Ok(facts.definitions.len())
264}
265
266struct VectorProjectionState<'a> {
267 ctx: &'a Context,
268 conn: postgres::Client,
269 lifecycle: CodeSymbolVectorLifecycle,
270}
271
272impl VectorProjectionState<'_> {
273 fn sync_file(&mut self, file_path: &str) -> anyhow::Result<usize> {
274 if !db::indexed_file_exists(&mut self.conn, &self.ctx.project_id, file_path)? {
275 anyhow::bail!(
276 "indexed file `{file_path}` was not found for project {}",
277 self.ctx.project_id
278 );
279 }
280 let symbols =
281 code_symbols::fetch_symbols_for_file(&mut self.conn, &self.ctx.project_id, file_path)?;
282 let symbol_count = symbols.len();
283 self.lifecycle.sync_file_symbols(file_path, &symbols)?;
284 Ok(symbol_count)
285 }
286
287 fn mark_synced(&mut self, file_path: &str) -> anyhow::Result<()> {
288 if db::mark_vectors_synced(&mut self.conn, &self.ctx.project_id, file_path)? {
289 Ok(())
290 } else {
291 anyhow::bail!(
292 "indexed file `{file_path}` was not found for project {}",
293 self.ctx.project_id
294 )
295 }
296 }
297}
298
299fn vector_lifecycle_from_context(
300 ctx: &Context,
301) -> Result<CodeSymbolVectorLifecycle, VectorLifecycleError> {
302 let qdrant = ctx
303 .qdrant
304 .clone()
305 .ok_or(VectorLifecycleError::MissingQdrantConfig)?;
306 let embedding =
307 embedding_source_from_context(ctx).ok_or(VectorLifecycleError::MissingEmbeddingConfig)?;
308 CodeSymbolVectorLifecycle::new(
309 ctx.project_id.clone(),
310 qdrant,
311 embedding,
312 ctx.code_vectors.clone(),
313 )
314}
315
316fn typed_projection_error(error: &anyhow::Error) -> ProjectionSyncError {
317 let kind = error
318 .downcast_ref::<VectorLifecycleError>()
319 .map(vector_error_kind)
320 .or_else(|| error.downcast_ref::<GraphReadError>().map(graph_error_kind))
321 .unwrap_or("sync_failed");
322 ProjectionSyncError {
323 kind: kind.to_string(),
324 message: error.to_string(),
325 }
326}
327
328fn graph_error_kind(error: &GraphReadError) -> &'static str {
329 match error {
330 GraphReadError::NotConfigured => "missing_falkordb_config",
331 GraphReadError::Unreachable { .. } => "falkordb_unreachable",
332 GraphReadError::QueryFailed { .. } => "falkordb_query_failed",
333 GraphReadError::InvalidTarget { .. } => "invalid_graph_target",
334 }
335}
336
337fn vector_error_kind(error: &VectorLifecycleError) -> &'static str {
338 match error {
339 VectorLifecycleError::MissingQdrantConfig => "missing_qdrant_config",
340 VectorLifecycleError::MissingEmbeddingConfig => "missing_embedding_config",
341 VectorLifecycleError::EmbeddingHttp { .. } => "embedding_http",
342 VectorLifecycleError::EmbeddingResponse(_) => "embedding_response",
343 VectorLifecycleError::QdrantHttp { .. } => "qdrant_http",
344 VectorLifecycleError::QdrantOperation(_) => "qdrant_operation",
345 VectorLifecycleError::DimensionMismatch { .. } => "dimension_mismatch",
346 }
347}
348
349#[cfg(test)]
350mod tests {
351 use super::*;
352
353 #[test]
354 fn sync_state_tracks_projection_success() {
355 let files = vec!["src/ok.rs".to_string(), "src/fail.rs".to_string()];
356 #[derive(Default)]
357 struct State {
358 synced: Vec<String>,
359 marked_synced: Vec<String>,
360 }
361 let mut state = State::default();
362
363 let report = sync_files_with_state(
364 &files,
365 &mut state,
366 |state, file_path| {
367 state.synced.push(file_path.to_string());
368 if file_path == "src/fail.rs" {
369 anyhow::bail!("projection write failed");
370 }
371 Ok(3)
372 },
373 |state, file_path| {
374 state.marked_synced.push(file_path.to_string());
375 Ok(())
376 },
377 );
378
379 assert_eq!(state.synced, vec!["src/ok.rs", "src/fail.rs"]);
380 assert_eq!(state.marked_synced, vec!["src/ok.rs"]);
381 assert_eq!(report.status, ProjectionStatus::Degraded);
382 assert_eq!(report.synced_files, 1);
383 assert_eq!(report.synced_symbols, 3);
384 assert!(report.degraded);
385 assert_eq!(
386 report.error.as_ref().map(|error| error.kind.as_str()),
387 Some("sync_failed")
388 );
389 }
390}