1use crate::index::{DocumentIndex, IndexType};
9use crate::query::{Filter, Query, QueryResult};
10use crate::types::{Document, DocumentId};
11use crate::validation::{Schema, ValidationResult};
12use std::collections::{HashMap, HashSet};
13use std::sync::RwLock;
14
15pub struct Collection {
21 name: String,
22 documents: RwLock<HashMap<DocumentId, Document>>,
23 indexes: RwLock<Vec<DocumentIndex>>,
24 schema: Option<Schema>,
25}
26
27impl Collection {
28 pub fn new(name: impl Into<String>) -> Self {
30 Self {
31 name: name.into(),
32 documents: RwLock::new(HashMap::new()),
33 indexes: RwLock::new(Vec::new()),
34 schema: None,
35 }
36 }
37
38 pub fn with_schema(name: impl Into<String>, schema: Schema) -> Self {
40 Self {
41 name: name.into(),
42 documents: RwLock::new(HashMap::new()),
43 indexes: RwLock::new(Vec::new()),
44 schema: Some(schema),
45 }
46 }
47
48 pub fn name(&self) -> &str {
50 &self.name
51 }
52
53 pub fn insert(&self, doc: Document) -> Result<DocumentId, CollectionError> {
59 if let Some(ref schema) = self.schema {
60 let result = schema.validate(&doc);
61 if !result.is_valid {
62 return Err(CollectionError::ValidationFailed(result.errors));
63 }
64 }
65
66 let id = doc.id.clone();
67
68 {
69 let mut docs = self.documents.write().expect("documents RwLock poisoned");
70 if docs.contains_key(&id) {
71 return Err(CollectionError::DuplicateId(id));
72 }
73 docs.insert(id.clone(), doc.clone());
74 }
75
76 self.index_document(&doc);
77
78 Ok(id)
79 }
80
81 pub fn insert_many(&self, docs: Vec<Document>) -> Result<Vec<DocumentId>, CollectionError> {
83 let mut ids = Vec::with_capacity(docs.len());
84
85 for doc in docs {
86 let id = self.insert(doc)?;
87 ids.push(id);
88 }
89
90 Ok(ids)
91 }
92
93 pub fn get(&self, id: &DocumentId) -> Option<Document> {
95 let docs = self.documents.read().expect("documents RwLock poisoned");
96 docs.get(id).cloned()
97 }
98
99 pub fn update(&self, id: &DocumentId, doc: Document) -> Result<(), CollectionError> {
101 if let Some(ref schema) = self.schema {
102 let result = schema.validate(&doc);
103 if !result.is_valid {
104 return Err(CollectionError::ValidationFailed(result.errors));
105 }
106 }
107
108 {
109 let mut docs = self.documents.write().expect("documents RwLock poisoned");
110 if !docs.contains_key(id) {
111 return Err(CollectionError::NotFound(id.clone()));
112 }
113
114 if let Some(old_doc) = docs.get(id) {
115 self.unindex_document(old_doc);
116 }
117
118 docs.insert(id.clone(), doc.clone());
119 }
120
121 self.index_document(&doc);
122
123 Ok(())
124 }
125
126 pub fn delete(&self, id: &DocumentId) -> Result<Document, CollectionError> {
128 let mut docs = self.documents.write().expect("documents RwLock poisoned");
129
130 match docs.remove(id) {
131 Some(doc) => {
132 self.unindex_document(&doc);
133 Ok(doc)
134 }
135 None => Err(CollectionError::NotFound(id.clone())),
136 }
137 }
138
139 pub fn contains(&self, id: &DocumentId) -> bool {
141 let docs = self.documents.read().expect("documents RwLock poisoned");
142 docs.contains_key(id)
143 }
144
145 pub fn count(&self) -> usize {
147 let docs = self.documents.read().expect("documents RwLock poisoned");
148 docs.len()
149 }
150
151 pub fn ids(&self) -> Vec<DocumentId> {
153 let docs = self.documents.read().expect("documents RwLock poisoned");
154 docs.keys().cloned().collect()
155 }
156
157 pub fn all(&self) -> Vec<Document> {
159 let docs = self.documents.read().expect("documents RwLock poisoned");
160 docs.values().cloned().collect()
161 }
162
163 pub fn clear(&self) {
165 let mut docs = self.documents.write().expect("documents RwLock poisoned");
166 docs.clear();
167
168 let mut indexes = self.indexes.write().expect("indexes RwLock poisoned");
169 for index in indexes.iter_mut() {
170 index.clear();
171 }
172 }
173
174 pub fn find(&self, query: &Query) -> QueryResult {
184 let docs = self.documents.read().expect("documents RwLock poisoned");
185 let indexes = self.indexes.read().expect("indexes RwLock poisoned");
186 let start = std::time::Instant::now();
187
188 let candidate_ids = self.find_indexed_candidates(&query.filters, &indexes);
191
192 let (mut matching, total_scanned) = if let Some(ids) = candidate_ids {
193 let scanned = ids.len();
195 let results: Vec<Document> = ids
196 .iter()
197 .filter_map(|id| docs.get(id))
198 .filter(|doc| query.matches(doc))
199 .cloned()
200 .collect();
201 (results, scanned)
202 } else {
203 let results: Vec<Document> = docs
205 .values()
206 .filter(|doc| query.matches(doc))
207 .cloned()
208 .collect();
209 (results, docs.len())
210 };
211
212 if let Some(ref sort) = query.sort {
214 matching.sort_by(|a, b| {
215 let va = a.get(&sort.field);
216 let vb = b.get(&sort.field);
217 let ord = crate::types::compare_values(va, vb);
218 if sort.ascending {
219 ord
220 } else {
221 ord.reverse()
222 }
223 });
224 }
225
226 if let Some(skip) = query.skip {
228 if skip < matching.len() {
229 matching = matching.split_off(skip);
230 } else {
231 matching.clear();
232 }
233 }
234
235 if let Some(limit) = query.limit {
237 matching.truncate(limit);
238 }
239
240 if let Some(ref fields) = query.projection {
242 for doc in &mut matching {
243 doc.data.retain(|key, _| fields.contains(key));
244 }
245 }
246
247 QueryResult {
248 documents: matching,
249 total_scanned,
250 execution_time_ms: start.elapsed().as_millis() as u64,
251 }
252 }
253
254 pub fn find_one(&self, query: &Query) -> Option<Document> {
256 let docs = self.documents.read().expect("documents RwLock poisoned");
257 docs.values().find(|doc| query.matches(doc)).cloned()
258 }
259
260 pub fn count_matching(&self, query: &Query) -> usize {
262 let docs = self.documents.read().expect("documents RwLock poisoned");
263 docs.values().filter(|doc| query.matches(doc)).count()
264 }
265
266 fn find_indexed_candidates(
275 &self,
276 filters: &[Filter],
277 indexes: &[DocumentIndex],
278 ) -> Option<HashSet<DocumentId>> {
279 let mut result: Option<HashSet<DocumentId>> = None;
280
281 for filter in filters {
282 if let Filter::Eq { field, value } = filter {
283 if let Some(index) = indexes.iter().find(|idx| {
285 idx.field() == field.as_str()
286 && matches!(
287 idx.index_type(),
288 IndexType::Hash | IndexType::Unique | IndexType::BTree
289 )
290 }) {
291 let ids: HashSet<DocumentId> = index.find_eq(value).into_iter().collect();
292 result = Some(match result {
293 Some(current) => current.intersection(&ids).cloned().collect(),
294 None => ids,
295 });
296 }
297 }
298 }
299
300 result
301 }
302
303 pub fn create_index(&self, field: impl Into<String>, index_type: IndexType) {
309 let field = field.into();
310 let mut index = DocumentIndex::new(field.clone(), index_type);
311
312 let docs = self.documents.read().expect("documents RwLock poisoned");
313 for doc in docs.values() {
314 index.index_document(doc);
315 }
316
317 let mut indexes = self.indexes.write().expect("indexes RwLock poisoned");
318 indexes.push(index);
319 }
320
321 pub fn drop_index(&self, field: &str) {
323 let mut indexes = self.indexes.write().expect("indexes RwLock poisoned");
324 indexes.retain(|idx| idx.field() != field);
325 }
326
327 pub fn index_names(&self) -> Vec<String> {
329 let indexes = self.indexes.read().expect("indexes RwLock poisoned");
330 indexes.iter().map(|idx| idx.field().to_string()).collect()
331 }
332
333 fn index_document(&self, doc: &Document) {
334 let mut indexes = self.indexes.write().expect("indexes RwLock poisoned");
335 for index in indexes.iter_mut() {
336 index.index_document(doc);
337 }
338 }
339
340 fn unindex_document(&self, doc: &Document) {
341 let mut indexes = self.indexes.write().expect("indexes RwLock poisoned");
342 for index in indexes.iter_mut() {
343 index.unindex_document(doc);
344 }
345 }
346
347 pub fn set_schema(&mut self, schema: Schema) {
353 self.schema = Some(schema);
354 }
355
356 pub fn schema(&self) -> Option<&Schema> {
358 self.schema.as_ref()
359 }
360
361 pub fn validate_all(&self) -> Vec<(DocumentId, ValidationResult)> {
363 let Some(ref schema) = self.schema else {
364 return Vec::new();
365 };
366
367 let docs = self.documents.read().expect("documents RwLock poisoned");
368 docs.iter()
369 .map(|(id, doc)| (id.clone(), schema.validate(doc)))
370 .filter(|(_, result)| !result.is_valid)
371 .collect()
372 }
373}
374
375#[derive(Debug, Clone)]
381pub enum CollectionError {
382 DuplicateId(DocumentId),
383 NotFound(DocumentId),
384 ValidationFailed(Vec<String>),
385 IndexError(String),
386}
387
388impl std::fmt::Display for CollectionError {
389 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
390 match self {
391 Self::DuplicateId(id) => write!(f, "Document with ID {} already exists", id),
392 Self::NotFound(id) => write!(f, "Document with ID {} not found", id),
393 Self::ValidationFailed(errors) => {
394 write!(f, "Validation failed: {}", errors.join(", "))
395 }
396 Self::IndexError(msg) => write!(f, "Index error: {}", msg),
397 }
398 }
399}
400
401impl std::error::Error for CollectionError {}
402
403#[cfg(test)]
408mod tests {
409 use super::*;
410 use crate::query::QueryBuilder;
411
412 #[test]
413 fn test_collection_creation() {
414 let collection = Collection::new("users");
415 assert_eq!(collection.name(), "users");
416 assert_eq!(collection.count(), 0);
417 }
418
419 #[test]
420 fn test_insert_and_get() {
421 let collection = Collection::new("test");
422
423 let mut doc = Document::with_id("doc1");
424 doc.set("name", "Alice");
425
426 let id = collection.insert(doc).unwrap();
427 assert_eq!(id.as_str(), "doc1");
428
429 let retrieved = collection.get(&id).unwrap();
430 assert_eq!(
431 retrieved.get("name").and_then(|v| v.as_str()),
432 Some("Alice")
433 );
434 }
435
436 #[test]
437 fn test_duplicate_id() {
438 let collection = Collection::new("test");
439
440 let doc1 = Document::with_id("same-id");
441 let doc2 = Document::with_id("same-id");
442
443 collection.insert(doc1).unwrap();
444 let result = collection.insert(doc2);
445
446 assert!(matches!(result, Err(CollectionError::DuplicateId(_))));
447 }
448
449 #[test]
450 fn test_update() {
451 let collection = Collection::new("test");
452
453 let mut doc = Document::with_id("doc1");
454 doc.set("count", 1i64);
455 collection.insert(doc).unwrap();
456
457 let mut updated = Document::with_id("doc1");
458 updated.set("count", 2i64);
459 collection
460 .update(&DocumentId::new("doc1"), updated)
461 .unwrap();
462
463 let retrieved = collection.get(&DocumentId::new("doc1")).unwrap();
464 assert_eq!(retrieved.get("count").and_then(|v| v.as_i64()), Some(2));
465 }
466
467 #[test]
468 fn test_delete() {
469 let collection = Collection::new("test");
470
471 let doc = Document::with_id("doc1");
472 collection.insert(doc).unwrap();
473
474 assert!(collection.contains(&DocumentId::new("doc1")));
475
476 collection.delete(&DocumentId::new("doc1")).unwrap();
477 assert!(!collection.contains(&DocumentId::new("doc1")));
478 }
479
480 #[test]
481 fn test_find() {
482 let collection = Collection::new("test");
483
484 for i in 0..10 {
485 let mut doc = Document::new();
486 doc.set("value", i as i64);
487 doc.set("even", i % 2 == 0);
488 collection.insert(doc).unwrap();
489 }
490
491 let query = QueryBuilder::new().eq("even", true).build();
492 let result = collection.find(&query);
493
494 assert_eq!(result.documents.len(), 5);
495 }
496
497 #[test]
498 fn test_find_uses_index() {
499 let collection = Collection::new("test");
500
501 for i in 0..100 {
503 let mut doc = Document::new();
504 doc.set("status", if i % 10 == 0 { "active" } else { "inactive" });
505 doc.set("value", i as i64);
506 collection.insert(doc).unwrap();
507 }
508
509 let query = QueryBuilder::new().eq("status", "active").build();
511 let result_no_index = collection.find(&query);
512 assert_eq!(result_no_index.documents.len(), 10);
513 assert_eq!(result_no_index.total_scanned, 100); collection.create_index("status", IndexType::Hash);
517
518 let result_with_index = collection.find(&query);
520 assert_eq!(result_with_index.documents.len(), 10);
521 assert!(
522 result_with_index.total_scanned < 100,
523 "Expected index scan to examine fewer than 100 docs, got {}",
524 result_with_index.total_scanned
525 );
526 assert_eq!(result_with_index.total_scanned, 10);
528 }
529
530 #[test]
531 fn test_find_index_with_additional_filters() {
532 let collection = Collection::new("test");
533
534 for i in 0..100 {
536 let mut doc = Document::new();
537 doc.set("status", if i < 50 { "active" } else { "inactive" });
538 doc.set("value", i as i64);
539 collection.insert(doc).unwrap();
540 }
541
542 collection.create_index("status", IndexType::Hash);
543
544 let query = QueryBuilder::new()
546 .eq("status", "active")
547 .gt("value", 25i64)
548 .build();
549
550 let result = collection.find(&query);
551
552 assert_eq!(result.documents.len(), 24);
554 assert_eq!(result.total_scanned, 50);
556 }
557
558 #[test]
559 fn test_find_no_matching_index_falls_back_to_scan() {
560 let collection = Collection::new("test");
561
562 for i in 0..20 {
563 let mut doc = Document::new();
564 doc.set("x", i as i64);
565 collection.insert(doc).unwrap();
566 }
567
568 collection.create_index("y", IndexType::Hash);
570
571 let query = QueryBuilder::new().eq("x", 5i64).build();
572 let result = collection.find(&query);
573
574 assert_eq!(result.documents.len(), 1);
575 assert_eq!(result.total_scanned, 20);
577 }
578}