luckdb/
lib.rs

1#![allow(warnings)]
2#![doc = include_str!("../README.md")]
3
4/// A lightweight JSON document database written in Rust.
5///
6/// LuckDB provides a document-oriented interface for storing, querying, and managing JSON documents
7/// with features like indexing, aggregation, persistence, and AES-256 encryption.
8///
9/// ## Overview
10///
11/// The main components of LuckDB are:
12///
13/// - `Client`: The primary entry point for interacting with databases
14/// - `Database`: Manages collections of documents
15/// - `Collection`: Stores and manipulates documents
16/// - `Document`: A JSON document (alias for `serde_json::Value`)
17/// - `Query`: Used to filter and find documents
18/// - `Index`: Improves query performance
19/// - `config::DatabaseConfig`: Configuration management for storage, encryption, and authentication
20/// - `encryption::AesEncryption`: AES-256 encryption for data protection
21///
22/// ## Basic Usage
23///
24/// ```no_run
25/// use luckdb::{Client, config::DatabaseConfig};
26/// use serde_json::json;
27///
28/// // Create a configuration with storage and encryption
29/// let config = DatabaseConfig::with_storage_path("./data")
30///     .with_encryption("my_secure_password");
31///
32/// let mut client = Client::with_config(config).unwrap();
33///
34/// // Access a database
35/// let db = client.db("mydb");
36///
37/// // Access a collection
38/// let coll = db.collection("mycollection");
39///
40/// // Insert a document
41/// let doc = json!({"name": "John", "age": 30});
42/// let id = coll.insert(doc).unwrap();
43///
44/// // Find documents
45/// let query = luckdb::Query::new().eq("name", "John");
46/// let results = coll.find(query, None).unwrap();
47///
48/// // Save changes to disk (encrypted)
49/// client.save().unwrap();
50/// ```
51///
52/// ## Encryption
53///
54/// LuckDB supports AES-256 encryption for data persistence:
55///
56/// ```no_run
57/// use luckdb::{Client, config::DatabaseConfig};
58///
59/// let config = DatabaseConfig::new()
60///     .with_encryption("my_password")
61///     .with_storage_path("./secure_data");
62///
63/// let client = Client::with_config(config).unwrap();
64/// // All data will be encrypted when saved to disk
65/// ```
66///
67/// ## Server Mode
68///
69/// LuckDB supports client-server mode with authentication:
70///
71/// ```no_run
72/// use luckdb::{Server, config::DatabaseConfig};
73///
74/// let config = DatabaseConfig::with_storage_path("./server_data")
75///     .with_auth("admin", "password")
76///     .with_encryption("server_secret");
77///
78/// let mut server = Server::with_config("127.0.0.1:27017".parse().unwrap(), config)?;
79/// server.start()?;
80/// ```
81///
82/// ## Configuration Files
83///
84/// LuckDB supports configuration via TOML files for easy deployment and environment management:
85///
86/// ```no_run
87/// use luckdb::{Client, config::DatabaseConfig};
88///
89/// // Load configuration from file
90/// let mut client = Client::with_config_file("config.toml")?;
91///
92/// // Or load config and modify it
93/// let mut config = DatabaseConfig::load_from_file("config.toml")?;
94/// config.encryption_enabled = true;
95/// config.encryption_password = Some("new_password".to_string());
96/// let mut client = Client::with_config(config)?;
97/// ```
98///
99/// A typical `config.toml` file looks like:
100/// ```toml
101/// [luckdb]
102/// storage_path = "./data"
103/// encryption_enabled = true
104/// encryption_password = "your_password"
105/// auth_username = "admin"
106/// auth_password = "auth_password"
107/// auto_save_interval_seconds = 300
108/// max_backup_files = 10
109/// ```
110use chrono;
111use serde::{Deserialize, Serialize};
112use serde_json::{Map, Value as Document, Value};
113use std::clone;
114use std::cmp::Ordering;
115use std::collections::{BTreeMap, HashMap, HashSet};
116use std::fs::{self, File};
117use std::io::{BufRead, BufReader, BufWriter};
118use std::io::{Read, Write};
119use std::net::{SocketAddr, TcpListener, TcpStream};
120use std::ops::Bound::{Excluded, Included, Unbounded};
121use std::path::{Path, PathBuf};
122use std::sync::{Arc, RwLock};
123use std::thread;
124use thiserror::Error;
125use uuid::Uuid;
126
127// Import new modules
128pub mod config;
129pub mod encryption;
130
131// --- Error Types ---
132#[derive(Error, Debug)]
133/// Error types returned by database operations.
134pub enum DbError {
135    #[error("Document not found")]
136    /// A requested document could not be found.
137    NotFound,
138    #[error("Invalid query: {0}")]
139    /// A query was malformed or contained invalid parameters.
140    InvalidQuery(String),
141    #[error("Serialization error: {0}")]
142    /// Error during JSON serialization or deserialization.
143    Serialization(#[from] serde_json::Error),
144    #[error("IO error: {0}")]
145    /// File system or IO-related error.
146    IoError(#[from] std::io::Error),
147    #[error("Other error: {0}")]
148    /// Generic error not covered by other categories.
149    Other(String),
150    #[error("Index error: {0}")]
151    /// Error during index creation or management.
152    IndexError(String),
153    #[error("Update error: {0}")]
154    /// Error during document update operation.
155    UpdateError(String),
156    #[error("Validation error: {0}")]
157    /// Data validation error.
158    ValidationError(String),
159    #[error("Connection error: {0}")]
160    /// Error establishing or maintaining a connection.
161    ConnectionError(String),
162    #[error("Protocol error: {0}")]
163    /// Error in the communication protocol.
164    ProtocolError(String),
165    #[error("Encryption error: {0}")]
166    /// Error during encryption/decryption operations.
167    EncryptionError(String),
168    #[error("Key management error: {0}")]
169    /// Error in encryption key management.
170    KeyManagementError(String),
171    #[error("Configuration error: {0}")]
172    /// Error in database configuration.
173    ConfigurationError(String),
174}
175pub type Result<T> = std::result::Result<T, DbError>;
176
177// --- Document ID ---
178#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
179pub struct DocId(String);
180impl DocId {
181    pub fn new() -> Self {
182        Self(Uuid::new_v4().to_string())
183    }
184
185    pub fn from_str(s: &str) -> Self {
186        Self(s.to_string())
187    }
188
189    pub fn as_str(&self) -> &str {
190        &self.0
191    }
192}
193impl std::fmt::Display for DocId {
194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195        write!(f, "{}", self.0)
196    }
197}
198
199// --- Query Operators ---
200#[derive(Debug, Clone, Serialize, Deserialize)]
201pub enum QueryOperator {
202    Eq(Value),
203    Ne(Value),
204    Gt(Value),
205    Gte(Value),
206    Lt(Value),
207    Lte(Value),
208    In(Vec<Value>),
209    Nin(Vec<Value>),
210    Exists(bool),
211    Regex(String),
212    And(Vec<Query>),
213    Or(Vec<Query>),
214    Nor(Vec<Query>),
215    Not(Box<QueryOperator>),
216    // Array operators
217    All(Vec<Value>),
218    ElemMatch(Query),
219    Size(usize),
220    // Geospatial operators (simplified)
221    Near {
222        point: (f64, f64),
223        max_distance: Option<f64>,
224    },
225    Within {
226        shape: GeoShape,
227    },
228    Intersects {
229        shape: GeoShape,
230    },
231}
232
233#[derive(Debug, Clone, Serialize, Deserialize)]
234pub enum GeoShape {
235    Box {
236        bottom_left: (f64, f64),
237        top_right: (f64, f64),
238    },
239    Polygon {
240        points: Vec<(f64, f64)>,
241    },
242    Center {
243        center: (f64, f64),
244        radius: f64,
245    },
246}
247
248// --- Query Condition ---
249#[derive(Debug, Clone, Serialize, Deserialize)]
250pub struct QueryCondition {
251    field: String,
252    operator: QueryOperator,
253}
254
255// --- Query ---
256#[derive(Debug, Clone, Serialize, Deserialize)]
257/// Represents a query used to filter documents in a collection.
258///
259/// The Query struct allows you to build complex queries using method chaining
260/// and various operators. It supports equality checks, comparisons, logical operations,
261/// array operations, and geospatial queries.
262///
263/// # Example
264///
265/// ```
266/// use luckdb::{Query, Value};
267///
268/// // Create a query for documents where age > 18 and status is "active"
269/// let query = Query::new()
270///     .gt("age", Value::Number(18.0))
271///     .eq("status", Value::String("active".to_string()));
272///
273/// // Create a complex query with logical operators
274/// let complex_query = Query::new()
275///     .eq("department", Value::String("engineering".to_string()))
276///     .and(vec![
277///         Query::new().gte("salary", Value::Number(50000.0)),
278///         Query::new().lt("years_experience", Value::Number(5.0))
279///     ]);
280/// ```
281pub struct Query {
282    /// The list of conditions that documents must satisfy.
283    conditions: Vec<QueryCondition>,
284    /// The list of logical operations to apply to nested queries.
285    logical_ops: Vec<(LogicalOp, Vec<Query>)>,
286}
287
288#[derive(Debug, Clone, Serialize, Deserialize)]
289pub enum LogicalOp {
290    And,
291    Or,
292    Nor,
293    Not,
294}
295// Add this helper function after the Query implementation
296impl Query {
297    /// Creates a query from a Value object, typically used for deserialization.
298    ///
299    /// This method converts a Value (usually an Object) into a Query structure
300    /// by parsing operator syntax like `{ "field": { "$gt": 10 } }`.
301    ///
302    /// # Arguments
303    ///
304    /// * `value` - The Value to convert into a Query. Must be an Object.
305    ///
306    /// # Returns
307    ///
308    /// A Result containing the parsed Query or an InvalidQuery error.
309    pub fn from_value(value: Value) -> Result<Self> {
310        match value {
311            Value::Object(map) => {
312                let mut query = Query::new();
313
314                for (field, op_value) in map {
315                    match op_value {
316                        Value::Object(op_map) => {
317                            for (op, val) in op_map {
318                                match op.as_str() {
319                                    "$eq" => query = query.eq(&field, val.clone()),
320                                    "$ne" => query = query.ne(&field, val.clone()),
321                                    "$gt" => query = query.gt(&field, val.clone()),
322                                    "$gte" => query = query.gte(&field, val.clone()),
323                                    "$lt" => query = query.lt(&field, val.clone()),
324                                    "$lte" => query = query.lte(&field, val.clone()),
325                                    "$in" => {
326                                        if let Value::Array(arr) = val {
327                                            query = query.in_(&field, arr.clone());
328                                        }
329                                    }
330                                    "$nin" => {
331                                        if let Value::Array(arr) = val {
332                                            query = query.nin(&field, arr.clone());
333                                        }
334                                    }
335                                    "$exists" => {
336                                        if let Value::Bool(exists) = val {
337                                            query = query.exists(&field, exists);
338                                        }
339                                    }
340                                    "$regex" => {
341                                        if let Value::String(pattern) = val {
342                                            query = query.regex(&field, &pattern);
343                                        }
344                                    }
345                                    _ => {
346                                        return Err(DbError::InvalidQuery(format!(
347                                            "Unknown operator: {}",
348                                            op
349                                        )))
350                                    }
351                                }
352                            }
353                        }
354                        _ => query = query.eq(&field, op_value.clone()),
355                    }
356                }
357
358                Ok(query)
359            }
360            _ => Err(DbError::InvalidQuery("Query must be an object".to_string())),
361        }
362    }
363}
364
365impl Query {
366    /// Creates a new empty query.
367    ///
368    /// This is typically used as the starting point for building complex queries using method chaining.
369    pub fn new() -> Self {
370        Self {
371            conditions: Vec::new(),
372            logical_ops: Vec::new(),
373        }
374    }
375
376    /// Adds an equality condition to the query.
377    ///
378    /// # Arguments
379    ///
380    /// * `key` - The field to check for equality.
381    /// * `value` - The value to compare against.
382    ///
383    /// # Returns
384    ///
385    /// The modified query for method chaining.
386    pub fn eq(mut self, key: &str, value: Value) -> Self {
387        self.conditions.push(QueryCondition {
388            field: key.to_string(),
389            operator: QueryOperator::Eq(value),
390        });
391        self
392    }
393
394    /// Adds a not equal condition to the query.
395    ///
396    /// # Arguments
397    ///
398    /// * `key` - The field to check for inequality.
399    /// * `value` - The value to compare against.
400    ///
401    /// # Returns
402    ///
403    /// The modified query for method chaining.
404    pub fn ne(mut self, key: &str, value: Value) -> Self {
405        self.conditions.push(QueryCondition {
406            field: key.to_string(),
407            operator: QueryOperator::Ne(value),
408        });
409        self
410    }
411
412    /// Adds a greater than condition to the query.
413    ///
414    /// # Arguments
415    ///
416    /// * `key` - The field to check.
417    /// * `value` - The value to compare against.
418    ///
419    /// # Returns
420    ///
421    /// The modified query for method chaining.
422    pub fn gt(mut self, key: &str, value: Value) -> Self {
423        self.conditions.push(QueryCondition {
424            field: key.to_string(),
425            operator: QueryOperator::Gt(value),
426        });
427        self
428    }
429
430    /// Adds a greater than or equal condition to the query.
431    ///
432    /// # Arguments
433    ///
434    /// * `key` - The field to check.
435    /// * `value` - The value to compare against.
436    ///
437    /// # Returns
438    ///
439    /// The modified query for method chaining.
440    pub fn gte(mut self, key: &str, value: Value) -> Self {
441        self.conditions.push(QueryCondition {
442            field: key.to_string(),
443            operator: QueryOperator::Gte(value),
444        });
445        self
446    }
447
448    /// Adds a less than condition to the query.
449    ///
450    /// # Arguments
451    ///
452    /// * `key` - The field to check.
453    /// * `value` - The value to compare against.
454    ///
455    /// # Returns
456    ///
457    /// The modified query for method chaining.
458    pub fn lt(mut self, key: &str, value: Value) -> Self {
459        self.conditions.push(QueryCondition {
460            field: key.to_string(),
461            operator: QueryOperator::Lt(value),
462        });
463        self
464    }
465
466    /// Adds a less than or equal condition to the query.
467    ///
468    /// # Arguments
469    ///
470    /// * `key` - The field to check.
471    /// * `value` - The value to compare against.
472    ///
473    /// # Returns
474    ///
475    /// The modified query for method chaining.
476    pub fn lte(mut self, key: &str, value: Value) -> Self {
477        self.conditions.push(QueryCondition {
478            field: key.to_string(),
479            operator: QueryOperator::Lte(value),
480        });
481        self
482    }
483
484    /// Adds an "in" condition to the query.
485    ///
486    /// Matches documents where the field's value is in the specified list of values.
487    ///
488    /// # Arguments
489    ///
490    /// * `key` - The field to check.
491    /// * `values` - The list of values to check against.
492    ///
493    /// # Returns
494    ///
495    /// The modified query for method chaining.
496    pub fn in_(mut self, key: &str, values: Vec<Value>) -> Self {
497        self.conditions.push(QueryCondition {
498            field: key.to_string(),
499            operator: QueryOperator::In(values),
500        });
501        self
502    }
503
504    /// Adds a "not in" condition to the query.
505    ///
506    /// Matches documents where the field's value is not in the specified list of values.
507    ///
508    /// # Arguments
509    ///
510    /// * `key` - The field to check.
511    /// * `values` - The list of values to check against.
512    ///
513    /// # Returns
514    ///
515    /// The modified query for method chaining.
516    pub fn nin(mut self, key: &str, values: Vec<Value>) -> Self {
517        self.conditions.push(QueryCondition {
518            field: key.to_string(),
519            operator: QueryOperator::Nin(values),
520        });
521        self
522    }
523
524    /// Adds an "exists" condition to the query.
525    ///
526    /// Matches documents where the field exists or doesn't exist as specified.
527    ///
528    /// # Arguments
529    ///
530    /// * `key` - The field to check for existence.
531    /// * `exists` - Whether the field should exist (true) or not (false).
532    ///
533    /// # Returns
534    ///
535    /// The modified query for method chaining.
536    pub fn exists(mut self, key: &str, exists: bool) -> Self {
537        self.conditions.push(QueryCondition {
538            field: key.to_string(),
539            operator: QueryOperator::Exists(exists),
540        });
541        self
542    }
543
544    /// Adds a regular expression condition to the query.
545    ///
546    /// Matches documents where the field's value matches the specified regular expression.
547    ///
548    /// # Arguments
549    ///
550    /// * `key` - The field to check.
551    /// * `pattern` - The regular expression pattern to match against.
552    ///
553    /// # Returns
554    ///
555    /// The modified query for method chaining.
556    pub fn regex(mut self, key: &str, pattern: &str) -> Self {
557        self.conditions.push(QueryCondition {
558            field: key.to_string(),
559            operator: QueryOperator::Regex(pattern.to_string()),
560        });
561        self
562    }
563
564    /// Adds a logical AND operation to the query.
565    ///
566    /// All specified sub-queries must match for the document to be included.
567    ///
568    /// # Arguments
569    ///
570    /// * `queries` - A list of queries that must all match.
571    ///
572    /// # Returns
573    ///
574    /// The modified query for method chaining.
575    pub fn and(mut self, queries: Vec<Query>) -> Self {
576        self.logical_ops.push((LogicalOp::And, queries));
577        self
578    }
579
580    /// Adds a logical OR operation to the query.
581    ///
582    /// At least one of the specified sub-queries must match for the document to be included.
583    ///
584    /// # Arguments
585    ///
586    /// * `queries` - A list of queries where at least one must match.
587    ///
588    /// # Returns
589    ///
590    /// The modified query for method chaining.
591    pub fn or(mut self, queries: Vec<Query>) -> Self {
592        self.logical_ops.push((LogicalOp::Or, queries));
593        self
594    }
595
596    /// Adds a logical NOR operation to the query.
597    ///
598    /// None of the specified sub-queries must match for the document to be included.
599    ///
600    /// # Arguments
601    ///
602    /// * `queries` - A list of queries that must all not match.
603    ///
604    /// # Returns
605    ///
606    /// The modified query for method chaining.
607    pub fn nor(mut self, queries: Vec<Query>) -> Self {
608        self.logical_ops.push((LogicalOp::Nor, queries));
609        self
610    }
611
612    /// Adds a logical NOT operation to the query.
613    ///
614    /// The specified condition must not match for the document to be included.
615    /// Note: This implementation only works with simple single-condition queries.
616    ///
617    /// # Arguments
618    ///
619    /// * `query` - The query that must not match.
620    ///
621    /// # Returns
622    ///
623    /// The modified query for method chaining.
624    pub fn not(mut self, query: Query) -> Self {
625        // Convert the query to a single condition for Not operator
626        if let Some(cond) = query.conditions.first() {
627            self.conditions.push(QueryCondition {
628                field: cond.field.clone(),
629                operator: QueryOperator::Not(Box::new(cond.operator.clone())),
630            });
631        }
632        self
633    }
634
635    /// Array operator: Matches arrays that contain all elements specified in the query.
636    ///
637    /// # Arguments
638    ///
639    /// * `key` - The field to check (must be an array).
640    /// * `values` - The list of values that the array must contain.
641    ///
642    /// # Returns
643    ///
644    /// The modified query for method chaining.
645    pub fn all(mut self, key: &str, values: Vec<Value>) -> Self {
646        self.conditions.push(QueryCondition {
647            field: key.to_string(),
648            operator: QueryOperator::All(values),
649        });
650        self
651    }
652
653    /// Array operator: Matches arrays that contain at least one element that matches the specified query.
654    ///
655    /// # Arguments
656    ///
657    /// * `key` - The field to check (must be an array).
658    /// * `query` - The query that at least one array element must match.
659    ///
660    /// # Returns
661    ///
662    /// The modified query for method chaining.
663    pub fn elem_match(mut self, key: &str, query: Query) -> Self {
664        self.conditions.push(QueryCondition {
665            field: key.to_string(),
666            operator: QueryOperator::ElemMatch(query),
667        });
668        self
669    }
670
671    /// Array operator: Matches arrays that have the specified size.
672    ///
673    /// # Arguments
674    ///
675    /// * `key` - The field to check (must be an array).
676    /// * `size` - The exact size the array must have.
677    ///
678    /// # Returns
679    ///
680    /// The modified query for method chaining.
681    pub fn size(mut self, key: &str, size: usize) -> Self {
682        self.conditions.push(QueryCondition {
683            field: key.to_string(),
684            operator: QueryOperator::Size(size),
685        });
686        self
687    }
688
689    /// Geospatial operator: Matches geospatial points that are within a certain distance of a point.
690    ///
691    /// # Arguments
692    ///
693    /// * `key` - The field to check (must contain geospatial data).
694    /// * `point` - The reference point as a tuple (longitude, latitude).
695    /// * `max_distance` - Optional maximum distance from the point.
696    ///
697    /// # Returns
698    ///
699    /// The modified query for method chaining.
700    pub fn near(mut self, key: &str, point: (f64, f64), max_distance: Option<f64>) -> Self {
701        self.conditions.push(QueryCondition {
702            field: key.to_string(),
703            operator: QueryOperator::Near {
704                point,
705                max_distance,
706            },
707        });
708        self
709    }
710
711    /// Geospatial operator: Matches geospatial points that are within the specified shape.
712    ///
713    /// # Arguments
714    ///
715    /// * `key` - The field to check (must contain geospatial data).
716    /// * `shape` - The shape to check against (Box, Polygon, or Center).
717    ///
718    /// # Returns
719    ///
720    /// The modified query for method chaining.
721    pub fn within(mut self, key: &str, shape: GeoShape) -> Self {
722        self.conditions.push(QueryCondition {
723            field: key.to_string(),
724            operator: QueryOperator::Within { shape },
725        });
726        self
727    }
728
729    /// Geospatial operator: Matches geospatial shapes that intersect with the specified shape.
730    ///
731    /// # Arguments
732    ///
733    /// * `key` - The field to check (must contain geospatial data).
734    /// * `shape` - The shape to check against (Box, Polygon, or Center).
735    ///
736    /// # Returns
737    ///
738    /// The modified query for method chaining.
739    pub fn intersects(mut self, key: &str, shape: GeoShape) -> Self {
740        self.conditions.push(QueryCondition {
741            field: key.to_string(),
742            operator: QueryOperator::Intersects { shape },
743        });
744        self
745    }
746
747    /// Checks if a document matches all conditions and logical operations of the query.
748    ///
749    /// This method is used internally by the database to determine if a document should be included
750    /// in query results. It checks both the direct conditions and any nested logical operations.
751    ///
752    /// # Arguments
753    ///
754    /// * `doc` - The document to check against the query.
755    ///
756    /// # Returns
757    ///
758    /// `true` if the document matches the query, `false` otherwise.
759    pub fn matches(&self, doc: &Document) -> bool {
760        // Check all conditions
761        for cond in &self.conditions {
762            if !self.matches_condition(doc, cond) {
763                return false;
764            }
765        }
766
767        // Check logical operations
768        for (op, queries) in &self.logical_ops {
769            match op {
770                LogicalOp::And => {
771                    if !queries.iter().all(|q| q.matches(doc)) {
772                        return false;
773                    }
774                }
775                LogicalOp::Or => {
776                    if !queries.iter().any(|q| q.matches(doc)) {
777                        return false;
778                    }
779                }
780                LogicalOp::Nor => {
781                    if queries.iter().any(|q| q.matches(doc)) {
782                        return false;
783                    }
784                }
785                LogicalOp::Not => {
786                    if queries.iter().any(|q| q.matches(doc)) {
787                        return false;
788                    }
789                }
790            }
791        }
792
793        true
794    }
795
796    fn matches_condition(&self, doc: &Document, cond: &QueryCondition) -> bool {
797        let field_parts: Vec<&str> = cond.field.split('.').collect();
798        let value = self.get_nested_value(doc, &field_parts);
799
800        match &cond.operator {
801            QueryOperator::Eq(expected) => match value {
802                Some(v) => v == expected,
803                None => false,
804            },
805            QueryOperator::Ne(expected) => match value {
806                Some(v) => v != expected,
807                None => true,
808            },
809            QueryOperator::Gt(expected) => match (value, expected) {
810                (Some(Value::Number(a)), Value::Number(b)) => {
811                    a.as_f64().unwrap_or(0.0) > b.as_f64().unwrap_or(0.0)
812                }
813                _ => false,
814            },
815            QueryOperator::Gte(expected) => match (value, expected) {
816                (Some(Value::Number(a)), Value::Number(b)) => {
817                    a.as_f64().unwrap_or(0.0) >= b.as_f64().unwrap_or(0.0)
818                }
819                _ => false,
820            },
821            QueryOperator::Lt(expected) => match (value, expected) {
822                (Some(Value::Number(a)), Value::Number(b)) => {
823                    a.as_f64().unwrap_or(0.0) < b.as_f64().unwrap_or(0.0)
824                }
825                _ => false,
826            },
827            QueryOperator::Lte(expected) => match (value, expected) {
828                (Some(Value::Number(a)), Value::Number(b)) => {
829                    a.as_f64().unwrap_or(0.0) <= b.as_f64().unwrap_or(0.0)
830                }
831                _ => false,
832            },
833            QueryOperator::In(values) => match value {
834                Some(v) => values.contains(v),
835                None => false,
836            },
837            QueryOperator::Nin(values) => match value {
838                Some(v) => !values.contains(v),
839                None => true,
840            },
841            QueryOperator::Exists(exists) => value.is_some() == *exists,
842            QueryOperator::Regex(pattern) => {
843                match value {
844                    Some(Value::String(s)) => {
845                        // Simple regex matching (in a real implementation, use regex crate)
846                        s.contains(pattern)
847                    }
848                    _ => false,
849                }
850            }
851            QueryOperator::All(values) => match value {
852                Some(Value::Array(arr)) => values.iter().all(|v| arr.contains(v)),
853                _ => false,
854            },
855            QueryOperator::ElemMatch(query) => match value {
856                Some(Value::Array(arr)) => arr.iter().any(|elem| query.matches(elem)),
857                _ => false,
858            },
859            QueryOperator::Size(size) => match value {
860                Some(Value::Array(arr)) => arr.len() == *size,
861                _ => false,
862            },
863            QueryOperator::Near {
864                point,
865                max_distance,
866            } => {
867                // Simplified geospatial query
868                match value {
869                    Some(Value::Array(arr)) if arr.len() >= 2 => {
870                        if let (Some(Value::Number(x)), Some(Value::Number(y))) =
871                            (arr.get(0), arr.get(1))
872                        {
873                            let doc_x = x.as_f64().unwrap_or(0.0);
874                            let doc_y = y.as_f64().unwrap_or(0.0);
875
876                            // Calculate distance (simplified Euclidean distance)
877                            let distance =
878                                ((doc_x - point.0).powi(2) + (doc_y - point.1).powi(2)).sqrt();
879
880                            if let Some(max) = max_distance {
881                                distance <= *max
882                            } else {
883                                true
884                            }
885                        } else {
886                            false
887                        }
888                    }
889                    _ => false,
890                }
891            }
892            QueryOperator::Within { shape } => {
893                // Simplified implementation
894                match (value, shape) {
895                    (
896                        Some(Value::Array(arr)),
897                        GeoShape::Box {
898                            bottom_left,
899                            top_right,
900                        },
901                    ) if arr.len() >= 2 => {
902                        if let (Some(Value::Number(x)), Some(Value::Number(y))) =
903                            (arr.get(0), arr.get(1))
904                        {
905                            let doc_x = x.as_f64().unwrap_or(0.0);
906                            let doc_y = y.as_f64().unwrap_or(0.0);
907
908                            doc_x >= bottom_left.0
909                                && doc_x <= top_right.0
910                                && doc_y >= bottom_left.1
911                                && doc_y <= top_right.1
912                        } else {
913                            false
914                        }
915                    }
916                    _ => false,
917                }
918            }
919            QueryOperator::Intersects { shape } => {
920                // Simplified implementation
921                match (value, shape) {
922                    (
923                        Some(Value::Array(arr)),
924                        GeoShape::Box {
925                            bottom_left,
926                            top_right,
927                        },
928                    ) if arr.len() >= 2 => {
929                        if let (Some(Value::Number(x)), Some(Value::Number(y))) =
930                            (arr.get(0), arr.get(1))
931                        {
932                            let doc_x = x.as_f64().unwrap_or(0.0);
933                            let doc_y = y.as_f64().unwrap_or(0.0);
934
935                            doc_x >= bottom_left.0
936                                && doc_x <= top_right.0
937                                && doc_y >= bottom_left.1
938                                && doc_y <= top_right.1
939                        } else {
940                            false
941                        }
942                    }
943                    _ => false,
944                }
945            }
946            QueryOperator::And(_)
947            | QueryOperator::Or(_)
948            | QueryOperator::Nor(_)
949            | QueryOperator::Not(_) => {
950                // These are handled at the query level
951                true
952            }
953        }
954    }
955
956    fn get_nested_value<'a>(&self, doc: &'a Value, path: &[&str]) -> Option<&'a Value> {
957        if path.is_empty() {
958            return Some(doc);
959        }
960
961        let key = path[0];
962        let rest = &path[1..];
963
964        match doc {
965            Value::Object(map) => {
966                if let Some(next_value) = map.get(key) {
967                    self.get_nested_value(next_value, rest)
968                } else {
969                    None
970                }
971            }
972            _ => None,
973        }
974    }
975}
976
977// --- Update Operators ---
978#[derive(Debug, Clone, Serialize, Deserialize)]
979pub enum UpdateOperator {
980    Set(Map<String, Value>),
981    Unset(Vec<String>),
982    Inc(Map<String, Value>),
983    Mul(Map<String, Value>),
984    Rename(Vec<(String, String)>),
985    SetOnInsert(Map<String, Value>),
986    Min(Map<String, Value>),
987    Max(Map<String, Value>),
988    CurrentDate(Map<String, Value>),
989    // Array operators
990    Push(Map<String, Value>),
991    PushAll(BTreeMap<String, Vec<Value>>),
992    AddToSet(Map<String, Value>),
993    Pop(BTreeMap<String, i64>),
994    Pull(Map<String, Value>),
995    PullAll(BTreeMap<String, Vec<Value>>),
996    Bit(Map<String, Value>),
997}
998
999// --- Update Document ---
1000#[derive(Debug, Clone, Serialize, Deserialize)]
1001pub struct UpdateDocument {
1002    operators: Vec<UpdateOperator>,
1003}
1004
1005impl UpdateDocument {
1006    pub fn new() -> Self {
1007        Self {
1008            operators: Vec::new(),
1009        }
1010    }
1011
1012    pub fn set(mut self, key: &str, value: Value) -> Self {
1013        let mut map = Map::new();
1014        map.insert(key.to_string(), value);
1015        self.operators.push(UpdateOperator::Set(map));
1016        self
1017    }
1018
1019    pub fn unset(mut self, key: &str) -> Self {
1020        self.operators
1021            .push(UpdateOperator::Unset(vec![key.to_string()]));
1022        self
1023    }
1024
1025    pub fn inc(mut self, key: &str, value: Value) -> Self {
1026        let mut map = Map::new();
1027        map.insert(key.to_string(), value);
1028        self.operators.push(UpdateOperator::Inc(map));
1029        self
1030    }
1031
1032    pub fn mul(mut self, key: &str, value: Value) -> Self {
1033        let mut map = Map::new();
1034        map.insert(key.to_string(), value);
1035        self.operators.push(UpdateOperator::Mul(map));
1036        self
1037    }
1038
1039    pub fn rename(mut self, old_key: &str, new_key: &str) -> Self {
1040        self.operators.push(UpdateOperator::Rename(vec![(
1041            old_key.to_string(),
1042            new_key.to_string(),
1043        )]));
1044        self
1045    }
1046
1047    pub fn set_on_insert(mut self, key: &str, value: Value) -> Self {
1048        let mut map = Map::new();
1049        map.insert(key.to_string(), value);
1050        self.operators.push(UpdateOperator::SetOnInsert(map));
1051        self
1052    }
1053
1054    pub fn min(mut self, key: &str, value: Value) -> Self {
1055        let mut map = Map::new();
1056        map.insert(key.to_string(), value);
1057        self.operators.push(UpdateOperator::Min(map));
1058        self
1059    }
1060
1061    pub fn max(mut self, key: &str, value: Value) -> Self {
1062        let mut map = Map::new();
1063        map.insert(key.to_string(), value);
1064        self.operators.push(UpdateOperator::Max(map));
1065        self
1066    }
1067
1068    pub fn current_date(mut self, key: &str, type_spec: Value) -> Self {
1069        let mut map = Map::new();
1070        map.insert(key.to_string(), type_spec);
1071        self.operators.push(UpdateOperator::CurrentDate(map));
1072        self
1073    }
1074
1075    // Array operators
1076    pub fn push(mut self, key: &str, value: Value) -> Self {
1077        let mut map = Map::new();
1078        map.insert(key.to_string(), value);
1079        self.operators.push(UpdateOperator::Push(map));
1080        self
1081    }
1082
1083    pub fn push_all(mut self, key: &str, values: Vec<Value>) -> Self {
1084        let mut btree_map = BTreeMap::new();
1085        btree_map.insert(key.to_string(), values);
1086        self.operators.push(UpdateOperator::PushAll(btree_map));
1087        self
1088    }
1089
1090    pub fn add_to_set(mut self, key: &str, value: Value) -> Self {
1091        let mut map = Map::new();
1092        map.insert(key.to_string(), value);
1093        self.operators.push(UpdateOperator::AddToSet(map));
1094        self
1095    }
1096
1097    pub fn pop(mut self, key: &str, pos: i64) -> Self {
1098        let mut btree_map = BTreeMap::new();
1099        btree_map.insert(key.to_string(), pos);
1100        self.operators.push(UpdateOperator::Pop(btree_map));
1101        self
1102    }
1103
1104    pub fn pull(mut self, key: &str, condition: Value) -> Self {
1105        let mut map = Map::new();
1106        map.insert(key.to_string(), condition);
1107        self.operators.push(UpdateOperator::Pull(map));
1108        self
1109    }
1110
1111    pub fn pull_all(mut self, key: &str, values: Vec<Value>) -> Self {
1112        let mut btree_map = BTreeMap::new();
1113        btree_map.insert(key.to_string(), values);
1114        self.operators.push(UpdateOperator::PullAll(btree_map));
1115        self
1116    }
1117
1118    // Bitwise operator
1119    pub fn bit(mut self, key: &str, operation: Value) -> Self {
1120        let mut map = Map::new();
1121        map.insert(key.to_string(), operation);
1122        self.operators.push(UpdateOperator::Bit(map));
1123        self
1124    }
1125}
1126
1127// --- Index ---
1128#[derive(Debug, Clone, Serialize, Deserialize)]
1129pub enum IndexType {
1130    Ascending,
1131    Descending,
1132    Geospatial,
1133    Text,
1134    Hashed,
1135}
1136
1137#[derive(Debug, Clone, Serialize, Deserialize)]
1138pub struct Index {
1139    name: String,
1140    key: Vec<(String, IndexType)>,
1141    unique: bool,
1142    sparse: bool,
1143    background: bool,
1144}
1145
1146impl Index {
1147    pub fn new(name: String, key: Vec<(String, IndexType)>) -> Self {
1148        Self {
1149            name,
1150            key,
1151            unique: false,
1152            sparse: false,
1153            background: false,
1154        }
1155    }
1156
1157    pub fn unique(mut self, unique: bool) -> Self {
1158        self.unique = unique;
1159        self
1160    }
1161
1162    pub fn sparse(mut self, sparse: bool) -> Self {
1163        self.sparse = sparse;
1164        self
1165    }
1166
1167    pub fn background(mut self, background: bool) -> Self {
1168        self.background = background;
1169        self
1170    }
1171}
1172
1173// --- Index Entry ---
1174#[derive(Debug, Clone, Serialize, Deserialize)]
1175struct IndexEntry {
1176    doc_id: DocId,
1177}
1178
1179#[derive(Debug, Clone, Serialize, Deserialize)]
1180struct OrdValue(Value);
1181
1182impl PartialEq for OrdValue {
1183    fn eq(&self, other: &Self) -> bool {
1184        self.0 == other.0
1185    }
1186}
1187
1188impl Eq for OrdValue {}
1189
1190impl PartialOrd for OrdValue {
1191    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1192        Some(self.cmp(other))
1193    }
1194}
1195
1196impl Ord for OrdValue {
1197    fn cmp(&self, other: &Self) -> Ordering {
1198        // Simple string-based comparison; adjust as needed
1199        format!("{:?}", self.0).cmp(&format!("{:?}", other.0))
1200    }
1201}
1202
1203// --- Index Data Structure ---
1204#[derive(Debug, Serialize, Clone, Deserialize)]
1205struct IndexData {
1206    entries: BTreeMap<OrdValue, Vec<IndexEntry>>,
1207    unique: bool,
1208    sparse: bool,
1209}
1210
1211impl IndexData {
1212    fn new(unique: bool, sparse: bool) -> Self {
1213        Self {
1214            entries: BTreeMap::new(),
1215            unique,
1216            sparse,
1217        }
1218    }
1219
1220    fn insert(&mut self, value: Value, doc_id: DocId) -> Result<()> {
1221        let ord_value = OrdValue(value.clone());
1222
1223        if self.unique && self.entries.contains_key(&ord_value) {
1224            return Err(DbError::IndexError(format!(
1225                "Duplicate key error: {:?}",
1226                value
1227            )));
1228        }
1229
1230        self.entries
1231            .entry(ord_value)
1232            .or_insert_with(Vec::new)
1233            .push(IndexEntry { doc_id });
1234        Ok(())
1235    }
1236
1237    fn remove(&mut self, value: &Value, doc_id: &DocId) -> Result<()> {
1238        let ord_value = OrdValue(value.clone());
1239
1240        if let Some(entries) = self.entries.get_mut(&ord_value) {
1241            entries.retain(|entry| entry.doc_id != *doc_id);
1242            if entries.is_empty() {
1243                self.entries.remove(&ord_value);
1244            }
1245        }
1246        Ok(())
1247    }
1248
1249    fn find(&self, value: &Value) -> Vec<&IndexEntry> {
1250        let ord_value = OrdValue(value.clone());
1251        self.entries
1252            .get(&ord_value)
1253            .map_or(Vec::new(), |entries| entries.iter().collect())
1254    }
1255
1256    fn find_range(&self, min: &Value, max: &Value) -> Vec<&IndexEntry> {
1257        let mut result = Vec::new();
1258        let ord_min = OrdValue(min.clone());
1259        let ord_max = OrdValue(max.clone());
1260
1261        for (key, entries) in self.entries.range((Included(&ord_min), Included(&ord_max))) {
1262            result.extend(entries.iter());
1263        }
1264        result
1265    }
1266}
1267
1268// --- Find Options ---
1269#[derive(Debug, Clone, Serialize, Deserialize)]
1270pub struct FindOptions {
1271    pub projection: Option<Map<String, Value>>,
1272    pub sort: Option<Vec<(String, SortOrder)>>,
1273    pub skip: Option<usize>,
1274    pub limit: Option<usize>,
1275    pub max_time_ms: Option<u64>,
1276    pub allow_partial_results: bool,
1277}
1278
1279#[derive(Debug, Clone, Serialize, Deserialize)]
1280pub enum SortOrder {
1281    Ascending,
1282    Descending,
1283}
1284
1285impl Default for FindOptions {
1286    fn default() -> Self {
1287        Self {
1288            projection: None,
1289            sort: None,
1290            skip: None,
1291            limit: None,
1292            max_time_ms: None,
1293            allow_partial_results: false,
1294        }
1295    }
1296}
1297
1298// --- Collection ---
1299#[derive(Debug, Clone, Serialize, Deserialize)]
1300/// A collection of documents in the database.
1301///
1302/// Collections are analogous to tables in relational databases and store JSON documents.
1303/// They provide methods for inserting, querying, updating, and deleting documents.
1304///
1305/// # Example
1306///
1307/// ```no_run
1308/// use luckdb::{Client, Document};
1309/// use serde_json::json;
1310///
1311/// let mut client = Client::new("mongodb://localhost");
1312/// let db = client.db("mydb");
1313/// let coll = db.collection("users");
1314///
1315/// // Insert a document
1316/// let user = json!({"name": "Alice", "email": "alice@example.com"});
1317/// coll.insert(user).unwrap();
1318/// ```
1319pub struct Collection {
1320    /// The name of the collection.
1321    name: String,
1322    /// The documents stored in the collection, indexed by their document ID.
1323    docs: HashMap<DocId, Document>,
1324    /// Internal index data structures for efficient querying.
1325    indexes: HashMap<String, IndexData>,
1326    /// Definitions of indexes created on the collection.
1327    index_definitions: HashMap<String, Index>,
1328}
1329
1330// --- Persistent Storage ---
1331#[derive(Debug, Serialize, Deserialize)]
1332pub struct PersistentCollection {
1333    name: String,
1334    docs: HashMap<DocId, Document>,
1335    indexes: HashMap<String, IndexData>,
1336    index_definitions: HashMap<String, Index>,
1337}
1338
1339impl From<Collection> for PersistentCollection {
1340    fn from(collection: Collection) -> Self {
1341        Self {
1342            name: collection.name,
1343            docs: collection.docs,
1344            indexes: collection.indexes,
1345            index_definitions: collection.index_definitions,
1346        }
1347    }
1348}
1349
1350impl From<PersistentCollection> for Collection {
1351    fn from(persistent: PersistentCollection) -> Self {
1352        Self {
1353            name: persistent.name,
1354            docs: persistent.docs,
1355            indexes: persistent.indexes,
1356            index_definitions: persistent.index_definitions,
1357        }
1358    }
1359}
1360
1361#[derive(Debug, Serialize, Deserialize)]
1362pub struct PersistentDatabase {
1363    name: String,
1364    collections: HashMap<String, PersistentCollection>,
1365}
1366
1367#[derive(Debug, Serialize, Deserialize)]
1368pub struct PersistentStorage {
1369    databases: HashMap<String, PersistentDatabase>,
1370}
1371
1372impl PersistentStorage {
1373    pub fn new() -> Self {
1374        Self {
1375            databases: HashMap::new(),
1376        }
1377    }
1378
1379    pub fn save_to_file(
1380        &self,
1381        path: &Path,
1382        encryption_key: Option<&crate::encryption::EncryptionKey>,
1383    ) -> Result<()> {
1384        let data = serde_json::to_string_pretty(self)?;
1385
1386        let final_data = if let Some(key) = encryption_key {
1387            crate::encryption::AesEncryption::encrypt_string(&data, key)?
1388        } else {
1389            data
1390        };
1391
1392        std::fs::write(path, final_data)?;
1393        Ok(())
1394    }
1395
1396    pub fn load_from_file(
1397        path: &Path,
1398        encryption_key: Option<&crate::encryption::EncryptionKey>,
1399    ) -> Result<Self> {
1400        if !path.exists() {
1401            return Ok(Self::new());
1402        }
1403
1404        let mut file = File::open(path)?;
1405        let mut contents = String::new();
1406        file.read_to_string(&mut contents)?;
1407
1408        let final_data = if let Some(key) = encryption_key {
1409            // Try to decrypt as encrypted data first
1410            match crate::encryption::AesEncryption::decrypt_string(&contents, key) {
1411                Ok(decrypted) => decrypted,
1412                Err(_) => {
1413                    // If decryption fails, try loading as plain text (for backward compatibility)
1414                    contents
1415                }
1416            }
1417        } else {
1418            contents
1419        };
1420
1421        let storage = serde_json::from_str(&final_data)?;
1422        Ok(storage)
1423    }
1424}
1425
1426// --- Collection ---
1427impl Collection {
1428    /// Creates a new empty collection with the given name.
1429    ///
1430    /// # Arguments
1431    ///
1432    /// * `name` - The name of the new collection.
1433    pub fn new(name: String) -> Self {
1434        Self {
1435            name,
1436            docs: HashMap::new(),
1437            indexes: HashMap::new(),
1438            index_definitions: HashMap::new(),
1439        }
1440    }
1441
1442    /// Inserts a single document into the collection, auto-generating an `_id` field if not present.
1443    ///
1444    /// # Arguments
1445    ///
1446    /// * `doc` - The document to insert. Must be a JSON object.
1447    ///
1448    /// # Returns
1449    ///
1450    /// The ID of the inserted document.
1451    ///
1452    /// # Errors
1453    ///
1454    /// Returns an error if the document is not a JSON object.
1455    pub fn insert(&mut self, mut doc: Document) -> Result<DocId> {
1456        let id = DocId::new();
1457
1458        // Inject _id field
1459        if let Some(obj) = doc.as_object_mut() {
1460            obj.insert("_id".to_string(), Value::String(id.to_string()));
1461        } else {
1462            return Err(DbError::Other("Document must be an object".into()));
1463        }
1464
1465        self.docs.insert(id.clone(), doc.clone());
1466
1467        // Update indexes
1468        self.update_indexes_on_insert(&id, &doc)?;
1469
1470        Ok(id)
1471    }
1472
1473    /// Inserts multiple documents into the collection, auto-generating `_id` fields for each.
1474    ///
1475    /// # Arguments
1476    ///
1477    /// * `docs` - A list of documents to insert. Each must be a JSON object.
1478    ///
1479    /// # Returns
1480    ///
1481    /// A list of IDs of the inserted documents.
1482    ///
1483    /// # Errors
1484    ///
1485    /// Returns an error if any document is not a JSON object.
1486    pub fn insert_many(&mut self, docs: Vec<Document>) -> Result<Vec<DocId>> {
1487        let mut ids = Vec::new();
1488
1489        for mut doc in docs {
1490            let id = DocId::new();
1491
1492            // Inject _id field
1493            if let Some(obj) = doc.as_object_mut() {
1494                obj.insert("_id".to_string(), Value::String(id.to_string()));
1495            } else {
1496                return Err(DbError::Other("Document must be an object".into()));
1497            }
1498
1499            self.docs.insert(id.clone(), doc.clone());
1500            ids.push(id.clone());
1501
1502            // Update indexes
1503            self.update_indexes_on_insert(&id, &doc)?;
1504        }
1505
1506        Ok(ids)
1507    }
1508
1509    /// Finds all documents matching the given query.
1510    ///
1511    /// # Arguments
1512    ///
1513    /// * `query` - The query to filter documents.
1514    /// * `options` - Optional parameters for sorting, pagination, and projection.
1515    ///
1516    /// # Returns
1517    ///
1518    /// A list of tuples containing document IDs and the matching documents.
1519    pub fn find(
1520        &self,
1521        query: Query,
1522        options: Option<FindOptions>,
1523    ) -> Result<Vec<(DocId, Document)>> {
1524        let mut results = Vec::new();
1525
1526        // Try to use indexes if possible
1527        let indexed_results = self.try_indexed_query(&query)?;
1528
1529        if !indexed_results.is_empty() {
1530            // Use indexed results
1531            for (id, doc) in indexed_results {
1532                if query.matches(&doc) {
1533                    results.push((id, doc));
1534                }
1535            }
1536        } else {
1537            // Full collection scan
1538            for (id, doc) in self.docs.iter() {
1539                if query.matches(doc) {
1540                    results.push((id.clone(), doc.clone()));
1541                }
1542            }
1543        }
1544
1545        // Apply options
1546        if let Some(opts) = options {
1547            // Sort
1548            if let Some(sort_spec) = opts.sort {
1549                self.sort_results(&mut results, &sort_spec);
1550            }
1551
1552            // Skip
1553            if let Some(skip) = opts.skip {
1554                if skip < results.len() {
1555                    results.drain(0..skip);
1556                } else {
1557                    results.clear();
1558                }
1559            }
1560
1561            // Limit
1562            if let Some(limit) = opts.limit {
1563                results.truncate(limit);
1564            }
1565
1566            // Projection
1567            if let Some(proj) = opts.projection {
1568                self.apply_projection(&mut results, &proj);
1569            }
1570        }
1571
1572        Ok(results)
1573    }
1574
1575    /// Finds the first document matching the given query.
1576    ///
1577    /// # Arguments
1578    ///
1579    /// * `query` - The query to filter documents.
1580    /// * `options` - Optional parameters for sorting and projection.
1581    ///
1582    /// # Returns
1583    ///
1584    /// A tuple containing the document ID and the matching document.
1585    ///
1586    /// # Errors
1587    ///
1588    /// Returns `DbError::NotFound` if no document matches the query.
1589    pub fn find_one(
1590        &self,
1591        query: Query,
1592        options: Option<FindOptions>,
1593    ) -> Result<(DocId, Document)> {
1594        let mut opts = options.unwrap_or_default();
1595        opts.limit = Some(1);
1596
1597        let mut results = self.find(query, Some(opts))?;
1598        results.pop().ok_or(DbError::NotFound)
1599    }
1600
1601    /// Finds a document, updates it, and returns the document (either before or after the update).
1602    ///
1603    /// # Arguments
1604    ///
1605    /// * `query` - The query to filter documents.
1606    /// * `update` - The update operations to apply.
1607    /// * `options` - Optional parameters including upsert behavior and which document version to return.
1608    ///
1609    /// # Returns
1610    ///
1611    /// The document before or after the update, depending on options.
1612    pub fn find_one_and_update(
1613        &mut self,
1614        query: Query,
1615        update: UpdateDocument,
1616        options: Option<FindOneAndUpdateOptions>,
1617    ) -> Result<Option<(DocId, Document)>> {
1618        let opts = options.unwrap_or_default();
1619
1620        // Find the document
1621        let result = self.find_one(query.clone(), None);
1622
1623        match result {
1624            Ok((id, mut doc)) => {
1625                let original_doc = doc.clone();
1626
1627                // Apply update
1628                self.apply_update(&mut doc, &update, opts.upsert)?;
1629
1630                // Update the document
1631                self.docs.insert(id.clone(), doc.clone());
1632
1633                // Update indexes
1634                self.update_indexes_on_update(&id, &original_doc, &doc)?;
1635
1636                if opts.return_document == ReturnDocument::After {
1637                    Ok(Some((id, doc)))
1638                } else {
1639                    Ok(Some((id, original_doc)))
1640                }
1641            }
1642            Err(DbError::NotFound) if opts.upsert => {
1643                // Upsert: create a new document
1644                let mut new_doc = Map::new();
1645
1646                // Extract fields from query
1647                for cond in &query.conditions {
1648                    if let QueryOperator::Eq(value) = &cond.operator {
1649                        new_doc.insert(cond.field.clone(), value.clone());
1650                    }
1651                }
1652
1653                // Apply update
1654                let mut doc = Value::Object(new_doc);
1655                self.apply_update(&mut doc, &update, true)?;
1656
1657                // Insert the document
1658                let id = self.insert(doc)?;
1659
1660                // Return the new document if requested
1661                if opts.return_document == ReturnDocument::After {
1662                    self.find_one(Query::new().eq("_id", Value::String(id.to_string())), None)
1663                        .map(Some)
1664                } else {
1665                    Ok(None)
1666                }
1667            }
1668            Err(_) => Ok(None),
1669        }
1670    }
1671
1672    // Find and replace a document
1673    pub fn find_one_and_replace(
1674        &mut self,
1675        query: Query,
1676        replacement: Document,
1677        options: Option<FindOneAndReplaceOptions>,
1678    ) -> Result<Option<(DocId, Document)>> {
1679        let opts = options.unwrap_or_default();
1680
1681        // Find the document
1682        let result = self.find_one(query.clone(), None);
1683
1684        match result {
1685            Ok((id, original_doc)) => {
1686                // Ensure replacement has _id
1687                let mut replacement = replacement;
1688                if let Some(obj) = replacement.as_object_mut() {
1689                    obj.insert("_id".to_string(), Value::String(id.to_string()));
1690                } else {
1691                    return Err(DbError::Other("Replacement must be an object".into()));
1692                }
1693
1694                // Update the document
1695                self.docs.insert(id.clone(), replacement.clone());
1696
1697                // Update indexes
1698                self.update_indexes_on_update(&id, &original_doc, &replacement)?;
1699
1700                if opts.return_document == ReturnDocument::After {
1701                    Ok(Some((id, replacement)))
1702                } else {
1703                    Ok(Some((id, original_doc)))
1704                }
1705            }
1706            Err(DbError::NotFound) if opts.upsert => {
1707                // Upsert: create a new document
1708                let id = self.insert(replacement)?;
1709
1710                // Return the new document if requested
1711                if opts.return_document == ReturnDocument::After {
1712                    self.find_one(Query::new().eq("_id", Value::String(id.to_string())), None)
1713                        .map(Some)
1714                } else {
1715                    Ok(None)
1716                }
1717            }
1718            Err(_) => Ok(None),
1719        }
1720    }
1721
1722    // Find and delete a document
1723    pub fn find_one_and_delete(
1724        &mut self,
1725        query: Query,
1726        options: Option<FindOneAndDeleteOptions>,
1727    ) -> Result<Option<(DocId, Document)>> {
1728        let opts = options.unwrap_or_default();
1729
1730        // Find the document
1731        let result = self.find_one(query.clone(), None);
1732
1733        match result {
1734            Ok((id, doc)) => {
1735                // Delete the document
1736                self.docs.remove(&id);
1737
1738                // Update indexes
1739                self.update_indexes_on_delete(&id, &doc)?;
1740
1741                Ok(Some((id, doc)))
1742            }
1743            Err(_) => Ok(None),
1744        }
1745    }
1746
1747    /// Updates the first document matching the query.
1748    ///
1749    /// # Arguments
1750    ///
1751    /// * `query` - The query to filter documents.
1752    /// * `update` - The update operations to apply.
1753    /// * `upsert` - If true, creates a new document if no match is found.
1754    ///
1755    /// # Returns
1756    ///
1757    /// The number of documents updated (either 0 or 1).
1758    pub fn update_one(
1759        &mut self,
1760        query: Query,
1761        update: UpdateDocument,
1762        upsert: bool,
1763    ) -> Result<usize> {
1764        let result = self.find_one(query.clone(), None);
1765
1766        match result {
1767            Ok((id, mut doc)) => {
1768                let original_doc = doc.clone();
1769
1770                // Apply update
1771                self.apply_update(&mut doc, &update, upsert)?;
1772
1773                // Update the document
1774                self.docs.insert(id.clone(), doc.clone());
1775
1776                // Update indexes
1777                self.update_indexes_on_update(&id, &original_doc, &doc)?;
1778
1779                Ok(1)
1780            }
1781            Err(DbError::NotFound) if upsert => {
1782                // Upsert: create a new document
1783                let mut new_doc = Map::new();
1784
1785                // Extract fields from query
1786                for cond in &query.conditions {
1787                    if let QueryOperator::Eq(value) = &cond.operator {
1788                        new_doc.insert(cond.field.clone(), value.clone());
1789                    }
1790                }
1791
1792                // Apply update
1793                let mut doc = Value::Object(new_doc);
1794                self.apply_update(&mut doc, &update, true)?;
1795
1796                // Insert the document
1797                self.insert(doc)?;
1798
1799                Ok(1)
1800            }
1801            Err(_) => Ok(0),
1802        }
1803    }
1804
1805    /// Updates all documents matching the query.
1806    ///
1807    /// # Arguments
1808    ///
1809    /// * `query` - The query to filter documents.
1810    /// * `update` - The update operations to apply.
1811    ///
1812    /// # Returns
1813    ///
1814    /// The number of documents updated.
1815    pub fn update_many(&mut self, query: Query, update: UpdateDocument) -> Result<usize> {
1816        let docs = self.find(query, None)?;
1817        let mut count = 0;
1818
1819        for (id, doc) in docs {
1820            let mut doc = doc;
1821            let original_doc = doc.clone();
1822
1823            // Apply update
1824            self.apply_update(&mut doc, &update, false)?;
1825
1826            // Update the document
1827            self.docs.insert(id.clone(), doc.clone());
1828
1829            // Update indexes
1830            self.update_indexes_on_update(&id, &original_doc, &doc)?;
1831
1832            count += 1;
1833        }
1834
1835        Ok(count)
1836    }
1837
1838    // Replace the first matching document
1839    pub fn replace_one(
1840        &mut self,
1841        query: Query,
1842        replacement: Document,
1843        upsert: bool,
1844    ) -> Result<usize> {
1845        let result = self.find_one(query.clone(), None);
1846
1847        match result {
1848            Ok((id, original_doc)) => {
1849                // Ensure replacement has _id
1850                let mut replacement = replacement;
1851                if let Some(obj) = replacement.as_object_mut() {
1852                    obj.insert("_id".to_string(), Value::String(id.to_string()));
1853                } else {
1854                    return Err(DbError::Other("Replacement must be an object".into()));
1855                }
1856
1857                // Update the document
1858                self.docs.insert(id.clone(), replacement.clone());
1859
1860                // Update indexes
1861                self.update_indexes_on_update(&id, &original_doc, &replacement)?;
1862
1863                Ok(1)
1864            }
1865            Err(DbError::NotFound) if upsert => {
1866                // Upsert: create a new document
1867                self.insert(replacement)?;
1868                Ok(1)
1869            }
1870            Err(_) => Ok(0),
1871        }
1872    }
1873
1874    /// Deletes the first document matching the query.
1875    ///
1876    /// # Arguments
1877    ///
1878    /// * `query` - The query to filter documents.
1879    ///
1880    /// # Returns
1881    ///
1882    /// The number of documents deleted (either 0 or 1).
1883    pub fn delete_one(&mut self, query: Query) -> Result<usize> {
1884        let result = self.find_one(query, None);
1885
1886        match result {
1887            Ok((id, doc)) => {
1888                // Delete the document
1889                self.docs.remove(&id);
1890
1891                // Update indexes
1892                self.update_indexes_on_delete(&id, &doc)?;
1893
1894                Ok(1)
1895            }
1896            Err(_) => Ok(0),
1897        }
1898    }
1899
1900    /// Deletes all documents matching the query.
1901    ///
1902    /// # Arguments
1903    ///
1904    /// * `query` - The query to filter documents.
1905    ///
1906    /// # Returns
1907    ///
1908    /// The number of documents deleted.
1909    pub fn delete_many(&mut self, query: Query) -> Result<usize> {
1910        let docs = self.find(query, None)?;
1911        let mut count = 0;
1912
1913        for (id, doc) in docs {
1914            // Delete the document
1915            self.docs.remove(&id);
1916
1917            // Update indexes
1918            self.update_indexes_on_delete(&id, &doc)?;
1919
1920            count += 1;
1921        }
1922
1923        Ok(count)
1924    }
1925
1926    /// Counts the number of documents matching the query.
1927    ///
1928    /// # Arguments
1929    ///
1930    /// * `query` - The query to filter documents.
1931    ///
1932    /// # Returns
1933    ///
1934    /// The number of matching documents.
1935    pub fn count_documents(&self, query: Query) -> Result<usize> {
1936        // Try to use indexes if possible
1937        let indexed_results = self.try_indexed_query(&query)?;
1938
1939        if !indexed_results.is_empty() {
1940            // Use indexed results
1941            let mut count = 0;
1942            for (_, doc) in indexed_results {
1943                if query.matches(&doc) {
1944                    count += 1;
1945                }
1946            }
1947            Ok(count)
1948        } else {
1949            // Full collection scan
1950            let mut count = 0;
1951            for doc in self.docs.values() {
1952                if query.matches(doc) {
1953                    count += 1;
1954                }
1955            }
1956            Ok(count)
1957        }
1958    }
1959
1960    /// Returns the estimated count of documents in the collection.
1961    ///
1962    /// # Returns
1963    ///
1964    /// The total number of documents in the collection.
1965    pub fn estimated_document_count(&self) -> Result<usize> {
1966        Ok(self.docs.len())
1967    }
1968
1969    // Create an index
1970    /// Creates an index on the collection.
1971    ///
1972    /// # Arguments
1973    ///
1974    /// * `index` - The index definition containing field, type, uniqueness, and other properties.
1975    ///
1976    /// # Errors
1977    ///
1978    /// Returns an error if an index with the same name already exists or if there's a problem creating the index.
1979    pub fn create_index(&mut self, index: Index) -> Result<()> {
1980        let index_name = index.name.clone();
1981
1982        // Check if index already exists
1983        if self.index_definitions.contains_key(&index_name) {
1984            return Err(DbError::IndexError(format!(
1985                "Index {} already exists",
1986                index_name
1987            )));
1988        }
1989
1990        // Create index data
1991        let index_data = IndexData::new(index.unique, index.sparse);
1992
1993        // Add to indexes
1994        self.indexes.insert(index_name.clone(), index_data);
1995
1996        // Add to definitions
1997        self.index_definitions
1998            .insert(index_name.clone(), index.clone());
1999
2000        // Collect document IDs and contents for indexing
2001        let docs_to_index: Vec<(DocId, Document)> = self
2002            .docs
2003            .iter()
2004            .map(|(id, doc)| (id.clone(), doc.clone()))
2005            .collect();
2006
2007        // Build index from existing documents
2008        for (id, doc) in docs_to_index {
2009            self.add_to_indexes(&index_name, &id, &doc)?;
2010        }
2011
2012        Ok(())
2013    }
2014
2015    /// Drops an existing index from the collection.
2016    ///
2017    /// # Arguments
2018    ///
2019    /// * `name` - The name of the index to drop.
2020    pub fn drop_index(&mut self, name: &str) -> Result<()> {
2021        self.indexes.remove(name);
2022        self.index_definitions.remove(name);
2023        Ok(())
2024    }
2025
2026    /// Lists all indexes created on the collection.
2027    ///
2028    /// # Returns
2029    ///
2030    /// A list of index definitions.
2031    pub fn list_indexes(&self) -> Result<Vec<Index>> {
2032        Ok(self.index_definitions.values().cloned().collect())
2033    }
2034
2035    /// Drops all indexes from the collection.
2036    pub fn drop_indexes(&mut self) -> Result<()> {
2037        self.indexes.clear();
2038        self.index_definitions.clear();
2039        Ok(())
2040    }
2041
2042    // Get collection statistics
2043    pub fn stats(&self) -> Result<CollectionStats> {
2044        Ok(CollectionStats {
2045            count: self.docs.len(),
2046            size: self.calculate_size(),
2047            avg_obj_size: if self.docs.is_empty() {
2048                0
2049            } else {
2050                self.calculate_size() / self.docs.len()
2051            },
2052            index_count: self.index_definitions.len(),
2053            index_size: self.calculate_index_size(),
2054        })
2055    }
2056
2057    // Aggregate pipeline
2058    pub fn aggregate(&self, pipeline: Vec<AggregationStage>) -> Result<Vec<Document>> {
2059        let mut results = Vec::new();
2060
2061        // Start with all documents
2062        let mut docs: Vec<Document> = self.docs.values().cloned().collect();
2063
2064        // Apply pipeline stages
2065        for stage in pipeline {
2066            docs = self.apply_aggregation_stage(docs, stage)?;
2067        }
2068
2069        results.extend(docs);
2070        Ok(results)
2071    }
2072
2073    // Distinct values for a field
2074    pub fn distinct(&self, field: &str, query: Option<Query>) -> Result<Vec<Value>> {
2075        let mut values = HashSet::new();
2076
2077        for doc in self.docs.values() {
2078            if let Some(q) = &query {
2079                if !q.matches(doc) {
2080                    continue;
2081                }
2082            }
2083
2084            let field_parts: Vec<&str> = field.split('.').collect();
2085            if let Some(value) = self.get_nested_value(doc, &field_parts) {
2086                values.insert(value.clone());
2087            }
2088        }
2089
2090        Ok(values.into_iter().collect())
2091    }
2092
2093    // Get the number of documents in the collection
2094    pub fn count(&self) -> usize {
2095        self.docs.len()
2096    }
2097
2098    // --- Helper methods ---
2099
2100    fn apply_update(
2101        &self,
2102        doc: &mut Document,
2103        update: &UpdateDocument,
2104        is_upsert: bool,
2105    ) -> Result<()> {
2106        for op in &update.operators {
2107            match op {
2108                UpdateOperator::Set(fields) => {
2109                    if let Some(obj) = doc.as_object_mut() {
2110                        for (key, value) in fields {
2111                            obj.insert(key.clone(), value.clone());
2112                        }
2113                    }
2114                }
2115                UpdateOperator::Unset(fields) => {
2116                    if let Some(obj) = doc.as_object_mut() {
2117                        for key in fields {
2118                            obj.remove(key);
2119                        }
2120                    }
2121                }
2122                UpdateOperator::Inc(fields) => {
2123                    if let Some(obj) = doc.as_object_mut() {
2124                        for (key, value) in fields {
2125                            if let Some(Value::Number(n)) = obj.get(key) {
2126                                if let (Some(current), Some(inc)) = (n.as_f64(), value.as_f64()) {
2127                                    obj.insert(
2128                                        key.clone(),
2129                                        Value::Number(
2130                                            serde_json::Number::from_f64(current + inc).unwrap(),
2131                                        ),
2132                                    );
2133                                }
2134                            } else {
2135                                // Field doesn't exist or isn't a number
2136                                if let Some(inc) = value.as_f64() {
2137                                    obj.insert(
2138                                        key.clone(),
2139                                        Value::Number(serde_json::Number::from_f64(inc).unwrap()),
2140                                    );
2141                                }
2142                            }
2143                        }
2144                    }
2145                }
2146                UpdateOperator::Mul(fields) => {
2147                    if let Some(obj) = doc.as_object_mut() {
2148                        for (key, value) in fields {
2149                            if let Some(Value::Number(n)) = obj.get(key) {
2150                                if let (Some(current), Some(mul)) = (n.as_f64(), value.as_f64()) {
2151                                    obj.insert(
2152                                        key.clone(),
2153                                        Value::Number(
2154                                            serde_json::Number::from_f64(current * mul).unwrap(),
2155                                        ),
2156                                    );
2157                                }
2158                            }
2159                        }
2160                    }
2161                }
2162                UpdateOperator::Rename(fields) => {
2163                    if let Some(obj) = doc.as_object_mut() {
2164                        for (old_key, new_key) in fields {
2165                            if let Some(value) = obj.remove(old_key) {
2166                                obj.insert(new_key.clone(), value);
2167                            }
2168                        }
2169                    }
2170                }
2171                UpdateOperator::SetOnInsert(fields) => {
2172                    if is_upsert {
2173                        if let Some(obj) = doc.as_object_mut() {
2174                            for (key, value) in fields {
2175                                obj.insert(key.clone(), value.clone());
2176                            }
2177                        }
2178                    }
2179                }
2180                UpdateOperator::Min(fields) => {
2181                    if let Some(obj) = doc.as_object_mut() {
2182                        for (key, value) in fields {
2183                            if let Some(current) = obj.get(key) {
2184                                // Compare values (simplified)
2185                                if self.compare_values(value, current) == Ordering::Less {
2186                                    obj.insert(key.clone(), value.clone());
2187                                }
2188                            } else {
2189                                obj.insert(key.clone(), value.clone());
2190                            }
2191                        }
2192                    }
2193                }
2194                UpdateOperator::Max(fields) => {
2195                    if let Some(obj) = doc.as_object_mut() {
2196                        for (key, value) in fields {
2197                            if let Some(current) = obj.get(key) {
2198                                // Compare values (simplified)
2199                                if self.compare_values(value, current) == Ordering::Greater {
2200                                    obj.insert(key.clone(), value.clone());
2201                                }
2202                            } else {
2203                                obj.insert(key.clone(), value.clone());
2204                            }
2205                        }
2206                    }
2207                }
2208                UpdateOperator::CurrentDate(fields) => {
2209                    if let Some(obj) = doc.as_object_mut() {
2210                        for (key, type_spec) in fields {
2211                            let now = chrono::Utc::now();
2212
2213                            match type_spec {
2214                                Value::String(type_str) if type_str == "date" => {
2215                                    obj.insert(key.clone(), Value::String(now.to_rfc3339()));
2216                                }
2217                                Value::String(type_str) if type_str == "timestamp" => {
2218                                    obj.insert(
2219                                        key.clone(),
2220                                        Value::Number(serde_json::Number::from(now.timestamp())),
2221                                    );
2222                                }
2223                                Value::Object(spec) => {
2224                                    if let Some(Value::String(type_str)) = spec.get("$type") {
2225                                        match type_str.as_str() {
2226                                            "date" => {
2227                                                obj.insert(
2228                                                    key.clone(),
2229                                                    Value::String(now.to_rfc3339()),
2230                                                );
2231                                            }
2232                                            "timestamp" => {
2233                                                obj.insert(
2234                                                    key.clone(),
2235                                                    Value::Number(serde_json::Number::from(
2236                                                        now.timestamp(),
2237                                                    )),
2238                                                );
2239                                            }
2240                                            _ => {
2241                                                return Err(DbError::UpdateError(format!(
2242                                                    "Invalid date type specification: {}",
2243                                                    type_str
2244                                                )));
2245                                            }
2246                                        }
2247                                    }
2248                                }
2249                                _ => {
2250                                    return Err(DbError::UpdateError(
2251                                        "Invalid date type specification".into(),
2252                                    ));
2253                                }
2254                            }
2255                        }
2256                    }
2257                }
2258                UpdateOperator::Push(fields) => {
2259                    if let Some(obj) = doc.as_object_mut() {
2260                        for (key, value) in fields {
2261                            if let Some(Value::Array(arr)) = obj.get_mut(key) {
2262                                arr.push(value.clone());
2263                            } else {
2264                                // Field doesn't exist or isn't an array
2265                                obj.insert(key.clone(), Value::Array(vec![value.clone()]));
2266                            }
2267                        }
2268                    }
2269                }
2270                UpdateOperator::PushAll(fields) => {
2271                    if let Some(obj) = doc.as_object_mut() {
2272                        for (key, values) in fields {
2273                            if let Some(Value::Array(arr)) = obj.get_mut(key) {
2274                                arr.extend(values.iter().cloned());
2275                            } else {
2276                                // Field doesn't exist or isn't an array
2277                                obj.insert(key.clone(), Value::Array(values.clone()));
2278                            }
2279                        }
2280                    }
2281                }
2282                UpdateOperator::AddToSet(fields) => {
2283                    if let Some(obj) = doc.as_object_mut() {
2284                        for (key, value) in fields {
2285                            if let Some(Value::Array(arr)) = obj.get_mut(key) {
2286                                if !arr.contains(value) {
2287                                    arr.push(value.clone());
2288                                }
2289                            } else {
2290                                // Field doesn't exist or isn't an array
2291                                obj.insert(key.clone(), Value::Array(vec![value.clone()]));
2292                            }
2293                        }
2294                    }
2295                }
2296                UpdateOperator::Pop(fields) => {
2297                    if let Some(obj) = doc.as_object_mut() {
2298                        for (key, pos) in fields {
2299                            if let Some(Value::Array(arr)) = obj.get_mut(key) {
2300                                if *pos == 1 {
2301                                    arr.pop();
2302                                } else if *pos == -1 {
2303                                    arr.remove(0);
2304                                }
2305                            }
2306                        }
2307                    }
2308                }
2309                UpdateOperator::Pull(fields) => {
2310                    if let Some(obj) = doc.as_object_mut() {
2311                        for (key, condition) in fields {
2312                            if let Some(Value::Array(arr)) = obj.get_mut(key) {
2313                                // Simplified: remove all elements matching the condition
2314                                arr.retain(|elem| {
2315                                    match condition {
2316                                        Value::Object(cond) => {
2317                                            // Create a query from the condition
2318                                            let query = Query::new();
2319                                            // This is a simplified version
2320                                            !query.matches(elem)
2321                                        }
2322                                        _ => elem != condition,
2323                                    }
2324                                });
2325                            }
2326                        }
2327                    }
2328                }
2329                UpdateOperator::PullAll(fields) => {
2330                    if let Some(obj) = doc.as_object_mut() {
2331                        for (key, values) in fields {
2332                            if let Some(Value::Array(arr)) = obj.get_mut(key) {
2333                                arr.retain(|elem| !values.contains(elem));
2334                            }
2335                        }
2336                    }
2337                }
2338                UpdateOperator::Bit(fields) => {
2339                    if let Some(obj) = doc.as_object_mut() {
2340                        for (key, operation) in fields {
2341                            if let Value::Object(op) = operation {
2342                                if let Some(Value::Number(current)) = obj.get(key) {
2343                                    let mut current_num = current.as_i64().unwrap_or(0);
2344
2345                                    if let Some(Value::Number(and)) = op.get("and") {
2346                                        current_num &= and.as_i64().unwrap_or(0);
2347                                    }
2348                                    if let Some(Value::Number(or)) = op.get("or") {
2349                                        current_num |= or.as_i64().unwrap_or(0);
2350                                    }
2351                                    if let Some(Value::Number(xor)) = op.get("xor") {
2352                                        current_num ^= xor.as_i64().unwrap_or(0);
2353                                    }
2354
2355                                    obj.insert(
2356                                        key.clone(),
2357                                        Value::Number(serde_json::Number::from(current_num)),
2358                                    );
2359                                }
2360                            }
2361                        }
2362                    }
2363                }
2364            }
2365        }
2366
2367        Ok(())
2368    }
2369
2370    fn compare_values(&self, a: &Value, b: &Value) -> Ordering {
2371        match (a, b) {
2372            (Value::Number(a_num), Value::Number(b_num)) => a_num
2373                .as_f64()
2374                .unwrap_or(0.0)
2375                .partial_cmp(&b_num.as_f64().unwrap_or(0.0))
2376                .unwrap_or(Ordering::Equal),
2377            (Value::String(a_str), Value::String(b_str)) => a_str.cmp(b_str),
2378            (Value::Bool(a_bool), Value::Bool(b_bool)) => a_bool.cmp(b_bool),
2379            (Value::Array(a_arr), Value::Array(b_arr)) => a_arr.len().cmp(&b_arr.len()),
2380            (Value::Object(_), Value::Object(_)) => {
2381                Ordering::Equal // Simplified
2382            }
2383            _ => Ordering::Equal,
2384        }
2385    }
2386
2387    fn update_indexes_on_insert(&mut self, id: &DocId, doc: &Document) -> Result<()> {
2388        for (name, index_def) in self.index_definitions.iter() {
2389            // Pre-calculate all field values before mutable borrow
2390            let mut field_values = Vec::new();
2391            for (field, _) in &index_def.key {
2392                let field_parts: Vec<&str> = field.split('.').collect();
2393                let value = self.get_nested_value(doc, &field_parts);
2394                field_values.push((field, value));
2395            }
2396
2397            if let Some(index_data) = self.indexes.get_mut(name) {
2398                for (field, value) in field_values {
2399                    if let Some(value) = value {
2400                        index_data.insert(value.clone(), id.clone())?;
2401                    } else if index_data.sparse {
2402                        // Skip missing fields for sparse indexes
2403                        continue;
2404                    } else {
2405                        return Err(DbError::IndexError(format!(
2406                            "Field {} not found in document for non-sparse index",
2407                            field
2408                        )));
2409                    }
2410                }
2411            }
2412        }
2413
2414        Ok(())
2415    }
2416
2417    fn update_indexes_on_update(
2418        &mut self,
2419        id: &DocId,
2420        old_doc: &Document,
2421        new_doc: &Document,
2422    ) -> Result<()> {
2423        // Remove from indexes
2424        self.update_indexes_on_delete(id, old_doc)?;
2425
2426        // Add to indexes
2427        self.update_indexes_on_insert(id, new_doc)?;
2428
2429        Ok(())
2430    }
2431    // Fix update_indexes_on_delete method
2432    fn update_indexes_on_delete(&mut self, id: &DocId, doc: &Document) -> Result<()> {
2433        let index_names: Vec<String> = self.index_definitions.keys().cloned().collect();
2434
2435        for name in index_names {
2436            if let Some(index_def) = self.index_definitions.get(&name) {
2437                // Extract all field values first
2438                let mut field_values = Vec::new();
2439                for (field, _) in &index_def.key {
2440                    let field_parts: Vec<&str> = field.split('.').collect();
2441                    let value = self.get_nested_value(doc, &field_parts);
2442                    field_values.push(value.cloned());
2443                }
2444
2445                // Now get mutable borrow and process
2446                if let Some(index_data) = self.indexes.get_mut(&name) {
2447                    for (i, (_, _)) in index_def.key.iter().enumerate() {
2448                        if let Some(val) = &field_values[i] {
2449                            index_data.remove(val, id)?;
2450                        }
2451                    }
2452                }
2453            }
2454        }
2455
2456        Ok(())
2457    }
2458
2459    // Fix add_to_indexes method
2460    fn add_to_indexes(&mut self, index_name: &str, id: &DocId, doc: &Document) -> Result<()> {
2461        if let Some(index_def) = self.index_definitions.get(index_name).cloned() {
2462            // Extract all field values first
2463            let mut field_values = Vec::new();
2464            let field_names: Vec<String> = index_def
2465                .key
2466                .iter()
2467                .map(|(field, _)| field.clone())
2468                .collect();
2469
2470            for field in &field_names {
2471                let field_parts: Vec<&str> = field.split('.').collect();
2472                let value = self.get_nested_value(doc, &field_parts);
2473                field_values.push(value.cloned());
2474            }
2475
2476            // Now get mutable borrow and process
2477            if let Some(index_data) = self.indexes.get_mut(index_name) {
2478                for (i, field) in field_names.iter().enumerate() {
2479                    if let Some(val) = &field_values[i] {
2480                        index_data.insert(val.clone(), id.clone())?;
2481                    } else if index_data.sparse {
2482                        // Skip missing fields for sparse indexes
2483                        continue;
2484                    } else {
2485                        return Err(DbError::IndexError(format!(
2486                            "Field {} not found in document for non-sparse index",
2487                            field
2488                        )));
2489                    }
2490                }
2491            }
2492        }
2493
2494        Ok(())
2495    }
2496
2497    fn try_indexed_query(&self, query: &Query) -> Result<Vec<(DocId, Document)>> {
2498        // Try to find an index that can be used for the query
2499        for (index_name, index_def) in self.index_definitions.iter() {
2500            // Check if the first field in the index is used in the query
2501            if let Some((field, _)) = index_def.key.first() {
2502                for cond in &query.conditions {
2503                    if cond.field == *field {
2504                        // We can use this index
2505                        let mut results = Vec::new();
2506
2507                        if let Some(index_data) = self.indexes.get(index_name) {
2508                            match &cond.operator {
2509                                QueryOperator::Eq(value) => {
2510                                    let entries = index_data.find(value);
2511                                    for entry in entries {
2512                                        if let Some(doc) = self.docs.get(&entry.doc_id) {
2513                                            results.push((entry.doc_id.clone(), doc.clone()));
2514                                        }
2515                                    }
2516                                }
2517                                QueryOperator::Gt(value) => {
2518                                    // Find all entries greater than the value
2519                                    let ord_value = OrdValue(value.clone());
2520                                    for (key, entries) in
2521                                        index_data.entries.range((Excluded(&ord_value), Unbounded))
2522                                    {
2523                                        for entry in entries {
2524                                            if let Some(doc) = self.docs.get(&entry.doc_id) {
2525                                                results.push((entry.doc_id.clone(), doc.clone()));
2526                                            }
2527                                        }
2528                                    }
2529                                }
2530                                QueryOperator::Gte(value) => {
2531                                    // Find all entries greater than or equal to the value
2532                                    let ord_value = OrdValue(value.clone());
2533                                    for (key, entries) in
2534                                        index_data.entries.range((Included(&ord_value), Unbounded))
2535                                    {
2536                                        for entry in entries {
2537                                            if let Some(doc) = self.docs.get(&entry.doc_id) {
2538                                                results.push((entry.doc_id.clone(), doc.clone()));
2539                                            }
2540                                        }
2541                                    }
2542                                }
2543                                QueryOperator::Lt(value) => {
2544                                    // Find all entries less than the value
2545                                    let ord_value = OrdValue(value.clone());
2546                                    for (key, entries) in
2547                                        index_data.entries.range((Unbounded, Excluded(&ord_value)))
2548                                    {
2549                                        for entry in entries {
2550                                            if let Some(doc) = self.docs.get(&entry.doc_id) {
2551                                                results.push((entry.doc_id.clone(), doc.clone()));
2552                                            }
2553                                        }
2554                                    }
2555                                }
2556                                QueryOperator::Lte(value) => {
2557                                    // Find all entries less than or equal to the value
2558                                    let ord_value = OrdValue(value.clone());
2559                                    for (key, entries) in
2560                                        index_data.entries.range((Unbounded, Included(&ord_value)))
2561                                    {
2562                                        for entry in entries {
2563                                            if let Some(doc) = self.docs.get(&entry.doc_id) {
2564                                                results.push((entry.doc_id.clone(), doc.clone()));
2565                                            }
2566                                        }
2567                                    }
2568                                }
2569                                _ => {
2570                                    // Can't use index for this operator
2571                                    continue;
2572                                }
2573                            }
2574                        }
2575
2576                        return Ok(results);
2577                    }
2578                }
2579            }
2580        }
2581
2582        // No suitable index found
2583        Ok(Vec::new())
2584    }
2585
2586    fn sort_results(
2587        &self,
2588        results: &mut Vec<(DocId, Document)>,
2589        sort_spec: &[(String, SortOrder)],
2590    ) {
2591        results.sort_by(|a, b| {
2592            for (field, order) in sort_spec {
2593                let field_parts: Vec<&str> = field.split('.').collect();
2594                let a_val = self.get_nested_value(&a.1, &field_parts);
2595                let b_val = self.get_nested_value(&b.1, &field_parts);
2596
2597                let cmp = match (a_val, b_val) {
2598                    (Some(Value::Number(a_num)), Some(Value::Number(b_num))) => a_num
2599                        .as_f64()
2600                        .unwrap_or(0.0)
2601                        .partial_cmp(&b_num.as_f64().unwrap_or(0.0))
2602                        .unwrap_or(Ordering::Equal),
2603                    (Some(Value::String(a_str)), Some(Value::String(b_str))) => a_str.cmp(b_str),
2604                    (Some(Value::Bool(a_bool)), Some(Value::Bool(b_bool))) => a_bool.cmp(b_bool),
2605                    (Some(Value::Array(a_arr)), Some(Value::Array(b_arr))) => {
2606                        a_arr.len().cmp(&b_arr.len())
2607                    }
2608                    (Some(_), None) => Ordering::Greater,
2609                    (None, Some(_)) => Ordering::Less,
2610                    (None, None) => Ordering::Equal,
2611                    _ => Ordering::Equal,
2612                };
2613
2614                if cmp != Ordering::Equal {
2615                    return match order {
2616                        SortOrder::Ascending => cmp,
2617                        SortOrder::Descending => cmp.reverse(),
2618                    };
2619                }
2620            }
2621
2622            Ordering::Equal
2623        });
2624    }
2625
2626    fn apply_projection(
2627        &self,
2628        results: &mut Vec<(DocId, Document)>,
2629        projection: &Map<String, Value>,
2630    ) {
2631        let is_inclusive = projection.values().any(|v| match v {
2632            Value::Bool(b) => *b,
2633            Value::Number(n) => n.as_i64().unwrap_or(0) == 1,
2634            _ => false,
2635        });
2636
2637        for (_, doc) in results.iter_mut() {
2638            if let Some(obj) = doc.as_object_mut() {
2639                let mut new_obj = Map::new();
2640
2641                if is_inclusive {
2642                    // Include only specified fields
2643                    for (key, value) in projection {
2644                        if obj.contains_key(key) {
2645                            new_obj.insert(key.clone(), obj.get(key).unwrap().clone());
2646                        }
2647                    }
2648
2649                    // Always include _id
2650                    if obj.contains_key("_id") && !projection.contains_key("_id") {
2651                        new_obj.insert("_id".to_string(), obj.get("_id").unwrap().clone());
2652                    }
2653                } else {
2654                    // Exclude specified fields
2655                    for (key, value) in obj {
2656                        if !projection.contains_key(key) || key == "_id" {
2657                            new_obj.insert(key.clone(), value.clone());
2658                        }
2659                    }
2660                }
2661
2662                *doc = Value::Object(new_obj);
2663            }
2664        }
2665    }
2666
2667    fn get_nested_value<'a>(&self, doc: &'a Document, path: &[&str]) -> Option<&'a Value> {
2668        if path.is_empty() {
2669            return Some(doc);
2670        }
2671
2672        let current = path[0];
2673        let rest = &path[1..];
2674
2675        match doc.get(current) {
2676            Some(value) => {
2677                if rest.is_empty() {
2678                    Some(value)
2679                } else {
2680                    match value {
2681                        Value::Object(obj) => self.get_nested_value(value, rest),
2682                        _ => None,
2683                    }
2684                }
2685            }
2686            None => None,
2687        }
2688    }
2689
2690    fn calculate_size(&self) -> usize {
2691        // Simplified size calculation
2692        self.docs
2693            .values()
2694            .map(|doc| serde_json::to_string(doc).unwrap_or_default().len())
2695            .sum()
2696    }
2697
2698    fn calculate_index_size(&self) -> usize {
2699        // Simplified index size calculation
2700        self.indexes
2701            .values()
2702            .map(|index_data| {
2703                index_data.entries.len() * 16 // Approximate size per entry
2704            })
2705            .sum()
2706    }
2707
2708    fn apply_aggregation_stage(
2709        &self,
2710        docs: Vec<Document>,
2711        stage: AggregationStage,
2712    ) -> Result<Vec<Document>> {
2713        match stage {
2714            AggregationStage::Match(query) => {
2715                let mut result = Vec::new();
2716                for doc in docs {
2717                    if query.matches(&doc) {
2718                        result.push(doc);
2719                    }
2720                }
2721                Ok(result)
2722            }
2723            AggregationStage::Project(projection) => {
2724                let mut result = Vec::new();
2725                for doc in docs {
2726                    if let Some(obj) = doc.as_object() {
2727                        let mut new_obj = Map::new();
2728
2729                        for (key, value) in &projection {
2730                            if value.as_object().and_then(|o| o.get("$")).is_some() {
2731                                // Field projection with expression
2732                                if let Some(expr) = value.as_object().and_then(|o| o.get("$")) {
2733                                    // Simplified expression evaluation
2734                                    if let Some(field_name) = expr.as_str() {
2735                                        if let Some(field_value) =
2736                                            obj.get(field_name.trim_start_matches('$'))
2737                                        {
2738                                            new_obj.insert(key.clone(), field_value.clone());
2739                                        }
2740                                    }
2741                                }
2742                            } else if value.as_object().and_then(|o| o.get("$concat")).is_some() {
2743                                // String concatenation
2744                                if let Some(Value::Array(fields)) =
2745                                    value.as_object().and_then(|o| o.get("$concat"))
2746                                {
2747                                    let mut result_str = String::new();
2748                                    for field in fields {
2749                                        if let Some(field_name) = field.as_str() {
2750                                            if let Some(field_value) =
2751                                                obj.get(field_name.trim_start_matches('$'))
2752                                            {
2753                                                if let Some(s) = field_value.as_str() {
2754                                                    result_str.push_str(s);
2755                                                }
2756                                            }
2757                                        }
2758                                    }
2759                                    new_obj.insert(key.clone(), Value::String(result_str));
2760                                }
2761                            } else {
2762                                // Simple field inclusion/exclusion
2763                                let include = match value {
2764                                    Value::Bool(b) => b,
2765                                    Value::Number(n) => &(n.as_i64().unwrap_or(0) == 1),
2766                                    _ => &false,
2767                                };
2768
2769                                if *include && obj.contains_key(key.as_str()) {
2770                                    new_obj.insert(
2771                                        key.clone(),
2772                                        obj.get(key.as_str()).unwrap().clone(),
2773                                    );
2774                                } else if !include && !new_obj.contains_key(key.as_str()) {
2775                                    new_obj.insert(
2776                                        key.clone(),
2777                                        obj.get(key.as_str()).unwrap().clone(),
2778                                    );
2779                                }
2780                            }
2781                        }
2782
2783                        // Always include _id if not explicitly excluded
2784                        if obj.contains_key("_id") && !projection.contains_key("_id") {
2785                            new_obj.insert("_id".to_string(), obj.get("_id").unwrap().clone());
2786                        }
2787
2788                        result.push(Value::Object(new_obj));
2789                    }
2790                }
2791                Ok(result)
2792            }
2793            AggregationStage::Sort(sort_spec) => {
2794                let mut result = docs;
2795
2796                result.sort_by(|a, b| {
2797                    for (field, order) in &sort_spec {
2798                        let field_parts: Vec<&str> = field.split('.').collect();
2799                        let a_val = self.get_nested_value(a, &field_parts);
2800                        let b_val = self.get_nested_value(b, &field_parts);
2801
2802                        let cmp = match (a_val, b_val) {
2803                            (Some(Value::Number(a_num)), Some(Value::Number(b_num))) => a_num
2804                                .as_f64()
2805                                .unwrap_or(0.0)
2806                                .partial_cmp(&b_num.as_f64().unwrap_or(0.0))
2807                                .unwrap_or(Ordering::Equal),
2808                            (Some(Value::String(a_str)), Some(Value::String(b_str))) => {
2809                                a_str.cmp(b_str)
2810                            }
2811                            (Some(Value::Bool(a_bool)), Some(Value::Bool(b_bool))) => {
2812                                a_bool.cmp(b_bool)
2813                            }
2814                            (Some(Value::Array(a_arr)), Some(Value::Array(b_arr))) => {
2815                                a_arr.len().cmp(&b_arr.len())
2816                            }
2817                            (Some(_), None) => Ordering::Greater,
2818                            (None, Some(_)) => Ordering::Less,
2819                            (None, None) => Ordering::Equal,
2820                            _ => Ordering::Equal,
2821                        };
2822
2823                        if cmp != Ordering::Equal {
2824                            return match order {
2825                                SortOrder::Ascending => cmp,
2826                                SortOrder::Descending => cmp.reverse(),
2827                            };
2828                        }
2829                    }
2830
2831                    Ordering::Equal
2832                });
2833
2834                Ok(result)
2835            }
2836            AggregationStage::Skip(n) => {
2837                let mut result = docs;
2838                if n < result.len() {
2839                    result.drain(0..n);
2840                } else {
2841                    result.clear();
2842                }
2843                Ok(result)
2844            }
2845            AggregationStage::Limit(n) => {
2846                let mut result = docs;
2847                result.truncate(n);
2848                Ok(result)
2849            }
2850            AggregationStage::Group(group_spec) => {
2851                let mut groups: HashMap<String, Vec<Document>> = HashMap::new();
2852
2853                // Group documents
2854                for doc in docs {
2855                    let group_key = self.calculate_group_key(&doc, &group_spec.id)?;
2856                    groups.entry(group_key).or_insert_with(Vec::new).push(doc);
2857                }
2858
2859                // Apply group operations
2860                let mut result = Vec::new();
2861                for (key, group_docs) in groups {
2862                    let mut group_obj = Map::new();
2863
2864                    // Add _id field
2865                    group_obj.insert("_id".to_string(), Value::String(key));
2866
2867                    // Apply group operations
2868                    for (field, op) in &group_spec.operations {
2869                        match op {
2870                            GroupOperation::Sum(expr) => {
2871                                let sum = self.calculate_sum(&group_docs, expr)?;
2872                                group_obj.insert(field.clone(), sum);
2873                            }
2874                            GroupOperation::Avg(expr) => {
2875                                let avg = self.calculate_avg(&group_docs, expr)?;
2876                                group_obj.insert(field.clone(), avg);
2877                            }
2878                            GroupOperation::Min(expr) => {
2879                                let min = self.calculate_min(&group_docs, expr)?;
2880                                group_obj.insert(field.clone(), min);
2881                            }
2882                            GroupOperation::Max(expr) => {
2883                                let max = self.calculate_max(&group_docs, expr)?;
2884                                group_obj.insert(field.clone(), max);
2885                            }
2886                            GroupOperation::First(expr) => {
2887                                if let Some(doc) = group_docs.first() {
2888                                    if let Some(value) = self.evaluate_expression(doc, expr) {
2889                                        group_obj.insert(field.clone(), value);
2890                                    }
2891                                }
2892                            }
2893                            GroupOperation::Last(expr) => {
2894                                if let Some(doc) = group_docs.last() {
2895                                    if let Some(value) = self.evaluate_expression(doc, expr) {
2896                                        group_obj.insert(field.clone(), value);
2897                                    }
2898                                }
2899                            }
2900                            GroupOperation::Push(expr) => {
2901                                let mut values = Vec::new();
2902                                for doc in &group_docs {
2903                                    if let Some(value) = self.evaluate_expression(doc, expr) {
2904                                        values.push(value);
2905                                    }
2906                                }
2907                                group_obj.insert(field.clone(), Value::Array(values));
2908                            }
2909                            GroupOperation::AddToSet(expr) => {
2910                                let mut values = HashSet::new();
2911                                for doc in &group_docs {
2912                                    if let Some(value) = self.evaluate_expression(doc, expr) {
2913                                        values.insert(value);
2914                                    }
2915                                }
2916                                group_obj.insert(
2917                                    field.clone(),
2918                                    Value::Array(values.into_iter().collect()),
2919                                );
2920                            }
2921                            GroupOperation::StdDevPop(expr) => {
2922                                let std_dev = self.calculate_std_dev_pop(&group_docs, expr)?;
2923                                group_obj.insert(field.clone(), std_dev);
2924                            }
2925                            GroupOperation::StdDevSamp(expr) => {
2926                                let std_dev = self.calculate_std_dev_samp(&group_docs, expr)?;
2927                                group_obj.insert(field.clone(), std_dev);
2928                            }
2929                        }
2930                    }
2931
2932                    result.push(Value::Object(group_obj));
2933                }
2934
2935                Ok(result)
2936            }
2937            AggregationStage::Unwind(field) => {
2938                let mut result = Vec::new();
2939
2940                for doc in docs {
2941                    if let Some(obj) = doc.as_object() {
2942                        if let Some(value) = obj.get(&field) {
2943                            if let Value::Array(arr) = value {
2944                                for item in arr {
2945                                    let mut new_obj = obj.clone();
2946                                    new_obj.insert(field.clone(), item.clone());
2947                                    result.push(Value::Object(new_obj));
2948                                }
2949                            } else {
2950                                // Not an array, just include the document as is
2951                                result.push(doc.clone());
2952                            }
2953                        } else {
2954                            // Field doesn't exist, include the document as is
2955                            result.push(doc.clone());
2956                        }
2957                    }
2958                }
2959
2960                Ok(result)
2961            }
2962            AggregationStage::Lookup(lookup_spec) => {
2963                // Simplified implementation of $lookup
2964                let mut result = Vec::new();
2965
2966                for doc in docs {
2967                    if let Some(obj) = doc.as_object() {
2968                        let mut new_obj = obj.clone();
2969
2970                        // Get the local field value
2971                        let local_field_value = obj.get(&lookup_spec.local_field);
2972
2973                        if let Some(local_value) = local_field_value {
2974                            // Find matching documents in the foreign collection
2975                            let query =
2976                                Query::new().eq(&lookup_spec.foreign_field, local_value.clone());
2977
2978                            // This is a simplified implementation - in a real DB, we would access the foreign collection
2979                            new_obj.insert(lookup_spec.as_field.clone(), Value::Array(Vec::new()));
2980                        } else {
2981                            // Local field doesn't exist
2982                            new_obj.insert(lookup_spec.as_field.clone(), Value::Array(Vec::new()));
2983                        }
2984
2985                        result.push(Value::Object(new_obj));
2986                    }
2987                }
2988
2989                Ok(result)
2990            }
2991            AggregationStage::Out(collection_name) => {
2992                // Simplified implementation of $out
2993                // In a real DB, this would write to another collection
2994                Ok(docs)
2995            }
2996        }
2997    }
2998
2999    fn calculate_group_key(&self, doc: &Document, group_id: &GroupId) -> Result<String> {
3000        match group_id {
3001            GroupId::Field(field) => {
3002                let field_parts: Vec<&str> = field.split('.').collect();
3003                if let Some(value) = self.get_nested_value(doc, &field_parts) {
3004                    Ok(value.to_string())
3005                } else {
3006                    Ok("null".to_string())
3007                }
3008            }
3009            GroupId::Expression(expr) => {
3010                if let Some(value) = self.evaluate_expression(doc, expr) {
3011                    Ok(value.to_string())
3012                } else {
3013                    Ok("null".to_string())
3014                }
3015            }
3016            GroupId::Null => Ok("null".to_string()),
3017        }
3018    }
3019
3020    fn evaluate_expression(&self, doc: &Document, expr: &Value) -> Option<Value> {
3021        match expr {
3022            Value::String(s) if s.starts_with('$') => {
3023                let field = &s[1..];
3024                let field_parts: Vec<&str> = field.split('.').collect();
3025                self.get_nested_value(doc, &field_parts).cloned()
3026            }
3027            Value::Object(obj) => {
3028                // Handle expression operators
3029                if let Some(sum_expr) = obj.get("$sum") {
3030                    if let Some(field) = sum_expr.as_str() {
3031                        if field.starts_with('$') {
3032                            let field_name = &field[1..];
3033                            let field_parts: Vec<&str> = field_name.split('.').collect();
3034                            return self.get_nested_value(doc, &field_parts).cloned();
3035                        }
3036                    }
3037                }
3038                None
3039            }
3040            _ => Some(expr.clone()),
3041        }
3042    }
3043
3044    fn calculate_sum(&self, docs: &[Document], expr: &Value) -> Result<Value> {
3045        let mut sum = 0.0;
3046
3047        for doc in docs {
3048            if let Some(value) = self.evaluate_expression(doc, expr) {
3049                if let Some(num) = value.as_f64() {
3050                    sum += num;
3051                }
3052            }
3053        }
3054
3055        Ok(Value::Number(serde_json::Number::from_f64(sum).unwrap()))
3056    }
3057
3058    fn calculate_avg(&self, docs: &[Document], expr: &Value) -> Result<Value> {
3059        if docs.is_empty() {
3060            return Ok(Value::Number(serde_json::Number::from(0)));
3061        }
3062
3063        let sum = self.calculate_sum(docs, expr)?;
3064        if let Some(sum_num) = sum.as_f64() {
3065            let avg = sum_num / docs.len() as f64;
3066            Ok(Value::Number(serde_json::Number::from_f64(avg).unwrap()))
3067        } else {
3068            Ok(Value::Number(serde_json::Number::from(0)))
3069        }
3070    }
3071
3072    fn calculate_min(&self, docs: &[Document], expr: &Value) -> Result<Value> {
3073        let mut min_value: Option<Value> = None;
3074
3075        for doc in docs {
3076            if let Some(value) = self.evaluate_expression(doc, expr) {
3077                if let Some(ref mut min) = min_value {
3078                    if self.compare_values(&value, min) == Ordering::Less {
3079                        *min = value;
3080                    }
3081                } else {
3082                    min_value = Some(value);
3083                }
3084            }
3085        }
3086
3087        Ok(min_value.unwrap_or(Value::Null))
3088    }
3089
3090    fn calculate_max(&self, docs: &[Document], expr: &Value) -> Result<Value> {
3091        let mut max_value: Option<Value> = None;
3092
3093        for doc in docs {
3094            if let Some(value) = self.evaluate_expression(doc, expr) {
3095                if let Some(ref mut max) = max_value {
3096                    if self.compare_values(&value, max) == Ordering::Greater {
3097                        *max = value;
3098                    }
3099                } else {
3100                    max_value = Some(value);
3101                }
3102            }
3103        }
3104
3105        Ok(max_value.unwrap_or(Value::Null))
3106    }
3107
3108    fn calculate_std_dev_pop(&self, docs: &[Document], expr: &Value) -> Result<Value> {
3109        if docs.is_empty() {
3110            return Ok(Value::Number(serde_json::Number::from(0)));
3111        }
3112
3113        let avg = self.calculate_avg(docs, expr)?;
3114        let avg_num = avg.as_f64().unwrap_or(0.0);
3115
3116        let mut sum_sq_diff = 0.0;
3117        for doc in docs {
3118            if let Some(value) = self.evaluate_expression(doc, expr) {
3119                if let Some(num) = value.as_f64() {
3120                    let diff = num - avg_num;
3121                    sum_sq_diff += diff * diff;
3122                }
3123            }
3124        }
3125
3126        let variance = sum_sq_diff / docs.len() as f64;
3127        let std_dev = variance.sqrt();
3128
3129        Ok(Value::Number(
3130            serde_json::Number::from_f64(std_dev).unwrap(),
3131        ))
3132    }
3133
3134    fn calculate_std_dev_samp(&self, docs: &[Document], expr: &Value) -> Result<Value> {
3135        if docs.len() <= 1 {
3136            return Ok(Value::Number(serde_json::Number::from(0)));
3137        }
3138
3139        let avg = self.calculate_avg(docs, expr)?;
3140        let avg_num = avg.as_f64().unwrap_or(0.0);
3141
3142        let mut sum_sq_diff = 0.0;
3143        for doc in docs {
3144            if let Some(value) = self.evaluate_expression(doc, expr) {
3145                if let Some(num) = value.as_f64() {
3146                    let diff = num - avg_num;
3147                    sum_sq_diff += diff * diff;
3148                }
3149            }
3150        }
3151
3152        let variance = sum_sq_diff / (docs.len() - 1) as f64;
3153        let std_dev = variance.sqrt();
3154
3155        Ok(Value::Number(
3156            serde_json::Number::from_f64(std_dev).unwrap(),
3157        ))
3158    }
3159}
3160
3161// --- Aggregation Stages ---
3162#[derive(Debug, Clone, Serialize, Deserialize)]
3163pub enum AggregationStage {
3164    Match(Query),
3165    Project(Map<String, Value>),
3166    Sort(Vec<(String, SortOrder)>),
3167    Skip(usize),
3168    Limit(usize),
3169    Group(GroupSpecification),
3170    Unwind(String),
3171    Lookup(LookupSpecification),
3172    Out(String),
3173}
3174
3175#[derive(Debug, Clone, Serialize, Deserialize)]
3176pub struct GroupSpecification {
3177    pub id: GroupId,
3178    pub operations: HashMap<String, GroupOperation>,
3179}
3180
3181#[derive(Debug, Clone, Serialize, Deserialize)]
3182pub enum GroupId {
3183    Field(String),
3184    Expression(Value),
3185    Null,
3186}
3187
3188#[derive(Debug, Clone, Serialize, Deserialize)]
3189pub enum GroupOperation {
3190    Sum(Value),
3191    Avg(Value),
3192    Min(Value),
3193    Max(Value),
3194    First(Value),
3195    Last(Value),
3196    Push(Value),
3197    AddToSet(Value),
3198    StdDevPop(Value),
3199    StdDevSamp(Value),
3200}
3201
3202#[derive(Debug, Clone, Serialize, Deserialize)]
3203pub struct LookupSpecification {
3204    pub from: String,
3205    pub local_field: String,
3206    pub foreign_field: String,
3207    pub as_field: String, // Renamed from 'as' to avoid using a reserved keyword
3208}
3209
3210// --- Find and Modify Options ---
3211#[derive(Debug, Clone, Serialize, Deserialize)]
3212pub struct FindOneAndUpdateOptions {
3213    pub upsert: bool,
3214    pub return_document: ReturnDocument,
3215    pub projection: Option<Map<String, Value>>,
3216    pub sort: Option<Vec<(String, SortOrder)>>,
3217    pub max_time_ms: Option<u64>,
3218}
3219
3220impl Default for FindOneAndUpdateOptions {
3221    fn default() -> Self {
3222        Self {
3223            upsert: false,
3224            return_document: ReturnDocument::Before,
3225            projection: None,
3226            sort: None,
3227            max_time_ms: None,
3228        }
3229    }
3230}
3231
3232#[derive(Debug, Clone, Serialize, Deserialize)]
3233pub struct FindOneAndReplaceOptions {
3234    pub upsert: bool,
3235    pub return_document: ReturnDocument,
3236    pub projection: Option<Map<String, Value>>,
3237    pub sort: Option<Vec<(String, SortOrder)>>,
3238    pub max_time_ms: Option<u64>,
3239}
3240
3241impl Default for FindOneAndReplaceOptions {
3242    fn default() -> Self {
3243        Self {
3244            upsert: false,
3245            return_document: ReturnDocument::Before,
3246            projection: None,
3247            sort: None,
3248            max_time_ms: None,
3249        }
3250    }
3251}
3252
3253#[derive(Debug, Clone, Serialize, Deserialize)]
3254pub struct FindOneAndDeleteOptions {
3255    pub projection: Option<Map<String, Value>>,
3256    pub sort: Option<Vec<(String, SortOrder)>>,
3257    pub max_time_ms: Option<u64>,
3258}
3259
3260impl Default for FindOneAndDeleteOptions {
3261    fn default() -> Self {
3262        Self {
3263            projection: None,
3264            sort: None,
3265            max_time_ms: None,
3266        }
3267    }
3268}
3269
3270#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
3271pub enum ReturnDocument {
3272    Before,
3273    After,
3274}
3275
3276// --- Collection Statistics ---
3277#[derive(Debug, Clone, Serialize, Deserialize)]
3278pub struct CollectionStats {
3279    pub count: usize,
3280    pub size: usize,
3281    pub avg_obj_size: usize,
3282    pub index_count: usize,
3283    pub index_size: usize,
3284}
3285
3286// --- Database ---
3287#[derive(Debug, Clone, Serialize, Deserialize)]
3288/// A database containing collections of documents.
3289///
3290/// A database is a container for collections, which in turn contain documents.
3291/// It provides methods for creating, accessing, and managing collections.
3292///
3293/// # Example
3294///
3295/// ```no_run
3296/// use luckdb::Client;
3297///
3298/// let mut client = Client::new("mongodb://localhost");
3299/// let db = client.db("mydatabase");
3300///
3301/// // Access or create a collection
3302/// let coll = db.collection("mycollection");
3303/// ```
3304pub struct Database {
3305    /// The name of the database.
3306    name: String,
3307    /// The collections contained in the database, indexed by name.
3308    collections: HashMap<String, Collection>,
3309}
3310
3311impl Database {
3312    /// Creates a new empty database with the given name.
3313    ///
3314    /// # Arguments
3315    ///
3316    /// * `name` - The name of the new database.
3317    pub fn new(name: String) -> Self {
3318        Self {
3319            name,
3320            collections: HashMap::new(),
3321        }
3322    }
3323
3324    /// Gets a collection by name, creating it if it doesn't exist.
3325    ///
3326    /// # Arguments
3327    ///
3328    /// * `name` - The name of the collection to access.
3329    ///
3330    /// # Returns
3331    ///
3332    /// A mutable reference to the collection.
3333    pub fn collection(&mut self, name: &str) -> &mut Collection {
3334        self.collections
3335            .entry(name.to_string())
3336            .or_insert_with(|| Collection::new(name.to_string()))
3337    }
3338
3339    /// Lists the names of all collections in the database.
3340    ///
3341    /// # Returns
3342    ///
3343    /// A list of collection names.
3344    pub fn list_collection_names(&self) -> Vec<String> {
3345        self.collections.keys().cloned().collect()
3346    }
3347
3348    /// Creates a new collection with the specified options.
3349    ///
3350    /// # Arguments
3351    ///
3352    /// * `name` - The name of the new collection.
3353    /// * `options` - Optional parameters for creating the collection, including indexes.
3354    ///
3355    /// # Errors
3356    ///
3357    /// Returns an error if a collection with the same name already exists.
3358    pub fn create_collection(
3359        &mut self,
3360        name: &str,
3361        options: Option<CreateCollectionOptions>,
3362    ) -> Result<()> {
3363        if self.collections.contains_key(name) {
3364            return Err(DbError::Other(format!(
3365                "Collection {} already exists",
3366                name
3367            )));
3368        }
3369
3370        let mut collection = Collection::new(name.to_string());
3371
3372        // Apply options if provided
3373        if let Some(opts) = options {
3374            // Create indexes if specified
3375            for index in opts.indexes {
3376                collection.create_index(index)?;
3377            }
3378        }
3379
3380        self.collections.insert(name.to_string(), collection);
3381        Ok(())
3382    }
3383
3384    /// Drops a collection from the database.
3385    ///
3386    /// # Arguments
3387    ///
3388    /// * `name` - The name of the collection to drop.
3389    ///
3390    /// # Errors
3391    ///
3392    /// Returns an error if the collection does not exist.
3393    pub fn drop_collection(&mut self, name: &str) -> Result<()> {
3394        if self.collections.remove(name).is_none() {
3395            return Err(DbError::Other(format!(
3396                "Collection {} does not exist",
3397                name
3398            )));
3399        }
3400
3401        Ok(())
3402    }
3403
3404    /// Gets statistics about the database.
3405    ///
3406    /// # Returns
3407    ///
3408    /// Database statistics including collection counts and sizes.
3409    pub fn stats(&self) -> Result<DatabaseStats> {
3410        let mut collections = Vec::new();
3411        let mut total_size = 0;
3412        let mut total_index_size = 0;
3413
3414        for (name, collection) in self.collections.iter() {
3415            let coll_stats = collection.stats()?;
3416            collections.push(CollectionStatsInfo {
3417                name: name.clone(),
3418                count: coll_stats.count,
3419                size: coll_stats.size,
3420                index_count: coll_stats.index_count,
3421                index_size: coll_stats.index_size,
3422            });
3423
3424            total_size += coll_stats.size;
3425            total_index_size += coll_stats.index_size;
3426        }
3427
3428        Ok(DatabaseStats {
3429            collections,
3430            total_size,
3431            total_index_size,
3432        })
3433    }
3434
3435    /// Runs a database command.
3436    ///
3437    /// Supports commands like "create", "drop", "listCollections", and "dbStats".
3438    ///
3439    /// # Arguments
3440    ///
3441    /// * `command` - The command to execute, represented as a document.
3442    ///
3443    /// # Returns
3444    ///
3445    /// The result of the command execution.
3446    pub fn run_command(&mut self, command: &Document) -> Result<Document> {
3447        if let Some(obj) = command.as_object() {
3448            if let Some(cmd_name) = obj.keys().next() {
3449                match cmd_name.as_str() {
3450                    "create" => {
3451                        if let Some(Value::String(coll_name)) = obj.get(cmd_name) {
3452                            self.create_collection(coll_name, None)?;
3453                            Ok(serde_json::json!({ "ok": 1 }))
3454                        } else {
3455                            Err(DbError::Other("Invalid create command".into()))
3456                        }
3457                    }
3458                    "drop" => {
3459                        if let Some(Value::String(coll_name)) = obj.get(cmd_name) {
3460                            self.drop_collection(coll_name)?;
3461                            Ok(serde_json::json!({ "ok": 1 }))
3462                        } else {
3463                            Err(DbError::Other("Invalid drop command".into()))
3464                        }
3465                    }
3466                    "listCollections" => {
3467                        let coll_names = self.list_collection_names();
3468                        let collections: Vec<_> = coll_names
3469                            .into_iter()
3470                            .map(|name| {
3471                                serde_json::json!({
3472                                    "name": name,
3473                                    "type": "collection"
3474                                })
3475                            })
3476                            .collect();
3477
3478                        Ok(serde_json::json!({
3479                            "cursor": {
3480                                "id": 0,
3481                                "ns": format!("{}.collections", self.name),
3482                                "firstBatch": collections
3483                            },
3484                            "ok": 1
3485                        }))
3486                    }
3487                    "dbStats" => {
3488                        let stats = self.stats()?;
3489                        Ok(serde_json::json!({
3490                            "db": self.name,
3491                            "collections": stats.collections.len(),
3492                            "objects": stats.collections.iter().map(|c| c.count).sum::<usize>(),
3493                            "avgObjSize": if stats.collections.iter().map(|c| c.count).sum::<usize>() > 0 {
3494                                stats.total_size / stats.collections.iter().map(|c| c.count).sum::<usize>()
3495                            } else {
3496                                0
3497                            },
3498                            "dataSize": stats.total_size,
3499                            "indexSize": stats.total_index_size,
3500                            "ok": 1
3501                        }))
3502                    }
3503                    _ => Err(DbError::Other(format!("Unknown command: {}", cmd_name))),
3504                }
3505            } else {
3506                Err(DbError::Other("Empty command".into()))
3507            }
3508        } else {
3509            Err(DbError::Other("Command must be an object".into()))
3510        }
3511    }
3512}
3513
3514// --- Database Statistics ---
3515#[derive(Debug, Clone, Serialize, Deserialize)]
3516pub struct DatabaseStats {
3517    pub collections: Vec<CollectionStatsInfo>,
3518    pub total_size: usize,
3519    pub total_index_size: usize,
3520}
3521
3522#[derive(Debug, Clone, Serialize, Deserialize)]
3523pub struct CollectionStatsInfo {
3524    pub name: String,
3525    pub count: usize,
3526    pub size: usize,
3527    pub index_count: usize,
3528    pub index_size: usize,
3529}
3530
3531// --- Create Collection Options ---
3532#[derive(Debug, Clone, Serialize, Deserialize)]
3533pub struct CreateCollectionOptions {
3534    pub capped: bool,
3535    pub size: Option<usize>,
3536    pub max: Option<usize>,
3537    pub storage_engine: Option<Map<String, Value>>,
3538    pub validator: Option<Document>,
3539    pub validation_level: Option<String>,
3540    pub validation_action: Option<String>,
3541    pub index_option_defaults: Option<Map<String, Value>>,
3542    pub view_on: Option<String>,
3543    pub pipeline: Option<Vec<Document>>,
3544    pub collation: Option<Map<String, Value>>,
3545    pub write_concern: Option<Map<String, Value>>,
3546    pub indexes: Vec<Index>,
3547}
3548
3549impl Default for CreateCollectionOptions {
3550    fn default() -> Self {
3551        Self {
3552            capped: false,
3553            size: None,
3554            max: None,
3555            storage_engine: None,
3556            validator: None,
3557            validation_level: None,
3558            validation_action: None,
3559            index_option_defaults: None,
3560            view_on: None,
3561            pipeline: None,
3562            collation: None,
3563            write_concern: None,
3564            indexes: Vec::new(),
3565        }
3566    }
3567}
3568
3569// --- Client ---
3570#[derive(Debug, Clone)]
3571/// The main entry point for interacting with LuckDB databases.
3572///
3573/// The Client struct is responsible for managing databases and providing access to them.
3574/// It also handles persistence operations like saving and loading data to disk.
3575///
3576/// # Example
3577///
3578/// ```no_run
3579/// use luckdb::Client;
3580///
3581/// // Create a new client with a storage path
3582/// let mut client = Client::with_storage_path("mongodb://localhost", "/path/to/data");
3583///
3584/// // Access or create a database
3585/// let db = client.db("mydatabase");
3586///
3587/// // Save changes to disk
3588/// client.save().unwrap();
3589/// ```
3590pub struct Client {
3591    /// The databases managed by this client, indexed by name.
3592    pub databases: HashMap<String, Database>,
3593    /// The connection URI for the database server.
3594    pub uri: String,
3595    /// Database configuration
3596    pub config: config::DatabaseConfig,
3597    /// Encryption key for data protection
3598    pub encryption_key: Option<encryption::EncryptionKey>,
3599}
3600
3601impl Client {
3602    /// Creates a new client with default settings.
3603    ///
3604    /// Uses the default configuration with in-memory storage and no encryption.
3605    /// This is suitable for testing and temporary data storage.
3606    ///
3607    /// # Examples
3608    ///
3609    /// ```no_run
3610    /// use luckdb::Client;
3611    ///
3612    /// let mut client = Client::new();
3613    /// let db = client.db("test");
3614    /// ```
3615    ///
3616    /// # Note
3617    ///
3618    /// Data stored in-memory will be lost when the client is dropped.
3619    /// Use `with_config()` or `with_storage_path()` for persistent storage.
3620    pub fn new() -> Self {
3621        let config = config::DatabaseConfig::default();
3622        Self {
3623            databases: HashMap::new(),
3624            uri: "luckdb://localhost:27017".to_string(), // Updated from mongodb://
3625            encryption_key: None,
3626            config,
3627        }
3628    }
3629
3630    /// Creates a new client with a custom connection URI.
3631    ///
3632    /// # Arguments
3633    ///
3634    /// * `uri` - The connection URI for the database server.
3635    pub fn with_uri(uri: &str) -> Self {
3636        let config = config::DatabaseConfig::default();
3637        Self {
3638            databases: HashMap::new(),
3639            uri: uri.to_string(),
3640            encryption_key: None,
3641            config,
3642        }
3643    }
3644
3645    /// Creates a new client with a custom connection URI and storage path.
3646    ///
3647    /// # Arguments
3648    ///
3649    /// * `uri` - The connection URI for the database server.
3650    /// * `path` - The file path where data will be persisted to disk.
3651    pub fn with_storage_path<P: AsRef<Path>>(uri: &str, path: P) -> Self {
3652        let mut config = config::DatabaseConfig::default();
3653        config.storage_path = Some(path.as_ref().to_path_buf());
3654        Self {
3655            databases: HashMap::new(),
3656            uri: uri.to_string(),
3657            encryption_key: None,
3658            config,
3659        }
3660    }
3661
3662    /// Creates a new client with the specified configuration.
3663    ///
3664    /// This method allows you to configure storage, encryption, authentication,
3665    /// and other settings through a `DatabaseConfig` object.
3666    ///
3667    /// # Arguments
3668    ///
3669    /// * `config` - Database configuration object
3670    ///
3671    /// # Returns
3672    ///
3673    /// Returns a `Result<Client>` which will be an error if the configuration
3674    /// validation fails.
3675    ///
3676    /// # Examples
3677    ///
3678    /// ```no_run
3679    /// use luckdb::{Client, config::DatabaseConfig};
3680    ///
3681    /// // Create configuration with storage and encryption
3682    /// let config = DatabaseConfig::with_storage_path("./data")
3683    ///     .with_encryption("secure_password");
3684    ///
3685    /// let mut client = Client::with_config(config)?;
3686    /// let db = client.db("mydb");
3687    /// ```
3688    ///
3689    /// # Errors
3690    ///
3691    /// Returns `DbError::ValidationError` if the configuration is invalid.
3692    pub fn with_config(config: config::DatabaseConfig) -> Result<Self> {
3693        config.validate()?;
3694
3695        let encryption_key = if config.is_encryption_configured() {
3696            Some(encryption::EncryptionKey::from_password(
3697                config.encryption_password.as_ref().unwrap(),
3698                b"luckdb_salt_2024", // Default salt for key derivation
3699            ))
3700        } else {
3701            None
3702        };
3703
3704        Ok(Self {
3705            databases: HashMap::new(),
3706            uri: "luckdb://localhost:27017".to_string(),
3707            config,
3708            encryption_key,
3709        })
3710    }
3711
3712    /// Creates a new client by loading configuration from a TOML file.
3713    ///
3714    /// This is a convenience method that loads configuration from a TOML file
3715    /// and creates a client with that configuration. The file must contain
3716    /// a `[luckdb]` section with valid configuration parameters.
3717    ///
3718    /// # Arguments
3719    ///
3720    /// * `config_path` - Path to the TOML configuration file
3721    ///
3722    /// # Returns
3723    ///
3724    /// Returns a `Result<Client>` which will be an error if the file cannot be
3725    /// read, is invalid TOML, or contains invalid configuration.
3726    ///
3727    /// # Examples
3728    ///
3729    /// ```no_run
3730    /// use luckdb::Client;
3731    ///
3732    /// // Load from config.toml file
3733    /// let mut client = Client::with_config_file("config.toml")?;
3734    /// let db = client.db("mydb");
3735    /// ```
3736    ///
3737    /// # Configuration File Format
3738    ///
3739    /// ```toml
3740    /// [luckdb]
3741    /// storage_path = "./data"
3742    /// encryption_enabled = true
3743    /// encryption_password = "your_password"
3744    /// auth_username = "admin"
3745    /// auth_password = "auth_password"
3746    /// ```
3747    ///
3748    /// # Errors
3749    ///
3750    /// - `DbError::IoError` - If the file cannot be read
3751    /// - `DbError::TomlError` - If the file contains invalid TOML
3752    /// - `DbError::ValidationError` - If the configuration is invalid
3753    pub fn with_config_file<P: AsRef<Path>>(config_path: P) -> Result<Self> {
3754        let config = config::DatabaseConfig::load_from_file(config_path)?;
3755        Self::with_config(config)
3756    }
3757
3758    /// Enable encryption with a password.
3759    ///
3760    /// # Arguments
3761    ///
3762    /// * `password` - Password for encryption key derivation.
3763    pub fn with_encryption<S: Into<String>>(mut self, password: S) -> Result<Self> {
3764        self.config.encryption_enabled = true;
3765        self.config.encryption_password = Some(password.into());
3766
3767        let key = encryption::EncryptionKey::from_password(
3768            self.config.encryption_password.as_ref().unwrap(),
3769            b"luckdb_salt_2024",
3770        );
3771
3772        self.encryption_key = Some(key);
3773        self.config.validate()?;
3774        Ok(self)
3775    }
3776
3777    /// Gets a database by name, creating it if it doesn't exist.
3778    ///
3779    /// # Arguments
3780    ///
3781    /// * `name` - The name of the database to access.
3782    ///
3783    /// # Returns
3784    ///
3785    /// A mutable reference to the database.
3786    pub fn db(&mut self, name: &str) -> &mut Database {
3787        self.databases
3788            .entry(name.to_string())
3789            .or_insert_with(|| Database::new(name.to_string()))
3790    }
3791
3792    /// Lists the names of all databases managed by this client.
3793    ///
3794    /// # Returns
3795    ///
3796    /// A list of database names.
3797    pub fn list_database_names(&self) -> Vec<String> {
3798        self.databases.keys().cloned().collect()
3799    }
3800
3801    /// Drops a database managed by this client.
3802    ///
3803    /// # Arguments
3804    ///
3805    /// * `name` - The name of the database to drop.
3806    ///
3807    /// # Errors
3808    ///
3809    /// Returns an error if the database does not exist.
3810    pub fn drop_database(&mut self, name: &str) -> Result<()> {
3811        if self.databases.remove(name).is_none() {
3812            return Err(DbError::Other(format!("Database {} does not exist", name)));
3813        }
3814
3815        Ok(())
3816    }
3817
3818    /// Gets the connection URI used by this client.
3819    ///
3820    /// # Returns
3821    ///
3822    /// A reference to the connection URI string.
3823    pub fn uri(&self) -> &str {
3824        &self.uri
3825    }
3826
3827    /// Saves all data to disk at the configured storage path.
3828    ///
3829    /// # Errors
3830    ///
3831    /// Returns an error if no storage path is configured or if there's an IO error.
3832    pub fn save(&self) -> Result<()> {
3833        let storage_path = self.config.get_or_create_storage_path()?;
3834
3835        // Create directory if it doesn't exist
3836        if let Some(parent) = storage_path.parent() {
3837            fs::create_dir_all(parent)?;
3838        }
3839
3840        let mut persistent_storage = PersistentStorage::new();
3841
3842        for (name, db) in &self.databases {
3843            let mut persistent_db = PersistentDatabase {
3844                name: name.clone(),
3845                collections: HashMap::new(),
3846            };
3847
3848            for (coll_name, coll) in &db.collections {
3849                let persistent_coll = PersistentCollection::from(coll.clone());
3850                persistent_db
3851                    .collections
3852                    .insert(coll_name.clone(), persistent_coll);
3853            }
3854
3855            persistent_storage
3856                .databases
3857                .insert(name.clone(), persistent_db);
3858        }
3859
3860        // Save with encryption if configured
3861        persistent_storage.save_to_file(&storage_path, self.encryption_key.as_ref())?;
3862        Ok(())
3863    }
3864
3865    /// Loads all data from disk at the configured storage path.
3866    ///
3867    /// # Errors
3868    ///
3869    /// Returns an error if no storage path is configured or if there's an IO or deserialization error.
3870    pub fn load(&mut self) -> Result<()> {
3871        let storage_path = self.config.get_or_create_storage_path()?;
3872
3873        if storage_path.exists() {
3874            let persistent_storage =
3875                PersistentStorage::load_from_file(&storage_path, self.encryption_key.as_ref())?;
3876
3877            for (name, persistent_db) in persistent_storage.databases {
3878                let mut db = Database::new(name.clone());
3879
3880                for (coll_name, persistent_coll) in persistent_db.collections {
3881                    let coll = Collection::from(persistent_coll);
3882                    db.collections.insert(coll_name, coll);
3883                }
3884
3885                self.databases.insert(name, db);
3886            }
3887        }
3888        Ok(())
3889    }
3890
3891    /// Get configuration reference
3892    pub fn config(&self) -> &config::DatabaseConfig {
3893        &self.config
3894    }
3895
3896    /// Update configuration
3897    pub fn update_config(&mut self, config: config::DatabaseConfig) -> Result<()> {
3898        config.validate()?;
3899        self.config = config;
3900
3901        // Update encryption key if needed
3902        if self.config.is_encryption_configured() {
3903            self.encryption_key = Some(encryption::EncryptionKey::from_password(
3904                self.config.encryption_password.as_ref().unwrap(),
3905                b"luckdb_salt_2024",
3906            ));
3907        } else {
3908            self.encryption_key = None;
3909        }
3910
3911        Ok(())
3912    }
3913}
3914
3915// --- Bulk Write Operations ---
3916#[derive(Debug, Clone, Serialize, Deserialize)]
3917pub enum BulkWriteOperation {
3918    InsertOne {
3919        document: Document,
3920    },
3921    UpdateOne {
3922        filter: Query,
3923        update: UpdateDocument,
3924        upsert: bool,
3925    },
3926    UpdateMany {
3927        filter: Query,
3928        update: UpdateDocument,
3929        upsert: bool,
3930    },
3931    ReplaceOne {
3932        filter: Query,
3933        replacement: Document,
3934        upsert: bool,
3935    },
3936    DeleteOne {
3937        filter: Query,
3938    },
3939    DeleteMany {
3940        filter: Query,
3941    },
3942}
3943
3944#[derive(Debug, Clone, Serialize, Deserialize)]
3945pub struct BulkWriteOptions {
3946    pub ordered: bool,
3947    pub bypass_document_validation: bool,
3948    pub write_concern: Option<Map<String, Value>>,
3949}
3950
3951impl Default for BulkWriteOptions {
3952    fn default() -> Self {
3953        Self {
3954            ordered: true,
3955            bypass_document_validation: false,
3956            write_concern: None,
3957        }
3958    }
3959}
3960
3961#[derive(Debug, Clone, Serialize, Deserialize)]
3962pub struct BulkWriteResult {
3963    pub inserted_count: usize,
3964    pub matched_count: usize,
3965    pub modified_count: usize,
3966    pub deleted_count: usize,
3967    pub upserted_count: usize,
3968    pub upserted_ids: HashMap<usize, DocId>,
3969}
3970
3971impl Collection {
3972    // Execute bulk write operations
3973    pub fn bulk_write(
3974        &mut self,
3975        operations: Vec<BulkWriteOperation>,
3976        options: Option<BulkWriteOptions>,
3977    ) -> Result<BulkWriteResult> {
3978        let opts = options.unwrap_or_default();
3979        let mut result = BulkWriteResult {
3980            inserted_count: 0,
3981            matched_count: 0,
3982            modified_count: 0,
3983            deleted_count: 0,
3984            upserted_count: 0,
3985            upserted_ids: HashMap::new(),
3986        };
3987
3988        let mut index = 0;
3989
3990        for op in operations {
3991            match op {
3992                BulkWriteOperation::InsertOne { document } => {
3993                    let id = self.insert(document)?;
3994                    result.inserted_count += 1;
3995                    result.upserted_ids.insert(index, id);
3996                }
3997                BulkWriteOperation::UpdateOne {
3998                    filter,
3999                    update,
4000                    upsert,
4001                } => {
4002                    let count = self.update_one(filter, update, upsert)?;
4003                    if count > 0 {
4004                        result.matched_count += 1;
4005                        result.modified_count += 1;
4006                    }
4007                    if upsert && count > 0 {
4008                        result.upserted_count += 1;
4009                    }
4010                }
4011                BulkWriteOperation::UpdateMany {
4012                    filter,
4013                    update,
4014                    upsert,
4015                } => {
4016                    let count = self.update_many(filter, update)?;
4017                    result.matched_count += count;
4018                    result.modified_count += count;
4019                    if upsert && count > 0 {
4020                        result.upserted_count += count;
4021                    }
4022                }
4023                BulkWriteOperation::ReplaceOne {
4024                    filter,
4025                    replacement,
4026                    upsert,
4027                } => {
4028                    let count = self.replace_one(filter, replacement, upsert)?;
4029                    if count > 0 {
4030                        result.matched_count += 1;
4031                        result.modified_count += 1;
4032                    }
4033                    if upsert && count > 0 {
4034                        result.upserted_count += 1;
4035                    }
4036                }
4037                BulkWriteOperation::DeleteOne { filter } => {
4038                    let count = self.delete_one(filter)?;
4039                    result.deleted_count += count;
4040                }
4041                BulkWriteOperation::DeleteMany { filter } => {
4042                    let count = self.delete_many(filter)?;
4043                    result.deleted_count += count;
4044                }
4045            }
4046
4047            index += 1;
4048
4049            // If ordered and an error occurs, we would stop here
4050            // For simplicity, we're not handling errors in this example
4051        }
4052
4053        Ok(result)
4054    }
4055}
4056
4057// --- Remote Client ---
4058#[derive(Debug, Clone)]
4059pub struct RemoteClient {
4060    addr: SocketAddr,
4061}
4062
4063impl RemoteClient {
4064    pub fn new(addr: SocketAddr) -> Self {
4065        Self { addr }
4066    }
4067
4068    pub fn connect(&self) -> Result<RemoteConnection> {
4069        let stream = TcpStream::connect(self.addr)?;
4070        Ok(RemoteConnection { stream })
4071    }
4072}
4073
4074#[derive(Debug)]
4075pub struct RemoteConnection {
4076    stream: TcpStream,
4077}
4078
4079impl RemoteConnection {
4080    pub fn send_command(&mut self, command: &str) -> Result<String> {
4081        let mut writer = BufWriter::new(&self.stream);
4082        writer.write_all(command.as_bytes())?;
4083        writer.write_all(b"\n")?;
4084        writer.flush()?;
4085
4086        let mut reader = BufReader::new(&self.stream);
4087        let mut response = String::new();
4088        reader.read_line(&mut response)?;
4089
4090        Ok(response)
4091    }
4092
4093    pub fn close(self) -> Result<()> {
4094        // The connection will be closed when RemoteConnection is dropped
4095        Ok(())
4096    }
4097}
4098
4099// --- Server ---
4100#[derive(Debug)]
4101pub struct Server {
4102    addr: SocketAddr,
4103    client: Client,
4104}
4105
4106impl Server {
4107    /// Creates a new server with the given address and storage configuration
4108    pub fn new(addr: SocketAddr, storage_path: Option<PathBuf>) -> Self {
4109        let mut config = config::DatabaseConfig::default();
4110        config.storage_path = storage_path;
4111
4112        let client = Client::with_config(config).unwrap();
4113        Self { addr, client }
4114    }
4115
4116    /// Creates a new server with custom configuration
4117    pub fn with_config(addr: SocketAddr, config: config::DatabaseConfig) -> Result<Self> {
4118        let client = Client::with_config(config)?;
4119        Ok(Self { addr, client })
4120    }
4121
4122    /// Creates a new server with configuration from file
4123    pub fn with_config_file<P: AsRef<Path>>(addr: SocketAddr, config_path: P) -> Result<Self> {
4124        let client = Client::with_config_file(config_path)?;
4125        Ok(Self { addr, client })
4126    }
4127
4128    /// Enable authentication (deprecated - use config instead)
4129    #[deprecated(note = "Use with_config() instead")]
4130    pub fn with_auth(mut self, username: String, password: String) -> Self {
4131        let mut config = self.client.config().clone();
4132        config.auth_username = Some(username);
4133        config.auth_password = Some(password);
4134        let _ = self.client.update_config(config);
4135        self
4136    }
4137
4138    /// Enable encryption on the server
4139    pub fn with_encryption(mut self, password: String) -> Result<Self> {
4140        self.client = self.client.with_encryption(password)?;
4141        Ok(self)
4142    }
4143
4144    pub fn start(&mut self) -> Result<()> {
4145        // Load data from disk if storage path is configured
4146        if self.client.config().storage_path.is_some() {
4147            self.client.load()?;
4148        }
4149        let listener = TcpListener::bind(self.addr)?;
4150        println!("Server listening on {}", self.addr);
4151
4152        for stream in listener.incoming() {
4153            match stream {
4154                Ok(stream) => {
4155                    let config = self.client.config().clone();
4156                    let encryption_key = self.client.encryption_key.clone();
4157                    thread::spawn(move || {
4158                        if let Err(e) = Self::handle_client(stream, config, encryption_key) {
4159                            eprintln!("Error handling client: {}", e);
4160                        }
4161                    });
4162                }
4163                Err(e) => {
4164                    eprintln!("Failed to accept connection: {}", e);
4165                }
4166            }
4167        }
4168        Ok(())
4169    }
4170
4171    fn handle_client(
4172        mut stream: TcpStream,
4173        config: config::DatabaseConfig,
4174        encryption_key: Option<encryption::EncryptionKey>,
4175    ) -> Result<()> {
4176        let mut client = Client::with_config(config)?;
4177        client.encryption_key = encryption_key;
4178
4179        let mut reader = BufReader::new(&stream);
4180        let mut writer = BufWriter::new(&stream);
4181        let mut authenticated = !client.config().is_auth_configured(); // If no auth is configured, client is authenticated by default
4182
4183        loop {
4184            let mut command = String::new();
4185            match reader.read_line(&mut command) {
4186                Ok(0) => break, // Connection closed
4187                Ok(_) => {
4188                    // Trim whitespace and newline
4189                    let command = command.trim();
4190                    if command.is_empty() {
4191                        continue;
4192                    }
4193
4194                    // Handle authentication if required
4195                    if !authenticated {
4196                        if command.starts_with("AUTH") {
4197                            let parts: Vec<&str> = command.split_whitespace().collect();
4198                            if parts.len() != 3 {
4199                                let response =
4200                                    "ERROR: Usage: AUTH <username> <password>".to_string();
4201                                writer.write_all(response.as_bytes())?;
4202                                writer.write_all(b"\n")?;
4203                                writer.flush()?;
4204                                continue;
4205                            }
4206
4207                            let username = parts[1];
4208                            let password = parts[2];
4209
4210                            if let (Some(auth_user), Some(auth_pass)) = (
4211                                &client.config().auth_username,
4212                                &client.config().auth_password,
4213                            ) {
4214                                if username == auth_user && password == auth_pass {
4215                                    authenticated = true;
4216                                    let response = "OK: Authenticated".to_string();
4217                                    writer.write_all(response.as_bytes())?;
4218                                    writer.write_all(b"\n")?;
4219                                    writer.flush()?;
4220                                    continue;
4221                                }
4222                            }
4223
4224                            let response = "ERROR: Authentication failed".to_string();
4225                            writer.write_all(response.as_bytes())?;
4226                            writer.write_all(b"\n")?;
4227                            writer.flush()?;
4228                            continue;
4229                        } else {
4230                            let response =
4231                                "ERROR: Not authenticated. Use AUTH <username> <password>"
4232                                    .to_string();
4233                            writer.write_all(response.as_bytes())?;
4234                            writer.write_all(b"\n")?;
4235                            writer.flush()?;
4236                            continue;
4237                        }
4238                    }
4239
4240                    // Parse and execute the command
4241                    let response = match Self::process_command(command, &mut client) {
4242                        Ok(response) => response,
4243                        Err(e) => format!("ERROR: {}", e),
4244                    };
4245
4246                    // Send response
4247                    writer.write_all(response.as_bytes())?;
4248                    writer.write_all(b"\n")?;
4249                    writer.flush()?;
4250
4251                    // If the command was EXIT, break the loop
4252                    if command == "EXIT" {
4253                        break;
4254                    }
4255                }
4256                Err(e) => {
4257                    eprintln!("Error reading from client: {}", e);
4258                    break;
4259                }
4260            }
4261        }
4262        Ok(())
4263    }
4264
4265    fn process_command(command: &str, client: &mut Client) -> Result<String> {
4266        let parts: Vec<&str> = command.split_whitespace().collect();
4267        if parts.is_empty() {
4268            return Ok("ERROR: Empty command".to_string());
4269        }
4270        match parts[0] {
4271            "INSERT" => {
4272                if parts.len() < 3 {
4273                    return Ok(
4274                        "ERROR: Usage: INSERT <database> <collection> <document>".to_string()
4275                    );
4276                }
4277                let db_name = parts[1];
4278                let coll_name = parts[2];
4279                let doc_json = parts[3..].join(" ");
4280                let doc: Document = serde_json::from_str(&doc_json)?;
4281                let db = client.db(db_name);
4282                let coll = db.collection(coll_name);
4283                let id = coll.insert(doc)?;
4284                Ok(format!("OK: Document inserted with ID: {}", id))
4285            }
4286            "FIND" => {
4287                if parts.len() < 3 {
4288                    return Ok("ERROR: Usage: FIND <database> <collection> [query]".to_string());
4289                }
4290                let db_name = parts[1];
4291                let coll_name = parts[2];
4292                let query = if parts.len() > 3 {
4293                    let query_json = parts[3..].join(" ");
4294                    let query_value: Value = serde_json::from_str(&query_json)?;
4295                    Query::from_value(query_value)?
4296                } else {
4297                    Query::new()
4298                };
4299                let db = client.db(db_name);
4300                let coll = db.collection(coll_name);
4301                let results = coll.find(query, None)?;
4302                let results_json = serde_json::to_string(&results)?;
4303                Ok(format!("OK: {}", results_json))
4304            }
4305            "UPDATE" => {
4306                if parts.len() < 4 {
4307                    return Ok(
4308                        "ERROR: Usage: UPDATE <database> <collection> <query> <update>".to_string(),
4309                    );
4310                }
4311                let db_name = parts[1];
4312                let coll_name = parts[2];
4313                // Find the end of the query
4314                let mut query_end = 3;
4315                let mut brace_count = 0;
4316                for (i, c) in command.char_indices() {
4317                    if i < parts[0].len() + parts[1].len() + parts[2].len() + 2 {
4318                        continue;
4319                    }
4320                    if c == '{' {
4321                        brace_count += 1;
4322                    } else if c == '}' {
4323                        brace_count -= 1;
4324                        if brace_count == 0 {
4325                            query_end = i + 1;
4326                            break;
4327                        }
4328                    }
4329                }
4330                let query_json =
4331                    command[parts[0].len() + parts[1].len() + parts[2].len() + 3..query_end].trim();
4332                let update_json = command[query_end..].trim();
4333                let query: Query = serde_json::from_str(query_json)?;
4334                let update: UpdateDocument = serde_json::from_str(update_json)?;
4335                let db = client.db(db_name);
4336                let coll = db.collection(coll_name);
4337                let count = coll.update_one(query, update, false)?;
4338                Ok(format!("OK: Updated {} documents", count))
4339            }
4340            "DELETE" => {
4341                if parts.len() < 3 {
4342                    return Ok("ERROR: Usage: DELETE <database> <collection> [query]".to_string());
4343                }
4344                let db_name = parts[1];
4345                let coll_name = parts[2];
4346                let query = if parts.len() > 3 {
4347                    let query_json = parts[3..].join(" ");
4348                    let query_value: Value = serde_json::from_str(&query_json)?;
4349                    Query::from_value(query_value)?
4350                } else {
4351                    Query::new()
4352                };
4353                let db = client.db(db_name);
4354                let coll = db.collection(coll_name);
4355                let count = coll.delete_one(query)?;
4356                Ok(format!("OK: Deleted {} documents", count))
4357            }
4358            "SAVE" => {
4359                client.save()?;
4360                Ok("OK: Data saved to disk".to_string())
4361            }
4362            "LOAD" => {
4363                client.load()?;
4364                Ok("OK: Data loaded from disk".to_string())
4365            }
4366            "LIST_DB" => {
4367                let dbs = client.list_database_names();
4368                Ok(format!("OK: {}", serde_json::to_string(&dbs)?))
4369            }
4370            "LIST_COLL" => {
4371                if parts.len() < 2 {
4372                    return Ok("ERROR: Usage: LIST_COLL <database>".to_string());
4373                }
4374                let db_name = parts[1];
4375                let db = client.db(db_name);
4376                let colls = db.list_collection_names();
4377                Ok(format!("OK: {}", serde_json::to_string(&colls)?))
4378            }
4379            "STATS" => {
4380                if parts.len() < 2 {
4381                    return Ok("ERROR: Usage: STATS <database>".to_string());
4382                }
4383                let db_name = parts[1];
4384                let db = client.db(db_name);
4385                let stats = db.stats()?;
4386                Ok(format!("OK: {}", serde_json::to_string(&stats)?))
4387            }
4388            "EXIT" => {
4389                // Save data before exiting
4390                if client.config().storage_path.is_some() {
4391                    if let Err(e) = client.save() {
4392                        return Ok(format!("ERROR: Failed to save data: {}", e));
4393                    }
4394                }
4395                Ok("OK: Connection closing".to_string())
4396            }
4397            _ => Ok("ERROR: Unknown command".to_string()),
4398        }
4399    }
4400}