Skip to main content

thread_flow/incremental/backends/
postgres.rs

1// SPDX-FileCopyrightText: 2025 Knitli Inc. <knitli@knit.li>
2// SPDX-License-Identifier: AGPL-3.0-or-later
3
4//! PostgreSQL storage backend for the incremental update system.
5//!
6//! Provides a full-featured SQL backend for CLI deployment with:
7//!
8//! - **Connection pooling** via `deadpool-postgres` for concurrent access
9//! - **Prepared statements** for query plan caching and performance
10//! - **Batch operations** with transactional atomicity
11//! - **Upsert semantics** for idempotent fingerprint and edge updates
12//!
13//! # Performance Targets
14//!
15//! - Single operations: <10ms p95 latency (Constitutional Principle VI)
16//! - Full graph load (1000 nodes): <50ms p95 latency
17//!
18//! # Example
19//!
20//! ```rust,ignore
21//! use thread_flow::incremental::backends::postgres::PostgresIncrementalBackend;
22//!
23//! let backend = PostgresIncrementalBackend::new("postgresql://localhost/thread")
24//!     .await
25//!     .expect("Failed to connect to Postgres");
26//!
27//! backend.run_migrations().await.expect("Migration failed");
28//! ```
29
30use crate::incremental::graph::DependencyGraph;
31use crate::incremental::storage::{StorageBackend, StorageError};
32use crate::incremental::types::{
33    AnalysisDefFingerprint, DependencyEdge, DependencyStrength, DependencyType, SymbolDependency,
34    SymbolKind,
35};
36use async_trait::async_trait;
37use deadpool_postgres::{Config, Pool, Runtime};
38use recoco::utils::fingerprint::Fingerprint;
39use std::path::{Path, PathBuf};
40use thread_utilities::RapidSet;
41use tokio_postgres::NoTls;
42
43/// PostgreSQL storage backend for the incremental update system.
44///
45/// Uses `deadpool-postgres` for connection pooling and `tokio-postgres` for
46/// async query execution. All queries use prepared statements for optimal
47/// query plan caching.
48///
49/// # Connection Management
50///
51/// The backend manages a pool of connections. The default pool size is 16
52/// connections, configurable via the connection URL or pool configuration.
53///
54/// # Thread Safety
55///
56/// This type is `Send + Sync` and can be shared across async tasks.
57#[derive(Debug)]
58pub struct PostgresIncrementalBackend {
59    pool: Pool,
60}
61
62impl PostgresIncrementalBackend {
63    /// Creates a new Postgres backend connected to the given database URL.
64    ///
65    /// The URL should be a standard PostgreSQL connection string:
66    /// `postgresql://user:password@host:port/database`
67    ///
68    /// # Arguments
69    ///
70    /// * `database_url` - PostgreSQL connection string.
71    ///
72    /// # Errors
73    ///
74    /// Returns [`StorageError::Backend`] if the connection pool cannot be created.
75    ///
76    /// # Examples
77    ///
78    /// ```rust,ignore
79    /// let backend = PostgresIncrementalBackend::new("postgresql://localhost/thread").await?;
80    /// ```
81    pub async fn new(database_url: &str) -> Result<Self, StorageError> {
82        let pg_config = database_url
83            .parse::<tokio_postgres::Config>()
84            .map_err(|e| StorageError::Backend(format!("Invalid database URL: {e}")))?;
85
86        let mut cfg = Config::new();
87        // Extract config from parsed URL
88        if let Some(hosts) = pg_config.get_hosts().first() {
89            match hosts {
90                tokio_postgres::config::Host::Tcp(h) => cfg.host = Some(h.clone()),
91                #[cfg(unix)]
92                tokio_postgres::config::Host::Unix(p) => {
93                    cfg.host = Some(p.to_string_lossy().to_string());
94                }
95            }
96        }
97        if let Some(ports) = pg_config.get_ports().first() {
98            cfg.port = Some(*ports);
99        }
100        if let Some(user) = pg_config.get_user() {
101            cfg.user = Some(user.to_string());
102        }
103        if let Some(password) = pg_config.get_password() {
104            cfg.password = Some(String::from_utf8_lossy(password).to_string());
105        }
106        if let Some(dbname) = pg_config.get_dbname() {
107            cfg.dbname = Some(dbname.to_string());
108        }
109
110        let pool = cfg
111            .create_pool(Some(Runtime::Tokio1), NoTls)
112            .map_err(|e| StorageError::Backend(format!("Failed to create connection pool: {e}")))?;
113
114        // Verify connectivity by acquiring and releasing a connection
115        let _conn = pool
116            .get()
117            .await
118            .map_err(|e| StorageError::Backend(format!("Failed to connect to database: {e}")))?;
119
120        Ok(Self { pool })
121    }
122
123    /// Creates a new Postgres backend from an existing connection pool.
124    ///
125    /// Useful for testing or when you want to configure the pool externally.
126    ///
127    /// # Arguments
128    ///
129    /// * `pool` - A pre-configured `deadpool-postgres` connection pool.
130    pub fn from_pool(pool: Pool) -> Self {
131        Self { pool }
132    }
133
134    /// Runs the schema migration to create required tables and indexes.
135    ///
136    /// This is idempotent: running it multiple times has no effect if the
137    /// schema already exists (uses `CREATE TABLE IF NOT EXISTS`).
138    ///
139    /// # Errors
140    ///
141    /// Returns [`StorageError::Backend`] if the migration SQL fails to execute.
142    pub async fn run_migrations(&self) -> Result<(), StorageError> {
143        let client = self.pool.get().await.map_err(pg_pool_error)?;
144
145        let migration_sql = include_str!("../../../migrations/incremental_system_v1.sql");
146        client
147            .batch_execute(migration_sql)
148            .await
149            .map_err(|e| StorageError::Backend(format!("Migration failed: {e}")))?;
150
151        Ok(())
152    }
153
154    /// Saves multiple dependency edges in a single transaction.
155    ///
156    /// This is more efficient than calling [`save_edge`](StorageBackend::save_edge)
157    /// individually for each edge, as it reduces round-trips to the database.
158    ///
159    /// # Arguments
160    ///
161    /// * `edges` - Slice of dependency edges to persist.
162    ///
163    /// # Errors
164    ///
165    /// Returns [`StorageError::Backend`] if the transaction fails.
166    /// The transaction is rolled back on any error.
167    pub async fn save_edges_batch(&self, edges: &[DependencyEdge]) -> Result<(), StorageError> {
168        if edges.is_empty() {
169            return Ok(());
170        }
171
172        let mut client = self.pool.get().await.map_err(pg_pool_error)?;
173
174        // Execute in a transaction for atomicity
175        let txn = client.transaction().await.map_err(pg_error)?;
176
177        let stmt = txn
178            .prepare(
179                "INSERT INTO dependency_edges \
180                 (from_path, to_path, dep_type, symbol_from, symbol_to, symbol_kind, dependency_strength) \
181                 VALUES ($1, $2, $3, $4, $5, $6, $7) \
182                 ON CONFLICT (from_path, to_path, dep_type) DO UPDATE SET \
183                     symbol_from = EXCLUDED.symbol_from, \
184                     symbol_to = EXCLUDED.symbol_to, \
185                     symbol_kind = EXCLUDED.symbol_kind, \
186                     dependency_strength = EXCLUDED.dependency_strength",
187            )
188            .await
189            .map_err(pg_error)?;
190
191        for edge in edges {
192            let (sym_from, sym_to, sym_kind, strength) = match &edge.symbol {
193                Some(sym) => (
194                    Some(sym.from_symbol.as_str()),
195                    Some(sym.to_symbol.as_str()),
196                    Some(sym.kind.to_string()),
197                    Some(sym.strength.to_string()),
198                ),
199                None => (None, None, None, None),
200            };
201
202            txn.execute(
203                &stmt,
204                &[
205                    &edge.from.to_string_lossy().as_ref(),
206                    &edge.to.to_string_lossy().as_ref(),
207                    &edge.dep_type.to_string(),
208                    &sym_from,
209                    &sym_to,
210                    &sym_kind.as_deref(),
211                    &strength.as_deref(),
212                ],
213            )
214            .await
215            .map_err(pg_error)?;
216        }
217
218        txn.commit().await.map_err(pg_error)?;
219
220        Ok(())
221    }
222}
223
224#[async_trait]
225impl StorageBackend for PostgresIncrementalBackend {
226    async fn save_fingerprint(
227        &self,
228        file_path: &Path,
229        fingerprint: &AnalysisDefFingerprint,
230    ) -> Result<(), StorageError> {
231        let mut client = self.pool.get().await.map_err(pg_pool_error)?;
232
233        let txn = client.transaction().await.map_err(pg_error)?;
234
235        // Upsert the fingerprint record
236        let stmt = txn
237            .prepare(
238                "INSERT INTO analysis_fingerprints (file_path, content_fingerprint, last_analyzed) \
239                 VALUES ($1, $2, $3) \
240                 ON CONFLICT (file_path) DO UPDATE SET \
241                     content_fingerprint = EXCLUDED.content_fingerprint, \
242                     last_analyzed = EXCLUDED.last_analyzed",
243            )
244            .await
245            .map_err(pg_error)?;
246
247        let fp_path = file_path.to_string_lossy();
248        let fp_bytes = fingerprint.fingerprint.as_slice();
249
250        txn.execute(
251            &stmt,
252            &[&fp_path.as_ref(), &fp_bytes, &fingerprint.last_analyzed],
253        )
254        .await
255        .map_err(pg_error)?;
256
257        // Replace source files: delete existing, then insert new
258        let del_stmt = txn
259            .prepare("DELETE FROM source_files WHERE fingerprint_path = $1")
260            .await
261            .map_err(pg_error)?;
262
263        txn.execute(&del_stmt, &[&fp_path.as_ref()])
264            .await
265            .map_err(pg_error)?;
266
267        if !fingerprint.source_files.is_empty() {
268            let ins_stmt = txn
269                .prepare("INSERT INTO source_files (fingerprint_path, source_path) VALUES ($1, $2)")
270                .await
271                .map_err(pg_error)?;
272
273            for source in &fingerprint.source_files {
274                let src_path = source.to_string_lossy();
275                txn.execute(&ins_stmt, &[&fp_path.as_ref(), &src_path.as_ref()])
276                    .await
277                    .map_err(pg_error)?;
278            }
279        }
280
281        txn.commit().await.map_err(pg_error)?;
282
283        Ok(())
284    }
285
286    async fn load_fingerprint(
287        &self,
288        file_path: &Path,
289    ) -> Result<Option<AnalysisDefFingerprint>, StorageError> {
290        let client = self.pool.get().await.map_err(pg_pool_error)?;
291
292        let fp_path = file_path.to_string_lossy();
293
294        // Load the fingerprint record
295        let stmt = client
296            .prepare(
297                "SELECT content_fingerprint, last_analyzed \
298                 FROM analysis_fingerprints WHERE file_path = $1",
299            )
300            .await
301            .map_err(pg_error)?;
302
303        let row = client
304            .query_opt(&stmt, &[&fp_path.as_ref()])
305            .await
306            .map_err(pg_error)?;
307
308        let Some(row) = row else {
309            return Ok(None);
310        };
311
312        let fp_bytes: Vec<u8> = row.get(0);
313        let last_analyzed: Option<i64> = row.get(1);
314
315        let fingerprint = bytes_to_fingerprint(&fp_bytes)?;
316
317        // Load associated source files
318        let src_stmt = client
319            .prepare("SELECT source_path FROM source_files WHERE fingerprint_path = $1")
320            .await
321            .map_err(pg_error)?;
322
323        let src_rows = client
324            .query(&src_stmt, &[&fp_path.as_ref()])
325            .await
326            .map_err(pg_error)?;
327
328        let source_files: RapidSet<PathBuf> = src_rows
329            .iter()
330            .map(|r| {
331                let s: String = r.get(0);
332                PathBuf::from(s)
333            })
334            .collect();
335
336        Ok(Some(AnalysisDefFingerprint {
337            source_files,
338            fingerprint,
339            last_analyzed,
340        }))
341    }
342
343    async fn delete_fingerprint(&self, file_path: &Path) -> Result<bool, StorageError> {
344        let client = self.pool.get().await.map_err(pg_pool_error)?;
345
346        let fp_path = file_path.to_string_lossy();
347
348        // CASCADE will delete source_files entries automatically
349        let stmt = client
350            .prepare("DELETE FROM analysis_fingerprints WHERE file_path = $1")
351            .await
352            .map_err(pg_error)?;
353
354        let count = client
355            .execute(&stmt, &[&fp_path.as_ref()])
356            .await
357            .map_err(pg_error)?;
358
359        Ok(count > 0)
360    }
361
362    async fn save_edge(&self, edge: &DependencyEdge) -> Result<(), StorageError> {
363        let client = self.pool.get().await.map_err(pg_pool_error)?;
364
365        let (sym_from, sym_to, sym_kind, strength) = match &edge.symbol {
366            Some(sym) => (
367                Some(sym.from_symbol.clone()),
368                Some(sym.to_symbol.clone()),
369                Some(sym.kind.to_string()),
370                Some(sym.strength.to_string()),
371            ),
372            None => (None, None, None, None),
373        };
374
375        let stmt = client
376            .prepare(
377                "INSERT INTO dependency_edges \
378                 (from_path, to_path, dep_type, symbol_from, symbol_to, symbol_kind, dependency_strength) \
379                 VALUES ($1, $2, $3, $4, $5, $6, $7) \
380                 ON CONFLICT (from_path, to_path, dep_type) DO UPDATE SET \
381                     symbol_from = EXCLUDED.symbol_from, \
382                     symbol_to = EXCLUDED.symbol_to, \
383                     symbol_kind = EXCLUDED.symbol_kind, \
384                     dependency_strength = EXCLUDED.dependency_strength",
385            )
386            .await
387            .map_err(pg_error)?;
388
389        client
390            .execute(
391                &stmt,
392                &[
393                    &edge.from.to_string_lossy().as_ref(),
394                    &edge.to.to_string_lossy().as_ref(),
395                    &edge.dep_type.to_string(),
396                    &sym_from.as_deref(),
397                    &sym_to.as_deref(),
398                    &sym_kind.as_deref(),
399                    &strength.as_deref(),
400                ],
401            )
402            .await
403            .map_err(pg_error)?;
404
405        Ok(())
406    }
407
408    async fn load_edges_from(&self, file_path: &Path) -> Result<Vec<DependencyEdge>, StorageError> {
409        let client = self.pool.get().await.map_err(pg_pool_error)?;
410
411        let stmt = client
412            .prepare(
413                "SELECT from_path, to_path, dep_type, \
414                        symbol_from, symbol_to, symbol_kind, dependency_strength \
415                 FROM dependency_edges WHERE from_path = $1",
416            )
417            .await
418            .map_err(pg_error)?;
419
420        let fp = file_path.to_string_lossy();
421        let rows = client
422            .query(&stmt, &[&fp.as_ref()])
423            .await
424            .map_err(pg_error)?;
425
426        rows.iter().map(row_to_edge).collect()
427    }
428
429    async fn load_edges_to(&self, file_path: &Path) -> Result<Vec<DependencyEdge>, StorageError> {
430        let client = self.pool.get().await.map_err(pg_pool_error)?;
431
432        let stmt = client
433            .prepare(
434                "SELECT from_path, to_path, dep_type, \
435                        symbol_from, symbol_to, symbol_kind, dependency_strength \
436                 FROM dependency_edges WHERE to_path = $1",
437            )
438            .await
439            .map_err(pg_error)?;
440
441        let fp = file_path.to_string_lossy();
442        let rows = client
443            .query(&stmt, &[&fp.as_ref()])
444            .await
445            .map_err(pg_error)?;
446
447        rows.iter().map(row_to_edge).collect()
448    }
449
450    async fn delete_edges_for(&self, file_path: &Path) -> Result<usize, StorageError> {
451        let client = self.pool.get().await.map_err(pg_pool_error)?;
452
453        let fp = file_path.to_string_lossy();
454
455        let stmt = client
456            .prepare("DELETE FROM dependency_edges WHERE from_path = $1 OR to_path = $1")
457            .await
458            .map_err(pg_error)?;
459
460        let count = client
461            .execute(&stmt, &[&fp.as_ref()])
462            .await
463            .map_err(pg_error)?;
464
465        Ok(count as usize)
466    }
467
468    async fn load_full_graph(&self) -> Result<DependencyGraph, StorageError> {
469        let client = self.pool.get().await.map_err(pg_pool_error)?;
470
471        let mut graph = DependencyGraph::new();
472
473        // Load all fingerprints with their source files
474        let fp_stmt = client
475            .prepare(
476                "SELECT f.file_path, f.content_fingerprint, f.last_analyzed \
477                 FROM analysis_fingerprints f",
478            )
479            .await
480            .map_err(pg_error)?;
481
482        let fp_rows = client.query(&fp_stmt, &[]).await.map_err(pg_error)?;
483
484        let src_stmt = client
485            .prepare(
486                "SELECT fingerprint_path, source_path FROM source_files ORDER BY fingerprint_path",
487            )
488            .await
489            .map_err(pg_error)?;
490
491        let src_rows = client.query(&src_stmt, &[]).await.map_err(pg_error)?;
492
493        // Build source files map grouped by fingerprint_path
494        let mut source_map: thread_utilities::RapidMap<String, RapidSet<PathBuf>> =
495            thread_utilities::get_map();
496        for row in &src_rows {
497            let fp_path: String = row.get(0);
498            let src_path: String = row.get(1);
499            source_map
500                .entry(fp_path)
501                .or_default()
502                .insert(PathBuf::from(src_path));
503        }
504
505        // Reconstruct fingerprint nodes
506        for row in &fp_rows {
507            let file_path: String = row.get(0);
508            let fp_bytes: Vec<u8> = row.get(1);
509            let last_analyzed: Option<i64> = row.get(2);
510
511            let fingerprint = bytes_to_fingerprint(&fp_bytes)?;
512            let source_files = source_map.remove(&file_path).unwrap_or_default();
513
514            let fp = AnalysisDefFingerprint {
515                source_files,
516                fingerprint,
517                last_analyzed,
518            };
519
520            graph.nodes.insert(PathBuf::from(&file_path), fp);
521        }
522
523        // Load all edges
524        let edge_stmt = client
525            .prepare(
526                "SELECT from_path, to_path, dep_type, \
527                        symbol_from, symbol_to, symbol_kind, dependency_strength \
528                 FROM dependency_edges",
529            )
530            .await
531            .map_err(pg_error)?;
532
533        let edge_rows = client.query(&edge_stmt, &[]).await.map_err(pg_error)?;
534
535        for row in &edge_rows {
536            let edge = row_to_edge(row)?;
537            graph.add_edge(edge);
538        }
539
540        Ok(graph)
541    }
542
543    async fn save_full_graph(&self, graph: &DependencyGraph) -> Result<(), StorageError> {
544        let mut client = self.pool.get().await.map_err(pg_pool_error)?;
545
546        let txn = client.transaction().await.map_err(pg_error)?;
547
548        // Clear existing data (order matters due to foreign keys)
549        txn.batch_execute(
550            "DELETE FROM source_files; \
551             DELETE FROM dependency_edges; \
552             DELETE FROM analysis_fingerprints;",
553        )
554        .await
555        .map_err(pg_error)?;
556
557        // Save all fingerprints
558        let fp_stmt = txn
559            .prepare(
560                "INSERT INTO analysis_fingerprints (file_path, content_fingerprint, last_analyzed) \
561                 VALUES ($1, $2, $3)",
562            )
563            .await
564            .map_err(pg_error)?;
565
566        let src_stmt = txn
567            .prepare("INSERT INTO source_files (fingerprint_path, source_path) VALUES ($1, $2)")
568            .await
569            .map_err(pg_error)?;
570
571        for (path, fp) in &graph.nodes {
572            let fp_path = path.to_string_lossy();
573            let fp_bytes = fp.fingerprint.as_slice();
574
575            txn.execute(&fp_stmt, &[&fp_path.as_ref(), &fp_bytes, &fp.last_analyzed])
576                .await
577                .map_err(pg_error)?;
578
579            for source in &fp.source_files {
580                let src_path = source.to_string_lossy();
581                txn.execute(&src_stmt, &[&fp_path.as_ref(), &src_path.as_ref()])
582                    .await
583                    .map_err(pg_error)?;
584            }
585        }
586
587        // Save all edges
588        let edge_stmt = txn
589            .prepare(
590                "INSERT INTO dependency_edges \
591                 (from_path, to_path, dep_type, symbol_from, symbol_to, symbol_kind, dependency_strength) \
592                 VALUES ($1, $2, $3, $4, $5, $6, $7) \
593                 ON CONFLICT (from_path, to_path, dep_type) DO NOTHING",
594            )
595            .await
596            .map_err(pg_error)?;
597
598        for edge in &graph.edges {
599            let (sym_from, sym_to, sym_kind, strength) = match &edge.symbol {
600                Some(sym) => (
601                    Some(sym.from_symbol.clone()),
602                    Some(sym.to_symbol.clone()),
603                    Some(sym.kind.to_string()),
604                    Some(sym.strength.to_string()),
605                ),
606                None => (None, None, None, None),
607            };
608
609            txn.execute(
610                &edge_stmt,
611                &[
612                    &edge.from.to_string_lossy().as_ref(),
613                    &edge.to.to_string_lossy().as_ref(),
614                    &edge.dep_type.to_string(),
615                    &sym_from.as_deref(),
616                    &sym_to.as_deref(),
617                    &sym_kind.as_deref(),
618                    &strength.as_deref(),
619                ],
620            )
621            .await
622            .map_err(pg_error)?;
623        }
624
625        txn.commit().await.map_err(pg_error)?;
626
627        Ok(())
628    }
629
630    fn name(&self) -> &'static str {
631        "postgres"
632    }
633}
634
635// ─── Helper Functions ───────────────────────────────────────────────────────
636
637/// Converts a database row to a [`DependencyEdge`].
638fn row_to_edge(row: &tokio_postgres::Row) -> Result<DependencyEdge, StorageError> {
639    let from_path: String = row.get(0);
640    let to_path: String = row.get(1);
641    let dep_type_str: String = row.get(2);
642    let symbol_from: Option<String> = row.get(3);
643    let symbol_to: Option<String> = row.get(4);
644    let symbol_kind: Option<String> = row.get(5);
645    let strength: Option<String> = row.get(6);
646
647    let dep_type = parse_dependency_type(&dep_type_str)?;
648
649    let symbol = match (symbol_from, symbol_to, symbol_kind, strength) {
650        (Some(from), Some(to), Some(kind), Some(str_val)) => Some(SymbolDependency {
651            from_symbol: from,
652            to_symbol: to,
653            kind: parse_symbol_kind(&kind)?,
654            strength: parse_dependency_strength(&str_val)?,
655        }),
656        _ => None,
657    };
658
659    Ok(DependencyEdge {
660        from: PathBuf::from(from_path),
661        to: PathBuf::from(to_path),
662        dep_type,
663        symbol,
664    })
665}
666
667/// Converts raw bytes from Postgres BYTEA to a [`Fingerprint`].
668fn bytes_to_fingerprint(bytes: &[u8]) -> Result<Fingerprint, StorageError> {
669    let arr: [u8; 16] = bytes.try_into().map_err(|_| {
670        StorageError::Corruption(format!(
671            "Fingerprint has invalid length: expected 16, got {}",
672            bytes.len()
673        ))
674    })?;
675    Ok(Fingerprint(arr))
676}
677
678/// Parses a string representation of [`DependencyType`].
679fn parse_dependency_type(s: &str) -> Result<DependencyType, StorageError> {
680    match s {
681        "import" | "Import" => Ok(DependencyType::Import),
682        "export" | "Export" => Ok(DependencyType::Export),
683        "macro" | "Macro" => Ok(DependencyType::Macro),
684        "type" | "Type" => Ok(DependencyType::Type),
685        "trait" | "Trait" => Ok(DependencyType::Trait),
686        other => Err(StorageError::Corruption(format!(
687            "Unknown dependency type: {other}"
688        ))),
689    }
690}
691
692/// Parses a string representation of [`SymbolKind`].
693fn parse_symbol_kind(s: &str) -> Result<SymbolKind, StorageError> {
694    match s {
695        "function" | "Function" => Ok(SymbolKind::Function),
696        "class" | "Class" => Ok(SymbolKind::Class),
697        "interface" | "Interface" => Ok(SymbolKind::Interface),
698        "type_alias" | "TypeAlias" => Ok(SymbolKind::TypeAlias),
699        "constant" | "Constant" => Ok(SymbolKind::Constant),
700        "enum" | "Enum" => Ok(SymbolKind::Enum),
701        "module" | "Module" => Ok(SymbolKind::Module),
702        "macro" | "Macro" => Ok(SymbolKind::Macro),
703        other => Err(StorageError::Corruption(format!(
704            "Unknown symbol kind: {other}"
705        ))),
706    }
707}
708
709/// Parses a string representation of [`DependencyStrength`].
710fn parse_dependency_strength(s: &str) -> Result<DependencyStrength, StorageError> {
711    match s {
712        "strong" | "Strong" => Ok(DependencyStrength::Strong),
713        "weak" | "Weak" => Ok(DependencyStrength::Weak),
714        other => Err(StorageError::Corruption(format!(
715            "Unknown dependency strength: {other}"
716        ))),
717    }
718}
719
720/// Converts a `tokio_postgres::Error` to a [`StorageError::Backend`].
721fn pg_error(e: tokio_postgres::Error) -> StorageError {
722    StorageError::Backend(format!("Postgres error: {e}"))
723}
724
725/// Converts a deadpool pool error to a [`StorageError::Backend`].
726fn pg_pool_error(e: deadpool_postgres::PoolError) -> StorageError {
727    StorageError::Backend(format!("Connection pool error: {e}"))
728}