luckdb/
lib.rs

1#![feature(slice_as_array)]
2#![allow(warnings)]
3use chrono;
4use serde::{Deserialize, Serialize};
5use serde_json::{Map, Value as Document, Value};
6use std::clone;
7use std::cmp::Ordering;
8use std::collections::{BTreeMap, HashMap, HashSet};
9use std::fs::{self, File};
10use std::io::{BufRead, BufReader, BufWriter};
11use std::io::{Read, Write};
12use std::net::{SocketAddr, TcpListener, TcpStream};
13use std::ops::Bound::{Excluded, Included, Unbounded};
14use std::path::{Path, PathBuf};
15use std::sync::{Arc, RwLock};
16use std::thread;
17use thiserror::Error;
18use uuid::Uuid;
19
20// --- Error Types ---
21#[derive(Error, Debug)]
22pub enum DbError {
23    #[error("Document not found")]
24    NotFound,
25    #[error("Invalid query: {0}")]
26    InvalidQuery(String),
27    #[error("Serialization error: {0}")]
28    Serialization(#[from] serde_json::Error),
29    #[error("IO error: {0}")]
30    IoError(#[from] std::io::Error),
31    #[error("Other error: {0}")]
32    Other(String),
33    #[error("Index error: {0}")]
34    IndexError(String),
35    #[error("Update error: {0}")]
36    UpdateError(String),
37    #[error("Validation error: {0}")]
38    ValidationError(String),
39    #[error("Connection error: {0}")]
40    ConnectionError(String),
41    #[error("Protocol error: {0}")]
42    ProtocolError(String),
43}
44pub type Result<T> = std::result::Result<T, DbError>;
45
46// --- Document ID ---
47#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
48pub struct DocId(String);
49impl DocId {
50    pub fn new() -> Self {
51        Self(Uuid::new_v4().to_string())
52    }
53
54    pub fn from_str(s: &str) -> Self {
55        Self(s.to_string())
56    }
57
58    pub fn as_str(&self) -> &str {
59        &self.0
60    }
61}
62impl std::fmt::Display for DocId {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        write!(f, "{}", self.0)
65    }
66}
67
68// --- Query Operators ---
69#[derive(Debug, Clone, Serialize, Deserialize)]
70pub enum QueryOperator {
71    Eq(Value),
72    Ne(Value),
73    Gt(Value),
74    Gte(Value),
75    Lt(Value),
76    Lte(Value),
77    In(Vec<Value>),
78    Nin(Vec<Value>),
79    Exists(bool),
80    Regex(String),
81    And(Vec<Query>),
82    Or(Vec<Query>),
83    Nor(Vec<Query>),
84    Not(Box<QueryOperator>),
85    // Array operators
86    All(Vec<Value>),
87    ElemMatch(Query),
88    Size(usize),
89    // Geospatial operators (simplified)
90    Near {
91        point: (f64, f64),
92        max_distance: Option<f64>,
93    },
94    Within {
95        shape: GeoShape,
96    },
97    Intersects {
98        shape: GeoShape,
99    },
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub enum GeoShape {
104    Box {
105        bottom_left: (f64, f64),
106        top_right: (f64, f64),
107    },
108    Polygon {
109        points: Vec<(f64, f64)>,
110    },
111    Center {
112        center: (f64, f64),
113        radius: f64,
114    },
115}
116
117// --- Query Condition ---
118#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct QueryCondition {
120    field: String,
121    operator: QueryOperator,
122}
123
124// --- Query ---
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct Query {
127    conditions: Vec<QueryCondition>,
128    logical_ops: Vec<(LogicalOp, Vec<Query>)>,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
132pub enum LogicalOp {
133    And,
134    Or,
135    Nor,
136    Not,
137}
138// Add this helper function after the Query implementation
139impl Query {
140    pub fn from_value(value: Value) -> Result<Self> {
141        match value {
142            Value::Object(map) => {
143                let mut query = Query::new();
144
145                for (field, op_value) in map {
146                    match op_value {
147                        Value::Object(op_map) => {
148                            for (op, val) in op_map {
149                                match op.as_str() {
150                                    "$eq" => query = query.eq(&field, val.clone()),
151                                    "$ne" => query = query.ne(&field, val.clone()),
152                                    "$gt" => query = query.gt(&field, val.clone()),
153                                    "$gte" => query = query.gte(&field, val.clone()),
154                                    "$lt" => query = query.lt(&field, val.clone()),
155                                    "$lte" => query = query.lte(&field, val.clone()),
156                                    "$in" => {
157                                        if let Value::Array(arr) = val {
158                                            query = query.in_(&field, arr.clone());
159                                        }
160                                    }
161                                    "$nin" => {
162                                        if let Value::Array(arr) = val {
163                                            query = query.nin(&field, arr.clone());
164                                        }
165                                    }
166                                    "$exists" => {
167                                        if let Value::Bool(exists) = val {
168                                            query = query.exists(&field, exists);
169                                        }
170                                    }
171                                    "$regex" => {
172                                        if let Value::String(pattern) = val {
173                                            query = query.regex(&field, &pattern);
174                                        }
175                                    }
176                                    _ => {
177                                        return Err(DbError::InvalidQuery(format!(
178                                            "Unknown operator: {}",
179                                            op
180                                        )))
181                                    }
182                                }
183                            }
184                        }
185                        _ => query = query.eq(&field, op_value.clone()),
186                    }
187                }
188
189                Ok(query)
190            }
191            _ => Err(DbError::InvalidQuery("Query must be an object".to_string())),
192        }
193    }
194}
195
196impl Query {
197    pub fn new() -> Self {
198        Self {
199            conditions: Vec::new(),
200            logical_ops: Vec::new(),
201        }
202    }
203
204    pub fn eq(mut self, key: &str, value: Value) -> Self {
205        self.conditions.push(QueryCondition {
206            field: key.to_string(),
207            operator: QueryOperator::Eq(value),
208        });
209        self
210    }
211
212    pub fn ne(mut self, key: &str, value: Value) -> Self {
213        self.conditions.push(QueryCondition {
214            field: key.to_string(),
215            operator: QueryOperator::Ne(value),
216        });
217        self
218    }
219
220    pub fn gt(mut self, key: &str, value: Value) -> Self {
221        self.conditions.push(QueryCondition {
222            field: key.to_string(),
223            operator: QueryOperator::Gt(value),
224        });
225        self
226    }
227
228    pub fn gte(mut self, key: &str, value: Value) -> Self {
229        self.conditions.push(QueryCondition {
230            field: key.to_string(),
231            operator: QueryOperator::Gte(value),
232        });
233        self
234    }
235
236    pub fn lt(mut self, key: &str, value: Value) -> Self {
237        self.conditions.push(QueryCondition {
238            field: key.to_string(),
239            operator: QueryOperator::Lt(value),
240        });
241        self
242    }
243
244    pub fn lte(mut self, key: &str, value: Value) -> Self {
245        self.conditions.push(QueryCondition {
246            field: key.to_string(),
247            operator: QueryOperator::Lte(value),
248        });
249        self
250    }
251
252    pub fn in_(mut self, key: &str, values: Vec<Value>) -> Self {
253        self.conditions.push(QueryCondition {
254            field: key.to_string(),
255            operator: QueryOperator::In(values),
256        });
257        self
258    }
259
260    pub fn nin(mut self, key: &str, values: Vec<Value>) -> Self {
261        self.conditions.push(QueryCondition {
262            field: key.to_string(),
263            operator: QueryOperator::Nin(values),
264        });
265        self
266    }
267
268    pub fn exists(mut self, key: &str, exists: bool) -> Self {
269        self.conditions.push(QueryCondition {
270            field: key.to_string(),
271            operator: QueryOperator::Exists(exists),
272        });
273        self
274    }
275
276    pub fn regex(mut self, key: &str, pattern: &str) -> Self {
277        self.conditions.push(QueryCondition {
278            field: key.to_string(),
279            operator: QueryOperator::Regex(pattern.to_string()),
280        });
281        self
282    }
283
284    pub fn and(mut self, queries: Vec<Query>) -> Self {
285        self.logical_ops.push((LogicalOp::And, queries));
286        self
287    }
288
289    pub fn or(mut self, queries: Vec<Query>) -> Self {
290        self.logical_ops.push((LogicalOp::Or, queries));
291        self
292    }
293
294    pub fn nor(mut self, queries: Vec<Query>) -> Self {
295        self.logical_ops.push((LogicalOp::Nor, queries));
296        self
297    }
298
299    pub fn not(mut self, query: Query) -> Self {
300        // Convert the query to a single condition for Not operator
301        if let Some(cond) = query.conditions.first() {
302            self.conditions.push(QueryCondition {
303                field: cond.field.clone(),
304                operator: QueryOperator::Not(Box::new(cond.operator.clone())),
305            });
306        }
307        self
308    }
309
310    // Array operators
311    pub fn all(mut self, key: &str, values: Vec<Value>) -> Self {
312        self.conditions.push(QueryCondition {
313            field: key.to_string(),
314            operator: QueryOperator::All(values),
315        });
316        self
317    }
318
319    pub fn elem_match(mut self, key: &str, query: Query) -> Self {
320        self.conditions.push(QueryCondition {
321            field: key.to_string(),
322            operator: QueryOperator::ElemMatch(query),
323        });
324        self
325    }
326
327    pub fn size(mut self, key: &str, size: usize) -> Self {
328        self.conditions.push(QueryCondition {
329            field: key.to_string(),
330            operator: QueryOperator::Size(size),
331        });
332        self
333    }
334
335    // Geospatial operators
336    pub fn near(mut self, key: &str, point: (f64, f64), max_distance: Option<f64>) -> Self {
337        self.conditions.push(QueryCondition {
338            field: key.to_string(),
339            operator: QueryOperator::Near {
340                point,
341                max_distance,
342            },
343        });
344        self
345    }
346
347    pub fn within(mut self, key: &str, shape: GeoShape) -> Self {
348        self.conditions.push(QueryCondition {
349            field: key.to_string(),
350            operator: QueryOperator::Within { shape },
351        });
352        self
353    }
354
355    pub fn intersects(mut self, key: &str, shape: GeoShape) -> Self {
356        self.conditions.push(QueryCondition {
357            field: key.to_string(),
358            operator: QueryOperator::Intersects { shape },
359        });
360        self
361    }
362
363    // Check if a document matches all conditions
364    pub fn matches(&self, doc: &Document) -> bool {
365        // Check all conditions
366        for cond in &self.conditions {
367            if !self.matches_condition(doc, cond) {
368                return false;
369            }
370        }
371
372        // Check logical operations
373        for (op, queries) in &self.logical_ops {
374            match op {
375                LogicalOp::And => {
376                    if !queries.iter().all(|q| q.matches(doc)) {
377                        return false;
378                    }
379                }
380                LogicalOp::Or => {
381                    if !queries.iter().any(|q| q.matches(doc)) {
382                        return false;
383                    }
384                }
385                LogicalOp::Nor => {
386                    if queries.iter().any(|q| q.matches(doc)) {
387                        return false;
388                    }
389                }
390                LogicalOp::Not => {
391                    if queries.iter().any(|q| q.matches(doc)) {
392                        return false;
393                    }
394                }
395            }
396        }
397
398        true
399    }
400
401    fn matches_condition(&self, doc: &Document, cond: &QueryCondition) -> bool {
402        let field_parts: Vec<&str> = cond.field.split('.').collect();
403        let value = self.get_nested_value(doc, &field_parts);
404
405        match &cond.operator {
406            QueryOperator::Eq(expected) => match value {
407                Some(v) => v == expected,
408                None => false,
409            },
410            QueryOperator::Ne(expected) => match value {
411                Some(v) => v != expected,
412                None => true,
413            },
414            QueryOperator::Gt(expected) => match (value, expected) {
415                (Some(Value::Number(a)), Value::Number(b)) => {
416                    a.as_f64().unwrap_or(0.0) > b.as_f64().unwrap_or(0.0)
417                }
418                _ => false,
419            },
420            QueryOperator::Gte(expected) => match (value, expected) {
421                (Some(Value::Number(a)), Value::Number(b)) => {
422                    a.as_f64().unwrap_or(0.0) >= b.as_f64().unwrap_or(0.0)
423                }
424                _ => false,
425            },
426            QueryOperator::Lt(expected) => match (value, expected) {
427                (Some(Value::Number(a)), Value::Number(b)) => {
428                    a.as_f64().unwrap_or(0.0) < b.as_f64().unwrap_or(0.0)
429                }
430                _ => false,
431            },
432            QueryOperator::Lte(expected) => match (value, expected) {
433                (Some(Value::Number(a)), Value::Number(b)) => {
434                    a.as_f64().unwrap_or(0.0) <= b.as_f64().unwrap_or(0.0)
435                }
436                _ => false,
437            },
438            QueryOperator::In(values) => match value {
439                Some(v) => values.contains(v),
440                None => false,
441            },
442            QueryOperator::Nin(values) => match value {
443                Some(v) => !values.contains(v),
444                None => true,
445            },
446            QueryOperator::Exists(exists) => value.is_some() == *exists,
447            QueryOperator::Regex(pattern) => {
448                match value {
449                    Some(Value::String(s)) => {
450                        // Simple regex matching (in a real implementation, use regex crate)
451                        s.contains(pattern)
452                    }
453                    _ => false,
454                }
455            }
456            QueryOperator::All(values) => match value {
457                Some(Value::Array(arr)) => values.iter().all(|v| arr.contains(v)),
458                _ => false,
459            },
460            QueryOperator::ElemMatch(query) => match value {
461                Some(Value::Array(arr)) => arr.iter().any(|elem| query.matches(elem)),
462                _ => false,
463            },
464            QueryOperator::Size(size) => match value {
465                Some(Value::Array(arr)) => arr.len() == *size,
466                _ => false,
467            },
468            QueryOperator::Near {
469                point,
470                max_distance,
471            } => {
472                // Simplified geospatial query
473                match value {
474                    Some(Value::Array(arr)) if arr.len() >= 2 => {
475                        if let (Some(Value::Number(x)), Some(Value::Number(y))) =
476                            (arr.get(0), arr.get(1))
477                        {
478                            let doc_x = x.as_f64().unwrap_or(0.0);
479                            let doc_y = y.as_f64().unwrap_or(0.0);
480
481                            // Calculate distance (simplified Euclidean distance)
482                            let distance =
483                                ((doc_x - point.0).powi(2) + (doc_y - point.1).powi(2)).sqrt();
484
485                            if let Some(max) = max_distance {
486                                distance <= *max
487                            } else {
488                                true
489                            }
490                        } else {
491                            false
492                        }
493                    }
494                    _ => false,
495                }
496            }
497            QueryOperator::Within { shape } => {
498                // Simplified implementation
499                match (value, shape) {
500                    (
501                        Some(Value::Array(arr)),
502                        GeoShape::Box {
503                            bottom_left,
504                            top_right,
505                        },
506                    ) if arr.len() >= 2 => {
507                        if let (Some(Value::Number(x)), Some(Value::Number(y))) =
508                            (arr.get(0), arr.get(1))
509                        {
510                            let doc_x = x.as_f64().unwrap_or(0.0);
511                            let doc_y = y.as_f64().unwrap_or(0.0);
512
513                            doc_x >= bottom_left.0
514                                && doc_x <= top_right.0
515                                && doc_y >= bottom_left.1
516                                && doc_y <= top_right.1
517                        } else {
518                            false
519                        }
520                    }
521                    _ => false,
522                }
523            }
524            QueryOperator::Intersects { shape } => {
525                // Simplified implementation
526                match (value, shape) {
527                    (
528                        Some(Value::Array(arr)),
529                        GeoShape::Box {
530                            bottom_left,
531                            top_right,
532                        },
533                    ) if arr.len() >= 2 => {
534                        if let (Some(Value::Number(x)), Some(Value::Number(y))) =
535                            (arr.get(0), arr.get(1))
536                        {
537                            let doc_x = x.as_f64().unwrap_or(0.0);
538                            let doc_y = y.as_f64().unwrap_or(0.0);
539
540                            doc_x >= bottom_left.0
541                                && doc_x <= top_right.0
542                                && doc_y >= bottom_left.1
543                                && doc_y <= top_right.1
544                        } else {
545                            false
546                        }
547                    }
548                    _ => false,
549                }
550            }
551            QueryOperator::And(_)
552            | QueryOperator::Or(_)
553            | QueryOperator::Nor(_)
554            | QueryOperator::Not(_) => {
555                // These are handled at the query level
556                true
557            }
558        }
559    }
560
561    fn get_nested_value<'a>(&self, doc: &'a Value, path: &[&str]) -> Option<&'a Value> {
562        if path.is_empty() {
563            return Some(doc);
564        }
565
566        let key = path[0];
567        let rest = &path[1..];
568
569        match doc {
570            Value::Object(map) => {
571                if let Some(next_value) = map.get(key) {
572                    self.get_nested_value(next_value, rest)
573                } else {
574                    None
575                }
576            }
577            _ => None,
578        }
579    }
580}
581
582// --- Update Operators ---
583#[derive(Debug, Clone, Serialize, Deserialize)]
584pub enum UpdateOperator {
585    Set(Map<String, Value>),
586    Unset(Vec<String>),
587    Inc(Map<String, Value>),
588    Mul(Map<String, Value>),
589    Rename(Vec<(String, String)>),
590    SetOnInsert(Map<String, Value>),
591    Min(Map<String, Value>),
592    Max(Map<String, Value>),
593    CurrentDate(Map<String, Value>),
594    // Array operators
595    Push(Map<String, Value>),
596    PushAll(BTreeMap<String, Vec<Value>>),
597    AddToSet(Map<String, Value>),
598    Pop(BTreeMap<String, i64>),
599    Pull(Map<String, Value>),
600    PullAll(BTreeMap<String, Vec<Value>>),
601    Bit(Map<String, Value>),
602}
603
604// --- Update Document ---
605#[derive(Debug, Clone, Serialize, Deserialize)]
606pub struct UpdateDocument {
607    operators: Vec<UpdateOperator>,
608}
609
610impl UpdateDocument {
611    pub fn new() -> Self {
612        Self {
613            operators: Vec::new(),
614        }
615    }
616
617    pub fn set(mut self, key: &str, value: Value) -> Self {
618        let mut map = Map::new();
619        map.insert(key.to_string(), value);
620        self.operators.push(UpdateOperator::Set(map));
621        self
622    }
623
624    pub fn unset(mut self, key: &str) -> Self {
625        self.operators
626            .push(UpdateOperator::Unset(vec![key.to_string()]));
627        self
628    }
629
630    pub fn inc(mut self, key: &str, value: Value) -> Self {
631        let mut map = Map::new();
632        map.insert(key.to_string(), value);
633        self.operators.push(UpdateOperator::Inc(map));
634        self
635    }
636
637    pub fn mul(mut self, key: &str, value: Value) -> Self {
638        let mut map = Map::new();
639        map.insert(key.to_string(), value);
640        self.operators.push(UpdateOperator::Mul(map));
641        self
642    }
643
644    pub fn rename(mut self, old_key: &str, new_key: &str) -> Self {
645        self.operators.push(UpdateOperator::Rename(vec![(
646            old_key.to_string(),
647            new_key.to_string(),
648        )]));
649        self
650    }
651
652    pub fn set_on_insert(mut self, key: &str, value: Value) -> Self {
653        let mut map = Map::new();
654        map.insert(key.to_string(), value);
655        self.operators.push(UpdateOperator::SetOnInsert(map));
656        self
657    }
658
659    pub fn min(mut self, key: &str, value: Value) -> Self {
660        let mut map = Map::new();
661        map.insert(key.to_string(), value);
662        self.operators.push(UpdateOperator::Min(map));
663        self
664    }
665
666    pub fn max(mut self, key: &str, value: Value) -> Self {
667        let mut map = Map::new();
668        map.insert(key.to_string(), value);
669        self.operators.push(UpdateOperator::Max(map));
670        self
671    }
672
673    pub fn current_date(mut self, key: &str, type_spec: Value) -> Self {
674        let mut map = Map::new();
675        map.insert(key.to_string(), type_spec);
676        self.operators.push(UpdateOperator::CurrentDate(map));
677        self
678    }
679
680    // Array operators
681    pub fn push(mut self, key: &str, value: Value) -> Self {
682        let mut map = Map::new();
683        map.insert(key.to_string(), value);
684        self.operators.push(UpdateOperator::Push(map));
685        self
686    }
687
688    pub fn push_all(mut self, key: &str, values: Vec<Value>) -> Self {
689        let mut btree_map = BTreeMap::new();
690        btree_map.insert(key.to_string(), values);
691        self.operators.push(UpdateOperator::PushAll(btree_map));
692        self
693    }
694
695    pub fn add_to_set(mut self, key: &str, value: Value) -> Self {
696        let mut map = Map::new();
697        map.insert(key.to_string(), value);
698        self.operators.push(UpdateOperator::AddToSet(map));
699        self
700    }
701
702    pub fn pop(mut self, key: &str, pos: i64) -> Self {
703        let mut btree_map = BTreeMap::new();
704        btree_map.insert(key.to_string(), pos);
705        self.operators.push(UpdateOperator::Pop(btree_map));
706        self
707    }
708
709    pub fn pull(mut self, key: &str, condition: Value) -> Self {
710        let mut map = Map::new();
711        map.insert(key.to_string(), condition);
712        self.operators.push(UpdateOperator::Pull(map));
713        self
714    }
715
716    pub fn pull_all(mut self, key: &str, values: Vec<Value>) -> Self {
717        let mut btree_map = BTreeMap::new();
718        btree_map.insert(key.to_string(), values);
719        self.operators.push(UpdateOperator::PullAll(btree_map));
720        self
721    }
722
723    // Bitwise operator
724    pub fn bit(mut self, key: &str, operation: Value) -> Self {
725        let mut map = Map::new();
726        map.insert(key.to_string(), operation);
727        self.operators.push(UpdateOperator::Bit(map));
728        self
729    }
730}
731
732// --- Index ---
733#[derive(Debug, Clone, Serialize, Deserialize)]
734pub enum IndexType {
735    Ascending,
736    Descending,
737    Geospatial,
738    Text,
739    Hashed,
740}
741
742#[derive(Debug, Clone, Serialize, Deserialize)]
743pub struct Index {
744    name: String,
745    key: Vec<(String, IndexType)>,
746    unique: bool,
747    sparse: bool,
748    background: bool,
749}
750
751impl Index {
752    pub fn new(name: String, key: Vec<(String, IndexType)>) -> Self {
753        Self {
754            name,
755            key,
756            unique: false,
757            sparse: false,
758            background: false,
759        }
760    }
761
762    pub fn unique(mut self, unique: bool) -> Self {
763        self.unique = unique;
764        self
765    }
766
767    pub fn sparse(mut self, sparse: bool) -> Self {
768        self.sparse = sparse;
769        self
770    }
771
772    pub fn background(mut self, background: bool) -> Self {
773        self.background = background;
774        self
775    }
776}
777
778// --- Index Entry ---
779#[derive(Debug, Clone, Serialize, Deserialize)]
780struct IndexEntry {
781    doc_id: DocId,
782}
783
784#[derive(Debug, Clone, Serialize, Deserialize)]
785struct OrdValue(Value);
786
787impl PartialEq for OrdValue {
788    fn eq(&self, other: &Self) -> bool {
789        self.0 == other.0
790    }
791}
792
793impl Eq for OrdValue {}
794
795impl PartialOrd for OrdValue {
796    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
797        Some(self.cmp(other))
798    }
799}
800
801impl Ord for OrdValue {
802    fn cmp(&self, other: &Self) -> Ordering {
803        // Simple string-based comparison; adjust as needed
804        format!("{:?}", self.0).cmp(&format!("{:?}", other.0))
805    }
806}
807
808// --- Index Data Structure ---
809#[derive(Debug, Serialize, Clone, Deserialize)]
810struct IndexData {
811    entries: BTreeMap<OrdValue, Vec<IndexEntry>>,
812    unique: bool,
813    sparse: bool,
814}
815
816impl IndexData {
817    fn new(unique: bool, sparse: bool) -> Self {
818        Self {
819            entries: BTreeMap::new(),
820            unique,
821            sparse,
822        }
823    }
824
825    fn insert(&mut self, value: Value, doc_id: DocId) -> Result<()> {
826        let ord_value = OrdValue(value.clone());
827
828        if self.unique && self.entries.contains_key(&ord_value) {
829            return Err(DbError::IndexError(format!(
830                "Duplicate key error: {:?}",
831                value
832            )));
833        }
834
835        self.entries
836            .entry(ord_value)
837            .or_insert_with(Vec::new)
838            .push(IndexEntry { doc_id });
839        Ok(())
840    }
841
842    fn remove(&mut self, value: &Value, doc_id: &DocId) -> Result<()> {
843        let ord_value = OrdValue(value.clone());
844
845        if let Some(entries) = self.entries.get_mut(&ord_value) {
846            entries.retain(|entry| entry.doc_id != *doc_id);
847            if entries.is_empty() {
848                self.entries.remove(&ord_value);
849            }
850        }
851        Ok(())
852    }
853
854    fn find(&self, value: &Value) -> Vec<&IndexEntry> {
855        let ord_value = OrdValue(value.clone());
856        self.entries
857            .get(&ord_value)
858            .map_or(Vec::new(), |entries| entries.iter().collect())
859    }
860
861    fn find_range(&self, min: &Value, max: &Value) -> Vec<&IndexEntry> {
862        let mut result = Vec::new();
863        let ord_min = OrdValue(min.clone());
864        let ord_max = OrdValue(max.clone());
865
866        for (key, entries) in self.entries.range((Included(&ord_min), Included(&ord_max))) {
867            result.extend(entries.iter());
868        }
869        result
870    }
871}
872
873// --- Find Options ---
874#[derive(Debug, Clone, Serialize, Deserialize)]
875pub struct FindOptions {
876    pub projection: Option<Map<String, Value>>,
877    pub sort: Option<Vec<(String, SortOrder)>>,
878    pub skip: Option<usize>,
879    pub limit: Option<usize>,
880    pub max_time_ms: Option<u64>,
881    pub allow_partial_results: bool,
882}
883
884#[derive(Debug, Clone, Serialize, Deserialize)]
885pub enum SortOrder {
886    Ascending,
887    Descending,
888}
889
890impl Default for FindOptions {
891    fn default() -> Self {
892        Self {
893            projection: None,
894            sort: None,
895            skip: None,
896            limit: None,
897            max_time_ms: None,
898            allow_partial_results: false,
899        }
900    }
901}
902
903// --- Collection ---
904#[derive(Debug, Clone, Serialize, Deserialize)]
905pub struct Collection {
906    name: String,
907    docs: HashMap<DocId, Document>,
908    indexes: HashMap<String, IndexData>,
909    index_definitions: HashMap<String, Index>,
910}
911
912// --- Persistent Storage ---
913#[derive(Debug, Serialize, Deserialize)]
914pub struct PersistentCollection {
915    name: String,
916    docs: HashMap<DocId, Document>,
917    indexes: HashMap<String, IndexData>,
918    index_definitions: HashMap<String, Index>,
919}
920
921impl From<Collection> for PersistentCollection {
922    fn from(collection: Collection) -> Self {
923        Self {
924            name: collection.name,
925            docs: collection.docs,
926            indexes: collection.indexes,
927            index_definitions: collection.index_definitions,
928        }
929    }
930}
931
932impl From<PersistentCollection> for Collection {
933    fn from(persistent: PersistentCollection) -> Self {
934        Self {
935            name: persistent.name,
936            docs: persistent.docs,
937            indexes: persistent.indexes,
938            index_definitions: persistent.index_definitions,
939        }
940    }
941}
942
943#[derive(Debug, Serialize, Deserialize)]
944pub struct PersistentDatabase {
945    name: String,
946    collections: HashMap<String, PersistentCollection>,
947}
948
949#[derive(Debug, Serialize, Deserialize)]
950pub struct PersistentStorage {
951    databases: HashMap<String, PersistentDatabase>,
952}
953
954impl PersistentStorage {
955    pub fn new() -> Self {
956        Self {
957            databases: HashMap::new(),
958        }
959    }
960
961    pub fn save_to_file(&self, path: &Path) -> Result<()> {
962        let data = serde_json::to_string_pretty(self)?;
963        let mut file = File::create(path)?;
964        file.write_all(data.as_bytes())?;
965        Ok(())
966    }
967
968    pub fn load_from_file(path: &Path) -> Result<Self> {
969        if !path.exists() {
970            return Ok(Self::new());
971        }
972
973        let mut file = File::open(path)?;
974        let mut contents = String::new();
975        file.read_to_string(&mut contents)?;
976        let storage = serde_json::from_str(&contents)?;
977        Ok(storage)
978    }
979}
980
981// --- Collection ---
982impl Collection {
983    pub fn new(name: String) -> Self {
984        Self {
985            name,
986            docs: HashMap::new(),
987            indexes: HashMap::new(),
988            index_definitions: HashMap::new(),
989        }
990    }
991
992    // Insert a document, auto-generate _id
993    pub fn insert(&mut self, mut doc: Document) -> Result<DocId> {
994        let id = DocId::new();
995
996        // Inject _id field
997        if let Some(obj) = doc.as_object_mut() {
998            obj.insert("_id".to_string(), Value::String(id.to_string()));
999        } else {
1000            return Err(DbError::Other("Document must be an object".into()));
1001        }
1002
1003        self.docs.insert(id.clone(), doc.clone());
1004
1005        // Update indexes
1006        self.update_indexes_on_insert(&id, &doc)?;
1007
1008        Ok(id)
1009    }
1010
1011    // Insert multiple documents
1012    pub fn insert_many(&mut self, docs: Vec<Document>) -> Result<Vec<DocId>> {
1013        let mut ids = Vec::new();
1014
1015        for mut doc in docs {
1016            let id = DocId::new();
1017
1018            // Inject _id field
1019            if let Some(obj) = doc.as_object_mut() {
1020                obj.insert("_id".to_string(), Value::String(id.to_string()));
1021            } else {
1022                return Err(DbError::Other("Document must be an object".into()));
1023            }
1024
1025            self.docs.insert(id.clone(), doc.clone());
1026            ids.push(id.clone());
1027
1028            // Update indexes
1029            self.update_indexes_on_insert(&id, &doc)?;
1030        }
1031
1032        Ok(ids)
1033    }
1034
1035    // Find all matching documents
1036    pub fn find(
1037        &self,
1038        query: Query,
1039        options: Option<FindOptions>,
1040    ) -> Result<Vec<(DocId, Document)>> {
1041        let mut results = Vec::new();
1042
1043        // Try to use indexes if possible
1044        let indexed_results = self.try_indexed_query(&query)?;
1045
1046        if !indexed_results.is_empty() {
1047            // Use indexed results
1048            for (id, doc) in indexed_results {
1049                if query.matches(&doc) {
1050                    results.push((id, doc));
1051                }
1052            }
1053        } else {
1054            // Full collection scan
1055            for (id, doc) in self.docs.iter() {
1056                if query.matches(doc) {
1057                    results.push((id.clone(), doc.clone()));
1058                }
1059            }
1060        }
1061
1062        // Apply options
1063        if let Some(opts) = options {
1064            // Sort
1065            if let Some(sort_spec) = opts.sort {
1066                self.sort_results(&mut results, &sort_spec);
1067            }
1068
1069            // Skip
1070            if let Some(skip) = opts.skip {
1071                if skip < results.len() {
1072                    results.drain(0..skip);
1073                } else {
1074                    results.clear();
1075                }
1076            }
1077
1078            // Limit
1079            if let Some(limit) = opts.limit {
1080                results.truncate(limit);
1081            }
1082
1083            // Projection
1084            if let Some(proj) = opts.projection {
1085                self.apply_projection(&mut results, &proj);
1086            }
1087        }
1088
1089        Ok(results)
1090    }
1091
1092    // Find a single document
1093    pub fn find_one(
1094        &self,
1095        query: Query,
1096        options: Option<FindOptions>,
1097    ) -> Result<(DocId, Document)> {
1098        let mut opts = options.unwrap_or_default();
1099        opts.limit = Some(1);
1100
1101        let mut results = self.find(query, Some(opts))?;
1102        results.pop().ok_or(DbError::NotFound)
1103    }
1104
1105    // Find and modify a document
1106    pub fn find_one_and_update(
1107        &mut self,
1108        query: Query,
1109        update: UpdateDocument,
1110        options: Option<FindOneAndUpdateOptions>,
1111    ) -> Result<Option<(DocId, Document)>> {
1112        let opts = options.unwrap_or_default();
1113
1114        // Find the document
1115        let result = self.find_one(query.clone(), None);
1116
1117        match result {
1118            Ok((id, mut doc)) => {
1119                let original_doc = doc.clone();
1120
1121                // Apply update
1122                self.apply_update(&mut doc, &update, opts.upsert)?;
1123
1124                // Update the document
1125                self.docs.insert(id.clone(), doc.clone());
1126
1127                // Update indexes
1128                self.update_indexes_on_update(&id, &original_doc, &doc)?;
1129
1130                if opts.return_document == ReturnDocument::After {
1131                    Ok(Some((id, doc)))
1132                } else {
1133                    Ok(Some((id, original_doc)))
1134                }
1135            }
1136            Err(DbError::NotFound) if opts.upsert => {
1137                // Upsert: create a new document
1138                let mut new_doc = Map::new();
1139
1140                // Extract fields from query
1141                for cond in &query.conditions {
1142                    if let QueryOperator::Eq(value) = &cond.operator {
1143                        new_doc.insert(cond.field.clone(), value.clone());
1144                    }
1145                }
1146
1147                // Apply update
1148                let mut doc = Value::Object(new_doc);
1149                self.apply_update(&mut doc, &update, true)?;
1150
1151                // Insert the document
1152                let id = self.insert(doc)?;
1153
1154                // Return the new document if requested
1155                if opts.return_document == ReturnDocument::After {
1156                    self.find_one(Query::new().eq("_id", Value::String(id.to_string())), None)
1157                        .map(Some)
1158                } else {
1159                    Ok(None)
1160                }
1161            }
1162            Err(_) => Ok(None),
1163        }
1164    }
1165
1166    // Find and replace a document
1167    pub fn find_one_and_replace(
1168        &mut self,
1169        query: Query,
1170        replacement: Document,
1171        options: Option<FindOneAndReplaceOptions>,
1172    ) -> Result<Option<(DocId, Document)>> {
1173        let opts = options.unwrap_or_default();
1174
1175        // Find the document
1176        let result = self.find_one(query.clone(), None);
1177
1178        match result {
1179            Ok((id, original_doc)) => {
1180                // Ensure replacement has _id
1181                let mut replacement = replacement;
1182                if let Some(obj) = replacement.as_object_mut() {
1183                    obj.insert("_id".to_string(), Value::String(id.to_string()));
1184                } else {
1185                    return Err(DbError::Other("Replacement must be an object".into()));
1186                }
1187
1188                // Update the document
1189                self.docs.insert(id.clone(), replacement.clone());
1190
1191                // Update indexes
1192                self.update_indexes_on_update(&id, &original_doc, &replacement)?;
1193
1194                if opts.return_document == ReturnDocument::After {
1195                    Ok(Some((id, replacement)))
1196                } else {
1197                    Ok(Some((id, original_doc)))
1198                }
1199            }
1200            Err(DbError::NotFound) if opts.upsert => {
1201                // Upsert: create a new document
1202                let id = self.insert(replacement)?;
1203
1204                // Return the new document if requested
1205                if opts.return_document == ReturnDocument::After {
1206                    self.find_one(Query::new().eq("_id", Value::String(id.to_string())), None)
1207                        .map(Some)
1208                } else {
1209                    Ok(None)
1210                }
1211            }
1212            Err(_) => Ok(None),
1213        }
1214    }
1215
1216    // Find and delete a document
1217    pub fn find_one_and_delete(
1218        &mut self,
1219        query: Query,
1220        options: Option<FindOneAndDeleteOptions>,
1221    ) -> Result<Option<(DocId, Document)>> {
1222        let opts = options.unwrap_or_default();
1223
1224        // Find the document
1225        let result = self.find_one(query.clone(), None);
1226
1227        match result {
1228            Ok((id, doc)) => {
1229                // Delete the document
1230                self.docs.remove(&id);
1231
1232                // Update indexes
1233                self.update_indexes_on_delete(&id, &doc)?;
1234
1235                Ok(Some((id, doc)))
1236            }
1237            Err(_) => Ok(None),
1238        }
1239    }
1240
1241    // Update the first matching document
1242    pub fn update_one(
1243        &mut self,
1244        query: Query,
1245        update: UpdateDocument,
1246        upsert: bool,
1247    ) -> Result<usize> {
1248        let result = self.find_one(query.clone(), None);
1249
1250        match result {
1251            Ok((id, mut doc)) => {
1252                let original_doc = doc.clone();
1253
1254                // Apply update
1255                self.apply_update(&mut doc, &update, upsert)?;
1256
1257                // Update the document
1258                self.docs.insert(id.clone(), doc.clone());
1259
1260                // Update indexes
1261                self.update_indexes_on_update(&id, &original_doc, &doc)?;
1262
1263                Ok(1)
1264            }
1265            Err(DbError::NotFound) if upsert => {
1266                // Upsert: create a new document
1267                let mut new_doc = Map::new();
1268
1269                // Extract fields from query
1270                for cond in &query.conditions {
1271                    if let QueryOperator::Eq(value) = &cond.operator {
1272                        new_doc.insert(cond.field.clone(), value.clone());
1273                    }
1274                }
1275
1276                // Apply update
1277                let mut doc = Value::Object(new_doc);
1278                self.apply_update(&mut doc, &update, true)?;
1279
1280                // Insert the document
1281                self.insert(doc)?;
1282
1283                Ok(1)
1284            }
1285            Err(_) => Ok(0),
1286        }
1287    }
1288
1289    // Update all matching documents
1290    pub fn update_many(&mut self, query: Query, update: UpdateDocument) -> Result<usize> {
1291        let docs = self.find(query, None)?;
1292        let mut count = 0;
1293
1294        for (id, doc) in docs {
1295            let mut doc = doc;
1296            let original_doc = doc.clone();
1297
1298            // Apply update
1299            self.apply_update(&mut doc, &update, false)?;
1300
1301            // Update the document
1302            self.docs.insert(id.clone(), doc.clone());
1303
1304            // Update indexes
1305            self.update_indexes_on_update(&id, &original_doc, &doc)?;
1306
1307            count += 1;
1308        }
1309
1310        Ok(count)
1311    }
1312
1313    // Replace the first matching document
1314    pub fn replace_one(
1315        &mut self,
1316        query: Query,
1317        replacement: Document,
1318        upsert: bool,
1319    ) -> Result<usize> {
1320        let result = self.find_one(query.clone(), None);
1321
1322        match result {
1323            Ok((id, original_doc)) => {
1324                // Ensure replacement has _id
1325                let mut replacement = replacement;
1326                if let Some(obj) = replacement.as_object_mut() {
1327                    obj.insert("_id".to_string(), Value::String(id.to_string()));
1328                } else {
1329                    return Err(DbError::Other("Replacement must be an object".into()));
1330                }
1331
1332                // Update the document
1333                self.docs.insert(id.clone(), replacement.clone());
1334
1335                // Update indexes
1336                self.update_indexes_on_update(&id, &original_doc, &replacement)?;
1337
1338                Ok(1)
1339            }
1340            Err(DbError::NotFound) if upsert => {
1341                // Upsert: create a new document
1342                self.insert(replacement)?;
1343                Ok(1)
1344            }
1345            Err(_) => Ok(0),
1346        }
1347    }
1348
1349    // Delete the first matching document
1350    pub fn delete_one(&mut self, query: Query) -> Result<usize> {
1351        let result = self.find_one(query, None);
1352
1353        match result {
1354            Ok((id, doc)) => {
1355                // Delete the document
1356                self.docs.remove(&id);
1357
1358                // Update indexes
1359                self.update_indexes_on_delete(&id, &doc)?;
1360
1361                Ok(1)
1362            }
1363            Err(_) => Ok(0),
1364        }
1365    }
1366
1367    // Delete all matching documents
1368    pub fn delete_many(&mut self, query: Query) -> Result<usize> {
1369        let docs = self.find(query, None)?;
1370        let mut count = 0;
1371
1372        for (id, doc) in docs {
1373            // Delete the document
1374            self.docs.remove(&id);
1375
1376            // Update indexes
1377            self.update_indexes_on_delete(&id, &doc)?;
1378
1379            count += 1;
1380        }
1381
1382        Ok(count)
1383    }
1384
1385    // Count documents matching the query
1386    pub fn count_documents(&self, query: Query) -> Result<usize> {
1387        // Try to use indexes if possible
1388        let indexed_results = self.try_indexed_query(&query)?;
1389
1390        if !indexed_results.is_empty() {
1391            // Use indexed results
1392            let mut count = 0;
1393            for (_, doc) in indexed_results {
1394                if query.matches(&doc) {
1395                    count += 1;
1396                }
1397            }
1398            Ok(count)
1399        } else {
1400            // Full collection scan
1401            let mut count = 0;
1402            for doc in self.docs.values() {
1403                if query.matches(doc) {
1404                    count += 1;
1405                }
1406            }
1407            Ok(count)
1408        }
1409    }
1410
1411    // Estimated count of documents in the collection
1412    pub fn estimated_document_count(&self) -> Result<usize> {
1413        Ok(self.docs.len())
1414    }
1415
1416    // Create an index
1417    // Create index
1418    pub fn create_index(&mut self, index: Index) -> Result<()> {
1419        let index_name = index.name.clone();
1420
1421        // Check if index already exists
1422        if self.index_definitions.contains_key(&index_name) {
1423            return Err(DbError::IndexError(format!(
1424                "Index {} already exists",
1425                index_name
1426            )));
1427        }
1428
1429        // Create index data
1430        let index_data = IndexData::new(index.unique, index.sparse);
1431
1432        // Add to indexes
1433        self.indexes.insert(index_name.clone(), index_data);
1434
1435        // Add to definitions
1436        self.index_definitions
1437            .insert(index_name.clone(), index.clone());
1438
1439        // Collect document IDs and contents for indexing
1440        let docs_to_index: Vec<(DocId, Document)> = self
1441            .docs
1442            .iter()
1443            .map(|(id, doc)| (id.clone(), doc.clone()))
1444            .collect();
1445
1446        // Build index from existing documents
1447        for (id, doc) in docs_to_index {
1448            self.add_to_indexes(&index_name, &id, &doc)?;
1449        }
1450
1451        Ok(())
1452    }
1453
1454    // Drop an index
1455    pub fn drop_index(&mut self, name: &str) -> Result<()> {
1456        self.indexes.remove(name);
1457        self.index_definitions.remove(name);
1458        Ok(())
1459    }
1460
1461    // List all indexes
1462    pub fn list_indexes(&self) -> Result<Vec<Index>> {
1463        Ok(self.index_definitions.values().cloned().collect())
1464    }
1465
1466    // Drop all indexes
1467    pub fn drop_indexes(&mut self) -> Result<()> {
1468        self.indexes.clear();
1469        self.index_definitions.clear();
1470        Ok(())
1471    }
1472
1473    // Get collection statistics
1474    pub fn stats(&self) -> Result<CollectionStats> {
1475        Ok(CollectionStats {
1476            count: self.docs.len(),
1477            size: self.calculate_size(),
1478            avg_obj_size: if self.docs.is_empty() {
1479                0
1480            } else {
1481                self.calculate_size() / self.docs.len()
1482            },
1483            index_count: self.index_definitions.len(),
1484            index_size: self.calculate_index_size(),
1485        })
1486    }
1487
1488    // Aggregate pipeline
1489    pub fn aggregate(&self, pipeline: Vec<AggregationStage>) -> Result<Vec<Document>> {
1490        let mut results = Vec::new();
1491
1492        // Start with all documents
1493        let mut docs: Vec<Document> = self.docs.values().cloned().collect();
1494
1495        // Apply pipeline stages
1496        for stage in pipeline {
1497            docs = self.apply_aggregation_stage(docs, stage)?;
1498        }
1499
1500        results.extend(docs);
1501        Ok(results)
1502    }
1503
1504    // Distinct values for a field
1505    pub fn distinct(&self, field: &str, query: Option<Query>) -> Result<Vec<Value>> {
1506        let mut values = HashSet::new();
1507
1508        for doc in self.docs.values() {
1509            if let Some(q) = &query {
1510                if !q.matches(doc) {
1511                    continue;
1512                }
1513            }
1514
1515            let field_parts: Vec<&str> = field.split('.').collect();
1516            if let Some(value) = self.get_nested_value(doc, &field_parts) {
1517                values.insert(value.clone());
1518            }
1519        }
1520
1521        Ok(values.into_iter().collect())
1522    }
1523
1524    // Get the number of documents in the collection
1525    pub fn count(&self) -> usize {
1526        self.docs.len()
1527    }
1528
1529    // --- Helper methods ---
1530
1531    fn apply_update(
1532        &self,
1533        doc: &mut Document,
1534        update: &UpdateDocument,
1535        is_upsert: bool,
1536    ) -> Result<()> {
1537        for op in &update.operators {
1538            match op {
1539                UpdateOperator::Set(fields) => {
1540                    if let Some(obj) = doc.as_object_mut() {
1541                        for (key, value) in fields {
1542                            obj.insert(key.clone(), value.clone());
1543                        }
1544                    }
1545                }
1546                UpdateOperator::Unset(fields) => {
1547                    if let Some(obj) = doc.as_object_mut() {
1548                        for key in fields {
1549                            obj.remove(key);
1550                        }
1551                    }
1552                }
1553                UpdateOperator::Inc(fields) => {
1554                    if let Some(obj) = doc.as_object_mut() {
1555                        for (key, value) in fields {
1556                            if let Some(Value::Number(n)) = obj.get(key) {
1557                                if let (Some(current), Some(inc)) = (n.as_f64(), value.as_f64()) {
1558                                    obj.insert(
1559                                        key.clone(),
1560                                        Value::Number(
1561                                            serde_json::Number::from_f64(current + inc).unwrap(),
1562                                        ),
1563                                    );
1564                                }
1565                            } else {
1566                                // Field doesn't exist or isn't a number
1567                                if let Some(inc) = value.as_f64() {
1568                                    obj.insert(
1569                                        key.clone(),
1570                                        Value::Number(serde_json::Number::from_f64(inc).unwrap()),
1571                                    );
1572                                }
1573                            }
1574                        }
1575                    }
1576                }
1577                UpdateOperator::Mul(fields) => {
1578                    if let Some(obj) = doc.as_object_mut() {
1579                        for (key, value) in fields {
1580                            if let Some(Value::Number(n)) = obj.get(key) {
1581                                if let (Some(current), Some(mul)) = (n.as_f64(), value.as_f64()) {
1582                                    obj.insert(
1583                                        key.clone(),
1584                                        Value::Number(
1585                                            serde_json::Number::from_f64(current * mul).unwrap(),
1586                                        ),
1587                                    );
1588                                }
1589                            }
1590                        }
1591                    }
1592                }
1593                UpdateOperator::Rename(fields) => {
1594                    if let Some(obj) = doc.as_object_mut() {
1595                        for (old_key, new_key) in fields {
1596                            if let Some(value) = obj.remove(old_key) {
1597                                obj.insert(new_key.clone(), value);
1598                            }
1599                        }
1600                    }
1601                }
1602                UpdateOperator::SetOnInsert(fields) => {
1603                    if is_upsert {
1604                        if let Some(obj) = doc.as_object_mut() {
1605                            for (key, value) in fields {
1606                                obj.insert(key.clone(), value.clone());
1607                            }
1608                        }
1609                    }
1610                }
1611                UpdateOperator::Min(fields) => {
1612                    if let Some(obj) = doc.as_object_mut() {
1613                        for (key, value) in fields {
1614                            if let Some(current) = obj.get(key) {
1615                                // Compare values (simplified)
1616                                if self.compare_values(value, current) == Ordering::Less {
1617                                    obj.insert(key.clone(), value.clone());
1618                                }
1619                            } else {
1620                                obj.insert(key.clone(), value.clone());
1621                            }
1622                        }
1623                    }
1624                }
1625                UpdateOperator::Max(fields) => {
1626                    if let Some(obj) = doc.as_object_mut() {
1627                        for (key, value) in fields {
1628                            if let Some(current) = obj.get(key) {
1629                                // Compare values (simplified)
1630                                if self.compare_values(value, current) == Ordering::Greater {
1631                                    obj.insert(key.clone(), value.clone());
1632                                }
1633                            } else {
1634                                obj.insert(key.clone(), value.clone());
1635                            }
1636                        }
1637                    }
1638                }
1639                UpdateOperator::CurrentDate(fields) => {
1640                    if let Some(obj) = doc.as_object_mut() {
1641                        for (key, type_spec) in fields {
1642                            let now = chrono::Utc::now();
1643
1644                            match type_spec {
1645                                Value::String(type_str) if type_str == "date" => {
1646                                    obj.insert(key.clone(), Value::String(now.to_rfc3339()));
1647                                }
1648                                Value::String(type_str) if type_str == "timestamp" => {
1649                                    obj.insert(
1650                                        key.clone(),
1651                                        Value::Number(serde_json::Number::from(now.timestamp())),
1652                                    );
1653                                }
1654                                Value::Object(spec) => {
1655                                    if let Some(Value::String(type_str)) = spec.get("$type") {
1656                                        match type_str.as_str() {
1657                                            "date" => {
1658                                                obj.insert(
1659                                                    key.clone(),
1660                                                    Value::String(now.to_rfc3339()),
1661                                                );
1662                                            }
1663                                            "timestamp" => {
1664                                                obj.insert(
1665                                                    key.clone(),
1666                                                    Value::Number(serde_json::Number::from(
1667                                                        now.timestamp(),
1668                                                    )),
1669                                                );
1670                                            }
1671                                            _ => {
1672                                                return Err(DbError::UpdateError(format!(
1673                                                    "Invalid date type specification: {}",
1674                                                    type_str
1675                                                )));
1676                                            }
1677                                        }
1678                                    }
1679                                }
1680                                _ => {
1681                                    return Err(DbError::UpdateError(
1682                                        "Invalid date type specification".into(),
1683                                    ));
1684                                }
1685                            }
1686                        }
1687                    }
1688                }
1689                UpdateOperator::Push(fields) => {
1690                    if let Some(obj) = doc.as_object_mut() {
1691                        for (key, value) in fields {
1692                            if let Some(Value::Array(arr)) = obj.get_mut(key) {
1693                                arr.push(value.clone());
1694                            } else {
1695                                // Field doesn't exist or isn't an array
1696                                obj.insert(key.clone(), Value::Array(vec![value.clone()]));
1697                            }
1698                        }
1699                    }
1700                }
1701                UpdateOperator::PushAll(fields) => {
1702                    if let Some(obj) = doc.as_object_mut() {
1703                        for (key, values) in fields {
1704                            if let Some(Value::Array(arr)) = obj.get_mut(key) {
1705                                arr.extend(values.iter().cloned());
1706                            } else {
1707                                // Field doesn't exist or isn't an array
1708                                obj.insert(key.clone(), Value::Array(values.clone()));
1709                            }
1710                        }
1711                    }
1712                }
1713                UpdateOperator::AddToSet(fields) => {
1714                    if let Some(obj) = doc.as_object_mut() {
1715                        for (key, value) in fields {
1716                            if let Some(Value::Array(arr)) = obj.get_mut(key) {
1717                                if !arr.contains(value) {
1718                                    arr.push(value.clone());
1719                                }
1720                            } else {
1721                                // Field doesn't exist or isn't an array
1722                                obj.insert(key.clone(), Value::Array(vec![value.clone()]));
1723                            }
1724                        }
1725                    }
1726                }
1727                UpdateOperator::Pop(fields) => {
1728                    if let Some(obj) = doc.as_object_mut() {
1729                        for (key, pos) in fields {
1730                            if let Some(Value::Array(arr)) = obj.get_mut(key) {
1731                                if *pos == 1 {
1732                                    arr.pop();
1733                                } else if *pos == -1 {
1734                                    arr.remove(0);
1735                                }
1736                            }
1737                        }
1738                    }
1739                }
1740                UpdateOperator::Pull(fields) => {
1741                    if let Some(obj) = doc.as_object_mut() {
1742                        for (key, condition) in fields {
1743                            if let Some(Value::Array(arr)) = obj.get_mut(key) {
1744                                // Simplified: remove all elements matching the condition
1745                                arr.retain(|elem| {
1746                                    match condition {
1747                                        Value::Object(cond) => {
1748                                            // Create a query from the condition
1749                                            let query = Query::new();
1750                                            // This is a simplified version
1751                                            !query.matches(elem)
1752                                        }
1753                                        _ => elem != condition,
1754                                    }
1755                                });
1756                            }
1757                        }
1758                    }
1759                }
1760                UpdateOperator::PullAll(fields) => {
1761                    if let Some(obj) = doc.as_object_mut() {
1762                        for (key, values) in fields {
1763                            if let Some(Value::Array(arr)) = obj.get_mut(key) {
1764                                arr.retain(|elem| !values.contains(elem));
1765                            }
1766                        }
1767                    }
1768                }
1769                UpdateOperator::Bit(fields) => {
1770                    if let Some(obj) = doc.as_object_mut() {
1771                        for (key, operation) in fields {
1772                            if let Value::Object(op) = operation {
1773                                if let Some(Value::Number(current)) = obj.get(key) {
1774                                    let mut current_num = current.as_i64().unwrap_or(0);
1775
1776                                    if let Some(Value::Number(and)) = op.get("and") {
1777                                        current_num &= and.as_i64().unwrap_or(0);
1778                                    }
1779                                    if let Some(Value::Number(or)) = op.get("or") {
1780                                        current_num |= or.as_i64().unwrap_or(0);
1781                                    }
1782                                    if let Some(Value::Number(xor)) = op.get("xor") {
1783                                        current_num ^= xor.as_i64().unwrap_or(0);
1784                                    }
1785
1786                                    obj.insert(
1787                                        key.clone(),
1788                                        Value::Number(serde_json::Number::from(current_num)),
1789                                    );
1790                                }
1791                            }
1792                        }
1793                    }
1794                }
1795            }
1796        }
1797
1798        Ok(())
1799    }
1800
1801    fn compare_values(&self, a: &Value, b: &Value) -> Ordering {
1802        match (a, b) {
1803            (Value::Number(a_num), Value::Number(b_num)) => a_num
1804                .as_f64()
1805                .unwrap_or(0.0)
1806                .partial_cmp(&b_num.as_f64().unwrap_or(0.0))
1807                .unwrap_or(Ordering::Equal),
1808            (Value::String(a_str), Value::String(b_str)) => a_str.cmp(b_str),
1809            (Value::Bool(a_bool), Value::Bool(b_bool)) => a_bool.cmp(b_bool),
1810            (Value::Array(a_arr), Value::Array(b_arr)) => a_arr.len().cmp(&b_arr.len()),
1811            (Value::Object(_), Value::Object(_)) => {
1812                Ordering::Equal // Simplified
1813            }
1814            _ => Ordering::Equal,
1815        }
1816    }
1817
1818    fn update_indexes_on_insert(&mut self, id: &DocId, doc: &Document) -> Result<()> {
1819        for (name, index_def) in self.index_definitions.iter() {
1820            // Pre-calculate all field values before mutable borrow
1821            let mut field_values = Vec::new();
1822            for (field, _) in &index_def.key {
1823                let field_parts: Vec<&str> = field.split('.').collect();
1824                let value = self.get_nested_value(doc, &field_parts);
1825                field_values.push((field, value));
1826            }
1827
1828            if let Some(index_data) = self.indexes.get_mut(name) {
1829                for (field, value) in field_values {
1830                    if let Some(value) = value {
1831                        index_data.insert(value.clone(), id.clone())?;
1832                    } else if index_data.sparse {
1833                        // Skip missing fields for sparse indexes
1834                        continue;
1835                    } else {
1836                        return Err(DbError::IndexError(format!(
1837                            "Field {} not found in document for non-sparse index",
1838                            field
1839                        )));
1840                    }
1841                }
1842            }
1843        }
1844
1845        Ok(())
1846    }
1847
1848    fn update_indexes_on_update(
1849        &mut self,
1850        id: &DocId,
1851        old_doc: &Document,
1852        new_doc: &Document,
1853    ) -> Result<()> {
1854        // Remove from indexes
1855        self.update_indexes_on_delete(id, old_doc)?;
1856
1857        // Add to indexes
1858        self.update_indexes_on_insert(id, new_doc)?;
1859
1860        Ok(())
1861    }
1862    // Fix update_indexes_on_delete method
1863    fn update_indexes_on_delete(&mut self, id: &DocId, doc: &Document) -> Result<()> {
1864        let index_names: Vec<String> = self.index_definitions.keys().cloned().collect();
1865
1866        for name in index_names {
1867            if let Some(index_def) = self.index_definitions.get(&name) {
1868                // Extract all field values first
1869                let mut field_values = Vec::new();
1870                for (field, _) in &index_def.key {
1871                    let field_parts: Vec<&str> = field.split('.').collect();
1872                    let value = self.get_nested_value(doc, &field_parts);
1873                    field_values.push(value.cloned());
1874                }
1875
1876                // Now get mutable borrow and process
1877                if let Some(index_data) = self.indexes.get_mut(&name) {
1878                    for (i, (_, _)) in index_def.key.iter().enumerate() {
1879                        if let Some(val) = &field_values[i] {
1880                            index_data.remove(val, id)?;
1881                        }
1882                    }
1883                }
1884            }
1885        }
1886
1887        Ok(())
1888    }
1889
1890    // Fix add_to_indexes method
1891    fn add_to_indexes(&mut self, index_name: &str, id: &DocId, doc: &Document) -> Result<()> {
1892        if let Some(index_def) = self.index_definitions.get(index_name).cloned() {
1893            // Extract all field values first
1894            let mut field_values = Vec::new();
1895            let field_names: Vec<String> = index_def
1896                .key
1897                .iter()
1898                .map(|(field, _)| field.clone())
1899                .collect();
1900
1901            for field in &field_names {
1902                let field_parts: Vec<&str> = field.split('.').collect();
1903                let value = self.get_nested_value(doc, &field_parts);
1904                field_values.push(value.cloned());
1905            }
1906
1907            // Now get mutable borrow and process
1908            if let Some(index_data) = self.indexes.get_mut(index_name) {
1909                for (i, field) in field_names.iter().enumerate() {
1910                    if let Some(val) = &field_values[i] {
1911                        index_data.insert(val.clone(), id.clone())?;
1912                    } else if index_data.sparse {
1913                        // Skip missing fields for sparse indexes
1914                        continue;
1915                    } else {
1916                        return Err(DbError::IndexError(format!(
1917                            "Field {} not found in document for non-sparse index",
1918                            field
1919                        )));
1920                    }
1921                }
1922            }
1923        }
1924
1925        Ok(())
1926    }
1927
1928    fn try_indexed_query(&self, query: &Query) -> Result<Vec<(DocId, Document)>> {
1929        // Try to find an index that can be used for the query
1930        for (index_name, index_def) in self.index_definitions.iter() {
1931            // Check if the first field in the index is used in the query
1932            if let Some((field, _)) = index_def.key.first() {
1933                for cond in &query.conditions {
1934                    if cond.field == *field {
1935                        // We can use this index
1936                        let mut results = Vec::new();
1937
1938                        if let Some(index_data) = self.indexes.get(index_name) {
1939                            match &cond.operator {
1940                                QueryOperator::Eq(value) => {
1941                                    let entries = index_data.find(value);
1942                                    for entry in entries {
1943                                        if let Some(doc) = self.docs.get(&entry.doc_id) {
1944                                            results.push((entry.doc_id.clone(), doc.clone()));
1945                                        }
1946                                    }
1947                                }
1948                                QueryOperator::Gt(value) => {
1949                                    // Find all entries greater than the value
1950                                    let ord_value = OrdValue(value.clone());
1951                                    for (key, entries) in
1952                                        index_data.entries.range((Excluded(&ord_value), Unbounded))
1953                                    {
1954                                        for entry in entries {
1955                                            if let Some(doc) = self.docs.get(&entry.doc_id) {
1956                                                results.push((entry.doc_id.clone(), doc.clone()));
1957                                            }
1958                                        }
1959                                    }
1960                                }
1961                                QueryOperator::Gte(value) => {
1962                                    // Find all entries greater than or equal to the value
1963                                    let ord_value = OrdValue(value.clone());
1964                                    for (key, entries) in
1965                                        index_data.entries.range((Included(&ord_value), Unbounded))
1966                                    {
1967                                        for entry in entries {
1968                                            if let Some(doc) = self.docs.get(&entry.doc_id) {
1969                                                results.push((entry.doc_id.clone(), doc.clone()));
1970                                            }
1971                                        }
1972                                    }
1973                                }
1974                                QueryOperator::Lt(value) => {
1975                                    // Find all entries less than the value
1976                                    let ord_value = OrdValue(value.clone());
1977                                    for (key, entries) in
1978                                        index_data.entries.range((Unbounded, Excluded(&ord_value)))
1979                                    {
1980                                        for entry in entries {
1981                                            if let Some(doc) = self.docs.get(&entry.doc_id) {
1982                                                results.push((entry.doc_id.clone(), doc.clone()));
1983                                            }
1984                                        }
1985                                    }
1986                                }
1987                                QueryOperator::Lte(value) => {
1988                                    // Find all entries less than or equal to the value
1989                                    let ord_value = OrdValue(value.clone());
1990                                    for (key, entries) in
1991                                        index_data.entries.range((Unbounded, Included(&ord_value)))
1992                                    {
1993                                        for entry in entries {
1994                                            if let Some(doc) = self.docs.get(&entry.doc_id) {
1995                                                results.push((entry.doc_id.clone(), doc.clone()));
1996                                            }
1997                                        }
1998                                    }
1999                                }
2000                                _ => {
2001                                    // Can't use index for this operator
2002                                    continue;
2003                                }
2004                            }
2005                        }
2006
2007                        return Ok(results);
2008                    }
2009                }
2010            }
2011        }
2012
2013        // No suitable index found
2014        Ok(Vec::new())
2015    }
2016
2017    fn sort_results(
2018        &self,
2019        results: &mut Vec<(DocId, Document)>,
2020        sort_spec: &[(String, SortOrder)],
2021    ) {
2022        results.sort_by(|a, b| {
2023            for (field, order) in sort_spec {
2024                let field_parts: Vec<&str> = field.split('.').collect();
2025                let a_val = self.get_nested_value(&a.1, &field_parts);
2026                let b_val = self.get_nested_value(&b.1, &field_parts);
2027
2028                let cmp = match (a_val, b_val) {
2029                    (Some(Value::Number(a_num)), Some(Value::Number(b_num))) => a_num
2030                        .as_f64()
2031                        .unwrap_or(0.0)
2032                        .partial_cmp(&b_num.as_f64().unwrap_or(0.0))
2033                        .unwrap_or(Ordering::Equal),
2034                    (Some(Value::String(a_str)), Some(Value::String(b_str))) => a_str.cmp(b_str),
2035                    (Some(Value::Bool(a_bool)), Some(Value::Bool(b_bool))) => a_bool.cmp(b_bool),
2036                    (Some(Value::Array(a_arr)), Some(Value::Array(b_arr))) => {
2037                        a_arr.len().cmp(&b_arr.len())
2038                    }
2039                    (Some(_), None) => Ordering::Greater,
2040                    (None, Some(_)) => Ordering::Less,
2041                    (None, None) => Ordering::Equal,
2042                    _ => Ordering::Equal,
2043                };
2044
2045                if cmp != Ordering::Equal {
2046                    return match order {
2047                        SortOrder::Ascending => cmp,
2048                        SortOrder::Descending => cmp.reverse(),
2049                    };
2050                }
2051            }
2052
2053            Ordering::Equal
2054        });
2055    }
2056
2057    fn apply_projection(
2058        &self,
2059        results: &mut Vec<(DocId, Document)>,
2060        projection: &Map<String, Value>,
2061    ) {
2062        let is_inclusive = projection.values().any(|v| match v {
2063            Value::Bool(b) => *b,
2064            Value::Number(n) => n.as_i64().unwrap_or(0) == 1,
2065            _ => false,
2066        });
2067
2068        for (_, doc) in results.iter_mut() {
2069            if let Some(obj) = doc.as_object_mut() {
2070                let mut new_obj = Map::new();
2071
2072                if is_inclusive {
2073                    // Include only specified fields
2074                    for (key, value) in projection {
2075                        if obj.contains_key(key) {
2076                            new_obj.insert(key.clone(), obj.get(key).unwrap().clone());
2077                        }
2078                    }
2079
2080                    // Always include _id
2081                    if obj.contains_key("_id") && !projection.contains_key("_id") {
2082                        new_obj.insert("_id".to_string(), obj.get("_id").unwrap().clone());
2083                    }
2084                } else {
2085                    // Exclude specified fields
2086                    for (key, value) in obj {
2087                        if !projection.contains_key(key) || key == "_id" {
2088                            new_obj.insert(key.clone(), value.clone());
2089                        }
2090                    }
2091                }
2092
2093                *doc = Value::Object(new_obj);
2094            }
2095        }
2096    }
2097
2098    fn get_nested_value<'a>(&self, doc: &'a Document, path: &[&str]) -> Option<&'a Value> {
2099        if path.is_empty() {
2100            return Some(doc);
2101        }
2102
2103        let current = path[0];
2104        let rest = &path[1..];
2105
2106        match doc.get(current) {
2107            Some(value) => {
2108                if rest.is_empty() {
2109                    Some(value)
2110                } else {
2111                    match value {
2112                        Value::Object(obj) => self.get_nested_value(value, rest),
2113                        _ => None,
2114                    }
2115                }
2116            }
2117            None => None,
2118        }
2119    }
2120
2121    fn calculate_size(&self) -> usize {
2122        // Simplified size calculation
2123        self.docs
2124            .values()
2125            .map(|doc| serde_json::to_string(doc).unwrap_or_default().len())
2126            .sum()
2127    }
2128
2129    fn calculate_index_size(&self) -> usize {
2130        // Simplified index size calculation
2131        self.indexes
2132            .values()
2133            .map(|index_data| {
2134                index_data.entries.len() * 16 // Approximate size per entry
2135            })
2136            .sum()
2137    }
2138
2139    fn apply_aggregation_stage(
2140        &self,
2141        docs: Vec<Document>,
2142        stage: AggregationStage,
2143    ) -> Result<Vec<Document>> {
2144        match stage {
2145            AggregationStage::Match(query) => {
2146                let mut result = Vec::new();
2147                for doc in docs {
2148                    if query.matches(&doc) {
2149                        result.push(doc);
2150                    }
2151                }
2152                Ok(result)
2153            }
2154            AggregationStage::Project(projection) => {
2155                let mut result = Vec::new();
2156                for doc in docs {
2157                    if let Some(obj) = doc.as_object() {
2158                        let mut new_obj = Map::new();
2159
2160                        for (key, value) in &projection {
2161                            if value.as_object().and_then(|o| o.get("$")).is_some() {
2162                                // Field projection with expression
2163                                if let Some(expr) = value.as_object().and_then(|o| o.get("$")) {
2164                                    // Simplified expression evaluation
2165                                    if let Some(field_name) = expr.as_str() {
2166                                        if let Some(field_value) =
2167                                            obj.get(field_name.trim_start_matches('$'))
2168                                        {
2169                                            new_obj.insert(key.clone(), field_value.clone());
2170                                        }
2171                                    }
2172                                }
2173                            } else if value.as_object().and_then(|o| o.get("$concat")).is_some() {
2174                                // String concatenation
2175                                if let Some(Value::Array(fields)) =
2176                                    value.as_object().and_then(|o| o.get("$concat"))
2177                                {
2178                                    let mut result_str = String::new();
2179                                    for field in fields {
2180                                        if let Some(field_name) = field.as_str() {
2181                                            if let Some(field_value) =
2182                                                obj.get(field_name.trim_start_matches('$'))
2183                                            {
2184                                                if let Some(s) = field_value.as_str() {
2185                                                    result_str.push_str(s);
2186                                                }
2187                                            }
2188                                        }
2189                                    }
2190                                    new_obj.insert(key.clone(), Value::String(result_str));
2191                                }
2192                            } else {
2193                                // Simple field inclusion/exclusion
2194                                let include = match value {
2195                                    Value::Bool(b) => b,
2196                                    Value::Number(n) => &(n.as_i64().unwrap_or(0) == 1),
2197                                    _ => &false,
2198                                };
2199
2200                                if *include && obj.contains_key(key.as_str()) {
2201                                    new_obj.insert(
2202                                        key.clone(),
2203                                        obj.get(key.as_str()).unwrap().clone(),
2204                                    );
2205                                } else if !include && !new_obj.contains_key(key.as_str()) {
2206                                    new_obj.insert(
2207                                        key.clone(),
2208                                        obj.get(key.as_str()).unwrap().clone(),
2209                                    );
2210                                }
2211                            }
2212                        }
2213
2214                        // Always include _id if not explicitly excluded
2215                        if obj.contains_key("_id") && !projection.contains_key("_id") {
2216                            new_obj.insert("_id".to_string(), obj.get("_id").unwrap().clone());
2217                        }
2218
2219                        result.push(Value::Object(new_obj));
2220                    }
2221                }
2222                Ok(result)
2223            }
2224            AggregationStage::Sort(sort_spec) => {
2225                let mut result = docs;
2226
2227                result.sort_by(|a, b| {
2228                    for (field, order) in &sort_spec {
2229                        let field_parts: Vec<&str> = field.split('.').collect();
2230                        let a_val = self.get_nested_value(a, &field_parts);
2231                        let b_val = self.get_nested_value(b, &field_parts);
2232
2233                        let cmp = match (a_val, b_val) {
2234                            (Some(Value::Number(a_num)), Some(Value::Number(b_num))) => a_num
2235                                .as_f64()
2236                                .unwrap_or(0.0)
2237                                .partial_cmp(&b_num.as_f64().unwrap_or(0.0))
2238                                .unwrap_or(Ordering::Equal),
2239                            (Some(Value::String(a_str)), Some(Value::String(b_str))) => {
2240                                a_str.cmp(b_str)
2241                            }
2242                            (Some(Value::Bool(a_bool)), Some(Value::Bool(b_bool))) => {
2243                                a_bool.cmp(b_bool)
2244                            }
2245                            (Some(Value::Array(a_arr)), Some(Value::Array(b_arr))) => {
2246                                a_arr.len().cmp(&b_arr.len())
2247                            }
2248                            (Some(_), None) => Ordering::Greater,
2249                            (None, Some(_)) => Ordering::Less,
2250                            (None, None) => Ordering::Equal,
2251                            _ => Ordering::Equal,
2252                        };
2253
2254                        if cmp != Ordering::Equal {
2255                            return match order {
2256                                SortOrder::Ascending => cmp,
2257                                SortOrder::Descending => cmp.reverse(),
2258                            };
2259                        }
2260                    }
2261
2262                    Ordering::Equal
2263                });
2264
2265                Ok(result)
2266            }
2267            AggregationStage::Skip(n) => {
2268                let mut result = docs;
2269                if n < result.len() {
2270                    result.drain(0..n);
2271                } else {
2272                    result.clear();
2273                }
2274                Ok(result)
2275            }
2276            AggregationStage::Limit(n) => {
2277                let mut result = docs;
2278                result.truncate(n);
2279                Ok(result)
2280            }
2281            AggregationStage::Group(group_spec) => {
2282                let mut groups: HashMap<String, Vec<Document>> = HashMap::new();
2283
2284                // Group documents
2285                for doc in docs {
2286                    let group_key = self.calculate_group_key(&doc, &group_spec.id)?;
2287                    groups.entry(group_key).or_insert_with(Vec::new).push(doc);
2288                }
2289
2290                // Apply group operations
2291                let mut result = Vec::new();
2292                for (key, group_docs) in groups {
2293                    let mut group_obj = Map::new();
2294
2295                    // Add _id field
2296                    group_obj.insert("_id".to_string(), Value::String(key));
2297
2298                    // Apply group operations
2299                    for (field, op) in &group_spec.operations {
2300                        match op {
2301                            GroupOperation::Sum(expr) => {
2302                                let sum = self.calculate_sum(&group_docs, expr)?;
2303                                group_obj.insert(field.clone(), sum);
2304                            }
2305                            GroupOperation::Avg(expr) => {
2306                                let avg = self.calculate_avg(&group_docs, expr)?;
2307                                group_obj.insert(field.clone(), avg);
2308                            }
2309                            GroupOperation::Min(expr) => {
2310                                let min = self.calculate_min(&group_docs, expr)?;
2311                                group_obj.insert(field.clone(), min);
2312                            }
2313                            GroupOperation::Max(expr) => {
2314                                let max = self.calculate_max(&group_docs, expr)?;
2315                                group_obj.insert(field.clone(), max);
2316                            }
2317                            GroupOperation::First(expr) => {
2318                                if let Some(doc) = group_docs.first() {
2319                                    if let Some(value) = self.evaluate_expression(doc, expr) {
2320                                        group_obj.insert(field.clone(), value);
2321                                    }
2322                                }
2323                            }
2324                            GroupOperation::Last(expr) => {
2325                                if let Some(doc) = group_docs.last() {
2326                                    if let Some(value) = self.evaluate_expression(doc, expr) {
2327                                        group_obj.insert(field.clone(), value);
2328                                    }
2329                                }
2330                            }
2331                            GroupOperation::Push(expr) => {
2332                                let mut values = Vec::new();
2333                                for doc in &group_docs {
2334                                    if let Some(value) = self.evaluate_expression(doc, expr) {
2335                                        values.push(value);
2336                                    }
2337                                }
2338                                group_obj.insert(field.clone(), Value::Array(values));
2339                            }
2340                            GroupOperation::AddToSet(expr) => {
2341                                let mut values = HashSet::new();
2342                                for doc in &group_docs {
2343                                    if let Some(value) = self.evaluate_expression(doc, expr) {
2344                                        values.insert(value);
2345                                    }
2346                                }
2347                                group_obj.insert(
2348                                    field.clone(),
2349                                    Value::Array(values.into_iter().collect()),
2350                                );
2351                            }
2352                            GroupOperation::StdDevPop(expr) => {
2353                                let std_dev = self.calculate_std_dev_pop(&group_docs, expr)?;
2354                                group_obj.insert(field.clone(), std_dev);
2355                            }
2356                            GroupOperation::StdDevSamp(expr) => {
2357                                let std_dev = self.calculate_std_dev_samp(&group_docs, expr)?;
2358                                group_obj.insert(field.clone(), std_dev);
2359                            }
2360                        }
2361                    }
2362
2363                    result.push(Value::Object(group_obj));
2364                }
2365
2366                Ok(result)
2367            }
2368            AggregationStage::Unwind(field) => {
2369                let mut result = Vec::new();
2370
2371                for doc in docs {
2372                    if let Some(obj) = doc.as_object() {
2373                        if let Some(value) = obj.get(&field) {
2374                            if let Value::Array(arr) = value {
2375                                for item in arr {
2376                                    let mut new_obj = obj.clone();
2377                                    new_obj.insert(field.clone(), item.clone());
2378                                    result.push(Value::Object(new_obj));
2379                                }
2380                            } else {
2381                                // Not an array, just include the document as is
2382                                result.push(doc.clone());
2383                            }
2384                        } else {
2385                            // Field doesn't exist, include the document as is
2386                            result.push(doc.clone());
2387                        }
2388                    }
2389                }
2390
2391                Ok(result)
2392            }
2393            AggregationStage::Lookup(lookup_spec) => {
2394                // Simplified implementation of $lookup
2395                let mut result = Vec::new();
2396
2397                for doc in docs {
2398                    if let Some(obj) = doc.as_object() {
2399                        let mut new_obj = obj.clone();
2400
2401                        // Get the local field value
2402                        let local_field_value = obj.get(&lookup_spec.local_field);
2403
2404                        if let Some(local_value) = local_field_value {
2405                            // Find matching documents in the foreign collection
2406                            let query =
2407                                Query::new().eq(&lookup_spec.foreign_field, local_value.clone());
2408
2409                            // This is a simplified implementation - in a real DB, we would access the foreign collection
2410                            new_obj.insert(lookup_spec.as_field.clone(), Value::Array(Vec::new()));
2411                        } else {
2412                            // Local field doesn't exist
2413                            new_obj.insert(lookup_spec.as_field.clone(), Value::Array(Vec::new()));
2414                        }
2415
2416                        result.push(Value::Object(new_obj));
2417                    }
2418                }
2419
2420                Ok(result)
2421            }
2422            AggregationStage::Out(collection_name) => {
2423                // Simplified implementation of $out
2424                // In a real DB, this would write to another collection
2425                Ok(docs)
2426            }
2427        }
2428    }
2429
2430    fn calculate_group_key(&self, doc: &Document, group_id: &GroupId) -> Result<String> {
2431        match group_id {
2432            GroupId::Field(field) => {
2433                let field_parts: Vec<&str> = field.split('.').collect();
2434                if let Some(value) = self.get_nested_value(doc, &field_parts) {
2435                    Ok(value.to_string())
2436                } else {
2437                    Ok("null".to_string())
2438                }
2439            }
2440            GroupId::Expression(expr) => {
2441                if let Some(value) = self.evaluate_expression(doc, expr) {
2442                    Ok(value.to_string())
2443                } else {
2444                    Ok("null".to_string())
2445                }
2446            }
2447            GroupId::Null => Ok("null".to_string()),
2448        }
2449    }
2450
2451    fn evaluate_expression(&self, doc: &Document, expr: &Value) -> Option<Value> {
2452        match expr {
2453            Value::String(s) if s.starts_with('$') => {
2454                let field = &s[1..];
2455                let field_parts: Vec<&str> = field.split('.').collect();
2456                self.get_nested_value(doc, &field_parts).cloned()
2457            }
2458            Value::Object(obj) => {
2459                // Handle expression operators
2460                if let Some(sum_expr) = obj.get("$sum") {
2461                    if let Some(field) = sum_expr.as_str() {
2462                        if field.starts_with('$') {
2463                            let field_name = &field[1..];
2464                            let field_parts: Vec<&str> = field_name.split('.').collect();
2465                            return self.get_nested_value(doc, &field_parts).cloned();
2466                        }
2467                    }
2468                }
2469                None
2470            }
2471            _ => Some(expr.clone()),
2472        }
2473    }
2474
2475    fn calculate_sum(&self, docs: &[Document], expr: &Value) -> Result<Value> {
2476        let mut sum = 0.0;
2477
2478        for doc in docs {
2479            if let Some(value) = self.evaluate_expression(doc, expr) {
2480                if let Some(num) = value.as_f64() {
2481                    sum += num;
2482                }
2483            }
2484        }
2485
2486        Ok(Value::Number(serde_json::Number::from_f64(sum).unwrap()))
2487    }
2488
2489    fn calculate_avg(&self, docs: &[Document], expr: &Value) -> Result<Value> {
2490        if docs.is_empty() {
2491            return Ok(Value::Number(serde_json::Number::from(0)));
2492        }
2493
2494        let sum = self.calculate_sum(docs, expr)?;
2495        if let Some(sum_num) = sum.as_f64() {
2496            let avg = sum_num / docs.len() as f64;
2497            Ok(Value::Number(serde_json::Number::from_f64(avg).unwrap()))
2498        } else {
2499            Ok(Value::Number(serde_json::Number::from(0)))
2500        }
2501    }
2502
2503    fn calculate_min(&self, docs: &[Document], expr: &Value) -> Result<Value> {
2504        let mut min_value: Option<Value> = None;
2505
2506        for doc in docs {
2507            if let Some(value) = self.evaluate_expression(doc, expr) {
2508                if let Some(ref mut min) = min_value {
2509                    if self.compare_values(&value, min) == Ordering::Less {
2510                        *min = value;
2511                    }
2512                } else {
2513                    min_value = Some(value);
2514                }
2515            }
2516        }
2517
2518        Ok(min_value.unwrap_or(Value::Null))
2519    }
2520
2521    fn calculate_max(&self, docs: &[Document], expr: &Value) -> Result<Value> {
2522        let mut max_value: Option<Value> = None;
2523
2524        for doc in docs {
2525            if let Some(value) = self.evaluate_expression(doc, expr) {
2526                if let Some(ref mut max) = max_value {
2527                    if self.compare_values(&value, max) == Ordering::Greater {
2528                        *max = value;
2529                    }
2530                } else {
2531                    max_value = Some(value);
2532                }
2533            }
2534        }
2535
2536        Ok(max_value.unwrap_or(Value::Null))
2537    }
2538
2539    fn calculate_std_dev_pop(&self, docs: &[Document], expr: &Value) -> Result<Value> {
2540        if docs.is_empty() {
2541            return Ok(Value::Number(serde_json::Number::from(0)));
2542        }
2543
2544        let avg = self.calculate_avg(docs, expr)?;
2545        let avg_num = avg.as_f64().unwrap_or(0.0);
2546
2547        let mut sum_sq_diff = 0.0;
2548        for doc in docs {
2549            if let Some(value) = self.evaluate_expression(doc, expr) {
2550                if let Some(num) = value.as_f64() {
2551                    let diff = num - avg_num;
2552                    sum_sq_diff += diff * diff;
2553                }
2554            }
2555        }
2556
2557        let variance = sum_sq_diff / docs.len() as f64;
2558        let std_dev = variance.sqrt();
2559
2560        Ok(Value::Number(
2561            serde_json::Number::from_f64(std_dev).unwrap(),
2562        ))
2563    }
2564
2565    fn calculate_std_dev_samp(&self, docs: &[Document], expr: &Value) -> Result<Value> {
2566        if docs.len() <= 1 {
2567            return Ok(Value::Number(serde_json::Number::from(0)));
2568        }
2569
2570        let avg = self.calculate_avg(docs, expr)?;
2571        let avg_num = avg.as_f64().unwrap_or(0.0);
2572
2573        let mut sum_sq_diff = 0.0;
2574        for doc in docs {
2575            if let Some(value) = self.evaluate_expression(doc, expr) {
2576                if let Some(num) = value.as_f64() {
2577                    let diff = num - avg_num;
2578                    sum_sq_diff += diff * diff;
2579                }
2580            }
2581        }
2582
2583        let variance = sum_sq_diff / (docs.len() - 1) as f64;
2584        let std_dev = variance.sqrt();
2585
2586        Ok(Value::Number(
2587            serde_json::Number::from_f64(std_dev).unwrap(),
2588        ))
2589    }
2590}
2591
2592// --- Aggregation Stages ---
2593#[derive(Debug, Clone, Serialize, Deserialize)]
2594pub enum AggregationStage {
2595    Match(Query),
2596    Project(Map<String, Value>),
2597    Sort(Vec<(String, SortOrder)>),
2598    Skip(usize),
2599    Limit(usize),
2600    Group(GroupSpecification),
2601    Unwind(String),
2602    Lookup(LookupSpecification),
2603    Out(String),
2604}
2605
2606#[derive(Debug, Clone, Serialize, Deserialize)]
2607pub struct GroupSpecification {
2608    pub id: GroupId,
2609    pub operations: HashMap<String, GroupOperation>,
2610}
2611
2612#[derive(Debug, Clone, Serialize, Deserialize)]
2613pub enum GroupId {
2614    Field(String),
2615    Expression(Value),
2616    Null,
2617}
2618
2619#[derive(Debug, Clone, Serialize, Deserialize)]
2620pub enum GroupOperation {
2621    Sum(Value),
2622    Avg(Value),
2623    Min(Value),
2624    Max(Value),
2625    First(Value),
2626    Last(Value),
2627    Push(Value),
2628    AddToSet(Value),
2629    StdDevPop(Value),
2630    StdDevSamp(Value),
2631}
2632
2633#[derive(Debug, Clone, Serialize, Deserialize)]
2634pub struct LookupSpecification {
2635    pub from: String,
2636    pub local_field: String,
2637    pub foreign_field: String,
2638    pub as_field: String, // Renamed from 'as' to avoid using a reserved keyword
2639}
2640
2641// --- Find and Modify Options ---
2642#[derive(Debug, Clone, Serialize, Deserialize)]
2643pub struct FindOneAndUpdateOptions {
2644    pub upsert: bool,
2645    pub return_document: ReturnDocument,
2646    pub projection: Option<Map<String, Value>>,
2647    pub sort: Option<Vec<(String, SortOrder)>>,
2648    pub max_time_ms: Option<u64>,
2649}
2650
2651impl Default for FindOneAndUpdateOptions {
2652    fn default() -> Self {
2653        Self {
2654            upsert: false,
2655            return_document: ReturnDocument::Before,
2656            projection: None,
2657            sort: None,
2658            max_time_ms: None,
2659        }
2660    }
2661}
2662
2663#[derive(Debug, Clone, Serialize, Deserialize)]
2664pub struct FindOneAndReplaceOptions {
2665    pub upsert: bool,
2666    pub return_document: ReturnDocument,
2667    pub projection: Option<Map<String, Value>>,
2668    pub sort: Option<Vec<(String, SortOrder)>>,
2669    pub max_time_ms: Option<u64>,
2670}
2671
2672impl Default for FindOneAndReplaceOptions {
2673    fn default() -> Self {
2674        Self {
2675            upsert: false,
2676            return_document: ReturnDocument::Before,
2677            projection: None,
2678            sort: None,
2679            max_time_ms: None,
2680        }
2681    }
2682}
2683
2684#[derive(Debug, Clone, Serialize, Deserialize)]
2685pub struct FindOneAndDeleteOptions {
2686    pub projection: Option<Map<String, Value>>,
2687    pub sort: Option<Vec<(String, SortOrder)>>,
2688    pub max_time_ms: Option<u64>,
2689}
2690
2691impl Default for FindOneAndDeleteOptions {
2692    fn default() -> Self {
2693        Self {
2694            projection: None,
2695            sort: None,
2696            max_time_ms: None,
2697        }
2698    }
2699}
2700
2701#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
2702pub enum ReturnDocument {
2703    Before,
2704    After,
2705}
2706
2707// --- Collection Statistics ---
2708#[derive(Debug, Clone, Serialize, Deserialize)]
2709pub struct CollectionStats {
2710    pub count: usize,
2711    pub size: usize,
2712    pub avg_obj_size: usize,
2713    pub index_count: usize,
2714    pub index_size: usize,
2715}
2716
2717// --- Database ---
2718#[derive(Debug, Clone, Serialize, Deserialize)]
2719pub struct Database {
2720    name: String,
2721    collections: HashMap<String, Collection>,
2722}
2723
2724impl Database {
2725    pub fn new(name: String) -> Self {
2726        Self {
2727            name,
2728            collections: HashMap::new(),
2729        }
2730    }
2731
2732    pub fn collection(&mut self, name: &str) -> &mut Collection {
2733        self.collections
2734            .entry(name.to_string())
2735            .or_insert_with(|| Collection::new(name.to_string()))
2736    }
2737
2738    // List all collections in the database
2739    pub fn list_collection_names(&self) -> Vec<String> {
2740        self.collections.keys().cloned().collect()
2741    }
2742
2743    // Create a collection with options
2744    pub fn create_collection(
2745        &mut self,
2746        name: &str,
2747        options: Option<CreateCollectionOptions>,
2748    ) -> Result<()> {
2749        if self.collections.contains_key(name) {
2750            return Err(DbError::Other(format!(
2751                "Collection {} already exists",
2752                name
2753            )));
2754        }
2755
2756        let mut collection = Collection::new(name.to_string());
2757
2758        // Apply options if provided
2759        if let Some(opts) = options {
2760            // Create indexes if specified
2761            for index in opts.indexes {
2762                collection.create_index(index)?;
2763            }
2764        }
2765
2766        self.collections.insert(name.to_string(), collection);
2767        Ok(())
2768    }
2769
2770    // Drop a collection
2771    pub fn drop_collection(&mut self, name: &str) -> Result<()> {
2772        if self.collections.remove(name).is_none() {
2773            return Err(DbError::Other(format!(
2774                "Collection {} does not exist",
2775                name
2776            )));
2777        }
2778
2779        Ok(())
2780    }
2781
2782    // Get database statistics
2783    pub fn stats(&self) -> Result<DatabaseStats> {
2784        let mut collections = Vec::new();
2785        let mut total_size = 0;
2786        let mut total_index_size = 0;
2787
2788        for (name, collection) in self.collections.iter() {
2789            let coll_stats = collection.stats()?;
2790            collections.push(CollectionStatsInfo {
2791                name: name.clone(),
2792                count: coll_stats.count,
2793                size: coll_stats.size,
2794                index_count: coll_stats.index_count,
2795                index_size: coll_stats.index_size,
2796            });
2797
2798            total_size += coll_stats.size;
2799            total_index_size += coll_stats.index_size;
2800        }
2801
2802        Ok(DatabaseStats {
2803            collections,
2804            total_size,
2805            total_index_size,
2806        })
2807    }
2808
2809    // Run a command
2810    pub fn run_command(&mut self, command: &Document) -> Result<Document> {
2811        if let Some(obj) = command.as_object() {
2812            if let Some(cmd_name) = obj.keys().next() {
2813                match cmd_name.as_str() {
2814                    "create" => {
2815                        if let Some(Value::String(coll_name)) = obj.get(cmd_name) {
2816                            self.create_collection(coll_name, None)?;
2817                            Ok(serde_json::json!({ "ok": 1 }))
2818                        } else {
2819                            Err(DbError::Other("Invalid create command".into()))
2820                        }
2821                    }
2822                    "drop" => {
2823                        if let Some(Value::String(coll_name)) = obj.get(cmd_name) {
2824                            self.drop_collection(coll_name)?;
2825                            Ok(serde_json::json!({ "ok": 1 }))
2826                        } else {
2827                            Err(DbError::Other("Invalid drop command".into()))
2828                        }
2829                    }
2830                    "listCollections" => {
2831                        let coll_names = self.list_collection_names();
2832                        let collections: Vec<_> = coll_names
2833                            .into_iter()
2834                            .map(|name| {
2835                                serde_json::json!({
2836                                    "name": name,
2837                                    "type": "collection"
2838                                })
2839                            })
2840                            .collect();
2841
2842                        Ok(serde_json::json!({
2843                            "cursor": {
2844                                "id": 0,
2845                                "ns": format!("{}.collections", self.name),
2846                                "firstBatch": collections
2847                            },
2848                            "ok": 1
2849                        }))
2850                    }
2851                    "dbStats" => {
2852                        let stats = self.stats()?;
2853                        Ok(serde_json::json!({
2854                            "db": self.name,
2855                            "collections": stats.collections.len(),
2856                            "objects": stats.collections.iter().map(|c| c.count).sum::<usize>(),
2857                            "avgObjSize": if stats.collections.iter().map(|c| c.count).sum::<usize>() > 0 {
2858                                stats.total_size / stats.collections.iter().map(|c| c.count).sum::<usize>()
2859                            } else {
2860                                0
2861                            },
2862                            "dataSize": stats.total_size,
2863                            "indexSize": stats.total_index_size,
2864                            "ok": 1
2865                        }))
2866                    }
2867                    _ => Err(DbError::Other(format!("Unknown command: {}", cmd_name))),
2868                }
2869            } else {
2870                Err(DbError::Other("Empty command".into()))
2871            }
2872        } else {
2873            Err(DbError::Other("Command must be an object".into()))
2874        }
2875    }
2876}
2877
2878// --- Database Statistics ---
2879#[derive(Debug, Clone, Serialize, Deserialize)]
2880pub struct DatabaseStats {
2881    pub collections: Vec<CollectionStatsInfo>,
2882    pub total_size: usize,
2883    pub total_index_size: usize,
2884}
2885
2886#[derive(Debug, Clone, Serialize, Deserialize)]
2887pub struct CollectionStatsInfo {
2888    pub name: String,
2889    pub count: usize,
2890    pub size: usize,
2891    pub index_count: usize,
2892    pub index_size: usize,
2893}
2894
2895// --- Create Collection Options ---
2896#[derive(Debug, Clone, Serialize, Deserialize)]
2897pub struct CreateCollectionOptions {
2898    pub capped: bool,
2899    pub size: Option<usize>,
2900    pub max: Option<usize>,
2901    pub storage_engine: Option<Map<String, Value>>,
2902    pub validator: Option<Document>,
2903    pub validation_level: Option<String>,
2904    pub validation_action: Option<String>,
2905    pub index_option_defaults: Option<Map<String, Value>>,
2906    pub view_on: Option<String>,
2907    pub pipeline: Option<Vec<Document>>,
2908    pub collation: Option<Map<String, Value>>,
2909    pub write_concern: Option<Map<String, Value>>,
2910    pub indexes: Vec<Index>,
2911}
2912
2913impl Default for CreateCollectionOptions {
2914    fn default() -> Self {
2915        Self {
2916            capped: false,
2917            size: None,
2918            max: None,
2919            storage_engine: None,
2920            validator: None,
2921            validation_level: None,
2922            validation_action: None,
2923            index_option_defaults: None,
2924            view_on: None,
2925            pipeline: None,
2926            collation: None,
2927            write_concern: None,
2928            indexes: Vec::new(),
2929        }
2930    }
2931}
2932
2933// --- Client ---
2934#[derive(Debug, Clone)]
2935pub struct Client {
2936    databases: HashMap<String, Database>,
2937    uri: String,
2938    storage_path: Option<PathBuf>,
2939}
2940
2941impl Client {
2942    pub fn new() -> Self {
2943        Self {
2944            databases: HashMap::new(),
2945            uri: "mongodb://localhost:27017".to_string(),
2946            storage_path: None,
2947        }
2948    }
2949
2950    pub fn with_uri(uri: &str) -> Self {
2951        Self {
2952            databases: HashMap::new(),
2953            uri: uri.to_string(),
2954            storage_path: None,
2955        }
2956    }
2957
2958    pub fn with_storage_path<P: AsRef<Path>>(uri: &str, path: P) -> Self {
2959        Self {
2960            databases: HashMap::new(),
2961            uri: uri.to_string(),
2962            storage_path: Some(path.as_ref().to_path_buf()),
2963        }
2964    }
2965
2966    pub fn db(&mut self, name: &str) -> &mut Database {
2967        self.databases
2968            .entry(name.to_string())
2969            .or_insert_with(|| Database::new(name.to_string()))
2970    }
2971
2972    // List all databases
2973    pub fn list_database_names(&self) -> Vec<String> {
2974        self.databases.keys().cloned().collect()
2975    }
2976
2977    // Drop a database
2978    pub fn drop_database(&mut self, name: &str) -> Result<()> {
2979        if self.databases.remove(name).is_none() {
2980            return Err(DbError::Other(format!("Database {} does not exist", name)));
2981        }
2982
2983        Ok(())
2984    }
2985
2986    // Get the URI
2987    pub fn uri(&self) -> &str {
2988        &self.uri
2989    }
2990
2991    // Save all data to disk
2992    pub fn save(&self) -> Result<()> {
2993        if let Some(ref path) = self.storage_path {
2994            // Create directory if it doesn't exist
2995            if let Some(parent) = path.parent() {
2996                fs::create_dir_all(parent)?;
2997            }
2998
2999            let mut persistent_storage = PersistentStorage::new();
3000
3001            for (name, db) in &self.databases {
3002                let mut persistent_db = PersistentDatabase {
3003                    name: name.clone(),
3004                    collections: HashMap::new(),
3005                };
3006
3007                for (coll_name, coll) in &db.collections {
3008                    let persistent_coll = PersistentCollection::from(coll.clone());
3009                    persistent_db
3010                        .collections
3011                        .insert(coll_name.clone(), persistent_coll);
3012                }
3013
3014                persistent_storage
3015                    .databases
3016                    .insert(name.clone(), persistent_db);
3017            }
3018
3019            persistent_storage.save_to_file(path)?;
3020            Ok(())
3021        } else {
3022            Err(DbError::Other("No storage path configured".into()))
3023        }
3024    }
3025
3026    // Load all data from disk
3027    pub fn load(&mut self) -> Result<()> {
3028        if let Some(ref path) = self.storage_path {
3029            if path.exists() {
3030                let persistent_storage = PersistentStorage::load_from_file(path)?;
3031
3032                for (name, persistent_db) in persistent_storage.databases {
3033                    let mut db = Database::new(name.clone());
3034
3035                    for (coll_name, persistent_coll) in persistent_db.collections {
3036                        let coll = Collection::from(persistent_coll);
3037                        db.collections.insert(coll_name, coll);
3038                    }
3039
3040                    self.databases.insert(name, db);
3041                }
3042            }
3043            Ok(())
3044        } else {
3045            Err(DbError::Other("No storage path configured".into()))
3046        }
3047    }
3048}
3049
3050// --- Bulk Write Operations ---
3051#[derive(Debug, Clone, Serialize, Deserialize)]
3052pub enum BulkWriteOperation {
3053    InsertOne {
3054        document: Document,
3055    },
3056    UpdateOne {
3057        filter: Query,
3058        update: UpdateDocument,
3059        upsert: bool,
3060    },
3061    UpdateMany {
3062        filter: Query,
3063        update: UpdateDocument,
3064        upsert: bool,
3065    },
3066    ReplaceOne {
3067        filter: Query,
3068        replacement: Document,
3069        upsert: bool,
3070    },
3071    DeleteOne {
3072        filter: Query,
3073    },
3074    DeleteMany {
3075        filter: Query,
3076    },
3077}
3078
3079#[derive(Debug, Clone, Serialize, Deserialize)]
3080pub struct BulkWriteOptions {
3081    pub ordered: bool,
3082    pub bypass_document_validation: bool,
3083    pub write_concern: Option<Map<String, Value>>,
3084}
3085
3086impl Default for BulkWriteOptions {
3087    fn default() -> Self {
3088        Self {
3089            ordered: true,
3090            bypass_document_validation: false,
3091            write_concern: None,
3092        }
3093    }
3094}
3095
3096#[derive(Debug, Clone, Serialize, Deserialize)]
3097pub struct BulkWriteResult {
3098    pub inserted_count: usize,
3099    pub matched_count: usize,
3100    pub modified_count: usize,
3101    pub deleted_count: usize,
3102    pub upserted_count: usize,
3103    pub upserted_ids: HashMap<usize, DocId>,
3104}
3105
3106impl Collection {
3107    // Execute bulk write operations
3108    pub fn bulk_write(
3109        &mut self,
3110        operations: Vec<BulkWriteOperation>,
3111        options: Option<BulkWriteOptions>,
3112    ) -> Result<BulkWriteResult> {
3113        let opts = options.unwrap_or_default();
3114        let mut result = BulkWriteResult {
3115            inserted_count: 0,
3116            matched_count: 0,
3117            modified_count: 0,
3118            deleted_count: 0,
3119            upserted_count: 0,
3120            upserted_ids: HashMap::new(),
3121        };
3122
3123        let mut index = 0;
3124
3125        for op in operations {
3126            match op {
3127                BulkWriteOperation::InsertOne { document } => {
3128                    let id = self.insert(document)?;
3129                    result.inserted_count += 1;
3130                    result.upserted_ids.insert(index, id);
3131                }
3132                BulkWriteOperation::UpdateOne {
3133                    filter,
3134                    update,
3135                    upsert,
3136                } => {
3137                    let count = self.update_one(filter, update, upsert)?;
3138                    if count > 0 {
3139                        result.matched_count += 1;
3140                        result.modified_count += 1;
3141                    }
3142                    if upsert && count > 0 {
3143                        result.upserted_count += 1;
3144                    }
3145                }
3146                BulkWriteOperation::UpdateMany {
3147                    filter,
3148                    update,
3149                    upsert,
3150                } => {
3151                    let count = self.update_many(filter, update)?;
3152                    result.matched_count += count;
3153                    result.modified_count += count;
3154                    if upsert && count > 0 {
3155                        result.upserted_count += count;
3156                    }
3157                }
3158                BulkWriteOperation::ReplaceOne {
3159                    filter,
3160                    replacement,
3161                    upsert,
3162                } => {
3163                    let count = self.replace_one(filter, replacement, upsert)?;
3164                    if count > 0 {
3165                        result.matched_count += 1;
3166                        result.modified_count += 1;
3167                    }
3168                    if upsert && count > 0 {
3169                        result.upserted_count += 1;
3170                    }
3171                }
3172                BulkWriteOperation::DeleteOne { filter } => {
3173                    let count = self.delete_one(filter)?;
3174                    result.deleted_count += count;
3175                }
3176                BulkWriteOperation::DeleteMany { filter } => {
3177                    let count = self.delete_many(filter)?;
3178                    result.deleted_count += count;
3179                }
3180            }
3181
3182            index += 1;
3183
3184            // If ordered and an error occurs, we would stop here
3185            // For simplicity, we're not handling errors in this example
3186        }
3187
3188        Ok(result)
3189    }
3190}
3191
3192// --- Remote Client ---
3193#[derive(Debug, Clone)]
3194pub struct RemoteClient {
3195    addr: SocketAddr,
3196}
3197
3198impl RemoteClient {
3199    pub fn new(addr: SocketAddr) -> Self {
3200        Self { addr }
3201    }
3202
3203    pub fn connect(&self) -> Result<RemoteConnection> {
3204        let stream = TcpStream::connect(self.addr)?;
3205        Ok(RemoteConnection { stream })
3206    }
3207}
3208
3209#[derive(Debug)]
3210pub struct RemoteConnection {
3211    stream: TcpStream,
3212}
3213
3214impl RemoteConnection {
3215    pub fn send_command(&mut self, command: &str) -> Result<String> {
3216        let mut writer = BufWriter::new(&self.stream);
3217        writer.write_all(command.as_bytes())?;
3218        writer.write_all(b"\n")?;
3219        writer.flush()?;
3220
3221        let mut reader = BufReader::new(&self.stream);
3222        let mut response = String::new();
3223        reader.read_line(&mut response)?;
3224
3225        Ok(response)
3226    }
3227
3228    pub fn close(self) -> Result<()> {
3229        // The connection will be closed when RemoteConnection is dropped
3230        Ok(())
3231    }
3232}
3233
3234// --- Server ---
3235#[derive(Debug)]
3236pub struct Server {
3237    addr: SocketAddr,
3238    client: Client,
3239    auth_username: Option<String>,
3240    auth_password: Option<String>,
3241}
3242impl Server {
3243    pub fn new(addr: SocketAddr, storage_path: Option<PathBuf>) -> Self {
3244        let client = if let Some(path) = storage_path {
3245            Client::with_storage_path("mongodb://localhost", path)
3246        } else {
3247            Client::new()
3248        };
3249        Self { 
3250            addr, 
3251            client,
3252            auth_username: None,
3253            auth_password: None,
3254        }
3255    }
3256    
3257    pub fn with_auth(mut self, username: String, password: String) -> Self {
3258        self.auth_username = Some(username);
3259        self.auth_password = Some(password);
3260        self
3261    }
3262    
3263    pub fn start(&mut self) -> Result<()> {
3264        // Load data from disk if storage path is configured
3265        if self.client.storage_path.is_some() {
3266            self.client.load()?;
3267        }
3268        let listener = TcpListener::bind(self.addr)?;
3269        println!("Server listening on {}", self.addr);
3270        
3271        for stream in listener.incoming() {
3272            match stream {
3273                Ok(stream) => {
3274                    let client = self.client.clone();
3275                    let auth_username = self.auth_username.clone();
3276                    let auth_password = self.auth_password.clone();
3277                    thread::spawn(move || {
3278                        if let Err(e) = Self::handle_client(stream, client, auth_username, auth_password) {
3279                            eprintln!("Error handling client: {}", e);
3280                        }
3281                    });
3282                }
3283                Err(e) => {
3284                    eprintln!("Failed to accept connection: {}", e);
3285                }
3286            }
3287        }
3288        Ok(())
3289    }
3290    
3291    fn handle_client(
3292        mut stream: TcpStream, 
3293        mut client: Client, 
3294        auth_username: Option<String>, 
3295        auth_password: Option<String>
3296    ) -> Result<()> {
3297        let mut reader = BufReader::new(&stream);
3298        let mut writer = BufWriter::new(&stream);
3299        let mut authenticated = auth_username.is_none(); // If no auth is configured, client is authenticated by default
3300        
3301        loop {
3302            let mut command = String::new();
3303            match reader.read_line(&mut command) {
3304                Ok(0) => break, // Connection closed
3305                Ok(_) => {
3306                    // Trim whitespace and newline
3307                    let command = command.trim();
3308                    if command.is_empty() {
3309                        continue;
3310                    }
3311                    
3312                    // Handle authentication if required
3313                    if !authenticated {
3314                        if command.starts_with("AUTH") {
3315                            let parts: Vec<&str> = command.split_whitespace().collect();
3316                            if parts.len() != 3 {
3317                                let response = "ERROR: Usage: AUTH <username> <password>".to_string();
3318                                writer.write_all(response.as_bytes())?;
3319                                writer.write_all(b"\n")?;
3320                                writer.flush()?;
3321                                continue;
3322                            }
3323                            
3324                            let username = parts[1];
3325                            let password = parts[2];
3326                            
3327                            if let (Some(auth_user), Some(auth_pass)) = (&auth_username, &auth_password) {
3328                                if username == auth_user && password == auth_pass {
3329                                    authenticated = true;
3330                                    let response = "OK: Authenticated".to_string();
3331                                    writer.write_all(response.as_bytes())?;
3332                                    writer.write_all(b"\n")?;
3333                                    writer.flush()?;
3334                                    continue;
3335                                }
3336                            }
3337                            
3338                            let response = "ERROR: Authentication failed".to_string();
3339                            writer.write_all(response.as_bytes())?;
3340                            writer.write_all(b"\n")?;
3341                            writer.flush()?;
3342                            continue;
3343                        } else {
3344                            let response = "ERROR: Not authenticated. Use AUTH <username> <password>".to_string();
3345                            writer.write_all(response.as_bytes())?;
3346                            writer.write_all(b"\n")?;
3347                            writer.flush()?;
3348                            continue;
3349                        }
3350                    }
3351                    
3352                    // Parse and execute the command
3353                    let response = match Self::process_command(command, &mut client) {
3354                        Ok(response) => response,
3355                        Err(e) => format!("ERROR: {}", e),
3356                    };
3357                    
3358                    // Send response
3359                    writer.write_all(response.as_bytes())?;
3360                    writer.write_all(b"\n")?;
3361                    writer.flush()?;
3362                    
3363                    // If the command was EXIT, break the loop
3364                    if command == "EXIT" {
3365                        break;
3366                    }
3367                }
3368                Err(e) => {
3369                    eprintln!("Error reading from client: {}", e);
3370                    break;
3371                }
3372            }
3373        }
3374        Ok(())
3375    }
3376    
3377    fn process_command(command: &str, client: &mut Client) -> Result<String> {
3378        let parts: Vec<&str> = command.split_whitespace().collect();
3379        if parts.is_empty() {
3380            return Ok("ERROR: Empty command".to_string());
3381        }
3382        match parts[0] {
3383            "INSERT" => {
3384                if parts.len() < 3 {
3385                    return Ok(
3386                        "ERROR: Usage: INSERT <database> <collection> <document>".to_string()
3387                    );
3388                }
3389                let db_name = parts[1];
3390                let coll_name = parts[2];
3391                let doc_json = parts[3..].join(" ");
3392                let doc: Document = serde_json::from_str(&doc_json)?;
3393                let db = client.db(db_name);
3394                let coll = db.collection(coll_name);
3395                let id = coll.insert(doc)?;
3396                Ok(format!("OK: Document inserted with ID: {}", id))
3397            }
3398            "FIND" => {
3399                if parts.len() < 3 {
3400                    return Ok("ERROR: Usage: FIND <database> <collection> [query]".to_string());
3401                }
3402                let db_name = parts[1];
3403                let coll_name = parts[2];
3404                let query = if parts.len() > 3 {
3405                    let query_json = parts[3..].join(" ");
3406                    let query_value: Value = serde_json::from_str(&query_json)?;
3407                    Query::from_value(query_value)?
3408                } else {
3409                    Query::new()
3410                };
3411                let db = client.db(db_name);
3412                let coll = db.collection(coll_name);
3413                let results = coll.find(query, None)?;
3414                let results_json = serde_json::to_string(&results)?;
3415                Ok(format!("OK: {}", results_json))
3416            }
3417            "UPDATE" => {
3418                if parts.len() < 4 {
3419                    return Ok(
3420                        "ERROR: Usage: UPDATE <database> <collection> <query> <update>".to_string(),
3421                    );
3422                }
3423                let db_name = parts[1];
3424                let coll_name = parts[2];
3425                // Find the end of the query
3426                let mut query_end = 3;
3427                let mut brace_count = 0;
3428                for (i, c) in command.char_indices() {
3429                    if i < parts[0].len() + parts[1].len() + parts[2].len() + 2 {
3430                        continue;
3431                    }
3432                    if c == '{' {
3433                        brace_count += 1;
3434                    } else if c == '}' {
3435                        brace_count -= 1;
3436                        if brace_count == 0 {
3437                            query_end = i + 1;
3438                            break;
3439                        }
3440                    }
3441                }
3442                let query_json =
3443                    command[parts[0].len() + parts[1].len() + parts[2].len() + 3..query_end].trim();
3444                let update_json = command[query_end..].trim();
3445                let query: Query = serde_json::from_str(query_json)?;
3446                let update: UpdateDocument = serde_json::from_str(update_json)?;
3447                let db = client.db(db_name);
3448                let coll = db.collection(coll_name);
3449                let count = coll.update_one(query, update, false)?;
3450                Ok(format!("OK: Updated {} documents", count))
3451            }
3452            "DELETE" => {
3453                if parts.len() < 3 {
3454                    return Ok("ERROR: Usage: DELETE <database> <collection> [query]".to_string());
3455                }
3456                let db_name = parts[1];
3457                let coll_name = parts[2];
3458                let query = if parts.len() > 3 {
3459                    let query_json = parts[3..].join(" ");
3460                    let query_value: Value = serde_json::from_str(&query_json)?;
3461                    Query::from_value(query_value)?
3462                } else {
3463                    Query::new()
3464                };
3465                let db = client.db(db_name);
3466                let coll = db.collection(coll_name);
3467                let count = coll.delete_one(query)?;
3468                Ok(format!("OK: Deleted {} documents", count))
3469            }
3470            "SAVE" => {
3471                client.save()?;
3472                Ok("OK: Data saved to disk".to_string())
3473            }
3474            "LOAD" => {
3475                client.load()?;
3476                Ok("OK: Data loaded from disk".to_string())
3477            }
3478            "LIST_DB" => {
3479                let dbs = client.list_database_names();
3480                Ok(format!("OK: {}", serde_json::to_string(&dbs)?))
3481            }
3482            "LIST_COLL" => {
3483                if parts.len() < 2 {
3484                    return Ok("ERROR: Usage: LIST_COLL <database>".to_string());
3485                }
3486                let db_name = parts[1];
3487                let db = client.db(db_name);
3488                let colls = db.list_collection_names();
3489                Ok(format!("OK: {}", serde_json::to_string(&colls)?))
3490            }
3491            "STATS" => {
3492                if parts.len() < 2 {
3493                    return Ok("ERROR: Usage: STATS <database>".to_string());
3494                }
3495                let db_name = parts[1];
3496                let db = client.db(db_name);
3497                let stats = db.stats()?;
3498                Ok(format!("OK: {}", serde_json::to_string(&stats)?))
3499            }
3500            "EXIT" => {
3501                // Save data before exiting
3502                if client.storage_path.is_some() {
3503                    if let Err(e) = client.save() {
3504                        return Ok(format!("ERROR: Failed to save data: {}", e));
3505                    }
3506                }
3507                Ok("OK: Connection closing".to_string())
3508            }
3509            _ => Ok("ERROR: Unknown command".to_string()),
3510        }
3511    }
3512}