llm_memory_graph/storage/mod.rs
1//! Storage backend for persisting graph data
2
3mod async_sled_backend;
4mod cache;
5mod pooled_backend;
6mod serialization;
7mod sled_backend;
8
9pub use async_sled_backend::AsyncSledBackend;
10pub use cache::{CacheStats, StorageCache};
11pub use pooled_backend::{PoolConfig, PoolMetrics, PoolMetricsSnapshot, PooledAsyncBackend};
12pub use serialization::{SerializationFormat, Serializer};
13pub use sled_backend::SledBackend;
14
15use crate::error::Result;
16use crate::types::{Edge, EdgeId, Node, NodeId, SessionId};
17use async_trait::async_trait;
18
19/// Trait defining storage backend operations
20pub trait StorageBackend: Send + Sync {
21 /// Store a node in the backend
22 fn store_node(&self, node: &Node) -> Result<()>;
23
24 /// Retrieve a node by ID
25 fn get_node(&self, id: &NodeId) -> Result<Option<Node>>;
26
27 /// Delete a node
28 fn delete_node(&self, id: &NodeId) -> Result<()>;
29
30 /// Store an edge
31 fn store_edge(&self, edge: &Edge) -> Result<()>;
32
33 /// Retrieve an edge by ID
34 fn get_edge(&self, id: &EdgeId) -> Result<Option<Edge>>;
35
36 /// Delete an edge
37 fn delete_edge(&self, id: &EdgeId) -> Result<()>;
38
39 /// Get all nodes in a session
40 fn get_session_nodes(&self, session_id: &SessionId) -> Result<Vec<Node>>;
41
42 /// Get all edges from a node
43 fn get_outgoing_edges(&self, node_id: &NodeId) -> Result<Vec<Edge>>;
44
45 /// Get all edges to a node
46 fn get_incoming_edges(&self, node_id: &NodeId) -> Result<Vec<Edge>>;
47
48 /// Flush any pending writes
49 fn flush(&self) -> Result<()>;
50
51 /// Get storage statistics
52 fn stats(&self) -> Result<StorageStats>;
53}
54
55/// Statistics about storage usage
56#[derive(Debug, Clone)]
57pub struct StorageStats {
58 /// Total number of nodes
59 pub node_count: u64,
60 /// Total number of edges
61 pub edge_count: u64,
62 /// Total storage size in bytes
63 pub storage_bytes: u64,
64 /// Number of sessions
65 pub session_count: u64,
66}
67
68/// Async trait defining storage backend operations
69///
70/// This trait provides async versions of all storage operations for use with Tokio runtime.
71/// It enables high-performance concurrent operations and non-blocking I/O.
72#[async_trait]
73pub trait AsyncStorageBackend: Send + Sync {
74 /// Store a node in the backend asynchronously
75 async fn store_node(&self, node: &Node) -> Result<()>;
76
77 /// Retrieve a node by ID asynchronously
78 async fn get_node(&self, id: &NodeId) -> Result<Option<Node>>;
79
80 /// Delete a node asynchronously
81 async fn delete_node(&self, id: &NodeId) -> Result<()>;
82
83 /// Store an edge asynchronously
84 async fn store_edge(&self, edge: &Edge) -> Result<()>;
85
86 /// Retrieve an edge by ID asynchronously
87 async fn get_edge(&self, id: &EdgeId) -> Result<Option<Edge>>;
88
89 /// Delete an edge asynchronously
90 async fn delete_edge(&self, id: &EdgeId) -> Result<()>;
91
92 /// Get all nodes in a session asynchronously
93 async fn get_session_nodes(&self, session_id: &SessionId) -> Result<Vec<Node>>;
94
95 /// Get all edges from a node asynchronously
96 async fn get_outgoing_edges(&self, node_id: &NodeId) -> Result<Vec<Edge>>;
97
98 /// Get all edges to a node asynchronously
99 async fn get_incoming_edges(&self, node_id: &NodeId) -> Result<Vec<Edge>>;
100
101 /// Flush any pending writes asynchronously
102 async fn flush(&self) -> Result<()>;
103
104 /// Get storage statistics asynchronously
105 async fn stats(&self) -> Result<StorageStats>;
106
107 /// Batch store multiple nodes asynchronously for improved performance
108 async fn store_nodes_batch(&self, nodes: &[Node]) -> Result<Vec<NodeId>> {
109 let mut ids = Vec::with_capacity(nodes.len());
110 for node in nodes {
111 self.store_node(node).await?;
112 ids.push(node.id());
113 }
114 Ok(ids)
115 }
116
117 /// Batch store multiple edges asynchronously for improved performance
118 async fn store_edges_batch(&self, edges: &[Edge]) -> Result<Vec<EdgeId>> {
119 let mut ids = Vec::with_capacity(edges.len());
120 for edge in edges {
121 self.store_edge(edge).await?;
122 ids.push(edge.id);
123 }
124 Ok(ids)
125 }
126
127 /// Stream nodes from a session for memory-efficient iteration over large result sets
128 ///
129 /// This method returns a stream that yields nodes one at a time, avoiding the need
130 /// to load all nodes into memory at once. This is particularly useful for sessions
131 /// with thousands of nodes.
132 ///
133 /// # Examples
134 ///
135 /// ```no_run
136 /// use llm_memory_graph::storage::AsyncSledBackend;
137 /// use llm_memory_graph::storage::AsyncStorageBackend;
138 /// use llm_memory_graph::types::SessionId;
139 /// use futures::stream::StreamExt;
140 ///
141 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
142 /// let backend = AsyncSledBackend::open("./data/graph.db").await?;
143 /// let session_id = SessionId::new();
144 ///
145 /// let mut stream = backend.get_session_nodes_stream(&session_id);
146 /// let mut count = 0;
147 /// while let Some(node) = stream.next().await {
148 /// let node = node?;
149 /// // Process node without loading all into memory
150 /// count += 1;
151 /// }
152 /// println!("Processed {} nodes", count);
153 /// # Ok(())
154 /// # }
155 /// ```
156 fn get_session_nodes_stream(
157 &self,
158 session_id: &SessionId,
159 ) -> std::pin::Pin<Box<dyn futures::stream::Stream<Item = Result<Node>> + Send + '_>> {
160 // Default implementation: load all and convert to stream
161 // Backends should override for true streaming from storage
162 let session_id = *session_id;
163 Box::pin(async_stream::stream! {
164 match self.get_session_nodes(&session_id).await {
165 Ok(nodes) => {
166 for node in nodes {
167 yield Ok(node);
168 }
169 }
170 Err(e) => yield Err(e),
171 }
172 })
173 }
174
175 /// Count nodes in a session without loading them into memory
176 ///
177 /// This is more efficient than `get_session_nodes().await?.len()` for large sessions
178 /// as it only counts without deserializing node data.
179 ///
180 /// # Examples
181 ///
182 /// ```no_run
183 /// use llm_memory_graph::storage::AsyncSledBackend;
184 /// use llm_memory_graph::storage::AsyncStorageBackend;
185 /// use llm_memory_graph::types::SessionId;
186 ///
187 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
188 /// let backend = AsyncSledBackend::open("./data/graph.db").await?;
189 /// let session_id = SessionId::new();
190 ///
191 /// let count = backend.count_session_nodes(&session_id).await?;
192 /// println!("Session has {} nodes", count);
193 /// # Ok(())
194 /// # }
195 /// ```
196 async fn count_session_nodes(&self, session_id: &SessionId) -> Result<usize> {
197 // Default implementation: load and count
198 // Backends should override for O(1) counting if possible
199 let nodes = self.get_session_nodes(session_id).await?;
200 Ok(nodes.len())
201 }
202}