Skip to main content

thread_flow/incremental/
storage.rs

1// SPDX-FileCopyrightText: 2025 Knitli Inc. <knitli@knit.li>
2// SPDX-License-Identifier: AGPL-3.0-or-later
3
4//! Storage trait definitions for persisting dependency graphs and fingerprints.
5//!
6//! This module defines the abstract storage interface that enables the
7//! incremental update system to persist state across sessions. Concrete
8//! implementations are provided for:
9//!
10//! - **Postgres** (CLI deployment): Full-featured SQL backend
11//! - **D1** (Edge deployment): Cloudflare Workers-compatible storage
12//!
13//! ## Design Pattern
14//!
15//! Adapted from ReCoco's `build_import_op_exec_ctx` persistence
16//! (exec_ctx.rs:55-134) and setup state management.
17
18use super::graph::{DependencyGraph, GraphError};
19use super::types::{AnalysisDefFingerprint, DependencyEdge};
20use async_trait::async_trait;
21use metrics::{counter, histogram};
22use std::path::{Path, PathBuf};
23use tracing::{debug, instrument};
24
25/// Errors that can occur during storage operations.
26#[derive(Debug)]
27pub enum StorageError {
28    /// The requested item was not found in storage.
29    NotFound(String),
30
31    /// A database or I/O error occurred.
32    Backend(String),
33
34    /// The stored data is corrupted or invalid.
35    Corruption(String),
36
37    /// A graph-level error propagated from graph operations.
38    Graph(GraphError),
39}
40
41impl std::fmt::Display for StorageError {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        match self {
44            StorageError::NotFound(msg) => write!(f, "Storage item not found: {msg}"),
45            StorageError::Backend(msg) => write!(f, "Storage backend error: {msg}"),
46            StorageError::Corruption(msg) => write!(f, "Storage data corruption: {msg}"),
47            StorageError::Graph(err) => write!(f, "Graph error: {err}"),
48        }
49    }
50}
51
52impl std::error::Error for StorageError {
53    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
54        match self {
55            StorageError::Graph(err) => Some(err),
56            _ => None,
57        }
58    }
59}
60
61impl From<GraphError> for StorageError {
62    fn from(err: GraphError) -> Self {
63        StorageError::Graph(err)
64    }
65}
66
67/// Abstract storage backend for the incremental update system.
68///
69/// Provides async persistence for fingerprints and dependency edges.
70/// Implementations must support both read and write operations, as well
71/// as transactional consistency for batch updates.
72///
73/// # Implementors
74///
75/// - `PostgresStorage` (Phase 2): Full Postgres backend for CLI deployment
76/// - `D1Storage` (Phase 2): Cloudflare D1 backend for edge deployment
77///
78/// # Examples
79///
80/// ```rust,ignore
81/// # // This example requires a concrete implementation
82/// use thread_flow::incremental::storage::StorageBackend;
83///
84/// async fn example(storage: &dyn StorageBackend) {
85///     let fp = storage.load_fingerprint(Path::new("src/main.rs")).await;
86/// }
87/// ```
88#[async_trait]
89pub trait StorageBackend: Send + Sync + std::fmt::Debug {
90    /// Persists a fingerprint for the given file path.
91    ///
92    /// Uses upsert semantics: creates a new entry or updates an existing one.
93    ///
94    /// # Arguments
95    ///
96    /// * `file_path` - The file this fingerprint belongs to.
97    /// * `fingerprint` - The fingerprint data to persist.
98    async fn save_fingerprint(
99        &self,
100        file_path: &Path,
101        fingerprint: &AnalysisDefFingerprint,
102    ) -> Result<(), StorageError>;
103
104    /// Loads the fingerprint for a file, if one exists.
105    ///
106    /// # Arguments
107    ///
108    /// * `file_path` - The file to load the fingerprint for.
109    ///
110    /// # Returns
111    ///
112    /// `Ok(Some(fp))` if a fingerprint exists, `Ok(None)` if not found.
113    async fn load_fingerprint(
114        &self,
115        file_path: &Path,
116    ) -> Result<Option<AnalysisDefFingerprint>, StorageError>;
117
118    /// Deletes the fingerprint for a file.
119    ///
120    /// Returns `Ok(true)` if a fingerprint was deleted, `Ok(false)` if
121    /// no fingerprint existed for the path.
122    async fn delete_fingerprint(&self, file_path: &Path) -> Result<bool, StorageError>;
123
124    /// Persists a dependency edge.
125    ///
126    /// Uses upsert semantics based on the composite key
127    /// (from, to, from_symbol, to_symbol, dep_type).
128    async fn save_edge(&self, edge: &DependencyEdge) -> Result<(), StorageError>;
129
130    /// Loads all dependency edges originating from a file.
131    async fn load_edges_from(&self, file_path: &Path) -> Result<Vec<DependencyEdge>, StorageError>;
132
133    /// Loads all dependency edges targeting a file.
134    async fn load_edges_to(&self, file_path: &Path) -> Result<Vec<DependencyEdge>, StorageError>;
135
136    /// Deletes all dependency edges involving a file (as source or target).
137    async fn delete_edges_for(&self, file_path: &Path) -> Result<usize, StorageError>;
138
139    /// Loads the complete dependency graph from storage.
140    ///
141    /// This is used during initialization to restore the graph state
142    /// from the previous session.
143    async fn load_full_graph(&self) -> Result<DependencyGraph, StorageError>;
144
145    /// Persists the complete dependency graph to storage.
146    ///
147    /// This performs a full replacement of the stored graph.
148    /// Used after graph rebuilds or major updates.
149    async fn save_full_graph(&self, graph: &DependencyGraph) -> Result<(), StorageError>;
150
151    /// Returns the name of this storage backend for observability.
152    ///
153    /// Used in tracing spans and metrics to identify the storage implementation.
154    fn name(&self) -> &'static str;
155}
156
157/// In-memory storage backend for testing purposes.
158///
159/// Stores all data in memory with no persistence. Useful for unit tests
160/// and development scenarios.
161///
162/// # Examples
163///
164/// ```rust
165/// use thread_flow::incremental::storage::InMemoryStorage;
166///
167/// let storage = InMemoryStorage::new();
168/// ```
169#[derive(Debug)]
170pub struct InMemoryStorage {
171    fingerprints: tokio::sync::RwLock<thread_utilities::RapidMap<PathBuf, AnalysisDefFingerprint>>,
172    edges: tokio::sync::RwLock<Vec<DependencyEdge>>,
173}
174
175impl InMemoryStorage {
176    /// Creates a new empty in-memory storage backend.
177    pub fn new() -> Self {
178        Self {
179            fingerprints: tokio::sync::RwLock::new(thread_utilities::get_map()),
180            edges: tokio::sync::RwLock::new(Vec::new()),
181        }
182    }
183}
184
185impl Default for InMemoryStorage {
186    fn default() -> Self {
187        Self::new()
188    }
189}
190
191#[async_trait]
192impl StorageBackend for InMemoryStorage {
193    #[instrument(skip(self, fingerprint), fields(backend = "inmemory"))]
194    async fn save_fingerprint(
195        &self,
196        file_path: &Path,
197        fingerprint: &AnalysisDefFingerprint,
198    ) -> Result<(), StorageError> {
199        debug!(file_path = ?file_path, "saving fingerprint");
200        let start = std::time::Instant::now();
201        let mut fps = self.fingerprints.write().await;
202        fps.insert(file_path.to_path_buf(), fingerprint.clone());
203        histogram!("storage_write_latency_ms").record(start.elapsed().as_micros() as f64 / 1000.0);
204        counter!("storage_writes_total", "backend" => "inmemory").increment(1);
205        Ok(())
206    }
207
208    #[instrument(skip(self), fields(backend = "inmemory"))]
209    async fn load_fingerprint(
210        &self,
211        file_path: &Path,
212    ) -> Result<Option<AnalysisDefFingerprint>, StorageError> {
213        debug!(file_path = ?file_path, "loading fingerprint");
214        let start = std::time::Instant::now();
215        let fps = self.fingerprints.read().await;
216        let result = fps.get(file_path).cloned();
217        histogram!("storage_read_latency_ms").record(start.elapsed().as_micros() as f64 / 1000.0);
218        counter!("storage_reads_total", "backend" => "inmemory").increment(1);
219        Ok(result)
220    }
221
222    async fn delete_fingerprint(&self, file_path: &Path) -> Result<bool, StorageError> {
223        let mut fps = self.fingerprints.write().await;
224        Ok(fps.remove(file_path).is_some())
225    }
226
227    async fn save_edge(&self, edge: &DependencyEdge) -> Result<(), StorageError> {
228        let mut edges = self.edges.write().await;
229        edges.push(edge.clone());
230        Ok(())
231    }
232
233    async fn load_edges_from(&self, file_path: &Path) -> Result<Vec<DependencyEdge>, StorageError> {
234        let edges = self.edges.read().await;
235        Ok(edges
236            .iter()
237            .filter(|e| e.from == file_path)
238            .cloned()
239            .collect())
240    }
241
242    async fn load_edges_to(&self, file_path: &Path) -> Result<Vec<DependencyEdge>, StorageError> {
243        let edges = self.edges.read().await;
244        Ok(edges
245            .iter()
246            .filter(|e| e.to == file_path)
247            .cloned()
248            .collect())
249    }
250
251    async fn delete_edges_for(&self, file_path: &Path) -> Result<usize, StorageError> {
252        let mut edges = self.edges.write().await;
253        let before = edges.len();
254        edges.retain(|e| e.from != file_path && e.to != file_path);
255        Ok(before - edges.len())
256    }
257
258    async fn load_full_graph(&self) -> Result<DependencyGraph, StorageError> {
259        let edges = self.edges.read().await;
260        let fps = self.fingerprints.read().await;
261
262        let mut graph = DependencyGraph::new();
263
264        // Restore fingerprint nodes
265        for (path, fp) in fps.iter() {
266            graph.nodes.insert(path.clone(), fp.clone());
267        }
268
269        // Restore edges
270        for edge in edges.iter() {
271            graph.add_edge(edge.clone());
272        }
273
274        Ok(graph)
275    }
276
277    async fn save_full_graph(&self, graph: &DependencyGraph) -> Result<(), StorageError> {
278        let mut fps = self.fingerprints.write().await;
279        let mut edges = self.edges.write().await;
280
281        fps.clear();
282        for (path, fp) in &graph.nodes {
283            fps.insert(path.clone(), fp.clone());
284        }
285
286        edges.clear();
287        edges.extend(graph.edges.iter().cloned());
288
289        Ok(())
290    }
291
292    fn name(&self) -> &'static str {
293        "inmemory"
294    }
295}
296
297// ─── Tests ───────────────────────────────────────────────────────────────────
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302    use crate::incremental::types::DependencyType;
303
304    #[tokio::test]
305    async fn test_in_memory_storage_save_and_load_fingerprint() {
306        let storage = InMemoryStorage::new();
307        let fp = AnalysisDefFingerprint::new(b"test content");
308
309        storage
310            .save_fingerprint(Path::new("src/main.rs"), &fp)
311            .await
312            .unwrap();
313
314        let loaded = storage
315            .load_fingerprint(Path::new("src/main.rs"))
316            .await
317            .unwrap();
318
319        assert!(loaded.is_some());
320        let loaded = loaded.unwrap();
321        assert!(loaded.content_matches(b"test content"));
322    }
323
324    #[tokio::test]
325    async fn test_in_memory_storage_load_nonexistent_fingerprint() {
326        let storage = InMemoryStorage::new();
327        let loaded = storage
328            .load_fingerprint(Path::new("nonexistent.rs"))
329            .await
330            .unwrap();
331        assert!(loaded.is_none());
332    }
333
334    #[tokio::test]
335    async fn test_in_memory_storage_delete_fingerprint() {
336        let storage = InMemoryStorage::new();
337        let fp = AnalysisDefFingerprint::new(b"content");
338
339        storage
340            .save_fingerprint(Path::new("a.rs"), &fp)
341            .await
342            .unwrap();
343
344        let deleted = storage.delete_fingerprint(Path::new("a.rs")).await.unwrap();
345        assert!(deleted);
346
347        let loaded = storage.load_fingerprint(Path::new("a.rs")).await.unwrap();
348        assert!(loaded.is_none());
349    }
350
351    #[tokio::test]
352    async fn test_in_memory_storage_delete_nonexistent_fingerprint() {
353        let storage = InMemoryStorage::new();
354        let deleted = storage
355            .delete_fingerprint(Path::new("none.rs"))
356            .await
357            .unwrap();
358        assert!(!deleted);
359    }
360
361    #[tokio::test]
362    async fn test_in_memory_storage_save_and_load_edges() {
363        let storage = InMemoryStorage::new();
364        let edge = DependencyEdge::new(
365            PathBuf::from("main.rs"),
366            PathBuf::from("utils.rs"),
367            DependencyType::Import,
368        );
369
370        storage.save_edge(&edge).await.unwrap();
371
372        let from_edges = storage.load_edges_from(Path::new("main.rs")).await.unwrap();
373        assert_eq!(from_edges.len(), 1);
374        assert_eq!(from_edges[0].to, PathBuf::from("utils.rs"));
375
376        let to_edges = storage.load_edges_to(Path::new("utils.rs")).await.unwrap();
377        assert_eq!(to_edges.len(), 1);
378        assert_eq!(to_edges[0].from, PathBuf::from("main.rs"));
379    }
380
381    #[tokio::test]
382    async fn test_in_memory_storage_delete_edges() {
383        let storage = InMemoryStorage::new();
384
385        storage
386            .save_edge(&DependencyEdge::new(
387                PathBuf::from("a.rs"),
388                PathBuf::from("b.rs"),
389                DependencyType::Import,
390            ))
391            .await
392            .unwrap();
393        storage
394            .save_edge(&DependencyEdge::new(
395                PathBuf::from("c.rs"),
396                PathBuf::from("a.rs"),
397                DependencyType::Import,
398            ))
399            .await
400            .unwrap();
401        storage
402            .save_edge(&DependencyEdge::new(
403                PathBuf::from("d.rs"),
404                PathBuf::from("e.rs"),
405                DependencyType::Import,
406            ))
407            .await
408            .unwrap();
409
410        let deleted = storage.delete_edges_for(Path::new("a.rs")).await.unwrap();
411        assert_eq!(deleted, 2); // Both edges involving a.rs
412
413        // d.rs -> e.rs should remain
414        let remaining = storage.load_edges_from(Path::new("d.rs")).await.unwrap();
415        assert_eq!(remaining.len(), 1);
416    }
417
418    #[tokio::test]
419    async fn test_in_memory_storage_full_graph_roundtrip() {
420        let storage = InMemoryStorage::new();
421
422        let mut graph = DependencyGraph::new();
423        graph.add_edge(DependencyEdge::new(
424            PathBuf::from("a.rs"),
425            PathBuf::from("b.rs"),
426            DependencyType::Import,
427        ));
428        graph.add_edge(DependencyEdge::new(
429            PathBuf::from("b.rs"),
430            PathBuf::from("c.rs"),
431            DependencyType::Import,
432        ));
433
434        storage.save_full_graph(&graph).await.unwrap();
435
436        let loaded = storage.load_full_graph().await.unwrap();
437        assert_eq!(loaded.edge_count(), 2);
438        assert!(loaded.contains_node(Path::new("a.rs")));
439        assert!(loaded.contains_node(Path::new("b.rs")));
440        assert!(loaded.contains_node(Path::new("c.rs")));
441    }
442
443    #[tokio::test]
444    async fn test_in_memory_storage_upsert_fingerprint() {
445        let storage = InMemoryStorage::new();
446
447        let fp1 = AnalysisDefFingerprint::new(b"version 1");
448        storage
449            .save_fingerprint(Path::new("file.rs"), &fp1)
450            .await
451            .unwrap();
452
453        let fp2 = AnalysisDefFingerprint::new(b"version 2");
454        storage
455            .save_fingerprint(Path::new("file.rs"), &fp2)
456            .await
457            .unwrap();
458
459        let loaded = storage
460            .load_fingerprint(Path::new("file.rs"))
461            .await
462            .unwrap()
463            .unwrap();
464
465        assert!(loaded.content_matches(b"version 2"));
466        assert!(!loaded.content_matches(b"version 1"));
467    }
468
469    // ── StorageError Tests ───────────────────────────────────────────────
470
471    #[test]
472    fn test_storage_error_display() {
473        let err = StorageError::NotFound("file.rs".to_string());
474        assert!(format!("{}", err).contains("file.rs"));
475
476        let err = StorageError::Backend("connection refused".to_string());
477        assert!(format!("{}", err).contains("connection refused"));
478
479        let err = StorageError::Corruption("invalid checksum".to_string());
480        assert!(format!("{}", err).contains("invalid checksum"));
481    }
482
483    #[test]
484    fn test_storage_error_from_graph_error() {
485        let graph_err = GraphError::CyclicDependency(PathBuf::from("a.rs"));
486        let storage_err: StorageError = graph_err.into();
487
488        match storage_err {
489            StorageError::Graph(_) => {} // Expected
490            _ => panic!("Expected StorageError::Graph"),
491        }
492    }
493}