Skip to main content

converge_knowledge/storage/
mod.rs

1//! Storage backend for persistence.
2
3use crate::core::KnowledgeEntry;
4use crate::error::{Error, Result};
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9use tokio::fs;
10use tokio::io::{AsyncReadExt, AsyncWriteExt};
11use tokio::sync::RwLock;
12use uuid::Uuid;
13
14/// Storage backend for persisting knowledge entries and embeddings.
15pub struct StorageBackend {
16    /// Path to storage directory.
17    path: PathBuf,
18
19    /// In-memory index.
20    index: RwLock<StorageIndex>,
21
22    /// Dirty flag for pending writes.
23    dirty: RwLock<bool>,
24}
25
26/// Index of stored entries.
27#[derive(Debug, Default, Clone, Serialize, Deserialize)]
28struct StorageIndex {
29    entries: HashMap<Uuid, EntryMetadata>,
30    version: u32,
31}
32
33/// Metadata for a stored entry.
34#[derive(Debug, Clone, Serialize, Deserialize)]
35struct EntryMetadata {
36    id: Uuid,
37    title: String,
38    file_offset: u64,
39}
40
41impl StorageBackend {
42    /// Open or create storage at the given path.
43    pub async fn open(path: impl AsRef<Path>) -> Result<Self> {
44        let path = path.as_ref().to_path_buf();
45
46        // Create directory if needed
47        if let Some(parent) = path.parent() {
48            fs::create_dir_all(parent).await?;
49        }
50
51        // Load or create index
52        let index = Self::load_index(&path).await.unwrap_or_default();
53
54        Ok(Self {
55            path,
56            index: RwLock::new(index),
57            dirty: RwLock::new(false),
58        })
59    }
60
61    /// Load index from disk.
62    async fn load_index(path: &Path) -> Result<StorageIndex> {
63        let index_path = Self::index_path(path);
64
65        if !index_path.exists() {
66            return Ok(StorageIndex::default());
67        }
68
69        let mut file = fs::File::open(&index_path).await?;
70        let mut data = Vec::new();
71        file.read_to_end(&mut data).await?;
72
73        bincode::deserialize(&data).map_err(|e| Error::storage(e.to_string()))
74    }
75
76    /// Get index file path.
77    fn index_path(base: &Path) -> PathBuf {
78        base.with_extension("index")
79    }
80
81    /// Get data file path.
82    fn data_path(base: &Path) -> PathBuf {
83        base.with_extension("data")
84    }
85
86    /// Get embeddings file path.
87    fn embeddings_path(base: &Path) -> PathBuf {
88        base.with_extension("embeddings")
89    }
90
91    /// Load all entries and embeddings.
92    pub async fn load_all(&self) -> Result<Vec<(KnowledgeEntry, Vec<f32>)>> {
93        let data_path = Self::data_path(&self.path);
94        let embeddings_path = Self::embeddings_path(&self.path);
95
96        if !data_path.exists() {
97            return Ok(Vec::new());
98        }
99
100        // Load entries
101        let mut data_file = fs::File::open(&data_path).await?;
102        let mut data = Vec::new();
103        data_file.read_to_end(&mut data).await?;
104
105        let entries: Vec<KnowledgeEntry> =
106            bincode::deserialize(&data).map_err(|e| Error::storage(e.to_string()))?;
107
108        // Load embeddings
109        let embeddings: Vec<Vec<f32>> = if embeddings_path.exists() {
110            let mut emb_file = fs::File::open(&embeddings_path).await?;
111            let mut emb_data = Vec::new();
112            emb_file.read_to_end(&mut emb_data).await?;
113            bincode::deserialize(&emb_data).map_err(|e| Error::storage(e.to_string()))?
114        } else {
115            vec![Vec::new(); entries.len()]
116        };
117
118        Ok(entries.into_iter().zip(embeddings).collect())
119    }
120
121    /// Save a single entry with its embedding.
122    pub async fn save_entry(&self, entry: &KnowledgeEntry, embedding: &[f32]) -> Result<()> {
123        // Update index
124        {
125            let mut index = self.index.write().await;
126            index.entries.insert(
127                entry.id,
128                EntryMetadata {
129                    id: entry.id,
130                    title: entry.title.clone(),
131                    file_offset: 0,
132                },
133            );
134        }
135
136        *self.dirty.write().await = true;
137
138        self.flush_internal(Some((entry.clone(), embedding.to_vec())))
139            .await
140    }
141
142    /// Save multiple entries in batch.
143    pub async fn save_batch(&self, batch: &[(KnowledgeEntry, Vec<f32>)]) -> Result<()> {
144        {
145            let mut index = self.index.write().await;
146            for (entry, _) in batch {
147                index.entries.insert(
148                    entry.id,
149                    EntryMetadata {
150                        id: entry.id,
151                        title: entry.title.clone(),
152                        file_offset: 0,
153                    },
154                );
155            }
156        }
157
158        *self.dirty.write().await = true;
159        self.flush().await
160    }
161
162    /// Delete an entry.
163    pub async fn delete_entry(&self, id: Uuid) -> Result<()> {
164        {
165            let mut index = self.index.write().await;
166            index.entries.remove(&id);
167        }
168
169        *self.dirty.write().await = true;
170        self.flush().await
171    }
172
173    /// Flush pending writes to disk.
174    pub async fn flush(&self) -> Result<()> {
175        self.flush_internal(None).await
176    }
177
178    /// Internal flush with optional new entry.
179    async fn flush_internal(&self, new_entry: Option<(KnowledgeEntry, Vec<f32>)>) -> Result<()> {
180        // Load existing data
181        let mut all_data = self.load_all().await.unwrap_or_default();
182
183        // Add or update new entry
184        if let Some((entry, embedding)) = new_entry {
185            if let Some(pos) = all_data.iter().position(|(e, _)| e.id == entry.id) {
186                all_data[pos] = (entry, embedding);
187            } else {
188                all_data.push((entry, embedding));
189            }
190        }
191
192        // Filter by current index - clone the index to avoid holding lock across await
193        let index_snapshot = {
194            let index = self.index.read().await;
195            index.clone()
196        };
197        all_data.retain(|(e, _)| index_snapshot.entries.contains_key(&e.id));
198
199        // Separate entries and embeddings
200        let entries: Vec<_> = all_data.iter().map(|(e, _)| e.clone()).collect();
201        let embeddings: Vec<_> = all_data.iter().map(|(_, emb)| emb.clone()).collect();
202
203        // Write data file
204        let data_path = Self::data_path(&self.path);
205        let data = bincode::serialize(&entries).map_err(|e| Error::storage(e.to_string()))?;
206        let mut file = fs::File::create(&data_path).await?;
207        file.write_all(&data).await?;
208        file.sync_all().await?;
209
210        // Write embeddings file
211        let embeddings_path = Self::embeddings_path(&self.path);
212        let emb_data =
213            bincode::serialize(&embeddings).map_err(|e| Error::storage(e.to_string()))?;
214        let mut emb_file = fs::File::create(&embeddings_path).await?;
215        emb_file.write_all(&emb_data).await?;
216        emb_file.sync_all().await?;
217
218        // Write index - serialize before async write
219        let index_path = Self::index_path(&self.path);
220        let index_data = {
221            let index = self.index.read().await;
222            bincode::serialize(&*index).map_err(|e| Error::storage(e.to_string()))?
223        };
224
225        let mut index_file = fs::File::create(&index_path).await?;
226        index_file.write_all(&index_data).await?;
227        index_file.sync_all().await?;
228
229        *self.dirty.write().await = false;
230        Ok(())
231    }
232
233    /// Get storage statistics.
234    pub async fn stats(&self) -> StorageStats {
235        let index = self.index.read().await;
236        StorageStats {
237            entry_count: index.entries.len(),
238            version: index.version,
239        }
240    }
241}
242
243/// Storage statistics.
244#[derive(Debug, Clone)]
245pub struct StorageStats {
246    /// Number of knowledge entries currently stored.
247    pub entry_count: usize,
248    /// Storage format version.
249    pub version: u32,
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255    use tempfile::tempdir;
256
257    #[tokio::test]
258    async fn test_storage_open() {
259        let dir = tempdir().unwrap();
260        let path = dir.path().join("test.db");
261
262        let storage = StorageBackend::open(&path).await.unwrap();
263        assert_eq!(storage.stats().await.entry_count, 0);
264    }
265
266    #[tokio::test]
267    async fn test_storage_save_load() {
268        let dir = tempdir().unwrap();
269        let path = dir.path().join("test.db");
270
271        let storage = StorageBackend::open(&path).await.unwrap();
272
273        let entry = KnowledgeEntry::new("Test", "Content");
274        let embedding = vec![0.1, 0.2, 0.3];
275
276        storage.save_entry(&entry, &embedding).await.unwrap();
277
278        // Reload
279        let storage2 = StorageBackend::open(&path).await.unwrap();
280        let loaded = storage2.load_all().await.unwrap();
281
282        assert_eq!(loaded.len(), 1);
283        assert_eq!(loaded[0].0.title, "Test");
284    }
285}