Skip to main content

aegis_document/
collection.rs

1//! Aegis Document Collection
2//!
3//! Collection management for document storage.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use crate::index::{DocumentIndex, IndexType};
9use crate::query::{Filter, Query, QueryResult};
10use crate::types::{Document, DocumentId};
11use crate::validation::{Schema, ValidationResult};
12use std::collections::{HashMap, HashSet};
13use std::sync::RwLock;
14
15// =============================================================================
16// Collection
17// =============================================================================
18
19/// A collection of documents.
20pub struct Collection {
21    name: String,
22    documents: RwLock<HashMap<DocumentId, Document>>,
23    indexes: RwLock<Vec<DocumentIndex>>,
24    schema: Option<Schema>,
25}
26
27impl Collection {
28    /// Create a new collection.
29    pub fn new(name: impl Into<String>) -> Self {
30        Self {
31            name: name.into(),
32            documents: RwLock::new(HashMap::new()),
33            indexes: RwLock::new(Vec::new()),
34            schema: None,
35        }
36    }
37
38    /// Create a collection with schema validation.
39    pub fn with_schema(name: impl Into<String>, schema: Schema) -> Self {
40        Self {
41            name: name.into(),
42            documents: RwLock::new(HashMap::new()),
43            indexes: RwLock::new(Vec::new()),
44            schema: Some(schema),
45        }
46    }
47
48    /// Get the collection name.
49    pub fn name(&self) -> &str {
50        &self.name
51    }
52
53    // -------------------------------------------------------------------------
54    // Document Operations
55    // -------------------------------------------------------------------------
56
57    /// Insert a document.
58    pub fn insert(&self, doc: Document) -> Result<DocumentId, CollectionError> {
59        if let Some(ref schema) = self.schema {
60            let result = schema.validate(&doc);
61            if !result.is_valid {
62                return Err(CollectionError::ValidationFailed(result.errors));
63            }
64        }
65
66        let id = doc.id.clone();
67
68        {
69            let mut docs = self.documents.write().expect("documents RwLock poisoned");
70            if docs.contains_key(&id) {
71                return Err(CollectionError::DuplicateId(id));
72            }
73            docs.insert(id.clone(), doc.clone());
74        }
75
76        self.index_document(&doc);
77
78        Ok(id)
79    }
80
81    /// Insert multiple documents.
82    pub fn insert_many(&self, docs: Vec<Document>) -> Result<Vec<DocumentId>, CollectionError> {
83        let mut ids = Vec::with_capacity(docs.len());
84
85        for doc in docs {
86            let id = self.insert(doc)?;
87            ids.push(id);
88        }
89
90        Ok(ids)
91    }
92
93    /// Get a document by ID.
94    pub fn get(&self, id: &DocumentId) -> Option<Document> {
95        let docs = self.documents.read().expect("documents RwLock poisoned");
96        docs.get(id).cloned()
97    }
98
99    /// Update a document.
100    pub fn update(&self, id: &DocumentId, doc: Document) -> Result<(), CollectionError> {
101        if let Some(ref schema) = self.schema {
102            let result = schema.validate(&doc);
103            if !result.is_valid {
104                return Err(CollectionError::ValidationFailed(result.errors));
105            }
106        }
107
108        {
109            let mut docs = self.documents.write().expect("documents RwLock poisoned");
110            if !docs.contains_key(id) {
111                return Err(CollectionError::NotFound(id.clone()));
112            }
113
114            if let Some(old_doc) = docs.get(id) {
115                self.unindex_document(old_doc);
116            }
117
118            docs.insert(id.clone(), doc.clone());
119        }
120
121        self.index_document(&doc);
122
123        Ok(())
124    }
125
126    /// Delete a document.
127    pub fn delete(&self, id: &DocumentId) -> Result<Document, CollectionError> {
128        let mut docs = self.documents.write().expect("documents RwLock poisoned");
129
130        match docs.remove(id) {
131            Some(doc) => {
132                self.unindex_document(&doc);
133                Ok(doc)
134            }
135            None => Err(CollectionError::NotFound(id.clone())),
136        }
137    }
138
139    /// Check if a document exists.
140    pub fn contains(&self, id: &DocumentId) -> bool {
141        let docs = self.documents.read().expect("documents RwLock poisoned");
142        docs.contains_key(id)
143    }
144
145    /// Get the number of documents.
146    pub fn count(&self) -> usize {
147        let docs = self.documents.read().expect("documents RwLock poisoned");
148        docs.len()
149    }
150
151    /// Get all document IDs.
152    pub fn ids(&self) -> Vec<DocumentId> {
153        let docs = self.documents.read().expect("documents RwLock poisoned");
154        docs.keys().cloned().collect()
155    }
156
157    /// Get all documents.
158    pub fn all(&self) -> Vec<Document> {
159        let docs = self.documents.read().expect("documents RwLock poisoned");
160        docs.values().cloned().collect()
161    }
162
163    /// Clear all documents.
164    pub fn clear(&self) {
165        let mut docs = self.documents.write().expect("documents RwLock poisoned");
166        docs.clear();
167
168        let mut indexes = self.indexes.write().expect("indexes RwLock poisoned");
169        for index in indexes.iter_mut() {
170            index.clear();
171        }
172    }
173
174    // -------------------------------------------------------------------------
175    // Query Operations
176    // -------------------------------------------------------------------------
177
178    /// Find documents matching a query.
179    ///
180    /// When possible, uses indexes on fields with `Eq` filters to narrow
181    /// the candidate set before applying the full filter chain, reducing
182    /// the number of documents scanned.
183    pub fn find(&self, query: &Query) -> QueryResult {
184        let docs = self.documents.read().expect("documents RwLock poisoned");
185        let indexes = self.indexes.read().expect("indexes RwLock poisoned");
186        let start = std::time::Instant::now();
187
188        // Try to find an Eq filter whose field has a hash/unique/btree index.
189        // We pick the first matching index we find and use it to get candidate IDs.
190        let candidate_ids = self.find_indexed_candidates(&query.filters, &indexes);
191
192        let (mut matching, total_scanned) = if let Some(ids) = candidate_ids {
193            // Index path: only scan the candidate documents
194            let scanned = ids.len();
195            let results: Vec<Document> = ids
196                .iter()
197                .filter_map(|id| docs.get(id))
198                .filter(|doc| query.matches(doc))
199                .cloned()
200                .collect();
201            (results, scanned)
202        } else {
203            // Full scan fallback
204            let results: Vec<Document> = docs
205                .values()
206                .filter(|doc| query.matches(doc))
207                .cloned()
208                .collect();
209            (results, docs.len())
210        };
211
212        // Apply sort
213        if let Some(ref sort) = query.sort {
214            matching.sort_by(|a, b| {
215                let va = a.get(&sort.field);
216                let vb = b.get(&sort.field);
217                let ord = crate::types::compare_values(va, vb);
218                if sort.ascending {
219                    ord
220                } else {
221                    ord.reverse()
222                }
223            });
224        }
225
226        // Apply skip
227        if let Some(skip) = query.skip {
228            if skip < matching.len() {
229                matching = matching.split_off(skip);
230            } else {
231                matching.clear();
232            }
233        }
234
235        // Apply limit
236        if let Some(limit) = query.limit {
237            matching.truncate(limit);
238        }
239
240        // Apply projection
241        if let Some(ref fields) = query.projection {
242            for doc in &mut matching {
243                doc.data.retain(|key, _| fields.contains(key));
244            }
245        }
246
247        QueryResult {
248            documents: matching,
249            total_scanned,
250            execution_time_ms: start.elapsed().as_millis() as u64,
251        }
252    }
253
254    /// Find one document matching a query.
255    pub fn find_one(&self, query: &Query) -> Option<Document> {
256        let docs = self.documents.read().expect("documents RwLock poisoned");
257        docs.values().find(|doc| query.matches(doc)).cloned()
258    }
259
260    /// Count documents matching a query.
261    pub fn count_matching(&self, query: &Query) -> usize {
262        let docs = self.documents.read().expect("documents RwLock poisoned");
263        docs.values().filter(|doc| query.matches(doc)).count()
264    }
265
266    // -------------------------------------------------------------------------
267    // Index-Accelerated Lookup (private)
268    // -------------------------------------------------------------------------
269
270    /// Scan filters for `Eq` conditions on indexed fields and return candidate
271    /// document IDs from the first matching index. When multiple `Eq` filters
272    /// have indexes, intersect their candidate sets for tighter filtering.
273    /// Returns `None` if no applicable index is found (triggers full scan).
274    fn find_indexed_candidates(
275        &self,
276        filters: &[Filter],
277        indexes: &[DocumentIndex],
278    ) -> Option<HashSet<DocumentId>> {
279        let mut result: Option<HashSet<DocumentId>> = None;
280
281        for filter in filters {
282            if let Filter::Eq { field, value } = filter {
283                // Find an index on this field (Hash, Unique, or BTree all use hash_index)
284                if let Some(index) = indexes.iter().find(|idx| {
285                    idx.field() == field.as_str()
286                        && matches!(
287                            idx.index_type(),
288                            IndexType::Hash | IndexType::Unique | IndexType::BTree
289                        )
290                }) {
291                    let ids: HashSet<DocumentId> = index.find_eq(value).into_iter().collect();
292                    result = Some(match result {
293                        Some(current) => current.intersection(&ids).cloned().collect(),
294                        None => ids,
295                    });
296                }
297            }
298        }
299
300        result
301    }
302
303    // -------------------------------------------------------------------------
304    // Index Operations
305    // -------------------------------------------------------------------------
306
307    /// Create an index on a field.
308    pub fn create_index(&self, field: impl Into<String>, index_type: IndexType) {
309        let field = field.into();
310        let mut index = DocumentIndex::new(field.clone(), index_type);
311
312        let docs = self.documents.read().expect("documents RwLock poisoned");
313        for doc in docs.values() {
314            index.index_document(doc);
315        }
316
317        let mut indexes = self.indexes.write().expect("indexes RwLock poisoned");
318        indexes.push(index);
319    }
320
321    /// Drop an index.
322    pub fn drop_index(&self, field: &str) {
323        let mut indexes = self.indexes.write().expect("indexes RwLock poisoned");
324        indexes.retain(|idx| idx.field() != field);
325    }
326
327    /// Get all index names.
328    pub fn index_names(&self) -> Vec<String> {
329        let indexes = self.indexes.read().expect("indexes RwLock poisoned");
330        indexes.iter().map(|idx| idx.field().to_string()).collect()
331    }
332
333    fn index_document(&self, doc: &Document) {
334        let mut indexes = self.indexes.write().expect("indexes RwLock poisoned");
335        for index in indexes.iter_mut() {
336            index.index_document(doc);
337        }
338    }
339
340    fn unindex_document(&self, doc: &Document) {
341        let mut indexes = self.indexes.write().expect("indexes RwLock poisoned");
342        for index in indexes.iter_mut() {
343            index.unindex_document(doc);
344        }
345    }
346
347    // -------------------------------------------------------------------------
348    // Schema Operations
349    // -------------------------------------------------------------------------
350
351    /// Set the collection schema.
352    pub fn set_schema(&mut self, schema: Schema) {
353        self.schema = Some(schema);
354    }
355
356    /// Get the collection schema.
357    pub fn schema(&self) -> Option<&Schema> {
358        self.schema.as_ref()
359    }
360
361    /// Validate all documents against the schema.
362    pub fn validate_all(&self) -> Vec<(DocumentId, ValidationResult)> {
363        let Some(ref schema) = self.schema else {
364            return Vec::new();
365        };
366
367        let docs = self.documents.read().expect("documents RwLock poisoned");
368        docs.iter()
369            .map(|(id, doc)| (id.clone(), schema.validate(doc)))
370            .filter(|(_, result)| !result.is_valid)
371            .collect()
372    }
373}
374
375// =============================================================================
376// Collection Error
377// =============================================================================
378
379/// Errors that can occur in collection operations.
380#[derive(Debug, Clone)]
381pub enum CollectionError {
382    DuplicateId(DocumentId),
383    NotFound(DocumentId),
384    ValidationFailed(Vec<String>),
385    IndexError(String),
386}
387
388impl std::fmt::Display for CollectionError {
389    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
390        match self {
391            Self::DuplicateId(id) => write!(f, "Document with ID {} already exists", id),
392            Self::NotFound(id) => write!(f, "Document with ID {} not found", id),
393            Self::ValidationFailed(errors) => {
394                write!(f, "Validation failed: {}", errors.join(", "))
395            }
396            Self::IndexError(msg) => write!(f, "Index error: {}", msg),
397        }
398    }
399}
400
401impl std::error::Error for CollectionError {}
402
403// =============================================================================
404// Tests
405// =============================================================================
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410    use crate::query::QueryBuilder;
411
412    #[test]
413    fn test_collection_creation() {
414        let collection = Collection::new("users");
415        assert_eq!(collection.name(), "users");
416        assert_eq!(collection.count(), 0);
417    }
418
419    #[test]
420    fn test_insert_and_get() {
421        let collection = Collection::new("test");
422
423        let mut doc = Document::with_id("doc1");
424        doc.set("name", "Alice");
425
426        let id = collection.insert(doc).unwrap();
427        assert_eq!(id.as_str(), "doc1");
428
429        let retrieved = collection.get(&id).unwrap();
430        assert_eq!(
431            retrieved.get("name").and_then(|v| v.as_str()),
432            Some("Alice")
433        );
434    }
435
436    #[test]
437    fn test_duplicate_id() {
438        let collection = Collection::new("test");
439
440        let doc1 = Document::with_id("same-id");
441        let doc2 = Document::with_id("same-id");
442
443        collection.insert(doc1).unwrap();
444        let result = collection.insert(doc2);
445
446        assert!(matches!(result, Err(CollectionError::DuplicateId(_))));
447    }
448
449    #[test]
450    fn test_update() {
451        let collection = Collection::new("test");
452
453        let mut doc = Document::with_id("doc1");
454        doc.set("count", 1i64);
455        collection.insert(doc).unwrap();
456
457        let mut updated = Document::with_id("doc1");
458        updated.set("count", 2i64);
459        collection
460            .update(&DocumentId::new("doc1"), updated)
461            .unwrap();
462
463        let retrieved = collection.get(&DocumentId::new("doc1")).unwrap();
464        assert_eq!(retrieved.get("count").and_then(|v| v.as_i64()), Some(2));
465    }
466
467    #[test]
468    fn test_delete() {
469        let collection = Collection::new("test");
470
471        let doc = Document::with_id("doc1");
472        collection.insert(doc).unwrap();
473
474        assert!(collection.contains(&DocumentId::new("doc1")));
475
476        collection.delete(&DocumentId::new("doc1")).unwrap();
477        assert!(!collection.contains(&DocumentId::new("doc1")));
478    }
479
480    #[test]
481    fn test_find() {
482        let collection = Collection::new("test");
483
484        for i in 0..10 {
485            let mut doc = Document::new();
486            doc.set("value", i as i64);
487            doc.set("even", i % 2 == 0);
488            collection.insert(doc).unwrap();
489        }
490
491        let query = QueryBuilder::new().eq("even", true).build();
492        let result = collection.find(&query);
493
494        assert_eq!(result.documents.len(), 5);
495    }
496
497    #[test]
498    fn test_find_uses_index() {
499        let collection = Collection::new("test");
500
501        // Insert 100 documents with a "status" field
502        for i in 0..100 {
503            let mut doc = Document::new();
504            doc.set("status", if i % 10 == 0 { "active" } else { "inactive" });
505            doc.set("value", i as i64);
506            collection.insert(doc).unwrap();
507        }
508
509        // Query without index => full scan
510        let query = QueryBuilder::new().eq("status", "active").build();
511        let result_no_index = collection.find(&query);
512        assert_eq!(result_no_index.documents.len(), 10);
513        assert_eq!(result_no_index.total_scanned, 100); // full scan
514
515        // Create index on "status"
516        collection.create_index("status", IndexType::Hash);
517
518        // Query with index => should scan far fewer documents
519        let result_with_index = collection.find(&query);
520        assert_eq!(result_with_index.documents.len(), 10);
521        assert!(
522            result_with_index.total_scanned < 100,
523            "Expected index scan to examine fewer than 100 docs, got {}",
524            result_with_index.total_scanned
525        );
526        // The index should return exactly the 10 matching documents
527        assert_eq!(result_with_index.total_scanned, 10);
528    }
529
530    #[test]
531    fn test_find_index_with_additional_filters() {
532        let collection = Collection::new("test");
533
534        // Insert docs: 50 active, 50 inactive; half of each have value > 25
535        for i in 0..100 {
536            let mut doc = Document::new();
537            doc.set("status", if i < 50 { "active" } else { "inactive" });
538            doc.set("value", i as i64);
539            collection.insert(doc).unwrap();
540        }
541
542        collection.create_index("status", IndexType::Hash);
543
544        // Eq filter on indexed field + Gt filter on non-indexed field
545        let query = QueryBuilder::new()
546            .eq("status", "active")
547            .gt("value", 25i64)
548            .build();
549
550        let result = collection.find(&query);
551
552        // active docs are i=0..50, value>25 means i=26..49 => 24 docs
553        assert_eq!(result.documents.len(), 24);
554        // Should only scan the 50 "active" candidates from the index
555        assert_eq!(result.total_scanned, 50);
556    }
557
558    #[test]
559    fn test_find_no_matching_index_falls_back_to_scan() {
560        let collection = Collection::new("test");
561
562        for i in 0..20 {
563            let mut doc = Document::new();
564            doc.set("x", i as i64);
565            collection.insert(doc).unwrap();
566        }
567
568        // Index on a different field
569        collection.create_index("y", IndexType::Hash);
570
571        let query = QueryBuilder::new().eq("x", 5i64).build();
572        let result = collection.find(&query);
573
574        assert_eq!(result.documents.len(), 1);
575        // No index on "x", so full scan
576        assert_eq!(result.total_scanned, 20);
577    }
578}