converge_knowledge/storage/
mod.rs1use crate::core::KnowledgeEntry;
4use crate::error::{Error, Result};
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9use tokio::fs;
10use tokio::io::{AsyncReadExt, AsyncWriteExt};
11use tokio::sync::RwLock;
12use uuid::Uuid;
13
14pub struct StorageBackend {
16 path: PathBuf,
18
19 index: RwLock<StorageIndex>,
21
22 dirty: RwLock<bool>,
24}
25
26#[derive(Debug, Default, Clone, Serialize, Deserialize)]
28struct StorageIndex {
29 entries: HashMap<Uuid, EntryMetadata>,
30 version: u32,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35struct EntryMetadata {
36 id: Uuid,
37 title: String,
38 file_offset: u64,
39}
40
41impl StorageBackend {
42 pub async fn open(path: impl AsRef<Path>) -> Result<Self> {
44 let path = path.as_ref().to_path_buf();
45
46 if let Some(parent) = path.parent() {
48 fs::create_dir_all(parent).await?;
49 }
50
51 let index = Self::load_index(&path).await.unwrap_or_default();
53
54 Ok(Self {
55 path,
56 index: RwLock::new(index),
57 dirty: RwLock::new(false),
58 })
59 }
60
61 async fn load_index(path: &Path) -> Result<StorageIndex> {
63 let index_path = Self::index_path(path);
64
65 if !index_path.exists() {
66 return Ok(StorageIndex::default());
67 }
68
69 let mut file = fs::File::open(&index_path).await?;
70 let mut data = Vec::new();
71 file.read_to_end(&mut data).await?;
72
73 bincode::deserialize(&data).map_err(|e| Error::storage(e.to_string()))
74 }
75
76 fn index_path(base: &Path) -> PathBuf {
78 base.with_extension("index")
79 }
80
81 fn data_path(base: &Path) -> PathBuf {
83 base.with_extension("data")
84 }
85
86 fn embeddings_path(base: &Path) -> PathBuf {
88 base.with_extension("embeddings")
89 }
90
91 pub async fn load_all(&self) -> Result<Vec<(KnowledgeEntry, Vec<f32>)>> {
93 let data_path = Self::data_path(&self.path);
94 let embeddings_path = Self::embeddings_path(&self.path);
95
96 if !data_path.exists() {
97 return Ok(Vec::new());
98 }
99
100 let mut data_file = fs::File::open(&data_path).await?;
102 let mut data = Vec::new();
103 data_file.read_to_end(&mut data).await?;
104
105 let entries: Vec<KnowledgeEntry> =
106 bincode::deserialize(&data).map_err(|e| Error::storage(e.to_string()))?;
107
108 let embeddings: Vec<Vec<f32>> = if embeddings_path.exists() {
110 let mut emb_file = fs::File::open(&embeddings_path).await?;
111 let mut emb_data = Vec::new();
112 emb_file.read_to_end(&mut emb_data).await?;
113 bincode::deserialize(&emb_data).map_err(|e| Error::storage(e.to_string()))?
114 } else {
115 vec![Vec::new(); entries.len()]
116 };
117
118 Ok(entries.into_iter().zip(embeddings).collect())
119 }
120
121 pub async fn save_entry(&self, entry: &KnowledgeEntry, embedding: &[f32]) -> Result<()> {
123 {
125 let mut index = self.index.write().await;
126 index.entries.insert(
127 entry.id,
128 EntryMetadata {
129 id: entry.id,
130 title: entry.title.clone(),
131 file_offset: 0,
132 },
133 );
134 }
135
136 *self.dirty.write().await = true;
137
138 self.flush_internal(Some((entry.clone(), embedding.to_vec())))
139 .await
140 }
141
142 pub async fn save_batch(&self, batch: &[(KnowledgeEntry, Vec<f32>)]) -> Result<()> {
144 {
145 let mut index = self.index.write().await;
146 for (entry, _) in batch {
147 index.entries.insert(
148 entry.id,
149 EntryMetadata {
150 id: entry.id,
151 title: entry.title.clone(),
152 file_offset: 0,
153 },
154 );
155 }
156 }
157
158 *self.dirty.write().await = true;
159 self.flush().await
160 }
161
162 pub async fn delete_entry(&self, id: Uuid) -> Result<()> {
164 {
165 let mut index = self.index.write().await;
166 index.entries.remove(&id);
167 }
168
169 *self.dirty.write().await = true;
170 self.flush().await
171 }
172
173 pub async fn flush(&self) -> Result<()> {
175 self.flush_internal(None).await
176 }
177
178 async fn flush_internal(&self, new_entry: Option<(KnowledgeEntry, Vec<f32>)>) -> Result<()> {
180 let mut all_data = self.load_all().await.unwrap_or_default();
182
183 if let Some((entry, embedding)) = new_entry {
185 if let Some(pos) = all_data.iter().position(|(e, _)| e.id == entry.id) {
186 all_data[pos] = (entry, embedding);
187 } else {
188 all_data.push((entry, embedding));
189 }
190 }
191
192 let index_snapshot = {
194 let index = self.index.read().await;
195 index.clone()
196 };
197 all_data.retain(|(e, _)| index_snapshot.entries.contains_key(&e.id));
198
199 let entries: Vec<_> = all_data.iter().map(|(e, _)| e.clone()).collect();
201 let embeddings: Vec<_> = all_data.iter().map(|(_, emb)| emb.clone()).collect();
202
203 let data_path = Self::data_path(&self.path);
205 let data = bincode::serialize(&entries).map_err(|e| Error::storage(e.to_string()))?;
206 let mut file = fs::File::create(&data_path).await?;
207 file.write_all(&data).await?;
208 file.sync_all().await?;
209
210 let embeddings_path = Self::embeddings_path(&self.path);
212 let emb_data =
213 bincode::serialize(&embeddings).map_err(|e| Error::storage(e.to_string()))?;
214 let mut emb_file = fs::File::create(&embeddings_path).await?;
215 emb_file.write_all(&emb_data).await?;
216 emb_file.sync_all().await?;
217
218 let index_path = Self::index_path(&self.path);
220 let index_data = {
221 let index = self.index.read().await;
222 bincode::serialize(&*index).map_err(|e| Error::storage(e.to_string()))?
223 };
224
225 let mut index_file = fs::File::create(&index_path).await?;
226 index_file.write_all(&index_data).await?;
227 index_file.sync_all().await?;
228
229 *self.dirty.write().await = false;
230 Ok(())
231 }
232
233 pub async fn stats(&self) -> StorageStats {
235 let index = self.index.read().await;
236 StorageStats {
237 entry_count: index.entries.len(),
238 version: index.version,
239 }
240 }
241}
242
243#[derive(Debug, Clone)]
245pub struct StorageStats {
246 pub entry_count: usize,
248 pub version: u32,
250}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
255 use tempfile::tempdir;
256
257 #[tokio::test]
258 async fn test_storage_open() {
259 let dir = tempdir().unwrap();
260 let path = dir.path().join("test.db");
261
262 let storage = StorageBackend::open(&path).await.unwrap();
263 assert_eq!(storage.stats().await.entry_count, 0);
264 }
265
266 #[tokio::test]
267 async fn test_storage_save_load() {
268 let dir = tempdir().unwrap();
269 let path = dir.path().join("test.db");
270
271 let storage = StorageBackend::open(&path).await.unwrap();
272
273 let entry = KnowledgeEntry::new("Test", "Content");
274 let embedding = vec![0.1, 0.2, 0.3];
275
276 storage.save_entry(&entry, &embedding).await.unwrap();
277
278 let storage2 = StorageBackend::open(&path).await.unwrap();
280 let loaded = storage2.load_all().await.unwrap();
281
282 assert_eq!(loaded.len(), 1);
283 assert_eq!(loaded[0].0.title, "Test");
284 }
285}