1use crate::incremental::graph::DependencyGraph;
31use crate::incremental::storage::{StorageBackend, StorageError};
32use crate::incremental::types::{
33 AnalysisDefFingerprint, DependencyEdge, DependencyStrength, DependencyType, SymbolDependency,
34 SymbolKind,
35};
36use async_trait::async_trait;
37use deadpool_postgres::{Config, Pool, Runtime};
38use recoco::utils::fingerprint::Fingerprint;
39use std::path::{Path, PathBuf};
40use thread_utilities::RapidSet;
41use tokio_postgres::NoTls;
42
43#[derive(Debug)]
58pub struct PostgresIncrementalBackend {
59 pool: Pool,
60}
61
62impl PostgresIncrementalBackend {
63 pub async fn new(database_url: &str) -> Result<Self, StorageError> {
82 let pg_config = database_url
83 .parse::<tokio_postgres::Config>()
84 .map_err(|e| StorageError::Backend(format!("Invalid database URL: {e}")))?;
85
86 let mut cfg = Config::new();
87 if let Some(hosts) = pg_config.get_hosts().first() {
89 match hosts {
90 tokio_postgres::config::Host::Tcp(h) => cfg.host = Some(h.clone()),
91 #[cfg(unix)]
92 tokio_postgres::config::Host::Unix(p) => {
93 cfg.host = Some(p.to_string_lossy().to_string());
94 }
95 }
96 }
97 if let Some(ports) = pg_config.get_ports().first() {
98 cfg.port = Some(*ports);
99 }
100 if let Some(user) = pg_config.get_user() {
101 cfg.user = Some(user.to_string());
102 }
103 if let Some(password) = pg_config.get_password() {
104 cfg.password = Some(String::from_utf8_lossy(password).to_string());
105 }
106 if let Some(dbname) = pg_config.get_dbname() {
107 cfg.dbname = Some(dbname.to_string());
108 }
109
110 let pool = cfg
111 .create_pool(Some(Runtime::Tokio1), NoTls)
112 .map_err(|e| StorageError::Backend(format!("Failed to create connection pool: {e}")))?;
113
114 let _conn = pool
116 .get()
117 .await
118 .map_err(|e| StorageError::Backend(format!("Failed to connect to database: {e}")))?;
119
120 Ok(Self { pool })
121 }
122
123 pub fn from_pool(pool: Pool) -> Self {
131 Self { pool }
132 }
133
134 pub async fn run_migrations(&self) -> Result<(), StorageError> {
143 let client = self.pool.get().await.map_err(pg_pool_error)?;
144
145 let migration_sql = include_str!("../../../migrations/incremental_system_v1.sql");
146 client
147 .batch_execute(migration_sql)
148 .await
149 .map_err(|e| StorageError::Backend(format!("Migration failed: {e}")))?;
150
151 Ok(())
152 }
153
154 pub async fn save_edges_batch(&self, edges: &[DependencyEdge]) -> Result<(), StorageError> {
168 if edges.is_empty() {
169 return Ok(());
170 }
171
172 let mut client = self.pool.get().await.map_err(pg_pool_error)?;
173
174 let txn = client.transaction().await.map_err(pg_error)?;
176
177 let stmt = txn
178 .prepare(
179 "INSERT INTO dependency_edges \
180 (from_path, to_path, dep_type, symbol_from, symbol_to, symbol_kind, dependency_strength) \
181 VALUES ($1, $2, $3, $4, $5, $6, $7) \
182 ON CONFLICT (from_path, to_path, dep_type) DO UPDATE SET \
183 symbol_from = EXCLUDED.symbol_from, \
184 symbol_to = EXCLUDED.symbol_to, \
185 symbol_kind = EXCLUDED.symbol_kind, \
186 dependency_strength = EXCLUDED.dependency_strength",
187 )
188 .await
189 .map_err(pg_error)?;
190
191 for edge in edges {
192 let (sym_from, sym_to, sym_kind, strength) = match &edge.symbol {
193 Some(sym) => (
194 Some(sym.from_symbol.as_str()),
195 Some(sym.to_symbol.as_str()),
196 Some(sym.kind.to_string()),
197 Some(sym.strength.to_string()),
198 ),
199 None => (None, None, None, None),
200 };
201
202 txn.execute(
203 &stmt,
204 &[
205 &edge.from.to_string_lossy().as_ref(),
206 &edge.to.to_string_lossy().as_ref(),
207 &edge.dep_type.to_string(),
208 &sym_from,
209 &sym_to,
210 &sym_kind.as_deref(),
211 &strength.as_deref(),
212 ],
213 )
214 .await
215 .map_err(pg_error)?;
216 }
217
218 txn.commit().await.map_err(pg_error)?;
219
220 Ok(())
221 }
222}
223
224#[async_trait]
225impl StorageBackend for PostgresIncrementalBackend {
226 async fn save_fingerprint(
227 &self,
228 file_path: &Path,
229 fingerprint: &AnalysisDefFingerprint,
230 ) -> Result<(), StorageError> {
231 let mut client = self.pool.get().await.map_err(pg_pool_error)?;
232
233 let txn = client.transaction().await.map_err(pg_error)?;
234
235 let stmt = txn
237 .prepare(
238 "INSERT INTO analysis_fingerprints (file_path, content_fingerprint, last_analyzed) \
239 VALUES ($1, $2, $3) \
240 ON CONFLICT (file_path) DO UPDATE SET \
241 content_fingerprint = EXCLUDED.content_fingerprint, \
242 last_analyzed = EXCLUDED.last_analyzed",
243 )
244 .await
245 .map_err(pg_error)?;
246
247 let fp_path = file_path.to_string_lossy();
248 let fp_bytes = fingerprint.fingerprint.as_slice();
249
250 txn.execute(
251 &stmt,
252 &[&fp_path.as_ref(), &fp_bytes, &fingerprint.last_analyzed],
253 )
254 .await
255 .map_err(pg_error)?;
256
257 let del_stmt = txn
259 .prepare("DELETE FROM source_files WHERE fingerprint_path = $1")
260 .await
261 .map_err(pg_error)?;
262
263 txn.execute(&del_stmt, &[&fp_path.as_ref()])
264 .await
265 .map_err(pg_error)?;
266
267 if !fingerprint.source_files.is_empty() {
268 let ins_stmt = txn
269 .prepare("INSERT INTO source_files (fingerprint_path, source_path) VALUES ($1, $2)")
270 .await
271 .map_err(pg_error)?;
272
273 for source in &fingerprint.source_files {
274 let src_path = source.to_string_lossy();
275 txn.execute(&ins_stmt, &[&fp_path.as_ref(), &src_path.as_ref()])
276 .await
277 .map_err(pg_error)?;
278 }
279 }
280
281 txn.commit().await.map_err(pg_error)?;
282
283 Ok(())
284 }
285
286 async fn load_fingerprint(
287 &self,
288 file_path: &Path,
289 ) -> Result<Option<AnalysisDefFingerprint>, StorageError> {
290 let client = self.pool.get().await.map_err(pg_pool_error)?;
291
292 let fp_path = file_path.to_string_lossy();
293
294 let stmt = client
296 .prepare(
297 "SELECT content_fingerprint, last_analyzed \
298 FROM analysis_fingerprints WHERE file_path = $1",
299 )
300 .await
301 .map_err(pg_error)?;
302
303 let row = client
304 .query_opt(&stmt, &[&fp_path.as_ref()])
305 .await
306 .map_err(pg_error)?;
307
308 let Some(row) = row else {
309 return Ok(None);
310 };
311
312 let fp_bytes: Vec<u8> = row.get(0);
313 let last_analyzed: Option<i64> = row.get(1);
314
315 let fingerprint = bytes_to_fingerprint(&fp_bytes)?;
316
317 let src_stmt = client
319 .prepare("SELECT source_path FROM source_files WHERE fingerprint_path = $1")
320 .await
321 .map_err(pg_error)?;
322
323 let src_rows = client
324 .query(&src_stmt, &[&fp_path.as_ref()])
325 .await
326 .map_err(pg_error)?;
327
328 let source_files: RapidSet<PathBuf> = src_rows
329 .iter()
330 .map(|r| {
331 let s: String = r.get(0);
332 PathBuf::from(s)
333 })
334 .collect();
335
336 Ok(Some(AnalysisDefFingerprint {
337 source_files,
338 fingerprint,
339 last_analyzed,
340 }))
341 }
342
343 async fn delete_fingerprint(&self, file_path: &Path) -> Result<bool, StorageError> {
344 let client = self.pool.get().await.map_err(pg_pool_error)?;
345
346 let fp_path = file_path.to_string_lossy();
347
348 let stmt = client
350 .prepare("DELETE FROM analysis_fingerprints WHERE file_path = $1")
351 .await
352 .map_err(pg_error)?;
353
354 let count = client
355 .execute(&stmt, &[&fp_path.as_ref()])
356 .await
357 .map_err(pg_error)?;
358
359 Ok(count > 0)
360 }
361
362 async fn save_edge(&self, edge: &DependencyEdge) -> Result<(), StorageError> {
363 let client = self.pool.get().await.map_err(pg_pool_error)?;
364
365 let (sym_from, sym_to, sym_kind, strength) = match &edge.symbol {
366 Some(sym) => (
367 Some(sym.from_symbol.clone()),
368 Some(sym.to_symbol.clone()),
369 Some(sym.kind.to_string()),
370 Some(sym.strength.to_string()),
371 ),
372 None => (None, None, None, None),
373 };
374
375 let stmt = client
376 .prepare(
377 "INSERT INTO dependency_edges \
378 (from_path, to_path, dep_type, symbol_from, symbol_to, symbol_kind, dependency_strength) \
379 VALUES ($1, $2, $3, $4, $5, $6, $7) \
380 ON CONFLICT (from_path, to_path, dep_type) DO UPDATE SET \
381 symbol_from = EXCLUDED.symbol_from, \
382 symbol_to = EXCLUDED.symbol_to, \
383 symbol_kind = EXCLUDED.symbol_kind, \
384 dependency_strength = EXCLUDED.dependency_strength",
385 )
386 .await
387 .map_err(pg_error)?;
388
389 client
390 .execute(
391 &stmt,
392 &[
393 &edge.from.to_string_lossy().as_ref(),
394 &edge.to.to_string_lossy().as_ref(),
395 &edge.dep_type.to_string(),
396 &sym_from.as_deref(),
397 &sym_to.as_deref(),
398 &sym_kind.as_deref(),
399 &strength.as_deref(),
400 ],
401 )
402 .await
403 .map_err(pg_error)?;
404
405 Ok(())
406 }
407
408 async fn load_edges_from(&self, file_path: &Path) -> Result<Vec<DependencyEdge>, StorageError> {
409 let client = self.pool.get().await.map_err(pg_pool_error)?;
410
411 let stmt = client
412 .prepare(
413 "SELECT from_path, to_path, dep_type, \
414 symbol_from, symbol_to, symbol_kind, dependency_strength \
415 FROM dependency_edges WHERE from_path = $1",
416 )
417 .await
418 .map_err(pg_error)?;
419
420 let fp = file_path.to_string_lossy();
421 let rows = client
422 .query(&stmt, &[&fp.as_ref()])
423 .await
424 .map_err(pg_error)?;
425
426 rows.iter().map(row_to_edge).collect()
427 }
428
429 async fn load_edges_to(&self, file_path: &Path) -> Result<Vec<DependencyEdge>, StorageError> {
430 let client = self.pool.get().await.map_err(pg_pool_error)?;
431
432 let stmt = client
433 .prepare(
434 "SELECT from_path, to_path, dep_type, \
435 symbol_from, symbol_to, symbol_kind, dependency_strength \
436 FROM dependency_edges WHERE to_path = $1",
437 )
438 .await
439 .map_err(pg_error)?;
440
441 let fp = file_path.to_string_lossy();
442 let rows = client
443 .query(&stmt, &[&fp.as_ref()])
444 .await
445 .map_err(pg_error)?;
446
447 rows.iter().map(row_to_edge).collect()
448 }
449
450 async fn delete_edges_for(&self, file_path: &Path) -> Result<usize, StorageError> {
451 let client = self.pool.get().await.map_err(pg_pool_error)?;
452
453 let fp = file_path.to_string_lossy();
454
455 let stmt = client
456 .prepare("DELETE FROM dependency_edges WHERE from_path = $1 OR to_path = $1")
457 .await
458 .map_err(pg_error)?;
459
460 let count = client
461 .execute(&stmt, &[&fp.as_ref()])
462 .await
463 .map_err(pg_error)?;
464
465 Ok(count as usize)
466 }
467
468 async fn load_full_graph(&self) -> Result<DependencyGraph, StorageError> {
469 let client = self.pool.get().await.map_err(pg_pool_error)?;
470
471 let mut graph = DependencyGraph::new();
472
473 let fp_stmt = client
475 .prepare(
476 "SELECT f.file_path, f.content_fingerprint, f.last_analyzed \
477 FROM analysis_fingerprints f",
478 )
479 .await
480 .map_err(pg_error)?;
481
482 let fp_rows = client.query(&fp_stmt, &[]).await.map_err(pg_error)?;
483
484 let src_stmt = client
485 .prepare(
486 "SELECT fingerprint_path, source_path FROM source_files ORDER BY fingerprint_path",
487 )
488 .await
489 .map_err(pg_error)?;
490
491 let src_rows = client.query(&src_stmt, &[]).await.map_err(pg_error)?;
492
493 let mut source_map: thread_utilities::RapidMap<String, RapidSet<PathBuf>> =
495 thread_utilities::get_map();
496 for row in &src_rows {
497 let fp_path: String = row.get(0);
498 let src_path: String = row.get(1);
499 source_map
500 .entry(fp_path)
501 .or_default()
502 .insert(PathBuf::from(src_path));
503 }
504
505 for row in &fp_rows {
507 let file_path: String = row.get(0);
508 let fp_bytes: Vec<u8> = row.get(1);
509 let last_analyzed: Option<i64> = row.get(2);
510
511 let fingerprint = bytes_to_fingerprint(&fp_bytes)?;
512 let source_files = source_map.remove(&file_path).unwrap_or_default();
513
514 let fp = AnalysisDefFingerprint {
515 source_files,
516 fingerprint,
517 last_analyzed,
518 };
519
520 graph.nodes.insert(PathBuf::from(&file_path), fp);
521 }
522
523 let edge_stmt = client
525 .prepare(
526 "SELECT from_path, to_path, dep_type, \
527 symbol_from, symbol_to, symbol_kind, dependency_strength \
528 FROM dependency_edges",
529 )
530 .await
531 .map_err(pg_error)?;
532
533 let edge_rows = client.query(&edge_stmt, &[]).await.map_err(pg_error)?;
534
535 for row in &edge_rows {
536 let edge = row_to_edge(row)?;
537 graph.add_edge(edge);
538 }
539
540 Ok(graph)
541 }
542
543 async fn save_full_graph(&self, graph: &DependencyGraph) -> Result<(), StorageError> {
544 let mut client = self.pool.get().await.map_err(pg_pool_error)?;
545
546 let txn = client.transaction().await.map_err(pg_error)?;
547
548 txn.batch_execute(
550 "DELETE FROM source_files; \
551 DELETE FROM dependency_edges; \
552 DELETE FROM analysis_fingerprints;",
553 )
554 .await
555 .map_err(pg_error)?;
556
557 let fp_stmt = txn
559 .prepare(
560 "INSERT INTO analysis_fingerprints (file_path, content_fingerprint, last_analyzed) \
561 VALUES ($1, $2, $3)",
562 )
563 .await
564 .map_err(pg_error)?;
565
566 let src_stmt = txn
567 .prepare("INSERT INTO source_files (fingerprint_path, source_path) VALUES ($1, $2)")
568 .await
569 .map_err(pg_error)?;
570
571 for (path, fp) in &graph.nodes {
572 let fp_path = path.to_string_lossy();
573 let fp_bytes = fp.fingerprint.as_slice();
574
575 txn.execute(&fp_stmt, &[&fp_path.as_ref(), &fp_bytes, &fp.last_analyzed])
576 .await
577 .map_err(pg_error)?;
578
579 for source in &fp.source_files {
580 let src_path = source.to_string_lossy();
581 txn.execute(&src_stmt, &[&fp_path.as_ref(), &src_path.as_ref()])
582 .await
583 .map_err(pg_error)?;
584 }
585 }
586
587 let edge_stmt = txn
589 .prepare(
590 "INSERT INTO dependency_edges \
591 (from_path, to_path, dep_type, symbol_from, symbol_to, symbol_kind, dependency_strength) \
592 VALUES ($1, $2, $3, $4, $5, $6, $7) \
593 ON CONFLICT (from_path, to_path, dep_type) DO NOTHING",
594 )
595 .await
596 .map_err(pg_error)?;
597
598 for edge in &graph.edges {
599 let (sym_from, sym_to, sym_kind, strength) = match &edge.symbol {
600 Some(sym) => (
601 Some(sym.from_symbol.clone()),
602 Some(sym.to_symbol.clone()),
603 Some(sym.kind.to_string()),
604 Some(sym.strength.to_string()),
605 ),
606 None => (None, None, None, None),
607 };
608
609 txn.execute(
610 &edge_stmt,
611 &[
612 &edge.from.to_string_lossy().as_ref(),
613 &edge.to.to_string_lossy().as_ref(),
614 &edge.dep_type.to_string(),
615 &sym_from.as_deref(),
616 &sym_to.as_deref(),
617 &sym_kind.as_deref(),
618 &strength.as_deref(),
619 ],
620 )
621 .await
622 .map_err(pg_error)?;
623 }
624
625 txn.commit().await.map_err(pg_error)?;
626
627 Ok(())
628 }
629
630 fn name(&self) -> &'static str {
631 "postgres"
632 }
633}
634
635fn row_to_edge(row: &tokio_postgres::Row) -> Result<DependencyEdge, StorageError> {
639 let from_path: String = row.get(0);
640 let to_path: String = row.get(1);
641 let dep_type_str: String = row.get(2);
642 let symbol_from: Option<String> = row.get(3);
643 let symbol_to: Option<String> = row.get(4);
644 let symbol_kind: Option<String> = row.get(5);
645 let strength: Option<String> = row.get(6);
646
647 let dep_type = parse_dependency_type(&dep_type_str)?;
648
649 let symbol = match (symbol_from, symbol_to, symbol_kind, strength) {
650 (Some(from), Some(to), Some(kind), Some(str_val)) => Some(SymbolDependency {
651 from_symbol: from,
652 to_symbol: to,
653 kind: parse_symbol_kind(&kind)?,
654 strength: parse_dependency_strength(&str_val)?,
655 }),
656 _ => None,
657 };
658
659 Ok(DependencyEdge {
660 from: PathBuf::from(from_path),
661 to: PathBuf::from(to_path),
662 dep_type,
663 symbol,
664 })
665}
666
667fn bytes_to_fingerprint(bytes: &[u8]) -> Result<Fingerprint, StorageError> {
669 let arr: [u8; 16] = bytes.try_into().map_err(|_| {
670 StorageError::Corruption(format!(
671 "Fingerprint has invalid length: expected 16, got {}",
672 bytes.len()
673 ))
674 })?;
675 Ok(Fingerprint(arr))
676}
677
678fn parse_dependency_type(s: &str) -> Result<DependencyType, StorageError> {
680 match s {
681 "import" | "Import" => Ok(DependencyType::Import),
682 "export" | "Export" => Ok(DependencyType::Export),
683 "macro" | "Macro" => Ok(DependencyType::Macro),
684 "type" | "Type" => Ok(DependencyType::Type),
685 "trait" | "Trait" => Ok(DependencyType::Trait),
686 other => Err(StorageError::Corruption(format!(
687 "Unknown dependency type: {other}"
688 ))),
689 }
690}
691
692fn parse_symbol_kind(s: &str) -> Result<SymbolKind, StorageError> {
694 match s {
695 "function" | "Function" => Ok(SymbolKind::Function),
696 "class" | "Class" => Ok(SymbolKind::Class),
697 "interface" | "Interface" => Ok(SymbolKind::Interface),
698 "type_alias" | "TypeAlias" => Ok(SymbolKind::TypeAlias),
699 "constant" | "Constant" => Ok(SymbolKind::Constant),
700 "enum" | "Enum" => Ok(SymbolKind::Enum),
701 "module" | "Module" => Ok(SymbolKind::Module),
702 "macro" | "Macro" => Ok(SymbolKind::Macro),
703 other => Err(StorageError::Corruption(format!(
704 "Unknown symbol kind: {other}"
705 ))),
706 }
707}
708
709fn parse_dependency_strength(s: &str) -> Result<DependencyStrength, StorageError> {
711 match s {
712 "strong" | "Strong" => Ok(DependencyStrength::Strong),
713 "weak" | "Weak" => Ok(DependencyStrength::Weak),
714 other => Err(StorageError::Corruption(format!(
715 "Unknown dependency strength: {other}"
716 ))),
717 }
718}
719
720fn pg_error(e: tokio_postgres::Error) -> StorageError {
722 StorageError::Backend(format!("Postgres error: {e}"))
723}
724
725fn pg_pool_error(e: deadpool_postgres::PoolError) -> StorageError {
727 StorageError::Backend(format!("Connection pool error: {e}"))
728}