codeprism_storage/
backends.rs

1//! Storage backend implementations
2
3use crate::{
4    AnalysisResult, AnalysisStorage, EdgeReference, GraphMetadata, GraphStorage, SerializableEdge,
5    SerializableGraph, SerializableNode,
6};
7use anyhow::{Context, Result};
8use async_trait::async_trait;
9use rusqlite::{params, Connection};
10use std::collections::HashMap;
11use std::path::{Path, PathBuf};
12use std::sync::{Arc, Mutex};
13use std::time::SystemTime;
14use tokio::fs;
15use tokio::sync::Mutex as AsyncMutex;
16
17/// In-memory graph storage implementation
18pub struct InMemoryGraphStorage {
19    graphs: Arc<Mutex<HashMap<String, SerializableGraph>>>,
20}
21
22impl InMemoryGraphStorage {
23    /// Create a new in-memory graph storage
24    pub fn new() -> Self {
25        Self {
26            graphs: Arc::new(Mutex::new(HashMap::new())),
27        }
28    }
29}
30
31impl Default for InMemoryGraphStorage {
32    fn default() -> Self {
33        Self::new()
34    }
35}
36
37#[async_trait]
38impl GraphStorage for InMemoryGraphStorage {
39    async fn store_graph(&self, graph: &SerializableGraph) -> Result<()> {
40        let mut graphs = self.graphs.lock().unwrap();
41        graphs.insert(graph.repo_id.clone(), graph.clone());
42        Ok(())
43    }
44
45    async fn load_graph(&self, repo_id: &str) -> Result<Option<SerializableGraph>> {
46        let graphs = self.graphs.lock().unwrap();
47        Ok(graphs.get(repo_id).cloned())
48    }
49
50    async fn update_nodes(&self, repo_id: &str, nodes: &[SerializableNode]) -> Result<()> {
51        let mut graphs = self.graphs.lock().unwrap();
52        if let Some(graph) = graphs.get_mut(repo_id) {
53            // Update existing nodes or add new ones
54            for new_node in nodes {
55                if let Some(existing_node) = graph.nodes.iter_mut().find(|n| n.id == new_node.id) {
56                    *existing_node = new_node.clone();
57                } else {
58                    graph.nodes.push(new_node.clone());
59                }
60            }
61        }
62        Ok(())
63    }
64
65    async fn update_edges(&self, repo_id: &str, edges: &[SerializableEdge]) -> Result<()> {
66        let mut graphs = self.graphs.lock().unwrap();
67        if let Some(graph) = graphs.get_mut(repo_id) {
68            // Update existing edges or add new ones
69            for new_edge in edges {
70                if let Some(existing_edge) = graph.edges.iter_mut().find(|e| {
71                    e.source == new_edge.source
72                        && e.target == new_edge.target
73                        && e.kind == new_edge.kind
74                }) {
75                    *existing_edge = new_edge.clone();
76                } else {
77                    graph.edges.push(new_edge.clone());
78                }
79            }
80        }
81        Ok(())
82    }
83
84    async fn delete_nodes(&self, repo_id: &str, node_ids: &[String]) -> Result<()> {
85        let mut graphs = self.graphs.lock().unwrap();
86        if let Some(graph) = graphs.get_mut(repo_id) {
87            graph.nodes.retain(|n| !node_ids.contains(&n.id));
88            // Also remove edges that reference deleted nodes
89            graph
90                .edges
91                .retain(|e| !node_ids.contains(&e.source) && !node_ids.contains(&e.target));
92        }
93        Ok(())
94    }
95
96    async fn delete_edges(&self, repo_id: &str, edge_refs: &[EdgeReference]) -> Result<()> {
97        let mut graphs = self.graphs.lock().unwrap();
98        if let Some(graph) = graphs.get_mut(repo_id) {
99            graph.edges.retain(|e| {
100                !edge_refs
101                    .iter()
102                    .any(|er| er.source == e.source && er.target == e.target && er.kind == e.kind)
103            });
104        }
105        Ok(())
106    }
107
108    async fn get_graph_metadata(&self, repo_id: &str) -> Result<Option<GraphMetadata>> {
109        let graphs = self.graphs.lock().unwrap();
110        Ok(graphs.get(repo_id).map(|g| g.metadata.clone()))
111    }
112
113    async fn update_graph_metadata(&self, repo_id: &str, metadata: &GraphMetadata) -> Result<()> {
114        let mut graphs = self.graphs.lock().unwrap();
115        if let Some(graph) = graphs.get_mut(repo_id) {
116            graph.metadata = metadata.clone();
117        }
118        Ok(())
119    }
120
121    async fn list_repositories(&self) -> Result<Vec<String>> {
122        let graphs = self.graphs.lock().unwrap();
123        Ok(graphs.keys().cloned().collect())
124    }
125
126    async fn delete_graph(&self, repo_id: &str) -> Result<()> {
127        let mut graphs = self.graphs.lock().unwrap();
128        graphs.remove(repo_id);
129        Ok(())
130    }
131
132    async fn graph_exists(&self, repo_id: &str) -> Result<bool> {
133        let graphs = self.graphs.lock().unwrap();
134        Ok(graphs.contains_key(repo_id))
135    }
136}
137
138/// File-based graph storage implementation
139pub struct FileGraphStorage {
140    data_path: PathBuf,
141}
142
143impl FileGraphStorage {
144    /// Create a new file-based graph storage
145    pub async fn new(data_path: &Path) -> Result<Self> {
146        let storage = Self {
147            data_path: data_path.to_path_buf(),
148        };
149
150        // Ensure the data directory exists
151        fs::create_dir_all(&storage.data_path)
152            .await
153            .context("Failed to create data directory")?;
154
155        Ok(storage)
156    }
157
158    /// Get the file path for a repository's graph
159    fn graph_file_path(&self, repo_id: &str) -> PathBuf {
160        self.data_path.join(format!("{repo_id}.graph.json"))
161    }
162
163    /// Get the file path for a repository's metadata
164    fn metadata_file_path(&self, repo_id: &str) -> PathBuf {
165        self.data_path.join(format!("{repo_id}.metadata.json"))
166    }
167}
168
169#[async_trait]
170impl GraphStorage for FileGraphStorage {
171    async fn store_graph(&self, graph: &SerializableGraph) -> Result<()> {
172        let graph_path = self.graph_file_path(&graph.repo_id);
173        let metadata_path = self.metadata_file_path(&graph.repo_id);
174
175        // Serialize and write graph
176        let graph_json =
177            serde_json::to_string_pretty(graph).context("Failed to serialize graph")?;
178        fs::write(&graph_path, graph_json)
179            .await
180            .with_context(|| format!("Failed to write graph to {graph_path:?}"))?;
181
182        // Serialize and write metadata separately for efficiency
183        let metadata_json = serde_json::to_string_pretty(&graph.metadata)
184            .context("Failed to serialize metadata")?;
185        fs::write(&metadata_path, metadata_json)
186            .await
187            .with_context(|| format!("Failed to write metadata to {metadata_path:?}"))?;
188
189        Ok(())
190    }
191
192    async fn load_graph(&self, repo_id: &str) -> Result<Option<SerializableGraph>> {
193        let graph_path = self.graph_file_path(repo_id);
194
195        if !graph_path.exists() {
196            return Ok(None);
197        }
198
199        let graph_json = fs::read_to_string(&graph_path)
200            .await
201            .with_context(|| format!("Failed to read graph from {graph_path:?}"))?;
202
203        let graph: SerializableGraph =
204            serde_json::from_str(&graph_json).context("Failed to deserialize graph")?;
205
206        Ok(Some(graph))
207    }
208
209    async fn update_nodes(&self, repo_id: &str, nodes: &[SerializableNode]) -> Result<()> {
210        if let Some(mut graph) = self.load_graph(repo_id).await? {
211            // Update existing nodes or add new ones
212            for new_node in nodes {
213                if let Some(existing_node) = graph.nodes.iter_mut().find(|n| n.id == new_node.id) {
214                    *existing_node = new_node.clone();
215                } else {
216                    graph.nodes.push(new_node.clone());
217                }
218            }
219
220            // Update timestamp
221            graph.metadata.last_updated = SystemTime::now();
222
223            self.store_graph(&graph).await?;
224        }
225        Ok(())
226    }
227
228    async fn update_edges(&self, repo_id: &str, edges: &[SerializableEdge]) -> Result<()> {
229        if let Some(mut graph) = self.load_graph(repo_id).await? {
230            // Update existing edges or add new ones
231            for new_edge in edges {
232                if let Some(existing_edge) = graph.edges.iter_mut().find(|e| {
233                    e.source == new_edge.source
234                        && e.target == new_edge.target
235                        && e.kind == new_edge.kind
236                }) {
237                    *existing_edge = new_edge.clone();
238                } else {
239                    graph.edges.push(new_edge.clone());
240                }
241            }
242
243            // Update timestamp
244            graph.metadata.last_updated = SystemTime::now();
245
246            self.store_graph(&graph).await?;
247        }
248        Ok(())
249    }
250
251    async fn delete_nodes(&self, repo_id: &str, node_ids: &[String]) -> Result<()> {
252        if let Some(mut graph) = self.load_graph(repo_id).await? {
253            graph.nodes.retain(|n| !node_ids.contains(&n.id));
254            // Also remove edges that reference deleted nodes
255            graph
256                .edges
257                .retain(|e| !node_ids.contains(&e.source) && !node_ids.contains(&e.target));
258
259            // Update timestamp
260            graph.metadata.last_updated = SystemTime::now();
261
262            self.store_graph(&graph).await?;
263        }
264        Ok(())
265    }
266
267    async fn delete_edges(&self, repo_id: &str, edge_refs: &[EdgeReference]) -> Result<()> {
268        if let Some(mut graph) = self.load_graph(repo_id).await? {
269            graph.edges.retain(|e| {
270                !edge_refs
271                    .iter()
272                    .any(|er| er.source == e.source && er.target == e.target && er.kind == e.kind)
273            });
274
275            // Update timestamp
276            graph.metadata.last_updated = SystemTime::now();
277
278            self.store_graph(&graph).await?;
279        }
280        Ok(())
281    }
282
283    async fn get_graph_metadata(&self, repo_id: &str) -> Result<Option<GraphMetadata>> {
284        let metadata_path = self.metadata_file_path(repo_id);
285
286        if !metadata_path.exists() {
287            return Ok(None);
288        }
289
290        let metadata_json = fs::read_to_string(&metadata_path)
291            .await
292            .with_context(|| format!("Failed to read metadata from {metadata_path:?}"))?;
293
294        let metadata: GraphMetadata =
295            serde_json::from_str(&metadata_json).context("Failed to deserialize metadata")?;
296
297        Ok(Some(metadata))
298    }
299
300    async fn update_graph_metadata(&self, repo_id: &str, metadata: &GraphMetadata) -> Result<()> {
301        let metadata_path = self.metadata_file_path(repo_id);
302
303        let metadata_json =
304            serde_json::to_string_pretty(metadata).context("Failed to serialize metadata")?;
305        fs::write(&metadata_path, metadata_json)
306            .await
307            .with_context(|| format!("Failed to write metadata to {metadata_path:?}"))?;
308
309        Ok(())
310    }
311
312    async fn list_repositories(&self) -> Result<Vec<String>> {
313        let mut repos = Vec::new();
314        let mut entries = fs::read_dir(&self.data_path)
315            .await
316            .context("Failed to read data directory")?;
317
318        while let Some(entry) = entries.next_entry().await? {
319            if let Some(file_name) = entry.file_name().to_str() {
320                if file_name.ends_with(".graph.json") {
321                    let repo_id = file_name.strip_suffix(".graph.json").unwrap().to_string();
322                    repos.push(repo_id);
323                }
324            }
325        }
326
327        Ok(repos)
328    }
329
330    async fn delete_graph(&self, repo_id: &str) -> Result<()> {
331        let graph_path = self.graph_file_path(repo_id);
332        let metadata_path = self.metadata_file_path(repo_id);
333
334        // Remove both files if they exist
335        if graph_path.exists() {
336            fs::remove_file(&graph_path)
337                .await
338                .with_context(|| format!("Failed to remove graph file {graph_path:?}"))?;
339        }
340
341        if metadata_path.exists() {
342            fs::remove_file(&metadata_path)
343                .await
344                .with_context(|| format!("Failed to remove metadata file {metadata_path:?}"))?;
345        }
346
347        Ok(())
348    }
349
350    async fn graph_exists(&self, repo_id: &str) -> Result<bool> {
351        let graph_path = self.graph_file_path(repo_id);
352        Ok(graph_path.exists())
353    }
354}
355
356/// SQLite-based graph storage implementation
357pub struct SqliteGraphStorage {
358    #[allow(dead_code)] // Will be used for database path management
359    db_path: PathBuf,
360    connection: Arc<AsyncMutex<Connection>>,
361}
362
363impl SqliteGraphStorage {
364    /// Create a new SQLite-based graph storage
365    pub async fn new(data_path: &Path) -> Result<Self> {
366        // Ensure the data directory exists
367        fs::create_dir_all(data_path)
368            .await
369            .context("Failed to create data directory")?;
370
371        let db_path = data_path.join("codeprism.db");
372        let connection = Connection::open(&db_path)
373            .with_context(|| format!("Failed to open SQLite database at {db_path:?}"))?;
374
375        let storage = Self {
376            db_path: db_path.clone(),
377            connection: Arc::new(AsyncMutex::new(connection)),
378        };
379
380        // Initialize database schema
381        storage.init_schema().await?;
382
383        Ok(storage)
384    }
385
386    /// Initialize the database schema
387    async fn init_schema(&self) -> Result<()> {
388        let conn = self.connection.lock().await;
389
390        conn.execute(
391            "CREATE TABLE IF NOT EXISTS graphs (
392                repo_id TEXT PRIMARY KEY,
393                data BLOB NOT NULL,
394                created_at INTEGER NOT NULL,
395                updated_at INTEGER NOT NULL
396            )",
397            [],
398        )?;
399
400        conn.execute(
401            "CREATE TABLE IF NOT EXISTS nodes (
402                repo_id TEXT NOT NULL,
403                node_id TEXT NOT NULL,
404                data BLOB NOT NULL,
405                updated_at INTEGER NOT NULL,
406                PRIMARY KEY (repo_id, node_id),
407                FOREIGN KEY (repo_id) REFERENCES graphs(repo_id) ON DELETE CASCADE
408            )",
409            [],
410        )?;
411
412        conn.execute(
413            "CREATE TABLE IF NOT EXISTS edges (
414                repo_id TEXT NOT NULL,
415                source TEXT NOT NULL,
416                target TEXT NOT NULL,
417                kind TEXT NOT NULL,
418                data BLOB NOT NULL,
419                updated_at INTEGER NOT NULL,
420                PRIMARY KEY (repo_id, source, target, kind),
421                FOREIGN KEY (repo_id) REFERENCES graphs(repo_id) ON DELETE CASCADE
422            )",
423            [],
424        )?;
425
426        conn.execute(
427            "CREATE TABLE IF NOT EXISTS metadata (
428                repo_id TEXT PRIMARY KEY,
429                data BLOB NOT NULL,
430                updated_at INTEGER NOT NULL,
431                FOREIGN KEY (repo_id) REFERENCES graphs(repo_id) ON DELETE CASCADE
432            )",
433            [],
434        )?;
435
436        // Create indices for better performance
437        conn.execute(
438            "CREATE INDEX IF NOT EXISTS idx_nodes_repo_id ON nodes(repo_id)",
439            [],
440        )?;
441        conn.execute(
442            "CREATE INDEX IF NOT EXISTS idx_edges_repo_id ON edges(repo_id)",
443            [],
444        )?;
445        conn.execute(
446            "CREATE INDEX IF NOT EXISTS idx_edges_source ON edges(source)",
447            [],
448        )?;
449        conn.execute(
450            "CREATE INDEX IF NOT EXISTS idx_edges_target ON edges(target)",
451            [],
452        )?;
453
454        Ok(())
455    }
456
457    /// Convert SystemTime to Unix timestamp
458    fn system_time_to_timestamp(time: SystemTime) -> i64 {
459        time.duration_since(std::time::UNIX_EPOCH)
460            .unwrap_or_default()
461            .as_secs() as i64
462    }
463
464    /// Convert Unix timestamp to SystemTime
465    #[allow(dead_code)] // Will be used for timestamp conversion utilities
466    fn timestamp_to_system_time(timestamp: i64) -> SystemTime {
467        std::time::UNIX_EPOCH + std::time::Duration::from_secs(timestamp as u64)
468    }
469}
470
471#[async_trait]
472impl GraphStorage for SqliteGraphStorage {
473    async fn store_graph(&self, graph: &SerializableGraph) -> Result<()> {
474        let conn = self.connection.lock().await;
475        let now = Self::system_time_to_timestamp(SystemTime::now());
476
477        // Serialize the entire graph
478        let graph_data = bincode::serialize(graph).context("Failed to serialize graph")?;
479
480        // Store the graph
481        conn.execute(
482            "INSERT OR REPLACE INTO graphs (repo_id, data, created_at, updated_at) VALUES (?1, ?2, ?3, ?4)",
483            params![graph.repo_id, graph_data, now, now],
484        )?;
485
486        // Store metadata separately for efficient queries
487        let metadata_data =
488            bincode::serialize(&graph.metadata).context("Failed to serialize metadata")?;
489
490        conn.execute(
491            "INSERT OR REPLACE INTO metadata (repo_id, data, updated_at) VALUES (?1, ?2, ?3)",
492            params![graph.repo_id, metadata_data, now],
493        )?;
494
495        // Store nodes individually for incremental updates
496        for node in &graph.nodes {
497            let node_data = bincode::serialize(node).context("Failed to serialize node")?;
498
499            conn.execute(
500                "INSERT OR REPLACE INTO nodes (repo_id, node_id, data, updated_at) VALUES (?1, ?2, ?3, ?4)",
501                params![graph.repo_id, node.id, node_data, now],
502            )?;
503        }
504
505        // Store edges individually for incremental updates
506        for edge in &graph.edges {
507            let edge_data = bincode::serialize(edge).context("Failed to serialize edge")?;
508
509            conn.execute(
510                "INSERT OR REPLACE INTO edges (repo_id, source, target, kind, data, updated_at) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
511                params![graph.repo_id, edge.source, edge.target, edge.kind, edge_data, now],
512            )?;
513        }
514
515        Ok(())
516    }
517
518    async fn load_graph(&self, repo_id: &str) -> Result<Option<SerializableGraph>> {
519        let conn = self.connection.lock().await;
520
521        // Check if graph exists
522        let mut stmt = conn.prepare("SELECT 1 FROM graphs WHERE repo_id = ?1")?;
523        let exists = stmt.exists([repo_id])?;
524
525        if !exists {
526            return Ok(None);
527        }
528
529        // Load metadata
530        let mut metadata_stmt = conn.prepare("SELECT data FROM metadata WHERE repo_id = ?1")?;
531        let metadata_result: Result<Vec<u8>, rusqlite::Error> =
532            metadata_stmt.query_row([repo_id], |row| row.get(0));
533
534        let metadata = match metadata_result {
535            Ok(metadata_data) => {
536                bincode::deserialize(&metadata_data).context("Failed to deserialize metadata")?
537            }
538            Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(None),
539            Err(e) => return Err(e.into()),
540        };
541
542        // Load nodes
543        let mut nodes = Vec::new();
544        let mut node_stmt = conn.prepare("SELECT data FROM nodes WHERE repo_id = ?1")?;
545        let node_rows = node_stmt.query_map([repo_id], |row| row.get::<_, Vec<u8>>(0))?;
546
547        for row in node_rows {
548            let node_data = row?;
549            let node: SerializableNode =
550                bincode::deserialize(&node_data).context("Failed to deserialize node")?;
551            nodes.push(node);
552        }
553
554        // Load edges
555        let mut edges = Vec::new();
556        let mut edge_stmt = conn.prepare("SELECT data FROM edges WHERE repo_id = ?1")?;
557        let edge_rows = edge_stmt.query_map([repo_id], |row| row.get::<_, Vec<u8>>(0))?;
558
559        for row in edge_rows {
560            let edge_data = row?;
561            let edge: SerializableEdge =
562                bincode::deserialize(&edge_data).context("Failed to deserialize edge")?;
563            edges.push(edge);
564        }
565
566        // Construct graph
567        let graph = SerializableGraph {
568            repo_id: repo_id.to_string(),
569            nodes,
570            edges,
571            metadata,
572        };
573
574        Ok(Some(graph))
575    }
576
577    async fn update_nodes(&self, repo_id: &str, nodes: &[SerializableNode]) -> Result<()> {
578        let conn = self.connection.lock().await;
579        let now = Self::system_time_to_timestamp(SystemTime::now());
580
581        for node in nodes {
582            let node_data = bincode::serialize(node).context("Failed to serialize node")?;
583
584            conn.execute(
585                "INSERT OR REPLACE INTO nodes (repo_id, node_id, data, updated_at) VALUES (?1, ?2, ?3, ?4)",
586                params![repo_id, node.id, node_data, now],
587            )?;
588        }
589
590        // Update the graph's updated_at timestamp
591        conn.execute(
592            "UPDATE graphs SET updated_at = ?1 WHERE repo_id = ?2",
593            params![now, repo_id],
594        )?;
595
596        Ok(())
597    }
598
599    async fn update_edges(&self, repo_id: &str, edges: &[SerializableEdge]) -> Result<()> {
600        let conn = self.connection.lock().await;
601        let now = Self::system_time_to_timestamp(SystemTime::now());
602
603        for edge in edges {
604            let edge_data = bincode::serialize(edge).context("Failed to serialize edge")?;
605
606            conn.execute(
607                "INSERT OR REPLACE INTO edges (repo_id, source, target, kind, data, updated_at) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
608                params![repo_id, edge.source, edge.target, edge.kind, edge_data, now],
609            )?;
610        }
611
612        // Update the graph's updated_at timestamp
613        conn.execute(
614            "UPDATE graphs SET updated_at = ?1 WHERE repo_id = ?2",
615            params![now, repo_id],
616        )?;
617
618        Ok(())
619    }
620
621    async fn delete_nodes(&self, repo_id: &str, node_ids: &[String]) -> Result<()> {
622        let conn = self.connection.lock().await;
623        let now = Self::system_time_to_timestamp(SystemTime::now());
624
625        for node_id in node_ids {
626            // Delete the node
627            conn.execute(
628                "DELETE FROM nodes WHERE repo_id = ?1 AND node_id = ?2",
629                params![repo_id, node_id],
630            )?;
631
632            // Delete edges that reference this node
633            conn.execute(
634                "DELETE FROM edges WHERE repo_id = ?1 AND (source = ?2 OR target = ?2)",
635                params![repo_id, node_id],
636            )?;
637        }
638
639        // Update the graph's updated_at timestamp
640        conn.execute(
641            "UPDATE graphs SET updated_at = ?1 WHERE repo_id = ?2",
642            params![now, repo_id],
643        )?;
644
645        Ok(())
646    }
647
648    async fn delete_edges(&self, repo_id: &str, edge_refs: &[EdgeReference]) -> Result<()> {
649        let conn = self.connection.lock().await;
650        let now = Self::system_time_to_timestamp(SystemTime::now());
651
652        for edge_ref in edge_refs {
653            conn.execute(
654                "DELETE FROM edges WHERE repo_id = ?1 AND source = ?2 AND target = ?3 AND kind = ?4",
655                params![repo_id, edge_ref.source, edge_ref.target, edge_ref.kind],
656            )?;
657        }
658
659        // Update the graph's updated_at timestamp
660        conn.execute(
661            "UPDATE graphs SET updated_at = ?1 WHERE repo_id = ?2",
662            params![now, repo_id],
663        )?;
664
665        Ok(())
666    }
667
668    async fn get_graph_metadata(&self, repo_id: &str) -> Result<Option<GraphMetadata>> {
669        let conn = self.connection.lock().await;
670
671        let mut stmt = conn.prepare("SELECT data FROM metadata WHERE repo_id = ?1")?;
672        let metadata_result: Result<Vec<u8>, rusqlite::Error> =
673            stmt.query_row([repo_id], |row| row.get::<_, Vec<u8>>(0));
674
675        match metadata_result {
676            Ok(metadata_data) => {
677                let metadata: GraphMetadata = bincode::deserialize(&metadata_data)
678                    .context("Failed to deserialize metadata")?;
679                Ok(Some(metadata))
680            }
681            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
682            Err(e) => Err(e.into()),
683        }
684    }
685
686    async fn update_graph_metadata(&self, repo_id: &str, metadata: &GraphMetadata) -> Result<()> {
687        let conn = self.connection.lock().await;
688        let now = Self::system_time_to_timestamp(SystemTime::now());
689
690        let metadata_data = bincode::serialize(metadata).context("Failed to serialize metadata")?;
691
692        conn.execute(
693            "INSERT OR REPLACE INTO metadata (repo_id, data, updated_at) VALUES (?1, ?2, ?3)",
694            params![repo_id, metadata_data, now],
695        )?;
696
697        Ok(())
698    }
699
700    async fn list_repositories(&self) -> Result<Vec<String>> {
701        let conn = self.connection.lock().await;
702
703        let mut stmt = conn.prepare("SELECT repo_id FROM graphs ORDER BY repo_id")?;
704        let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
705
706        let mut repos = Vec::new();
707        for row in rows {
708            repos.push(row?);
709        }
710
711        Ok(repos)
712    }
713
714    async fn delete_graph(&self, repo_id: &str) -> Result<()> {
715        let conn = self.connection.lock().await;
716
717        // Delete from graphs table (CASCADE will handle related tables)
718        conn.execute("DELETE FROM graphs WHERE repo_id = ?1", [repo_id])?;
719
720        Ok(())
721    }
722
723    async fn graph_exists(&self, repo_id: &str) -> Result<bool> {
724        let conn = self.connection.lock().await;
725
726        let mut stmt = conn.prepare("SELECT 1 FROM graphs WHERE repo_id = ?1")?;
727        let exists = stmt.exists([repo_id])?;
728
729        Ok(exists)
730    }
731}
732
733/// In-memory analysis storage
734pub struct InMemoryAnalysisStorage {
735    results: Arc<Mutex<HashMap<String, AnalysisResult>>>,
736}
737
738impl InMemoryAnalysisStorage {
739    /// Create a new in-memory analysis storage
740    pub fn new() -> Self {
741        Self {
742            results: Arc::new(Mutex::new(HashMap::new())),
743        }
744    }
745}
746
747impl Default for InMemoryAnalysisStorage {
748    fn default() -> Self {
749        Self::new()
750    }
751}
752
753#[async_trait]
754impl AnalysisStorage for InMemoryAnalysisStorage {
755    async fn store_analysis(&self, result: &AnalysisResult) -> Result<()> {
756        let mut results = self.results.lock().unwrap();
757        results.insert(result.id.clone(), result.clone());
758        Ok(())
759    }
760
761    async fn load_analysis(&self, result_id: &str) -> Result<Option<AnalysisResult>> {
762        let results = self.results.lock().unwrap();
763        Ok(results.get(result_id).cloned())
764    }
765
766    async fn find_analysis(
767        &self,
768        repo_id: &str,
769        analysis_type: Option<&str>,
770        since: Option<SystemTime>,
771    ) -> Result<Vec<AnalysisResult>> {
772        let results = self.results.lock().unwrap();
773        let filtered: Vec<AnalysisResult> = results
774            .values()
775            .filter(|r| {
776                r.repo_id == repo_id
777                    && analysis_type.is_none_or(|t| r.analysis_type == t)
778                    && since.is_none_or(|s| r.timestamp >= s)
779            })
780            .cloned()
781            .collect();
782        Ok(filtered)
783    }
784
785    async fn delete_analysis(&self, result_id: &str) -> Result<()> {
786        let mut results = self.results.lock().unwrap();
787        results.remove(result_id);
788        Ok(())
789    }
790
791    async fn cleanup_old_results(&self, older_than: SystemTime) -> Result<usize> {
792        let mut results = self.results.lock().unwrap();
793        let keys_to_remove: Vec<String> = results
794            .iter()
795            .filter(|(_, r)| r.timestamp < older_than)
796            .map(|(k, _)| k.clone())
797            .collect();
798
799        let count = keys_to_remove.len();
800        for key in keys_to_remove {
801            results.remove(&key);
802        }
803
804        Ok(count)
805    }
806}
807
808/// File analysis storage implementation
809pub struct FileAnalysisStorage;
810
811impl FileAnalysisStorage {
812    /// Create a new file-based analysis storage
813    pub async fn new(_data_path: &Path) -> Result<Self> {
814        Ok(Self)
815    }
816}
817
818#[async_trait]
819impl AnalysisStorage for FileAnalysisStorage {
820    async fn store_analysis(&self, _result: &AnalysisResult) -> Result<()> {
821        anyhow::bail!("Not implemented")
822    }
823
824    async fn load_analysis(&self, _result_id: &str) -> Result<Option<AnalysisResult>> {
825        anyhow::bail!("Not implemented")
826    }
827
828    async fn find_analysis(
829        &self,
830        _repo_id: &str,
831        _analysis_type: Option<&str>,
832        _since: Option<SystemTime>,
833    ) -> Result<Vec<AnalysisResult>> {
834        anyhow::bail!("Not implemented")
835    }
836
837    async fn delete_analysis(&self, _result_id: &str) -> Result<()> {
838        anyhow::bail!("Not implemented")
839    }
840
841    async fn cleanup_old_results(&self, _older_than: SystemTime) -> Result<usize> {
842        anyhow::bail!("Not implemented")
843    }
844}
845
846#[cfg(test)]
847mod tests {
848    use super::*;
849    use crate::graph::SerializableSpan;
850    use std::path::PathBuf;
851    use tempfile::tempdir;
852
853    fn create_test_graph() -> SerializableGraph {
854        let mut graph = SerializableGraph::new("test_repo".to_string());
855
856        // Add test nodes
857        let node1 = SerializableNode::new(
858            "node1".to_string(),
859            "TestFunction".to_string(),
860            "function".to_string(),
861            PathBuf::from("test.rs"),
862            SerializableSpan {
863                start_byte: 0,
864                end_byte: 100,
865                start_line: 1,
866                end_line: 5,
867                start_column: 0,
868                end_column: 10,
869            },
870        );
871
872        let node2 = SerializableNode::new(
873            "node2".to_string(),
874            "TestStruct".to_string(),
875            "struct".to_string(),
876            PathBuf::from("test.rs"),
877            SerializableSpan {
878                start_byte: 101,
879                end_byte: 200,
880                start_line: 6,
881                end_line: 10,
882                start_column: 0,
883                end_column: 15,
884            },
885        );
886
887        graph.add_node(node1);
888        graph.add_node(node2);
889
890        // Add test edges
891        let edge = SerializableEdge::new(
892            "node1".to_string(),
893            "node2".to_string(),
894            "calls".to_string(),
895        );
896        graph.add_edge(edge);
897
898        graph.update_metadata();
899        graph
900    }
901
902    #[tokio::test]
903    async fn test_in_memory_storage() {
904        let storage = InMemoryGraphStorage::new();
905        let graph = create_test_graph();
906
907        // Test store and load
908        storage.store_graph(&graph).await.unwrap();
909        let loaded = storage.load_graph("test_repo").await.unwrap();
910        assert!(loaded.is_some(), "Should have value");
911        let loaded_graph = loaded.unwrap();
912        assert_eq!(loaded_graph.repo_id, "test_repo");
913        assert_eq!(loaded_graph.nodes.len(), 2, "Should have 2 items");
914        assert_eq!(loaded_graph.edges.len(), 1, "Should have 1 items");
915
916        // Test update nodes
917        let new_node = SerializableNode::new(
918            "node3".to_string(),
919            "NewFunction".to_string(),
920            "function".to_string(),
921            PathBuf::from("new.rs"),
922            SerializableSpan {
923                start_byte: 0,
924                end_byte: 50,
925                start_line: 1,
926                end_line: 3,
927                start_column: 0,
928                end_column: 5,
929            },
930        );
931        storage
932            .update_nodes("test_repo", &[new_node])
933            .await
934            .unwrap();
935
936        let updated = storage.load_graph("test_repo").await.unwrap().unwrap();
937        assert_eq!(updated.nodes.len(), 3, "Should have 3 items");
938
939        // Test delete nodes
940        storage
941            .delete_nodes("test_repo", &["node1".to_string()])
942            .await
943            .unwrap();
944        let after_delete = storage.load_graph("test_repo").await.unwrap().unwrap();
945        assert_eq!(after_delete.nodes.len(), 2, "Should have 2 items");
946        assert_eq!(after_delete.edges.len(), 0, "Should have 0 items"); // Edge should be deleted too
947
948        // Test list repositories
949        let repos = storage.list_repositories().await.unwrap();
950        assert!(repos.contains(&"test_repo".to_string()));
951
952        // Test graph exists
953        assert!(storage.graph_exists("test_repo").await.unwrap());
954        assert!(!storage.graph_exists("nonexistent").await.unwrap());
955
956        // Test delete graph
957        storage.delete_graph("test_repo").await.unwrap();
958        assert!(!storage.graph_exists("test_repo").await.unwrap());
959    }
960
961    #[tokio::test]
962    async fn test_file_storage() {
963        let temp_dir = tempdir().unwrap();
964        let storage = FileGraphStorage::new(temp_dir.path()).await.unwrap();
965        let graph = create_test_graph();
966
967        // Test store and load
968        storage.store_graph(&graph).await.unwrap();
969        let loaded = storage.load_graph("test_repo").await.unwrap();
970        assert!(loaded.is_some(), "Should have value");
971        let loaded_graph = loaded.unwrap();
972        assert_eq!(loaded_graph.repo_id, "test_repo");
973        assert_eq!(loaded_graph.nodes.len(), 2, "Should have 2 items");
974        assert_eq!(loaded_graph.edges.len(), 1, "Should have 1 items");
975
976        // Test metadata operations
977        let metadata = storage.get_graph_metadata("test_repo").await.unwrap();
978        assert!(metadata.is_some(), "Should have value");
979
980        let mut new_metadata = metadata.unwrap();
981        new_metadata.version = 42;
982        storage
983            .update_graph_metadata("test_repo", &new_metadata)
984            .await
985            .unwrap();
986
987        let updated_metadata = storage
988            .get_graph_metadata("test_repo")
989            .await
990            .unwrap()
991            .unwrap();
992        assert_eq!(updated_metadata.version, 42);
993
994        // Test incremental updates
995        let new_node = SerializableNode::new(
996            "node3".to_string(),
997            "NewFunction".to_string(),
998            "function".to_string(),
999            PathBuf::from("new.rs"),
1000            SerializableSpan {
1001                start_byte: 0,
1002                end_byte: 50,
1003                start_line: 1,
1004                end_line: 3,
1005                start_column: 0,
1006                end_column: 5,
1007            },
1008        );
1009        storage
1010            .update_nodes("test_repo", &[new_node])
1011            .await
1012            .unwrap();
1013
1014        let updated = storage.load_graph("test_repo").await.unwrap().unwrap();
1015        assert_eq!(updated.nodes.len(), 3, "Should have 3 items");
1016
1017        // Test edge updates
1018        let new_edge = SerializableEdge::new(
1019            "node2".to_string(),
1020            "node3".to_string(),
1021            "references".to_string(),
1022        );
1023        storage
1024            .update_edges("test_repo", &[new_edge])
1025            .await
1026            .unwrap();
1027
1028        let updated = storage.load_graph("test_repo").await.unwrap().unwrap();
1029        assert_eq!(updated.edges.len(), 2, "Should have 2 items");
1030
1031        // Test list repositories
1032        let repos = storage.list_repositories().await.unwrap();
1033        assert!(repos.contains(&"test_repo".to_string()));
1034
1035        // Test graph exists
1036        assert!(storage.graph_exists("test_repo").await.unwrap());
1037        assert!(!storage.graph_exists("nonexistent").await.unwrap());
1038
1039        // Test delete operations
1040        let edge_refs = vec![EdgeReference {
1041            source: "node1".to_string(),
1042            target: "node2".to_string(),
1043            kind: "calls".to_string(),
1044        }];
1045        storage.delete_edges("test_repo", &edge_refs).await.unwrap();
1046
1047        let after_edge_delete = storage.load_graph("test_repo").await.unwrap().unwrap();
1048        assert_eq!(after_edge_delete.edges.len(), 1, "Should have 1 items");
1049
1050        // Test delete graph
1051        storage.delete_graph("test_repo").await.unwrap();
1052        assert!(!storage.graph_exists("test_repo").await.unwrap());
1053    }
1054
1055    #[tokio::test]
1056    async fn test_sqlite_storage() {
1057        let temp_dir = tempdir().unwrap();
1058        let storage = SqliteGraphStorage::new(temp_dir.path()).await.unwrap();
1059        let graph = create_test_graph();
1060
1061        // Test store and load
1062        storage.store_graph(&graph).await.unwrap();
1063        let loaded = storage.load_graph("test_repo").await.unwrap();
1064        assert!(loaded.is_some(), "Should have value");
1065        let loaded_graph = loaded.unwrap();
1066        assert_eq!(loaded_graph.repo_id, "test_repo");
1067        assert_eq!(loaded_graph.nodes.len(), 2, "Should have 2 items");
1068        assert_eq!(loaded_graph.edges.len(), 1, "Should have 1 items");
1069
1070        // Test metadata operations
1071        let metadata = storage.get_graph_metadata("test_repo").await.unwrap();
1072        assert!(metadata.is_some(), "Should have value");
1073
1074        let mut new_metadata = metadata.unwrap();
1075        new_metadata.version = 42;
1076        storage
1077            .update_graph_metadata("test_repo", &new_metadata)
1078            .await
1079            .unwrap();
1080
1081        let updated_metadata = storage
1082            .get_graph_metadata("test_repo")
1083            .await
1084            .unwrap()
1085            .unwrap();
1086        assert_eq!(updated_metadata.version, 42);
1087
1088        // Test incremental updates
1089        let new_node = SerializableNode::new(
1090            "node3".to_string(),
1091            "NewFunction".to_string(),
1092            "function".to_string(),
1093            PathBuf::from("new.rs"),
1094            SerializableSpan {
1095                start_byte: 0,
1096                end_byte: 50,
1097                start_line: 1,
1098                end_line: 3,
1099                start_column: 0,
1100                end_column: 5,
1101            },
1102        );
1103        storage
1104            .update_nodes("test_repo", &[new_node])
1105            .await
1106            .unwrap();
1107
1108        let updated = storage.load_graph("test_repo").await.unwrap().unwrap();
1109        assert_eq!(updated.nodes.len(), 3, "Should have 3 items");
1110
1111        // Test edge updates
1112        let new_edge = SerializableEdge::new(
1113            "node2".to_string(),
1114            "node3".to_string(),
1115            "references".to_string(),
1116        );
1117        storage
1118            .update_edges("test_repo", &[new_edge])
1119            .await
1120            .unwrap();
1121
1122        let updated = storage.load_graph("test_repo").await.unwrap().unwrap();
1123        assert_eq!(updated.edges.len(), 2, "Should have 2 items");
1124
1125        // Test list repositories
1126        let repos = storage.list_repositories().await.unwrap();
1127        assert!(repos.contains(&"test_repo".to_string()));
1128
1129        // Test graph exists
1130        assert!(storage.graph_exists("test_repo").await.unwrap());
1131        assert!(!storage.graph_exists("nonexistent").await.unwrap());
1132
1133        // Test delete operations
1134        let edge_refs = vec![EdgeReference {
1135            source: "node1".to_string(),
1136            target: "node2".to_string(),
1137            kind: "calls".to_string(),
1138        }];
1139        storage.delete_edges("test_repo", &edge_refs).await.unwrap();
1140
1141        let after_edge_delete = storage.load_graph("test_repo").await.unwrap().unwrap();
1142        assert_eq!(after_edge_delete.edges.len(), 1, "Should have 1 items");
1143
1144        // Test delete nodes (should also remove related edges)
1145        storage
1146            .delete_nodes("test_repo", &["node3".to_string()])
1147            .await
1148            .unwrap();
1149        let after_node_delete = storage.load_graph("test_repo").await.unwrap().unwrap();
1150        assert_eq!(after_node_delete.nodes.len(), 2, "Should have 2 items");
1151        assert_eq!(after_node_delete.edges.len(), 0, "Should have 0 items"); // Edge to node3 should be deleted
1152
1153        // Test delete graph
1154        storage.delete_graph("test_repo").await.unwrap();
1155        assert!(!storage.graph_exists("test_repo").await.unwrap());
1156    }
1157
1158    #[tokio::test]
1159    async fn test_storage_error_handling() {
1160        // Test loading non-existent graph from FileGraphStorage
1161        let temp_dir = tempdir().unwrap();
1162        let storage = FileGraphStorage::new(temp_dir.path()).await.unwrap();
1163        let result = storage.load_graph("nonexistent").await.unwrap();
1164        assert!(result.is_none(), "Should be none");
1165
1166        // Test loading non-existent graph from SQLiteGraphStorage
1167        let temp_dir = tempdir().unwrap();
1168        let storage = SqliteGraphStorage::new(temp_dir.path()).await.unwrap();
1169        let result = storage.load_graph("nonexistent").await.unwrap();
1170        assert!(result.is_none(), "Should be none");
1171
1172        // Test FileGraphStorage with a file as path (should fail)
1173        let temp_dir = tempdir().unwrap();
1174        let file_path = temp_dir.path().join("not_a_directory.txt");
1175        fs::write(&file_path, "test").await.unwrap();
1176        let result = FileGraphStorage::new(&file_path).await;
1177        assert!(
1178            result.is_err(),
1179            "Should fail when trying to create directory over a file"
1180        );
1181
1182        // Test SQLiteGraphStorage with a file as path (should fail)
1183        let temp_dir = tempdir().unwrap();
1184        let file_path = temp_dir.path().join("not_a_directory.txt");
1185        fs::write(&file_path, "test").await.unwrap();
1186        let result = SqliteGraphStorage::new(&file_path).await;
1187        assert!(
1188            result.is_err(),
1189            "Should fail when trying to create directory over a file"
1190        );
1191    }
1192
1193    #[tokio::test]
1194    async fn test_concurrent_access() {
1195        use tokio::task;
1196
1197        let temp_dir = tempdir().unwrap();
1198        let storage = Arc::new(SqliteGraphStorage::new(temp_dir.path()).await.unwrap());
1199
1200        // Test concurrent writes
1201        let mut handles = Vec::new();
1202        for i in 0..10 {
1203            let storage_clone = storage.clone();
1204            let handle = task::spawn(async move {
1205                let mut graph = create_test_graph();
1206                graph.repo_id = format!("repo_{i}");
1207                storage_clone.store_graph(&graph).await.unwrap();
1208            });
1209            handles.push(handle);
1210        }
1211
1212        // Wait for all writes to complete
1213        for handle in handles {
1214            handle.await.unwrap();
1215        }
1216
1217        // Verify all graphs were stored
1218        let repos = storage.list_repositories().await.unwrap();
1219        assert_eq!(repos.len(), 10, "Should have 10 items");
1220
1221        // Test concurrent reads
1222        let mut handles = Vec::new();
1223        for i in 0..10 {
1224            let storage_clone = storage.clone();
1225            let handle = task::spawn(async move {
1226                let repo_id = format!("repo_{i}");
1227                let graph = storage_clone.load_graph(&repo_id).await.unwrap();
1228                assert!(graph.is_some(), "Should have value");
1229                assert_eq!(graph.unwrap().repo_id, repo_id);
1230            });
1231            handles.push(handle);
1232        }
1233
1234        // Wait for all reads to complete
1235        for handle in handles {
1236            handle.await.unwrap();
1237        }
1238    }
1239}