1use crate::{Document, DocumentChunk, Embedding, RragError, RragResult};
7use async_trait::async_trait;
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use tokio::fs;
13use tokio::io::{AsyncReadExt, AsyncWriteExt};
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub enum StorageEntry {
18 Document(Document),
19 Chunk(DocumentChunk),
20 Embedding(Embedding),
21 Metadata(HashMap<String, serde_json::Value>),
22}
23
24#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
26pub struct StorageKey {
27 pub entry_type: EntryType,
29
30 pub id: String,
32
33 pub namespace: Option<String>,
35}
36
37#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
38pub enum EntryType {
39 Document,
40 Chunk,
41 Embedding,
42 Metadata,
43}
44
45impl StorageKey {
46 pub fn document(id: impl Into<String>) -> Self {
47 Self {
48 entry_type: EntryType::Document,
49 id: id.into(),
50 namespace: None,
51 }
52 }
53
54 pub fn chunk(document_id: impl Into<String>, chunk_index: usize) -> Self {
55 Self {
56 entry_type: EntryType::Chunk,
57 id: format!("{}_{}", document_id.into(), chunk_index),
58 namespace: None,
59 }
60 }
61
62 pub fn embedding(id: impl Into<String>) -> Self {
63 Self {
64 entry_type: EntryType::Embedding,
65 id: id.into(),
66 namespace: None,
67 }
68 }
69
70 pub fn with_namespace(mut self, namespace: impl Into<String>) -> Self {
71 self.namespace = Some(namespace.into());
72 self
73 }
74
75 pub fn to_path(&self) -> PathBuf {
77 let type_str = match self.entry_type {
78 EntryType::Document => "documents",
79 EntryType::Chunk => "chunks",
80 EntryType::Embedding => "embeddings",
81 EntryType::Metadata => "metadata",
82 };
83
84 let mut path = PathBuf::from(type_str);
85
86 if let Some(namespace) = &self.namespace {
87 path.push(namespace);
88 }
89
90 path.push(format!("{}.json", self.id));
91 path
92 }
93}
94
95#[derive(Debug, Clone)]
97pub struct StorageQuery {
98 pub entry_type: Option<EntryType>,
100
101 pub namespace: Option<String>,
103
104 pub key_prefix: Option<String>,
106
107 pub metadata_filters: HashMap<String, serde_json::Value>,
109
110 pub limit: Option<usize>,
112
113 pub offset: Option<usize>,
115}
116
117impl StorageQuery {
118 pub fn new() -> Self {
119 Self {
120 entry_type: None,
121 namespace: None,
122 key_prefix: None,
123 metadata_filters: HashMap::new(),
124 limit: None,
125 offset: None,
126 }
127 }
128
129 pub fn documents() -> Self {
130 Self::new().with_entry_type(EntryType::Document)
131 }
132
133 pub fn chunks() -> Self {
134 Self::new().with_entry_type(EntryType::Chunk)
135 }
136
137 pub fn embeddings() -> Self {
138 Self::new().with_entry_type(EntryType::Embedding)
139 }
140
141 pub fn with_entry_type(mut self, entry_type: EntryType) -> Self {
142 self.entry_type = Some(entry_type);
143 self
144 }
145
146 pub fn with_namespace(mut self, namespace: impl Into<String>) -> Self {
147 self.namespace = Some(namespace.into());
148 self
149 }
150
151 pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
152 self.key_prefix = Some(prefix.into());
153 self
154 }
155
156 pub fn with_limit(mut self, limit: usize) -> Self {
157 self.limit = Some(limit);
158 self
159 }
160
161 pub fn with_offset(mut self, offset: usize) -> Self {
162 self.offset = Some(offset);
163 self
164 }
165}
166
167impl Default for StorageQuery {
168 fn default() -> Self {
169 Self::new()
170 }
171}
172
173#[async_trait]
175pub trait Storage: Send + Sync {
176 fn name(&self) -> &str;
178
179 async fn put(&self, key: &StorageKey, entry: &StorageEntry) -> RragResult<()>;
181
182 async fn get(&self, key: &StorageKey) -> RragResult<Option<StorageEntry>>;
184
185 async fn delete(&self, key: &StorageKey) -> RragResult<bool>;
187
188 async fn exists(&self, key: &StorageKey) -> RragResult<bool>;
190
191 async fn list_keys(&self, query: &StorageQuery) -> RragResult<Vec<StorageKey>>;
193
194 async fn get_many(
196 &self,
197 keys: &[StorageKey],
198 ) -> RragResult<Vec<(StorageKey, Option<StorageEntry>)>>;
199
200 async fn put_many(&self, entries: &[(StorageKey, StorageEntry)]) -> RragResult<()>;
202
203 async fn delete_many(&self, keys: &[StorageKey]) -> RragResult<usize>;
205
206 async fn clear(&self) -> RragResult<()> {
208 Err(RragError::storage(
209 "clear",
210 std::io::Error::new(
211 std::io::ErrorKind::Unsupported,
212 "Clear operation not supported",
213 ),
214 ))
215 }
216
217 async fn stats(&self) -> RragResult<StorageStats>;
219
220 async fn health_check(&self) -> RragResult<bool>;
222}
223
224#[derive(Debug, Clone, Serialize, Deserialize)]
226pub struct StorageStats {
227 pub total_entries: usize,
229
230 pub entries_by_type: HashMap<String, usize>,
232
233 pub size_bytes: u64,
235
236 pub available_bytes: Option<u64>,
238
239 pub backend_type: String,
241
242 pub last_updated: chrono::DateTime<chrono::Utc>,
244}
245
246pub struct InMemoryStorage {
248 data: Arc<tokio::sync::RwLock<HashMap<StorageKey, StorageEntry>>>,
250
251 config: MemoryStorageConfig,
253}
254
255#[derive(Debug, Clone)]
256pub struct MemoryStorageConfig {
257 pub max_entries: Option<usize>,
259
260 pub max_memory_bytes: Option<u64>,
262}
263
264impl Default for MemoryStorageConfig {
265 fn default() -> Self {
266 Self {
267 max_entries: Some(100_000),
268 max_memory_bytes: Some(1_000_000_000), }
270 }
271}
272
273impl InMemoryStorage {
274 pub fn new() -> Self {
275 Self {
276 data: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
277 config: MemoryStorageConfig::default(),
278 }
279 }
280
281 pub fn with_config(config: MemoryStorageConfig) -> Self {
282 Self {
283 data: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
284 config,
285 }
286 }
287
288 async fn check_limits(&self) -> RragResult<()> {
290 let data = self.data.read().await;
291
292 if let Some(max_entries) = self.config.max_entries {
293 if data.len() >= max_entries {
294 return Err(RragError::storage(
295 "memory_limit",
296 std::io::Error::new(
297 std::io::ErrorKind::OutOfMemory,
298 format!("Exceeded maximum entries: {}", max_entries),
299 ),
300 ));
301 }
302 }
303
304 Ok(())
305 }
306
307 fn matches_query(&self, key: &StorageKey, query: &StorageQuery) -> bool {
309 if let Some(entry_type) = &query.entry_type {
311 if key.entry_type != *entry_type {
312 return false;
313 }
314 }
315
316 if let Some(namespace) = &query.namespace {
318 match &key.namespace {
319 Some(key_ns) if key_ns == namespace => {}
320 None if namespace.is_empty() => {}
321 _ => return false,
322 }
323 }
324
325 if let Some(prefix) = &query.key_prefix {
327 if !key.id.starts_with(prefix) {
328 return false;
329 }
330 }
331
332 true
333 }
334}
335
336impl Default for InMemoryStorage {
337 fn default() -> Self {
338 Self::new()
339 }
340}
341
342#[async_trait]
343impl Storage for InMemoryStorage {
344 fn name(&self) -> &str {
345 "in_memory"
346 }
347
348 async fn put(&self, key: &StorageKey, entry: &StorageEntry) -> RragResult<()> {
349 self.check_limits().await?;
350
351 let mut data = self.data.write().await;
352 data.insert(key.clone(), entry.clone());
353 Ok(())
354 }
355
356 async fn get(&self, key: &StorageKey) -> RragResult<Option<StorageEntry>> {
357 let data = self.data.read().await;
358 Ok(data.get(key).cloned())
359 }
360
361 async fn delete(&self, key: &StorageKey) -> RragResult<bool> {
362 let mut data = self.data.write().await;
363 Ok(data.remove(key).is_some())
364 }
365
366 async fn exists(&self, key: &StorageKey) -> RragResult<bool> {
367 let data = self.data.read().await;
368 Ok(data.contains_key(key))
369 }
370
371 async fn list_keys(&self, query: &StorageQuery) -> RragResult<Vec<StorageKey>> {
372 let data = self.data.read().await;
373 let mut keys: Vec<StorageKey> = data
374 .keys()
375 .filter(|key| self.matches_query(key, query))
376 .cloned()
377 .collect();
378
379 if let Some(offset) = query.offset {
381 if offset < keys.len() {
382 keys = keys.into_iter().skip(offset).collect();
383 } else {
384 keys.clear();
385 }
386 }
387
388 if let Some(limit) = query.limit {
389 keys.truncate(limit);
390 }
391
392 Ok(keys)
393 }
394
395 async fn get_many(
396 &self,
397 keys: &[StorageKey],
398 ) -> RragResult<Vec<(StorageKey, Option<StorageEntry>)>> {
399 let data = self.data.read().await;
400 let results = keys
401 .iter()
402 .map(|key| (key.clone(), data.get(key).cloned()))
403 .collect();
404 Ok(results)
405 }
406
407 async fn put_many(&self, entries: &[(StorageKey, StorageEntry)]) -> RragResult<()> {
408 self.check_limits().await?;
409
410 let mut data = self.data.write().await;
411 for (key, entry) in entries {
412 data.insert(key.clone(), entry.clone());
413 }
414 Ok(())
415 }
416
417 async fn delete_many(&self, keys: &[StorageKey]) -> RragResult<usize> {
418 let mut data = self.data.write().await;
419 let mut deleted = 0;
420 for key in keys {
421 if data.remove(key).is_some() {
422 deleted += 1;
423 }
424 }
425 Ok(deleted)
426 }
427
428 async fn clear(&self) -> RragResult<()> {
429 let mut data = self.data.write().await;
430 data.clear();
431 Ok(())
432 }
433
434 async fn stats(&self) -> RragResult<StorageStats> {
435 let data = self.data.read().await;
436
437 let mut entries_by_type = HashMap::new();
438 for key in data.keys() {
439 let type_str = match key.entry_type {
440 EntryType::Document => "documents",
441 EntryType::Chunk => "chunks",
442 EntryType::Embedding => "embeddings",
443 EntryType::Metadata => "metadata",
444 };
445 *entries_by_type.entry(type_str.to_string()).or_insert(0) += 1;
446 }
447
448 let estimated_size = data.len() * 1024; Ok(StorageStats {
452 total_entries: data.len(),
453 entries_by_type,
454 size_bytes: estimated_size as u64,
455 available_bytes: self
456 .config
457 .max_memory_bytes
458 .map(|max| max - estimated_size as u64),
459 backend_type: "in_memory".to_string(),
460 last_updated: chrono::Utc::now(),
461 })
462 }
463
464 async fn health_check(&self) -> RragResult<bool> {
465 let _data = self.data.read().await;
467 Ok(true)
468 }
469}
470
471pub struct FileStorage {
473 base_dir: PathBuf,
475
476 config: FileStorageConfig,
478}
479
480#[derive(Debug, Clone)]
481pub struct FileStorageConfig {
482 pub create_dirs: bool,
484
485 pub file_permissions: Option<u32>,
487
488 pub compress: bool,
490
491 pub sync_writes: bool,
493}
494
495impl Default for FileStorageConfig {
496 fn default() -> Self {
497 Self {
498 create_dirs: true,
499 file_permissions: None,
500 compress: false,
501 sync_writes: false,
502 }
503 }
504}
505
506impl FileStorage {
507 pub async fn new(base_dir: impl AsRef<Path>) -> RragResult<Self> {
508 let base_dir = base_dir.as_ref().to_path_buf();
509
510 if !base_dir.exists() {
511 fs::create_dir_all(&base_dir)
512 .await
513 .map_err(|e| RragError::storage("create_directory", e))?;
514 }
515
516 Ok(Self {
517 base_dir,
518 config: FileStorageConfig::default(),
519 })
520 }
521
522 pub async fn with_config(
523 base_dir: impl AsRef<Path>,
524 config: FileStorageConfig,
525 ) -> RragResult<Self> {
526 let base_dir = base_dir.as_ref().to_path_buf();
527
528 if config.create_dirs && !base_dir.exists() {
529 fs::create_dir_all(&base_dir)
530 .await
531 .map_err(|e| RragError::storage("create_directory", e))?;
532 }
533
534 Ok(Self { base_dir, config })
535 }
536
537 fn get_file_path(&self, key: &StorageKey) -> PathBuf {
539 self.base_dir.join(key.to_path())
540 }
541
542 async fn ensure_parent_dir(&self, file_path: &Path) -> RragResult<()> {
544 if let Some(parent) = file_path.parent() {
545 if !parent.exists() {
546 fs::create_dir_all(parent)
547 .await
548 .map_err(|e| RragError::storage("create_parent_directory", e))?;
549 }
550 }
551 Ok(())
552 }
553}
554
555#[async_trait]
556impl Storage for FileStorage {
557 fn name(&self) -> &str {
558 "file_system"
559 }
560
561 async fn put(&self, key: &StorageKey, entry: &StorageEntry) -> RragResult<()> {
562 let file_path = self.get_file_path(key);
563 self.ensure_parent_dir(&file_path).await?;
564
565 let json_data =
566 serde_json::to_vec_pretty(entry).map_err(|e| RragError::storage("serialize", e))?;
567
568 let mut file = fs::File::create(&file_path)
569 .await
570 .map_err(|e| RragError::storage("create_file", e))?;
571
572 file.write_all(&json_data)
573 .await
574 .map_err(|e| RragError::storage("write_file", e))?;
575
576 if self.config.sync_writes {
577 file.sync_all()
578 .await
579 .map_err(|e| RragError::storage("sync_file", e))?;
580 }
581
582 Ok(())
583 }
584
585 async fn get(&self, key: &StorageKey) -> RragResult<Option<StorageEntry>> {
586 let file_path = self.get_file_path(key);
587
588 if !file_path.exists() {
589 return Ok(None);
590 }
591
592 let mut file = fs::File::open(&file_path)
593 .await
594 .map_err(|e| RragError::storage("open_file", e))?;
595
596 let mut contents = Vec::new();
597 file.read_to_end(&mut contents)
598 .await
599 .map_err(|e| RragError::storage("read_file", e))?;
600
601 let entry =
602 serde_json::from_slice(&contents).map_err(|e| RragError::storage("deserialize", e))?;
603
604 Ok(Some(entry))
605 }
606
607 async fn delete(&self, key: &StorageKey) -> RragResult<bool> {
608 let file_path = self.get_file_path(key);
609
610 if !file_path.exists() {
611 return Ok(false);
612 }
613
614 fs::remove_file(&file_path)
615 .await
616 .map_err(|e| RragError::storage("delete_file", e))?;
617
618 Ok(true)
619 }
620
621 async fn exists(&self, key: &StorageKey) -> RragResult<bool> {
622 let file_path = self.get_file_path(key);
623 Ok(file_path.exists())
624 }
625
626 async fn list_keys(&self, _query: &StorageQuery) -> RragResult<Vec<StorageKey>> {
627 let keys = Vec::new();
630
631 Ok(keys)
634 }
635
636 async fn get_many(
637 &self,
638 keys: &[StorageKey],
639 ) -> RragResult<Vec<(StorageKey, Option<StorageEntry>)>> {
640 let mut results = Vec::with_capacity(keys.len());
641
642 for key in keys {
643 let entry = self.get(key).await?;
644 results.push((key.clone(), entry));
645 }
646
647 Ok(results)
648 }
649
650 async fn put_many(&self, entries: &[(StorageKey, StorageEntry)]) -> RragResult<()> {
651 for (key, entry) in entries {
652 self.put(key, entry).await?;
653 }
654 Ok(())
655 }
656
657 async fn delete_many(&self, keys: &[StorageKey]) -> RragResult<usize> {
658 let mut deleted = 0;
659
660 for key in keys {
661 if self.delete(key).await? {
662 deleted += 1;
663 }
664 }
665
666 Ok(deleted)
667 }
668
669 async fn stats(&self) -> RragResult<StorageStats> {
670 Ok(StorageStats {
673 total_entries: 0, entries_by_type: HashMap::new(),
675 size_bytes: 0,
676 available_bytes: None,
677 backend_type: "file_system".to_string(),
678 last_updated: chrono::Utc::now(),
679 })
680 }
681
682 async fn health_check(&self) -> RragResult<bool> {
683 Ok(self.base_dir.exists() && self.base_dir.is_dir())
685 }
686}
687
688pub struct StorageService {
690 storage: Arc<dyn Storage>,
692
693 #[allow(dead_code)]
695 config: StorageServiceConfig,
696}
697
698#[derive(Debug, Clone)]
699pub struct StorageServiceConfig {
700 pub enable_batching: bool,
702
703 pub batch_size: usize,
705
706 pub batch_timeout_ms: u64,
708
709 pub enable_caching: bool,
711
712 pub cache_ttl_seconds: u64,
714}
715
716impl Default for StorageServiceConfig {
717 fn default() -> Self {
718 Self {
719 enable_batching: true,
720 batch_size: 100,
721 batch_timeout_ms: 1000,
722 enable_caching: false,
723 cache_ttl_seconds: 300,
724 }
725 }
726}
727
728impl StorageService {
729 pub fn new(storage: Arc<dyn Storage>) -> Self {
730 Self {
731 storage,
732 config: StorageServiceConfig::default(),
733 }
734 }
735
736 pub fn with_config(storage: Arc<dyn Storage>, config: StorageServiceConfig) -> Self {
737 Self { storage, config }
738 }
739
740 pub async fn store_document(&self, document: &Document) -> RragResult<()> {
742 let key = StorageKey::document(&document.id);
743 let entry = StorageEntry::Document(document.clone());
744 self.storage.put(&key, &entry).await
745 }
746
747 pub async fn store_chunk(&self, chunk: &DocumentChunk) -> RragResult<()> {
749 let key = StorageKey::chunk(&chunk.document_id, chunk.chunk_index);
750 let entry = StorageEntry::Chunk(chunk.clone());
751 self.storage.put(&key, &entry).await
752 }
753
754 pub async fn store_embedding(&self, embedding: &Embedding) -> RragResult<()> {
756 let key = StorageKey::embedding(&embedding.source_id);
757 let entry = StorageEntry::Embedding(embedding.clone());
758 self.storage.put(&key, &entry).await
759 }
760
761 pub async fn get_document(&self, document_id: &str) -> RragResult<Option<Document>> {
763 let key = StorageKey::document(document_id);
764 match self.storage.get(&key).await? {
765 Some(StorageEntry::Document(doc)) => Ok(Some(doc)),
766 _ => Ok(None),
767 }
768 }
769
770 pub async fn get_stats(&self) -> RragResult<StorageStats> {
772 self.storage.stats().await
773 }
774
775 pub async fn health_check(&self) -> RragResult<bool> {
777 self.storage.health_check().await
778 }
779}
780
781#[cfg(test)]
782mod tests {
783 use super::*;
784 use tempfile::TempDir;
785
786 #[tokio::test]
787 async fn test_in_memory_storage() {
788 let storage = InMemoryStorage::new();
789
790 let doc = Document::new("Test document");
791 let key = StorageKey::document(&doc.id);
792 let entry = StorageEntry::Document(doc.clone());
793
794 storage.put(&key, &entry).await.unwrap();
796
797 let retrieved = storage.get(&key).await.unwrap();
798 assert!(retrieved.is_some());
799
800 if let Some(StorageEntry::Document(retrieved_doc)) = retrieved {
801 assert_eq!(retrieved_doc.id, doc.id);
802 assert_eq!(retrieved_doc.content_str(), doc.content_str());
803 }
804
805 assert!(storage.exists(&key).await.unwrap());
807
808 assert!(storage.delete(&key).await.unwrap());
810 assert!(!storage.exists(&key).await.unwrap());
811 }
812
813 #[tokio::test]
814 async fn test_file_storage() {
815 let temp_dir = TempDir::new().unwrap();
816 let storage = FileStorage::new(temp_dir.path()).await.unwrap();
817
818 let doc = Document::new("Test document for file storage");
819 let key = StorageKey::document(&doc.id);
820 let entry = StorageEntry::Document(doc.clone());
821
822 storage.put(&key, &entry).await.unwrap();
824
825 let retrieved = storage.get(&key).await.unwrap();
826 assert!(retrieved.is_some());
827
828 if let Some(StorageEntry::Document(retrieved_doc)) = retrieved {
829 assert_eq!(retrieved_doc.id, doc.id);
830 }
831
832 let file_path = temp_dir.path().join(key.to_path());
834 assert!(file_path.exists());
835 }
836
837 #[test]
838 fn test_storage_key() {
839 let doc_key = StorageKey::document("doc1");
840 assert_eq!(doc_key.entry_type, EntryType::Document);
841 assert_eq!(doc_key.id, "doc1");
842
843 let chunk_key = StorageKey::chunk("doc1", 5);
844 assert_eq!(chunk_key.entry_type, EntryType::Chunk);
845 assert_eq!(chunk_key.id, "doc1_5");
846
847 let ns_key = doc_key.with_namespace("test_namespace");
848 assert_eq!(ns_key.namespace, Some("test_namespace".to_string()));
849 }
850
851 #[tokio::test]
852 async fn test_storage_service() {
853 let storage = Arc::new(InMemoryStorage::new());
854 let service = StorageService::new(storage);
855
856 let doc = Document::new("Test document for service");
857
858 service.store_document(&doc).await.unwrap();
860
861 let retrieved = service.get_document(&doc.id).await.unwrap();
863 assert!(retrieved.is_some());
864 assert_eq!(retrieved.unwrap().id, doc.id);
865
866 let stats = service.get_stats().await.unwrap();
868 assert_eq!(stats.total_entries, 1);
869 }
870}