llm_memory_graph/storage/
mod.rs

1//! Storage backend for persisting graph data
2
3mod async_sled_backend;
4mod cache;
5mod pooled_backend;
6mod serialization;
7mod sled_backend;
8
9pub use async_sled_backend::AsyncSledBackend;
10pub use cache::{CacheStats, StorageCache};
11pub use pooled_backend::{PoolConfig, PoolMetrics, PoolMetricsSnapshot, PooledAsyncBackend};
12pub use serialization::{SerializationFormat, Serializer};
13pub use sled_backend::SledBackend;
14
15use crate::error::Result;
16use crate::types::{Edge, EdgeId, Node, NodeId, SessionId};
17use async_trait::async_trait;
18
19/// Trait defining storage backend operations
20pub trait StorageBackend: Send + Sync {
21    /// Store a node in the backend
22    fn store_node(&self, node: &Node) -> Result<()>;
23
24    /// Retrieve a node by ID
25    fn get_node(&self, id: &NodeId) -> Result<Option<Node>>;
26
27    /// Delete a node
28    fn delete_node(&self, id: &NodeId) -> Result<()>;
29
30    /// Store an edge
31    fn store_edge(&self, edge: &Edge) -> Result<()>;
32
33    /// Retrieve an edge by ID
34    fn get_edge(&self, id: &EdgeId) -> Result<Option<Edge>>;
35
36    /// Delete an edge
37    fn delete_edge(&self, id: &EdgeId) -> Result<()>;
38
39    /// Get all nodes in a session
40    fn get_session_nodes(&self, session_id: &SessionId) -> Result<Vec<Node>>;
41
42    /// Get all edges from a node
43    fn get_outgoing_edges(&self, node_id: &NodeId) -> Result<Vec<Edge>>;
44
45    /// Get all edges to a node
46    fn get_incoming_edges(&self, node_id: &NodeId) -> Result<Vec<Edge>>;
47
48    /// Flush any pending writes
49    fn flush(&self) -> Result<()>;
50
51    /// Get storage statistics
52    fn stats(&self) -> Result<StorageStats>;
53}
54
55/// Statistics about storage usage
56#[derive(Debug, Clone)]
57pub struct StorageStats {
58    /// Total number of nodes
59    pub node_count: u64,
60    /// Total number of edges
61    pub edge_count: u64,
62    /// Total storage size in bytes
63    pub storage_bytes: u64,
64    /// Number of sessions
65    pub session_count: u64,
66}
67
68/// Async trait defining storage backend operations
69///
70/// This trait provides async versions of all storage operations for use with Tokio runtime.
71/// It enables high-performance concurrent operations and non-blocking I/O.
72#[async_trait]
73pub trait AsyncStorageBackend: Send + Sync {
74    /// Store a node in the backend asynchronously
75    async fn store_node(&self, node: &Node) -> Result<()>;
76
77    /// Retrieve a node by ID asynchronously
78    async fn get_node(&self, id: &NodeId) -> Result<Option<Node>>;
79
80    /// Delete a node asynchronously
81    async fn delete_node(&self, id: &NodeId) -> Result<()>;
82
83    /// Store an edge asynchronously
84    async fn store_edge(&self, edge: &Edge) -> Result<()>;
85
86    /// Retrieve an edge by ID asynchronously
87    async fn get_edge(&self, id: &EdgeId) -> Result<Option<Edge>>;
88
89    /// Delete an edge asynchronously
90    async fn delete_edge(&self, id: &EdgeId) -> Result<()>;
91
92    /// Get all nodes in a session asynchronously
93    async fn get_session_nodes(&self, session_id: &SessionId) -> Result<Vec<Node>>;
94
95    /// Get all edges from a node asynchronously
96    async fn get_outgoing_edges(&self, node_id: &NodeId) -> Result<Vec<Edge>>;
97
98    /// Get all edges to a node asynchronously
99    async fn get_incoming_edges(&self, node_id: &NodeId) -> Result<Vec<Edge>>;
100
101    /// Flush any pending writes asynchronously
102    async fn flush(&self) -> Result<()>;
103
104    /// Get storage statistics asynchronously
105    async fn stats(&self) -> Result<StorageStats>;
106
107    /// Batch store multiple nodes asynchronously for improved performance
108    async fn store_nodes_batch(&self, nodes: &[Node]) -> Result<Vec<NodeId>> {
109        let mut ids = Vec::with_capacity(nodes.len());
110        for node in nodes {
111            self.store_node(node).await?;
112            ids.push(node.id());
113        }
114        Ok(ids)
115    }
116
117    /// Batch store multiple edges asynchronously for improved performance
118    async fn store_edges_batch(&self, edges: &[Edge]) -> Result<Vec<EdgeId>> {
119        let mut ids = Vec::with_capacity(edges.len());
120        for edge in edges {
121            self.store_edge(edge).await?;
122            ids.push(edge.id);
123        }
124        Ok(ids)
125    }
126
127    /// Stream nodes from a session for memory-efficient iteration over large result sets
128    ///
129    /// This method returns a stream that yields nodes one at a time, avoiding the need
130    /// to load all nodes into memory at once. This is particularly useful for sessions
131    /// with thousands of nodes.
132    ///
133    /// # Examples
134    ///
135    /// ```no_run
136    /// use llm_memory_graph::storage::AsyncSledBackend;
137    /// use llm_memory_graph::storage::AsyncStorageBackend;
138    /// use llm_memory_graph::types::SessionId;
139    /// use futures::stream::StreamExt;
140    ///
141    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
142    /// let backend = AsyncSledBackend::open("./data/graph.db").await?;
143    /// let session_id = SessionId::new();
144    ///
145    /// let mut stream = backend.get_session_nodes_stream(&session_id);
146    /// let mut count = 0;
147    /// while let Some(node) = stream.next().await {
148    ///     let node = node?;
149    ///     // Process node without loading all into memory
150    ///     count += 1;
151    /// }
152    /// println!("Processed {} nodes", count);
153    /// # Ok(())
154    /// # }
155    /// ```
156    fn get_session_nodes_stream(
157        &self,
158        session_id: &SessionId,
159    ) -> std::pin::Pin<Box<dyn futures::stream::Stream<Item = Result<Node>> + Send + '_>> {
160        // Default implementation: load all and convert to stream
161        // Backends should override for true streaming from storage
162        let session_id = *session_id;
163        Box::pin(async_stream::stream! {
164            match self.get_session_nodes(&session_id).await {
165                Ok(nodes) => {
166                    for node in nodes {
167                        yield Ok(node);
168                    }
169                }
170                Err(e) => yield Err(e),
171            }
172        })
173    }
174
175    /// Count nodes in a session without loading them into memory
176    ///
177    /// This is more efficient than `get_session_nodes().await?.len()` for large sessions
178    /// as it only counts without deserializing node data.
179    ///
180    /// # Examples
181    ///
182    /// ```no_run
183    /// use llm_memory_graph::storage::AsyncSledBackend;
184    /// use llm_memory_graph::storage::AsyncStorageBackend;
185    /// use llm_memory_graph::types::SessionId;
186    ///
187    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
188    /// let backend = AsyncSledBackend::open("./data/graph.db").await?;
189    /// let session_id = SessionId::new();
190    ///
191    /// let count = backend.count_session_nodes(&session_id).await?;
192    /// println!("Session has {} nodes", count);
193    /// # Ok(())
194    /// # }
195    /// ```
196    async fn count_session_nodes(&self, session_id: &SessionId) -> Result<usize> {
197        // Default implementation: load and count
198        // Backends should override for O(1) counting if possible
199        let nodes = self.get_session_nodes(session_id).await?;
200        Ok(nodes.len())
201    }
202}