aurora_db/
db.rs

1//! # Aurora Database
2//!
3//! Aurora is an embedded document database with tiered storage architecture.
4//! It provides document storage, querying, indexing, and search capabilities
5//! while optimizing for both performance and durability.
6//!
7//! ## Key Features
8//!
9//! * **Tiered Storage**: Hot in-memory cache + persistent cold storage
10//! * **Document Model**: Schema-flexible JSON-like document storage
11//! * **Querying**: Rich query capabilities with filtering and sorting
12//! * **Full-text Search**: Built-in search engine with relevance ranking
13//! * **Transactions**: ACID-compliant transaction support
14//! * **Blob Storage**: Efficient storage for large binary objects
15//!
16//! ## Usage Example
17//!
18//! ```rust
19//! use aurora::Aurora;
20//!
21//! // Open a database
22//! let db = Aurora::open("my_database.db")?;
23//!
24//! // Create a collection with schema
25//! db.new_collection("users", vec![
26//!     ("name", FieldType::String, false),
27//!     ("email", FieldType::String, true),  // unique field
28//!     ("age", FieldType::Int, false),
29//! ])?;
30//!
31//! // Insert a document
32//! let user_id = db.insert_into("users", vec![
33//!     ("name", Value::String("Jane Doe".to_string())),
34//!     ("email", Value::String("jane@example.com".to_string())),
35//!     ("age", Value::Int(28)),
36//! ])?;
37//!
38//! // Query for documents
39//! let adult_users = db.query("users")
40//!     .filter(|f| f.gt("age", 18))
41//!     .order_by("name", true)
42//!     .collect()
43//!     .await?;
44//! ```
45
46use crate::error::{AuroraError, Result};
47use crate::index::{Index, IndexDefinition, IndexType};
48use crate::network::http_models::{
49    Filter as HttpFilter, FilterOperator, QueryPayload, json_to_value,
50};
51use crate::query::{Filter, FilterBuilder, QueryBuilder, SearchBuilder, SimpleQueryBuilder};
52use crate::storage::{ColdStore, HotStore};
53use crate::types::{AuroraConfig, Collection, Document, FieldDefinition, FieldType, Value};
54use dashmap::DashMap;
55use serde_json::Value as JsonValue;
56use serde_json::from_str;
57use std::collections::HashMap;
58use std::fs::File as StdFile;
59use std::path::{Path, PathBuf};
60use std::sync::Arc;
61use std::time::Duration;
62use tokio::fs::File;
63use tokio::fs::read_to_string;
64use tokio::io::AsyncReadExt;
65use tokio::sync::OnceCell;
66use uuid::Uuid;
67// Index types for faster lookups
68type PrimaryIndex = DashMap<String, Vec<u8>>;
69type SecondaryIndex = DashMap<String, Vec<String>>;
70
71// Move DataInfo enum outside impl block
72#[derive(Debug)]
73pub enum DataInfo {
74    Data { size: usize, preview: String },
75    Blob { size: usize },
76    Compressed { size: usize },
77}
78
79impl DataInfo {
80    pub fn size(&self) -> usize {
81        match self {
82            DataInfo::Data { size, .. } => *size,
83            DataInfo::Blob { size } => *size,
84            DataInfo::Compressed { size } => *size,
85        }
86    }
87}
88
89/// The main database engine
90///
91/// Aurora combines a tiered storage architecture with document-oriented database features:
92/// - Hot tier: In-memory cache for frequently accessed data
93/// - Cold tier: Persistent disk storage for durability
94/// - Primary indices: Fast key-based access
95/// - Secondary indices: Fast field-based queries
96///
97/// # Examples
98///
99/// ```
100/// // Open a database (creates if doesn't exist)
101/// let db = Aurora::open("my_app.db")?;
102///
103/// // Insert a document
104/// let doc_id = db.insert_into("users", vec![
105///     ("name", Value::String("Alice".to_string())),
106///     ("age", Value::Int(32)),
107/// ])?;
108///
109/// // Retrieve a document
110/// let user = db.get_document("users", &doc_id)?;
111/// ```
112pub struct Aurora {
113    hot: HotStore,
114    cold: ColdStore,
115    // Indexing
116    primary_indices: Arc<DashMap<String, PrimaryIndex>>,
117    secondary_indices: Arc<DashMap<String, SecondaryIndex>>,
118    indices_initialized: Arc<OnceCell<()>>,
119    in_transaction: std::sync::atomic::AtomicBool,
120    transaction_ops: DashMap<String, Vec<u8>>,
121    indices: Arc<DashMap<String, Index>>,
122}
123
124impl Aurora {
125    /// Open or create a database at the specified location
126    ///
127    /// # Arguments
128    /// * `path` - Path to the database file or directory
129    ///   - Absolute paths (like `/data/myapp.db`) are used as-is
130    ///   - Relative paths (like `./data/myapp.db`) are resolved relative to the current directory
131    ///   - Simple names (like `myapp.db`) use the current directory
132    ///
133    /// # Returns
134    /// An initialized `Aurora` database instance
135    ///
136    /// # Examples
137    ///
138    /// ```
139    /// // Use a specific location
140    /// let db = Aurora::open("./data/my_application.db")?;
141    ///
142    /// // Just use a name (creates in current directory)
143    /// let db = Aurora::open("customer_data.db")?;
144    /// ```
145    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
146        let path = Self::resolve_path(path)?;
147
148        // Create parent directory if needed
149        if let Some(parent) = path.parent() {
150            if !parent.exists() {
151                std::fs::create_dir_all(parent)?;
152            }
153        }
154
155        // Initialize hot and cold stores with the path
156        let cold = ColdStore::new(path.to_str().unwrap())?;
157        let hot = HotStore::new();
158
159        // Initialize the rest of Aurora...
160        let db = Self {
161            hot,
162            cold,
163            primary_indices: Arc::new(DashMap::new()),
164            secondary_indices: Arc::new(DashMap::new()),
165            indices_initialized: Arc::new(OnceCell::new()),
166            in_transaction: std::sync::atomic::AtomicBool::new(false),
167            transaction_ops: DashMap::new(),
168            indices: Arc::new(DashMap::new()),
169        };
170
171        // Load or initialize indices...
172        // Rest of your existing open() code...
173
174        Ok(db)
175    }
176
177    /// Helper method to resolve database path
178    fn resolve_path<P: AsRef<Path>>(path: P) -> Result<PathBuf> {
179        let path = path.as_ref();
180
181        // If it's an absolute path, use it directly
182        if path.is_absolute() {
183            return Ok(path.to_path_buf());
184        }
185
186        // Otherwise, resolve relative to current directory
187        match std::env::current_dir() {
188            Ok(current_dir) => Ok(current_dir.join(path)),
189            Err(e) => Err(AuroraError::IoError(format!(
190                "Failed to resolve current directory: {}",
191                e
192            ))),
193        }
194    }
195
196    /// Open a database with custom configuration
197    pub fn with_config(config: AuroraConfig) -> Result<Self> {
198        let path = Self::resolve_path(&config.db_path)?;
199
200        if config.create_dirs {
201            if let Some(parent) = path.parent() {
202                if !parent.exists() {
203                    std::fs::create_dir_all(parent)?;
204                }
205            }
206        }
207
208        // Fix method calls to pass all required parameters
209        let cold = ColdStore::with_config(
210            path.to_str().unwrap(),
211            config.cold_cache_capacity_mb,
212            config.cold_flush_interval_ms,
213            config.cold_mode,
214        )?;
215
216        let hot = HotStore::with_config(
217            config.hot_cache_size_mb,
218            config.hot_cache_cleanup_interval_secs,
219        );
220
221        // Initialize the rest using the config...
222        let db = Self {
223            hot,
224            cold,
225            primary_indices: Arc::new(DashMap::new()),
226            secondary_indices: Arc::new(DashMap::new()),
227            indices_initialized: Arc::new(OnceCell::new()),
228            in_transaction: std::sync::atomic::AtomicBool::new(false),
229            transaction_ops: DashMap::new(),
230            indices: Arc::new(DashMap::new()),
231        };
232
233        // Set up auto-compaction if enabled
234        if config.auto_compact {
235            // Implementation for auto-compaction scheduling
236            // ...
237        }
238
239        Ok(db)
240    }
241
242    // Lazy index initialization
243    pub async fn ensure_indices_initialized(&self) -> Result<()> {
244        self.indices_initialized
245            .get_or_init(|| async {
246                println!("Initializing indices...");
247                if let Err(e) = self.initialize_indices() {
248                    eprintln!("Failed to initialize indices: {:?}", e);
249                }
250                println!("Indices initialized");
251                ()
252            })
253            .await;
254        Ok(())
255    }
256
257    fn initialize_indices(&self) -> Result<()> {
258        // Scan existing data and build indices
259        for result in self.cold.scan() {
260            let (key, value) = result?;
261            let key_str = std::str::from_utf8(&key.as_bytes())
262                .map_err(|_| AuroraError::InvalidKey("Invalid UTF-8".into()))?;
263
264            if let Some(collection_name) = key_str.split(':').next() {
265                self.index_value(collection_name, key_str, &value)?;
266            }
267        }
268        Ok(())
269    }
270
271    // Fast key-value operations with index support
272    pub fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
273        // Check hot cache first
274        if let Some(value) = self.hot.get(key) {
275            return Ok(Some(value));
276        }
277
278        // Check primary index
279        if let Some(collection) = key.split(':').next() {
280            if let Some(index) = self.primary_indices.get(collection) {
281                if let Some(value) = index.get(key) {
282                    // Promote to hot cache
283                    self.hot.set(key.to_string(), value.clone(), None);
284                    return Ok(Some(value.clone()));
285                }
286            }
287        }
288
289        // Fallback to cold storage
290        let value = self.cold.get(key)?;
291        if let Some(v) = &value {
292            self.hot.set(key.to_string(), v.clone(), None);
293        }
294        Ok(value)
295    }
296
297    pub fn put(&self, key: String, value: Vec<u8>, ttl: Option<Duration>) -> Result<()> {
298        const MAX_BLOB_SIZE: usize = 50 * 1024 * 1024; // 50MB limit
299
300        if value.len() > MAX_BLOB_SIZE {
301            return Err(AuroraError::InvalidOperation(format!(
302                "Blob size {} exceeds maximum allowed size of {}MB",
303                value.len() / (1024 * 1024),
304                MAX_BLOB_SIZE / (1024 * 1024)
305            )));
306        }
307
308        // Track transaction if needed
309        if self
310            .in_transaction
311            .load(std::sync::atomic::Ordering::SeqCst)
312        {
313            self.transaction_ops.insert(key.clone(), value.clone());
314            return Ok(());
315        }
316
317        // Write directly to storage (sled handles WAL internally)
318        self.cold.set(key.clone(), value.clone())?;
319        self.hot.set(key.clone(), value.clone(), ttl);
320
321        // FIX: Update the in-memory indices immediately after a successful write.
322        if let Some(collection_name) = key.split(':').next() {
323            // Don't index internal data like _collection or _index definitions
324            if !collection_name.starts_with('_') {
325                self.index_value(collection_name, &key, &value)?;
326            }
327        }
328
329        Ok(())
330    }
331
332    fn index_value(&self, collection: &str, key: &str, value: &[u8]) -> Result<()> {
333        // Update primary index
334        self.primary_indices
335            .entry(collection.to_string())
336            .or_insert_with(DashMap::new)
337            .insert(key.to_string(), value.to_vec());
338
339        // Update secondary indices if it's a JSON document
340        if let Ok(doc) = serde_json::from_slice::<Document>(value) {
341            for (field, value) in doc.data {
342                // Use consistent string formatting for indexing
343                let value_str = match &value {
344                    Value::String(s) => s.clone(),
345                    _ => value.to_string(),
346                };
347                self.secondary_indices
348                    .entry(format!("{}:{}", collection, field))
349                    .or_insert_with(DashMap::new)
350                    .entry(value_str)
351                    .or_insert_with(Vec::new)
352                    .push(key.to_string());
353            }
354        }
355        Ok(())
356    }
357
358    // Simplified collection scan (fallback)
359    fn scan_collection(&self, collection: &str) -> Result<Vec<Document>> {
360        let _prefix = format!("{}:", collection);
361        let mut documents = Vec::new();
362
363        if let Some(index) = self.primary_indices.get(collection) {
364            for entry in index.iter() {
365                if let Ok(doc) = serde_json::from_slice(entry.value()) {
366                    documents.push(doc);
367                }
368            }
369        }
370
371        Ok(documents)
372    }
373
374    // Restore missing methods
375    pub async fn put_blob(&self, key: String, file_path: &Path) -> Result<()> {
376        const MAX_FILE_SIZE: usize = 50 * 1024 * 1024; // 50MB limit
377
378        // Get file metadata to check size before reading
379        let metadata = tokio::fs::metadata(file_path).await?;
380        let file_size = metadata.len() as usize;
381
382        if file_size > MAX_FILE_SIZE {
383            return Err(AuroraError::InvalidOperation(format!(
384                "File size {} MB exceeds maximum allowed size of {} MB",
385                file_size / (1024 * 1024),
386                MAX_FILE_SIZE / (1024 * 1024)
387            )));
388        }
389
390        let mut file = File::open(file_path).await?;
391        let mut buffer = Vec::new();
392        file.read_to_end(&mut buffer).await?;
393
394        // Add BLOB: prefix to mark this as blob data
395        let mut blob_data = Vec::with_capacity(5 + buffer.len());
396        blob_data.extend_from_slice(b"BLOB:");
397        blob_data.extend_from_slice(&buffer);
398
399        self.put(key, blob_data, None)
400    }
401
402    /// Create a new collection with the given schema
403    ///
404    /// # Arguments
405    /// * `name` - Name of the collection to create
406    /// * `fields` - Schema definition as a list of field definitions:
407    ///   * Field name
408    ///   * Field type (String, Int, Float, Boolean, etc.)
409    ///   * Whether the field requires a unique value
410    ///
411    /// # Returns
412    /// Success or an error (e.g., collection already exists)
413    ///
414    /// # Examples
415    ///
416    /// ```
417    /// // Define a collection with schema
418    /// db.new_collection("products", vec![
419    ///     ("name", FieldType::String, false),
420    ///     ("price", FieldType::Float, false),
421    ///     ("sku", FieldType::String, true),  // unique field
422    ///     ("description", FieldType::String, false),
423    ///     ("in_stock", FieldType::Boolean, false),
424    /// ])?;
425    /// ```
426    pub fn new_collection(&self, name: &str, fields: Vec<(String, FieldType, bool)>) -> Result<()> {
427        let collection_key = format!("_collection:{}", name);
428
429        // Check if collection already exists
430        if self.get(&collection_key)?.is_some() {
431            return Err(AuroraError::CollectionAlreadyExists(name.to_string()));
432        }
433
434        // Create field definitions
435        let mut field_definitions = HashMap::new();
436        for (field_name, field_type, unique) in fields {
437            field_definitions.insert(
438                field_name,
439                FieldDefinition {
440                    field_type,
441                    unique,
442                    indexed: unique, // Auto-index unique fields
443                },
444            );
445        }
446
447        let collection = Collection {
448            name: name.to_string(),
449            fields: field_definitions,
450            // REMOVED: unique_fields is now derived from fields
451        };
452
453        let collection_data = serde_json::to_vec(&collection)?;
454        self.put(collection_key, collection_data, None)?;
455
456        Ok(())
457    }
458
459    /// Insert a document into a collection
460    ///
461    /// # Arguments
462    /// * `collection` - Name of the collection to insert into
463    /// * `data` - Document fields and values to insert
464    ///
465    /// # Returns
466    /// The ID of the inserted document or an error
467    ///
468    /// # Examples
469    ///
470    /// ```
471    /// // Insert a document
472    /// let doc_id = db.insert_into("users", vec![
473    ///     ("name", Value::String("John Doe".to_string())),
474    ///     ("email", Value::String("john@example.com".to_string())),
475    ///     ("active", Value::Bool(true)),
476    /// ])?;
477    /// ```
478    pub async fn insert_into(&self, collection: &str, data: Vec<(&str, Value)>) -> Result<String> {
479        // Convert Vec<(&str, Value)> to HashMap<String, Value>
480        let data_map: HashMap<String, Value> =
481            data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
482
483        // Validate unique constraints before inserting
484        self.validate_unique_constraints(collection, &data_map)
485            .await?;
486
487        let doc_id = Uuid::new_v4().to_string();
488        let document = Document {
489            id: doc_id.clone(),
490            data: data_map,
491        };
492
493        self.put(
494            format!("{}:{}", collection, doc_id),
495            serde_json::to_vec(&document)?,
496            None,
497        )?;
498
499        Ok(doc_id)
500    }
501
502    pub async fn insert_map(
503        &self,
504        collection: &str,
505        data: HashMap<String, Value>,
506    ) -> Result<String> {
507        // Validate unique constraints before inserting
508        self.validate_unique_constraints(collection, &data).await?;
509
510        let doc_id = Uuid::new_v4().to_string();
511        let document = Document {
512            id: doc_id.clone(),
513            data,
514        };
515
516        self.put(
517            format!("{}:{}", collection, doc_id),
518            serde_json::to_vec(&document)?,
519            None,
520        )?;
521
522        Ok(doc_id)
523    }
524
525    pub async fn get_all_collection(&self, collection: &str) -> Result<Vec<Document>> {
526        self.ensure_indices_initialized().await?;
527        self.scan_collection(collection)
528    }
529
530    pub fn get_data_by_pattern(&self, pattern: &str) -> Result<Vec<(String, DataInfo)>> {
531        let mut data = Vec::new();
532
533        if let Some(index) = self
534            .primary_indices
535            .get(pattern.split(':').next().unwrap_or(""))
536        {
537            for entry in index.iter() {
538                if entry.key().contains(pattern) {
539                    let value = entry.value();
540                    let info = if value.starts_with(b"BLOB:") {
541                        DataInfo::Blob { size: value.len() }
542                    } else {
543                        DataInfo::Data {
544                            size: value.len(),
545                            preview: String::from_utf8_lossy(&value[..value.len().min(50)])
546                                .into_owned(),
547                        }
548                    };
549
550                    data.push((entry.key().clone(), info));
551                }
552            }
553        }
554
555        Ok(data)
556    }
557
558    /// Begin a transaction
559    ///
560    /// All operations after beginning a transaction will be part of the transaction
561    /// until either commit_transaction() or rollback_transaction() is called.
562    ///
563    /// # Returns
564    /// Success or an error (e.g., if a transaction is already in progress)
565    ///
566    /// # Examples
567    ///
568    /// ```
569    /// // Start a transaction for atomic operations
570    /// db.begin_transaction()?;
571    ///
572    /// // Perform multiple operations
573    /// db.insert_into("accounts", vec![("user_id", Value::String(user_id)), ("balance", Value::Float(100.0))])?;
574    /// db.insert_into("audit_log", vec![("action", Value::String("account_created".to_string()))])?;
575    ///
576    /// // Commit all changes or roll back if there's an error
577    /// if all_ok {
578    ///     db.commit_transaction()?;
579    /// } else {
580    ///     db.rollback_transaction()?;
581    /// }
582    /// ```
583    pub fn begin_transaction(&self) -> Result<()> {
584        if self
585            .in_transaction
586            .load(std::sync::atomic::Ordering::SeqCst)
587        {
588            return Err(AuroraError::InvalidOperation(
589                "Transaction already in progress".into(),
590            ));
591        }
592
593        // Mark as in transaction
594        self.in_transaction
595            .store(true, std::sync::atomic::Ordering::SeqCst);
596        Ok(())
597    }
598
599    /// Commit the current transaction
600    ///
601    /// Makes all changes in the current transaction permanent.
602    ///
603    /// # Returns
604    /// Success or an error (e.g., if no transaction is active)
605    pub fn commit_transaction(&self) -> Result<()> {
606        if !self
607            .in_transaction
608            .load(std::sync::atomic::Ordering::SeqCst)
609        {
610            return Err(AuroraError::InvalidOperation(
611                "No transaction in progress".into(),
612            ));
613        }
614
615        // Apply all pending transaction operations
616        for item in self.transaction_ops.iter() {
617            self.cold.set(item.key().clone(), item.value().clone())?;
618            self.hot.set(item.key().clone(), item.value().clone(), None);
619        }
620
621        // Clear transaction data
622        self.transaction_ops.clear();
623        self.in_transaction
624            .store(false, std::sync::atomic::Ordering::SeqCst);
625
626        // Ensure durability by forcing a sync
627        self.cold.compact()?;
628
629        Ok(())
630    }
631
632    /// Roll back the current transaction
633    ///
634    /// Discards all changes made in the current transaction.
635    ///
636    /// # Returns
637    /// Success or an error (e.g., if no transaction is active)
638    pub fn rollback_transaction(&self) -> Result<()> {
639        if !self
640            .in_transaction
641            .load(std::sync::atomic::Ordering::SeqCst)
642        {
643            return Err(AuroraError::InvalidOperation(
644                "No transaction in progress".into(),
645            ));
646        }
647
648        // Simply discard the transaction operations
649        self.transaction_ops.clear();
650        self.in_transaction
651            .store(false, std::sync::atomic::Ordering::SeqCst);
652
653        Ok(())
654    }
655
656    pub async fn create_index(&self, collection: &str, field: &str) -> Result<()> {
657        // Check if collection exists
658        if self.get(&format!("_collection:{}", collection))?.is_none() {
659            return Err(AuroraError::CollectionNotFound(collection.to_string()));
660        }
661
662        // Generate a default index name
663        let index_name = format!("idx_{}_{}", collection, field);
664
665        // Create index definition
666        let definition = IndexDefinition {
667            name: index_name.clone(),
668            collection: collection.to_string(),
669            fields: vec![field.to_string()],
670            index_type: IndexType::BTree,
671            unique: false,
672        };
673
674        // Create the index
675        let index = Index::new(definition.clone());
676
677        // Index all existing documents in the collection
678        let prefix = format!("{}:", collection);
679        for result in self.cold.scan_prefix(&prefix) {
680            if let Ok((_, data)) = result {
681                if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
682                    let _ = index.insert(&doc);
683                }
684            }
685        }
686
687        // Store the index
688        self.indices.insert(index_name, index);
689
690        // Store the index definition for persistence
691        let index_key = format!("_index:{}:{}", collection, field);
692        self.put(index_key, serde_json::to_vec(&definition)?, None)?;
693
694        Ok(())
695    }
696
697    /// Create a query builder for advanced document queries
698    ///
699    /// # Arguments
700    /// * `collection` - Name of the collection to query
701    ///
702    /// # Returns
703    /// A `QueryBuilder` for constructing and executing queries
704    ///
705    /// # Examples
706    ///
707    /// ```
708    /// // Query for documents matching criteria
709    /// let active_premium_users = db.query("users")
710    ///     .filter(|f| f.eq("status", "active") && f.eq("plan", "premium"))
711    ///     .order_by("joined_date", false)  // newest first
712    ///     .limit(10)
713    ///     .collect()
714    ///     .await?;
715    /// ```
716    pub fn query<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
717        QueryBuilder::new(self, collection)
718    }
719
720    /// Create a search builder for full-text search
721    ///
722    /// # Arguments
723    /// * `collection` - Name of the collection to search
724    ///
725    /// # Returns
726    /// A `SearchBuilder` for configuring and executing searches
727    ///
728    /// # Examples
729    ///
730    /// ```
731    /// // Search for documents containing text
732    /// let search_results = db.search("articles")
733    ///     .field("content")
734    ///     .matching("quantum computing")
735    ///     .fuzzy(true)  // Enable fuzzy matching for typo tolerance
736    ///     .collect()
737    ///     .await?;
738    /// ```
739    pub fn search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
740        SearchBuilder::new(self, collection)
741    }
742
743    /// Retrieve a document by ID
744    ///
745    /// # Arguments
746    /// * `collection` - Name of the collection to query
747    /// * `id` - ID of the document to retrieve
748    ///
749    /// # Returns
750    /// The document if found, None if not found, or an error
751    ///
752    /// # Examples
753    ///
754    /// ```
755    /// // Get a document by ID
756    /// if let Some(user) = db.get_document("users", &user_id)? {
757    ///     println!("Found user: {}", user.data.get("name").unwrap());
758    /// } else {
759    ///     println!("User not found");
760    /// }
761    /// ```
762    pub fn get_document(&self, collection: &str, id: &str) -> Result<Option<Document>> {
763        let key = format!("{}:{}", collection, id);
764        if let Some(data) = self.get(&key)? {
765            Ok(Some(serde_json::from_slice(&data)?))
766        } else {
767            Ok(None)
768        }
769    }
770
771    /// Delete a document by ID
772    ///
773    /// # Arguments
774    /// * `collection` - Name of the collection containing the document
775    /// * `id` - ID of the document to delete
776    ///
777    /// # Returns
778    /// Success or an error
779    ///
780    /// # Examples
781    ///
782    /// ```
783    /// // Delete a specific document
784    /// db.delete("users", &user_id)?;
785    /// ```
786    pub async fn delete(&self, key: &str) -> Result<()> {
787        // Delete in all levels
788        if self.hot.get(key).is_some() {
789            self.hot.delete(key);
790        }
791
792        self.cold.delete(key)?;
793
794        // Update indices
795        if let Some(collection) = key.split(':').next() {
796            if let Some(index) = self.primary_indices.get_mut(collection) {
797                index.remove(key);
798            }
799        }
800
801        // If in transaction, record the operation (with null value to indicate deletion)
802        if self
803            .in_transaction
804            .load(std::sync::atomic::Ordering::SeqCst)
805        {
806            self.transaction_ops.insert(key.to_string(), Vec::new());
807        }
808
809        Ok(())
810    }
811
812    pub async fn delete_collection(&self, collection: &str) -> Result<()> {
813        let prefix = format!("{}:", collection);
814
815        // Get all keys in collection
816        let keys: Vec<String> = self
817            .cold
818            .scan()
819            .filter_map(|r| r.ok())
820            .filter(|(k, _)| k.starts_with(&prefix))
821            .map(|(k, _)| k)
822            .collect();
823
824        // Delete each key
825        for key in keys {
826            self.delete(&key).await?;
827        }
828
829        // Remove collection indices
830        self.primary_indices.remove(collection);
831        self.secondary_indices
832            .retain(|k, _| !k.starts_with(&prefix));
833
834        Ok(())
835    }
836
837    #[allow(dead_code)]
838    fn remove_from_indices(&self, collection: &str, doc: &Document) -> Result<()> {
839        // Remove from primary index
840        if let Some(index) = self.primary_indices.get(collection) {
841            index.remove(&doc.id);
842        }
843
844        // Remove from secondary indices
845        for (field, value) in &doc.data {
846            let index_key = format!("{}:{}", collection, field);
847            if let Some(index) = self.secondary_indices.get(&index_key) {
848                if let Some(mut doc_ids) = index.get_mut(&value.to_string()) {
849                    doc_ids.retain(|id| id != &doc.id);
850                }
851            }
852        }
853
854        Ok(())
855    }
856
857    pub async fn search_text(
858        &self,
859        collection: &str,
860        field: &str,
861        query: &str,
862    ) -> Result<Vec<Document>> {
863        let mut results = Vec::new();
864        let docs = self.get_all_collection(collection).await?;
865
866        for doc in docs {
867            if let Some(Value::String(text)) = doc.data.get(field) {
868                if text.to_lowercase().contains(&query.to_lowercase()) {
869                    results.push(doc);
870                }
871            }
872        }
873
874        Ok(results)
875    }
876
877    /// Export a collection to a JSON file
878    ///
879    /// # Arguments
880    /// * `collection` - Name of the collection to export
881    /// * `output_path` - Path to the output JSON file
882    ///
883    /// # Returns
884    /// Success or an error
885    ///
886    /// # Examples
887    ///
888    /// ```
889    /// // Backup a collection to JSON
890    /// db.export_as_json("users", "./backups/users_2023-10-15.json")?;
891    /// ```
892    pub fn export_as_json(&self, collection: &str, output_path: &str) -> Result<()> {
893        let output_path = if !output_path.ends_with(".json") {
894            format!("{}.json", output_path)
895        } else {
896            output_path.to_string()
897        };
898
899        let mut docs = Vec::new();
900
901        // Get all documents from the specified collection
902        for result in self.cold.scan() {
903            let (key, value) = result?;
904
905            // Only process documents from the specified collection
906            if let Some(key_collection) = key.split(':').next() {
907                if key_collection == collection && !key.starts_with("_collection:") {
908                    if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
909                        // Convert Value enum to raw JSON values
910                        let mut clean_doc = serde_json::Map::new();
911                        for (k, v) in doc.data {
912                            match v {
913                                Value::String(s) => clean_doc.insert(k, JsonValue::String(s)),
914                                Value::Int(i) => clean_doc.insert(k, JsonValue::Number(i.into())),
915                                Value::Float(f) => {
916                                    if let Some(n) = serde_json::Number::from_f64(f) {
917                                        clean_doc.insert(k, JsonValue::Number(n))
918                                    } else {
919                                        clean_doc.insert(k, JsonValue::Null)
920                                    }
921                                }
922                                Value::Bool(b) => clean_doc.insert(k, JsonValue::Bool(b)),
923                                Value::Array(arr) => {
924                                    let clean_arr: Vec<JsonValue> = arr
925                                        .into_iter()
926                                        .map(|v| match v {
927                                            Value::String(s) => JsonValue::String(s),
928                                            Value::Int(i) => JsonValue::Number(i.into()),
929                                            Value::Float(f) => serde_json::Number::from_f64(f)
930                                                .map(JsonValue::Number)
931                                                .unwrap_or(JsonValue::Null),
932                                            Value::Bool(b) => JsonValue::Bool(b),
933                                            Value::Null => JsonValue::Null,
934                                            _ => JsonValue::Null,
935                                        })
936                                        .collect();
937                                    clean_doc.insert(k, JsonValue::Array(clean_arr))
938                                }
939                                Value::Uuid(u) => {
940                                    clean_doc.insert(k, JsonValue::String(u.to_string()))
941                                }
942                                Value::Null => clean_doc.insert(k, JsonValue::Null),
943                                Value::Object(_) => None, // Handle nested objects if needed
944                            };
945                        }
946                        docs.push(JsonValue::Object(clean_doc));
947                    }
948                }
949            }
950        }
951
952        let output = JsonValue::Object(serde_json::Map::from_iter(vec![(
953            collection.to_string(),
954            JsonValue::Array(docs),
955        )]));
956
957        let mut file = StdFile::create(&output_path)?;
958        serde_json::to_writer_pretty(&mut file, &output)?;
959        println!("Exported collection '{}' to {}", collection, &output_path);
960        Ok(())
961    }
962
963    /// Export specific collection to CSV file
964    pub fn export_as_csv(&self, collection: &str, filename: &str) -> Result<()> {
965        let output_path = if !filename.ends_with(".csv") {
966            format!("{}.csv", filename)
967        } else {
968            filename.to_string()
969        };
970
971        let mut writer = csv::Writer::from_path(&output_path)?;
972        let mut headers = Vec::new();
973        let mut first_doc = true;
974
975        // Get all documents from the specified collection
976        for result in self.cold.scan() {
977            let (key, value) = result?;
978
979            // Only process documents from the specified collection
980            if let Some(key_collection) = key.split(':').next() {
981                if key_collection == collection && !key.starts_with("_collection:") {
982                    if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
983                        // Write headers from first document
984                        if first_doc && !doc.data.is_empty() {
985                            headers = doc.data.keys().cloned().collect();
986                            writer.write_record(&headers)?;
987                            first_doc = false;
988                        }
989
990                        // Write the document values
991                        let values: Vec<String> = headers
992                            .iter()
993                            .map(|header| {
994                                doc.data
995                                    .get(header)
996                                    .map(|v| v.to_string())
997                                    .unwrap_or_default()
998                            })
999                            .collect();
1000                        writer.write_record(&values)?;
1001                    }
1002                }
1003            }
1004        }
1005
1006        writer.flush()?;
1007        println!("Exported collection '{}' to {}", collection, &output_path);
1008        Ok(())
1009    }
1010
1011    // Helper method to create filter-based queries
1012    pub fn find<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
1013        self.query(collection)
1014    }
1015
1016    // Convenience methods that build on top of the FilterBuilder
1017
1018    pub async fn find_by_id(&self, collection: &str, id: &str) -> Result<Option<Document>> {
1019        self.query(collection)
1020            .filter(|f| f.eq("id", id))
1021            .first_one()
1022            .await
1023    }
1024
1025    pub async fn find_one<F>(&self, collection: &str, filter_fn: F) -> Result<Option<Document>>
1026    where
1027        F: Fn(&FilterBuilder) -> bool + Send + 'static,
1028    {
1029        self.query(collection).filter(filter_fn).first_one().await
1030    }
1031
1032    pub async fn find_by_field<T: Into<Value> + Clone + Send + 'static>(
1033        &self,
1034        collection: &str,
1035        field: &'static str,
1036        value: T,
1037    ) -> Result<Vec<Document>> {
1038        let value_clone = value.clone();
1039        self.query(collection)
1040            .filter(move |f| f.eq(field, value_clone.clone()))
1041            .collect()
1042            .await
1043    }
1044
1045    pub async fn find_by_fields(
1046        &self,
1047        collection: &str,
1048        fields: Vec<(&str, Value)>,
1049    ) -> Result<Vec<Document>> {
1050        let mut query = self.query(collection);
1051
1052        for (field, value) in fields {
1053            let field_owned = field.to_owned();
1054            let value_owned = value.clone();
1055            query = query.filter(move |f| f.eq(&field_owned, value_owned.clone()));
1056        }
1057
1058        query.collect().await
1059    }
1060
1061    // Advanced example: find documents with a field value in a specific range
1062    pub async fn find_in_range<T: Into<Value> + Clone + Send + 'static>(
1063        &self,
1064        collection: &str,
1065        field: &'static str,
1066        min: T,
1067        max: T,
1068    ) -> Result<Vec<Document>> {
1069        self.query(collection)
1070            .filter(move |f| f.between(field, min.clone(), max.clone()))
1071            .collect()
1072            .await
1073    }
1074
1075    // Complex query example: build with multiple combined filters
1076    pub async fn find_complex<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
1077        self.query(collection)
1078    }
1079
1080    // Create a full-text search query with added filter options
1081    pub fn advanced_search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
1082        self.search(collection)
1083    }
1084
1085    // Utility methods for common operations
1086    pub async fn upsert(
1087        &self,
1088        collection: &str,
1089        id: &str,
1090        data: Vec<(&str, Value)>,
1091    ) -> Result<String> {
1092        // Convert Vec<(&str, Value)> to HashMap<String, Value>
1093        let data_map: HashMap<String, Value> =
1094            data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
1095
1096        // Check if document exists
1097        if let Some(mut doc) = self.get_document(collection, id)? {
1098            // Update existing document - merge new data
1099            for (key, value) in data_map {
1100                doc.data.insert(key, value);
1101            }
1102
1103            // Validate unique constraints for the updated document
1104            // We need to exclude the current document from the uniqueness check
1105            self.validate_unique_constraints_excluding(collection, &doc.data, id)
1106                .await?;
1107
1108            self.put(
1109                format!("{}:{}", collection, id),
1110                serde_json::to_vec(&doc)?,
1111                None,
1112            )?;
1113            Ok(id.to_string())
1114        } else {
1115            // Insert new document with specified ID - validate unique constraints
1116            self.validate_unique_constraints(collection, &data_map)
1117                .await?;
1118
1119            let document = Document {
1120                id: id.to_string(),
1121                data: data_map,
1122            };
1123
1124            self.put(
1125                format!("{}:{}", collection, id),
1126                serde_json::to_vec(&document)?,
1127                None,
1128            )?;
1129            Ok(id.to_string())
1130        }
1131    }
1132
1133    // Atomic increment/decrement
1134    pub async fn increment(
1135        &self,
1136        collection: &str,
1137        id: &str,
1138        field: &str,
1139        amount: i64,
1140    ) -> Result<i64> {
1141        if let Some(mut doc) = self.get_document(collection, id)? {
1142            // Get current value
1143            let current = match doc.data.get(field) {
1144                Some(Value::Int(i)) => *i,
1145                _ => 0,
1146            };
1147
1148            // Increment
1149            let new_value = current + amount;
1150            doc.data.insert(field.to_string(), Value::Int(new_value));
1151
1152            // Save changes
1153            self.put(
1154                format!("{}:{}", collection, id),
1155                serde_json::to_vec(&doc)?,
1156                None,
1157            )?;
1158
1159            Ok(new_value)
1160        } else {
1161            Err(AuroraError::NotFound(format!(
1162                "Document {}:{} not found",
1163                collection, id
1164            )))
1165        }
1166    }
1167
1168    // Batch operations
1169    pub async fn batch_insert(
1170        &self,
1171        collection: &str,
1172        docs: Vec<Vec<(&str, Value)>>,
1173    ) -> Result<Vec<String>> {
1174        // Convert each Vec<(&str, Value)> to HashMap<String, Value>
1175        let doc_maps: Vec<HashMap<String, Value>> = docs
1176            .into_iter()
1177            .map(|doc| doc.into_iter().map(|(k, v)| (k.to_string(), v)).collect())
1178            .collect();
1179
1180        // Begin transaction
1181        self.begin_transaction()?;
1182
1183        let mut ids = Vec::with_capacity(doc_maps.len());
1184
1185        // Insert all documents
1186        for data_map in doc_maps {
1187            // Convert HashMap back to Vec for insert_into method
1188            let data_vec: Vec<(&str, Value)> = data_map
1189                .iter()
1190                .map(|(k, v)| (k.as_str(), v.clone()))
1191                .collect();
1192
1193            match self.insert_into(collection, data_vec).await {
1194                Ok(id) => ids.push(id),
1195                Err(e) => {
1196                    self.rollback_transaction()?;
1197                    return Err(e);
1198                }
1199            }
1200        }
1201
1202        // Commit transaction
1203        self.commit_transaction()?;
1204
1205        Ok(ids)
1206    }
1207
1208    // Delete documents by query
1209    pub async fn delete_by_query<F>(&self, collection: &str, filter_fn: F) -> Result<usize>
1210    where
1211        F: Fn(&FilterBuilder) -> bool + Send + 'static,
1212    {
1213        let docs = self.query(collection).filter(filter_fn).collect().await?;
1214
1215        let mut deleted_count = 0;
1216
1217        for doc in docs {
1218            let key = format!("{}:{}", collection, doc.id);
1219            self.delete(&key).await?;
1220            deleted_count += 1;
1221        }
1222
1223        Ok(deleted_count)
1224    }
1225
1226    /// Import documents from a JSON file into a collection
1227    ///
1228    /// This method validates documents against the collection schema
1229    /// and skips documents that already exist in the database.
1230    ///
1231    /// # Arguments
1232    /// * `collection` - Name of the collection to import into
1233    /// * `filename` - Path to the JSON file containing documents
1234    ///
1235    /// # Returns
1236    /// Statistics about the import operation or an error
1237    ///
1238    /// # Examples
1239    ///
1240    /// ```
1241    /// // Import documents from JSON
1242    /// let stats = db.import_from_json("users", "./data/new_users.json").await?;
1243    /// println!("Imported: {}, Skipped: {}, Failed: {}",
1244    ///     stats.imported, stats.skipped, stats.failed);
1245    /// ```
1246    pub async fn import_from_json(&self, collection: &str, filename: &str) -> Result<ImportStats> {
1247        // Validate that the collection exists
1248        let collection_def = self.get_collection_definition(collection)?;
1249
1250        // Load JSON file
1251        let json_string = read_to_string(filename)
1252            .await
1253            .map_err(|e| AuroraError::IoError(format!("Failed to read import file: {}", e)))?;
1254
1255        // Parse JSON
1256        let documents: Vec<JsonValue> = from_str(&json_string)
1257            .map_err(|e| AuroraError::SerializationError(format!("Failed to parse JSON: {}", e)))?;
1258
1259        let mut stats = ImportStats::default();
1260
1261        // Process each document
1262        for doc_json in documents {
1263            match self
1264                .import_document(collection, &collection_def, doc_json)
1265                .await
1266            {
1267                Ok(ImportResult::Imported) => stats.imported += 1,
1268                Ok(ImportResult::Skipped) => stats.skipped += 1,
1269                Err(_) => stats.failed += 1,
1270            }
1271        }
1272
1273        Ok(stats)
1274    }
1275
1276    /// Import a single document, performing schema validation and duplicate checking
1277    async fn import_document(
1278        &self,
1279        collection: &str,
1280        collection_def: &Collection,
1281        doc_json: JsonValue,
1282    ) -> Result<ImportResult> {
1283        if !doc_json.is_object() {
1284            return Err(AuroraError::InvalidOperation("Expected JSON object".into()));
1285        }
1286
1287        // Extract document ID if present
1288        let doc_id = doc_json
1289            .get("id")
1290            .and_then(|id| id.as_str())
1291            .map(|s| s.to_string())
1292            .unwrap_or_else(|| Uuid::new_v4().to_string());
1293
1294        // Check if document with this ID already exists
1295        if let Some(_) = self.get_document(collection, &doc_id)? {
1296            return Ok(ImportResult::Skipped);
1297        }
1298
1299        // Convert JSON to our document format and validate against schema
1300        let mut data_map = HashMap::new();
1301
1302        if let Some(obj) = doc_json.as_object() {
1303            for (field_name, field_def) in &collection_def.fields {
1304                if let Some(json_value) = obj.get(field_name) {
1305                    // Validate value against field type
1306                    if !self.validate_field_value(json_value, &field_def.field_type) {
1307                        return Err(AuroraError::InvalidOperation(format!(
1308                            "Field '{}' has invalid type",
1309                            field_name
1310                        )));
1311                    }
1312
1313                    // Convert JSON value to our Value type
1314                    let value = self.json_to_value(json_value)?;
1315                    data_map.insert(field_name.clone(), value);
1316                } else if field_def.unique {
1317                    // Missing required unique field
1318                    return Err(AuroraError::InvalidOperation(format!(
1319                        "Missing required unique field '{}'",
1320                        field_name
1321                    )));
1322                }
1323            }
1324        }
1325
1326        // Check for duplicates by unique fields
1327        let unique_fields = self.get_unique_fields(&collection_def);
1328        for unique_field in &unique_fields {
1329            if let Some(value) = data_map.get(unique_field) {
1330                // Query for existing documents with this unique value
1331                let query_results = self
1332                    .query(collection)
1333                    .filter(move |f| f.eq(unique_field, value.clone()))
1334                    .limit(1)
1335                    .collect()
1336                    .await?;
1337
1338                if !query_results.is_empty() {
1339                    // Found duplicate by unique field
1340                    return Ok(ImportResult::Skipped);
1341                }
1342            }
1343        }
1344
1345        // Create and insert document
1346        let document = Document {
1347            id: doc_id,
1348            data: data_map,
1349        };
1350
1351        self.put(
1352            format!("{}:{}", collection, document.id),
1353            serde_json::to_vec(&document)?,
1354            None,
1355        )?;
1356
1357        Ok(ImportResult::Imported)
1358    }
1359
1360    /// Validate that a JSON value matches the expected field type
1361    fn validate_field_value(&self, value: &JsonValue, field_type: &FieldType) -> bool {
1362        match field_type {
1363            FieldType::String => value.is_string(),
1364            FieldType::Int => value.is_i64() || value.is_u64(),
1365            FieldType::Float => value.is_number(),
1366            FieldType::Boolean => value.is_boolean(),
1367            FieldType::Array => value.is_array(),
1368            FieldType::Object => value.is_object(),
1369            FieldType::Uuid => {
1370                value.is_string() && Uuid::parse_str(value.as_str().unwrap_or("")).is_ok()
1371            }
1372        }
1373    }
1374
1375    /// Convert a JSON value to our internal Value type
1376    fn json_to_value(&self, json_value: &JsonValue) -> Result<Value> {
1377        match json_value {
1378            JsonValue::Null => Ok(Value::Null),
1379            JsonValue::Bool(b) => Ok(Value::Bool(*b)),
1380            JsonValue::Number(n) => {
1381                if let Some(i) = n.as_i64() {
1382                    Ok(Value::Int(i))
1383                } else if let Some(f) = n.as_f64() {
1384                    Ok(Value::Float(f))
1385                } else {
1386                    Err(AuroraError::InvalidOperation("Invalid number value".into()))
1387                }
1388            }
1389            JsonValue::String(s) => {
1390                // Try parsing as UUID first
1391                if let Ok(uuid) = Uuid::parse_str(s) {
1392                    Ok(Value::Uuid(uuid))
1393                } else {
1394                    Ok(Value::String(s.clone()))
1395                }
1396            }
1397            JsonValue::Array(arr) => {
1398                let mut values = Vec::new();
1399                for item in arr {
1400                    values.push(self.json_to_value(item)?);
1401                }
1402                Ok(Value::Array(values))
1403            }
1404            JsonValue::Object(obj) => {
1405                let mut map = HashMap::new();
1406                for (k, v) in obj {
1407                    map.insert(k.clone(), self.json_to_value(v)?);
1408                }
1409                Ok(Value::Object(map))
1410            }
1411        }
1412    }
1413
1414    /// Get collection definition
1415    fn get_collection_definition(&self, collection: &str) -> Result<Collection> {
1416        if let Some(data) = self.get(&format!("_collection:{}", collection))? {
1417            let collection_def: Collection = serde_json::from_slice(&data)?;
1418            Ok(collection_def)
1419        } else {
1420            Err(AuroraError::CollectionNotFound(collection.to_string()))
1421        }
1422    }
1423
1424    /// Get storage statistics and information about the database
1425    pub fn get_database_stats(&self) -> Result<DatabaseStats> {
1426        let hot_stats = self.hot.get_stats();
1427        let cold_stats = self.cold.get_stats()?;
1428
1429        Ok(DatabaseStats {
1430            hot_stats,
1431            cold_stats,
1432            estimated_size: self.cold.estimated_size(),
1433            collections: self.get_collection_stats()?,
1434        })
1435    }
1436
1437    /// Get a direct reference to a value in the hot cache
1438    pub fn get_hot_ref(&self, key: &str) -> Option<Arc<Vec<u8>>> {
1439        self.hot.get_ref(key)
1440    }
1441
1442    /// Check if a key is currently stored in the hot cache
1443    pub fn is_in_hot_cache(&self, key: &str) -> bool {
1444        self.hot.is_hot(key)
1445    }
1446
1447    /// Start background cleanup of hot cache with specified interval
1448    pub async fn start_hot_cache_maintenance(&self, interval_secs: u64) {
1449        let hot_store = Arc::new(self.hot.clone());
1450        hot_store.start_cleanup_with_interval(interval_secs).await;
1451    }
1452
1453    /// Clear the hot cache (useful when memory needs to be freed)
1454    pub fn clear_hot_cache(&self) {
1455        self.hot.clear();
1456        println!(
1457            "Hot cache cleared, current hit ratio: {:.2}%",
1458            self.hot.hit_ratio() * 100.0
1459        );
1460    }
1461
1462    /// Store multiple key-value pairs efficiently in a single batch operation
1463    pub fn batch_write(&self, pairs: Vec<(String, Vec<u8>)>) -> Result<()> {
1464        self.cold.batch_set(pairs)
1465    }
1466
1467    /// Scan for keys with a specific prefix
1468    pub fn scan_with_prefix(
1469        &self,
1470        prefix: &str,
1471    ) -> impl Iterator<Item = Result<(String, Vec<u8>)>> + '_ {
1472        self.cold.scan_prefix(prefix)
1473    }
1474
1475    /// Get storage efficiency metrics for the database
1476    pub fn get_collection_stats(&self) -> Result<HashMap<String, CollectionStats>> {
1477        let mut stats = HashMap::new();
1478
1479        // Scan all collections
1480        let collections: Vec<String> = self
1481            .cold
1482            .scan()
1483            .filter_map(|r| r.ok())
1484            .map(|(k, _)| k)
1485            .filter(|k| k.starts_with("_collection:"))
1486            .map(|k| k.trim_start_matches("_collection:").to_string())
1487            .collect();
1488
1489        for collection in collections {
1490            let prefix = format!("{}:", collection);
1491
1492            // Count documents
1493            let count = self.cold.scan_prefix(&prefix).count();
1494
1495            // Estimate size
1496            let size: usize = self
1497                .cold
1498                .scan_prefix(&prefix)
1499                .filter_map(|r| r.ok())
1500                .map(|(_, v)| v.len())
1501                .sum();
1502
1503            stats.insert(
1504                collection,
1505                CollectionStats {
1506                    count,
1507                    size_bytes: size,
1508                    avg_doc_size: if count > 0 { size / count } else { 0 },
1509                },
1510            );
1511        }
1512
1513        Ok(stats)
1514    }
1515
1516    /// Search for documents by exact value using an index
1517    ///
1518    /// This method performs a fast lookup using a pre-created index
1519    pub fn search_by_value(
1520        &self,
1521        collection: &str,
1522        field: &str,
1523        value: &Value,
1524    ) -> Result<Vec<Document>> {
1525        let index_key = format!("_index:{}:{}", collection, field);
1526
1527        if let Some(index_data) = self.get(&index_key)? {
1528            let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
1529            let index = Index::new(index_def);
1530
1531            // Use the previously unused search method
1532            if let Some(doc_ids) = index.search(value) {
1533                // Load the documents by ID
1534                let mut docs = Vec::new();
1535                for id in doc_ids {
1536                    if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
1537                        let doc: Document = serde_json::from_slice(&doc_data)?;
1538                        docs.push(doc);
1539                    }
1540                }
1541                return Ok(docs);
1542            }
1543        }
1544
1545        // Return empty result if no index or no matches
1546        Ok(Vec::new())
1547    }
1548
1549    /// Perform a full-text search on an indexed text field
1550    ///
1551    /// This provides more advanced text search capabilities including
1552    /// relevance ranking of results
1553    pub fn full_text_search(
1554        &self,
1555        collection: &str,
1556        field: &str,
1557        query: &str,
1558    ) -> Result<Vec<Document>> {
1559        let index_key = format!("_index:{}:{}", collection, field);
1560
1561        if let Some(index_data) = self.get(&index_key)? {
1562            let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
1563
1564            // Ensure this is a full-text index
1565            if !matches!(index_def.index_type, IndexType::FullText) {
1566                return Err(AuroraError::InvalidOperation(format!(
1567                    "Field '{}' is not indexed as full-text",
1568                    field
1569                )));
1570            }
1571
1572            let index = Index::new(index_def);
1573
1574            // Use the previously unused search_text method
1575            if let Some(doc_id_scores) = index.search_text(query) {
1576                // Load the documents by ID, preserving score order
1577                let mut docs = Vec::new();
1578                for (id, _score) in doc_id_scores {
1579                    if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
1580                        let doc: Document = serde_json::from_slice(&doc_data)?;
1581                        docs.push(doc);
1582                    }
1583                }
1584                return Ok(docs);
1585            }
1586        }
1587
1588        // Return empty result if no index or no matches
1589        Ok(Vec::new())
1590    }
1591
1592    /// Create a full-text search index on a text field
1593    pub fn create_text_index(
1594        &self,
1595        collection: &str,
1596        field: &str,
1597        _enable_stop_words: bool,
1598    ) -> Result<()> {
1599        // Check if collection exists
1600        if self.get(&format!("_collection:{}", collection))?.is_none() {
1601            return Err(AuroraError::CollectionNotFound(collection.to_string()));
1602        }
1603
1604        // Create index definition
1605        let index_def = IndexDefinition {
1606            name: format!("{}_{}_fulltext", collection, field),
1607            collection: collection.to_string(),
1608            fields: vec![field.to_string()],
1609            index_type: IndexType::FullText,
1610            unique: false,
1611        };
1612
1613        // Store index definition
1614        let index_key = format!("_index:{}:{}", collection, field);
1615        self.put(index_key, serde_json::to_vec(&index_def)?, None)?;
1616
1617        // Create the actual index
1618        let index = Index::new(index_def);
1619
1620        // Index all existing documents in the collection
1621        let prefix = format!("{}:", collection);
1622        for result in self.cold.scan_prefix(&prefix) {
1623            if let Ok((_, data)) = result {
1624                let doc: Document = serde_json::from_slice(&data)?;
1625                index.insert(&doc)?;
1626            }
1627        }
1628
1629        Ok(())
1630    }
1631
1632    pub async fn execute_simple_query(
1633        &self,
1634        builder: &SimpleQueryBuilder,
1635    ) -> Result<Vec<Document>> {
1636        // Ensure indices are initialized
1637        self.ensure_indices_initialized().await?;
1638
1639        // A place to store the IDs of the documents we need to fetch
1640        let mut doc_ids_to_load: Option<Vec<String>> = None;
1641        let mut used_filter_index: Option<usize> = None;
1642
1643        // --- The "Query Planner" ---
1644        // Look for an opportunity to use an index
1645        for (filter_idx, filter) in builder.filters.iter().enumerate() {
1646            match filter {
1647                Filter::Eq(field, value) => {
1648                    let index_key = format!("{}:{}", &builder.collection, field);
1649
1650                    // Do we have a secondary index for this field?
1651                    if let Some(index) = self.secondary_indices.get(&index_key) {
1652                        // Yes! Let's use it.
1653                        if let Some(matching_ids) = index.get(&value.to_string()) {
1654                            doc_ids_to_load = Some(matching_ids.clone());
1655                            used_filter_index = Some(filter_idx);
1656                            break; // Stop searching for other indexes for now
1657                        }
1658                    }
1659                }
1660                Filter::Gt(field, value)
1661                | Filter::Gte(field, value)
1662                | Filter::Lt(field, value)
1663                | Filter::Lte(field, value) => {
1664                    let index_key = format!("{}:{}", &builder.collection, field);
1665
1666                    // Do we have a secondary index for this field?
1667                    if let Some(index) = self.secondary_indices.get(&index_key) {
1668                        // For range queries, we need to scan through the index values
1669                        let mut matching_ids = Vec::new();
1670
1671                        for entry in index.iter() {
1672                            let index_value_str = entry.key();
1673
1674                            // Try to parse the index value to compare with our filter value
1675                            if let Ok(index_value) =
1676                                self.parse_value_from_string(index_value_str, value)
1677                            {
1678                                let matches = match filter {
1679                                    Filter::Gt(_, filter_val) => index_value > *filter_val,
1680                                    Filter::Gte(_, filter_val) => index_value >= *filter_val,
1681                                    Filter::Lt(_, filter_val) => index_value < *filter_val,
1682                                    Filter::Lte(_, filter_val) => index_value <= *filter_val,
1683                                    _ => false,
1684                                };
1685
1686                                if matches {
1687                                    matching_ids.extend(entry.value().clone());
1688                                }
1689                            }
1690                        }
1691
1692                        if !matching_ids.is_empty() {
1693                            doc_ids_to_load = Some(matching_ids);
1694                            used_filter_index = Some(filter_idx);
1695                            break;
1696                        }
1697                    }
1698                }
1699                Filter::Contains(field, search_term) => {
1700                    let index_key = format!("{}:{}", &builder.collection, field);
1701
1702                    // Do we have a secondary index for this field?
1703                    if let Some(index) = self.secondary_indices.get(&index_key) {
1704                        // For Contains queries, we need to scan through the index keys
1705                        // to find those that contain the search term
1706                        let mut matching_ids = Vec::new();
1707
1708                        for entry in index.iter() {
1709                            let index_value_str = entry.key();
1710
1711                            // Check if this indexed value contains our search term
1712                            if index_value_str
1713                                .to_lowercase()
1714                                .contains(&search_term.to_lowercase())
1715                            {
1716                                matching_ids.extend(entry.value().clone());
1717                            }
1718                        }
1719
1720                        if !matching_ids.is_empty() {
1721                            // Remove duplicates since a document could match multiple indexed values
1722                            matching_ids.sort();
1723                            matching_ids.dedup();
1724
1725                            doc_ids_to_load = Some(matching_ids);
1726                            used_filter_index = Some(filter_idx);
1727                            break;
1728                        }
1729                    }
1730                }
1731            }
1732        }
1733
1734        let mut final_docs: Vec<Document>;
1735
1736        if let Some(ids) = doc_ids_to_load {
1737            // --- Path 1: Index-based Fetch ---
1738            // We have a specific list of IDs to load. This is fast.
1739            println!("📊 Loading {} documents via index", ids.len());
1740            final_docs = Vec::with_capacity(ids.len());
1741
1742            for id in ids {
1743                let doc_key = format!("{}:{}", &builder.collection, id);
1744                if let Some(data) = self.get(&doc_key)? {
1745                    if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
1746                        final_docs.push(doc);
1747                    }
1748                }
1749            }
1750        } else {
1751            // --- Path 2: Full Collection Scan (Fallback) ---
1752            // No suitable index was found, so we must do the slow scan
1753            println!(
1754                "⚠️  INDEX MISS. Falling back to full collection scan for '{}'",
1755                &builder.collection
1756            );
1757            final_docs = self.get_all_collection(&builder.collection).await?;
1758        }
1759
1760        // Now, apply the *rest* of the filters in memory
1761        // This is important for queries with multiple filters, where only one might be indexed
1762        // Skip the filter we already used for the index lookup
1763        final_docs.retain(|doc| {
1764            builder.filters.iter().enumerate().all(|(idx, filter)| {
1765                // Skip the filter we already used for index lookup
1766                if Some(idx) == used_filter_index {
1767                    return true;
1768                }
1769
1770                match filter {
1771                    Filter::Eq(field, value) => doc.data.get(field).map_or(false, |v| v == value),
1772                    Filter::Gt(field, value) => doc.data.get(field).map_or(false, |v| v > value),
1773                    Filter::Gte(field, value) => doc.data.get(field).map_or(false, |v| v >= value),
1774                    Filter::Lt(field, value) => doc.data.get(field).map_or(false, |v| v < value),
1775                    Filter::Lte(field, value) => doc.data.get(field).map_or(false, |v| v <= value),
1776                    Filter::Contains(field, value_str) => {
1777                        doc.data.get(field).map_or(false, |v| match v {
1778                            Value::String(s) => s.contains(value_str),
1779                            Value::Array(arr) => arr.contains(&Value::String(value_str.clone())),
1780                            _ => false,
1781                        })
1782                    }
1783                }
1784            })
1785        });
1786
1787        println!(
1788            "✅ Query completed. Returning {} documents",
1789            final_docs.len()
1790        );
1791
1792        // NOTE: This implementation does not yet support ordering, limits, or offsets.
1793        // Those would be added here in a production system.
1794        Ok(final_docs)
1795    }
1796
1797    /// Helper method to parse a string value back to a Value for comparison
1798    fn parse_value_from_string(&self, value_str: &str, reference_value: &Value) -> Result<Value> {
1799        match reference_value {
1800            Value::Int(_) => {
1801                if let Ok(i) = value_str.parse::<i64>() {
1802                    Ok(Value::Int(i))
1803                } else {
1804                    Err(AuroraError::InvalidOperation("Failed to parse int".into()))
1805                }
1806            }
1807            Value::Float(_) => {
1808                if let Ok(f) = value_str.parse::<f64>() {
1809                    Ok(Value::Float(f))
1810                } else {
1811                    Err(AuroraError::InvalidOperation(
1812                        "Failed to parse float".into(),
1813                    ))
1814                }
1815            }
1816            Value::String(_) => Ok(Value::String(value_str.to_string())),
1817            _ => Ok(Value::String(value_str.to_string())),
1818        }
1819    }
1820
1821    pub async fn execute_dynamic_query(
1822        &self,
1823        collection: &str,
1824        payload: &QueryPayload,
1825    ) -> Result<Vec<Document>> {
1826        let mut docs = self.get_all_collection(collection).await?;
1827
1828        // 1. Apply Filters
1829        if let Some(filters) = &payload.filters {
1830            docs.retain(|doc| {
1831                filters.iter().all(|filter| {
1832                    doc.data
1833                        .get(&filter.field)
1834                        .map_or(false, |doc_val| check_filter(doc_val, filter))
1835                })
1836            });
1837        }
1838
1839        // 2. Apply Sorting
1840        if let Some(sort_options) = &payload.sort {
1841            docs.sort_by(|a, b| {
1842                let a_val = a.data.get(&sort_options.field);
1843                let b_val = b.data.get(&sort_options.field);
1844                let ordering = a_val
1845                    .partial_cmp(&b_val)
1846                    .unwrap_or(std::cmp::Ordering::Equal);
1847                if sort_options.ascending {
1848                    ordering
1849                } else {
1850                    ordering.reverse()
1851                }
1852            });
1853        }
1854
1855        // 3. Apply Pagination
1856        let mut docs = docs;
1857        if let Some(offset) = payload.offset {
1858            docs = docs.into_iter().skip(offset).collect();
1859        }
1860        if let Some(limit) = payload.limit {
1861            docs = docs.into_iter().take(limit).collect();
1862        }
1863
1864        // 4. Apply Field Selection (Projection)
1865        if let Some(select_fields) = &payload.select {
1866            if !select_fields.is_empty() {
1867                docs = docs
1868                    .into_iter()
1869                    .map(|mut doc| {
1870                        doc.data.retain(|key, _| select_fields.contains(key));
1871                        doc
1872                    })
1873                    .collect();
1874            }
1875        }
1876
1877        Ok(docs)
1878    }
1879
1880    pub async fn process_network_request(
1881        &self,
1882        request: crate::network::protocol::Request,
1883    ) -> crate::network::protocol::Response {
1884        use crate::network::protocol::Response;
1885
1886        match request {
1887            crate::network::protocol::Request::Get(key) => match self.get(&key) {
1888                Ok(value) => Response::Success(value),
1889                Err(e) => Response::Error(e.to_string()),
1890            },
1891            crate::network::protocol::Request::Put(key, value) => {
1892                match self.put(key, value, None) {
1893                    Ok(_) => Response::Done,
1894                    Err(e) => Response::Error(e.to_string()),
1895                }
1896            }
1897            crate::network::protocol::Request::Delete(key) => match self.delete(&key).await {
1898                Ok(_) => Response::Done,
1899                Err(e) => Response::Error(e.to_string()),
1900            },
1901            crate::network::protocol::Request::NewCollection { name, fields } => {
1902                let fields_for_db: Vec<(String, crate::types::FieldType, bool)> = fields
1903                    .iter()
1904                    .map(|(name, ft, unique)| (name.clone(), ft.clone(), *unique))
1905                    .collect();
1906
1907                match self.new_collection(&name, fields_for_db) {
1908                    Ok(_) => Response::Done,
1909                    Err(e) => Response::Error(e.to_string()),
1910                }
1911            }
1912            crate::network::protocol::Request::Insert { collection, data } => {
1913                match self.insert_map(&collection, data).await {
1914                    Ok(id) => Response::Message(id),
1915                    Err(e) => Response::Error(e.to_string()),
1916                }
1917            }
1918            crate::network::protocol::Request::GetDocument { collection, id } => {
1919                match self.get_document(&collection, &id) {
1920                    Ok(doc) => Response::Document(doc),
1921                    Err(e) => Response::Error(e.to_string()),
1922                }
1923            }
1924            crate::network::protocol::Request::Query(builder) => {
1925                match self.execute_simple_query(&builder).await {
1926                    Ok(docs) => Response::Documents(docs),
1927                    Err(e) => Response::Error(e.to_string()),
1928                }
1929            }
1930            crate::network::protocol::Request::BeginTransaction => match self.begin_transaction() {
1931                Ok(_) => Response::Done,
1932                Err(e) => Response::Error(e.to_string()),
1933            },
1934            crate::network::protocol::Request::CommitTransaction => match self.commit_transaction()
1935            {
1936                Ok(_) => Response::Done,
1937                Err(e) => Response::Error(e.to_string()),
1938            },
1939            crate::network::protocol::Request::RollbackTransaction => {
1940                match self.rollback_transaction() {
1941                    Ok(_) => Response::Done,
1942                    Err(e) => Response::Error(e.to_string()),
1943                }
1944            }
1945        }
1946    }
1947
1948    /// Create indices for commonly queried fields automatically
1949    ///
1950    /// This is a convenience method that creates indices for fields that are
1951    /// likely to be queried frequently, improving performance.
1952    ///
1953    /// # Arguments
1954    /// * `collection` - Name of the collection
1955    /// * `fields` - List of field names to create indices for
1956    ///
1957    /// # Examples
1958    /// ```
1959    /// // Create indices for commonly queried fields
1960    /// db.create_indices("users", &["email", "status", "created_at"]).await?;
1961    /// ```
1962    pub async fn create_indices(&self, collection: &str, fields: &[&str]) -> Result<()> {
1963        for field in fields {
1964            if let Err(e) = self.create_index(collection, field).await {
1965                eprintln!(
1966                    "Warning: Failed to create index for {}.{}: {}",
1967                    collection, field, e
1968                );
1969            } else {
1970                println!("✅ Created index for {}.{}", collection, field);
1971            }
1972        }
1973        Ok(())
1974    }
1975
1976    /// Get index statistics for a collection
1977    ///
1978    /// This helps understand which indices exist and how effective they are.
1979    pub fn get_index_stats(&self, collection: &str) -> HashMap<String, IndexStats> {
1980        let mut stats = HashMap::new();
1981
1982        for entry in self.secondary_indices.iter() {
1983            let key = entry.key();
1984            if key.starts_with(&format!("{}:", collection)) {
1985                let field = key.split(':').nth(1).unwrap_or("unknown");
1986                let index = entry.value();
1987
1988                let unique_values = index.len();
1989                let total_documents: usize = index.iter().map(|entry| entry.value().len()).sum();
1990
1991                stats.insert(
1992                    field.to_string(),
1993                    IndexStats {
1994                        unique_values,
1995                        total_documents,
1996                        avg_docs_per_value: if unique_values > 0 {
1997                            total_documents / unique_values
1998                        } else {
1999                            0
2000                        },
2001                    },
2002                );
2003            }
2004        }
2005
2006        stats
2007    }
2008
2009    /// Optimize a collection by creating indices for frequently filtered fields
2010    ///
2011    /// This analyzes common query patterns and suggests/creates optimal indices.
2012    pub async fn optimize_collection(&self, collection: &str) -> Result<()> {
2013        // For now, this is a simple implementation that creates indices for all fields
2014        // In a more sophisticated version, this would analyze query logs
2015
2016        if let Ok(collection_def) = self.get_collection_definition(collection) {
2017            let field_names: Vec<&str> = collection_def.fields.keys().map(|s| s.as_str()).collect();
2018            self.create_indices(collection, &field_names).await?;
2019            println!(
2020                "🚀 Optimized collection '{}' with {} indices",
2021                collection,
2022                field_names.len()
2023            );
2024        }
2025
2026        Ok(())
2027    }
2028
2029    // Helper method to get unique fields from a collection
2030    fn get_unique_fields(&self, collection: &Collection) -> Vec<String> {
2031        collection
2032            .fields
2033            .iter()
2034            .filter(|(_, def)| def.unique)
2035            .map(|(name, _)| name.clone())
2036            .collect()
2037    }
2038
2039    // Update the validation method to use the helper
2040    async fn validate_unique_constraints(
2041        &self,
2042        collection: &str,
2043        data: &HashMap<String, Value>,
2044    ) -> Result<()> {
2045        self.ensure_indices_initialized().await?;
2046        let collection_def = self.get_collection_definition(collection)?;
2047        let unique_fields = self.get_unique_fields(&collection_def);
2048
2049        for unique_field in &unique_fields {
2050            if let Some(value) = data.get(unique_field) {
2051                let index_key = format!("{}:{}", collection, unique_field);
2052                if let Some(index) = self.secondary_indices.get(&index_key) {
2053                    // Get the raw string value without JSON formatting
2054                    let value_str = match value {
2055                        Value::String(s) => s.clone(),
2056                        _ => value.to_string(),
2057                    };
2058                    if index.contains_key(&value_str) {
2059                        return Err(AuroraError::UniqueConstraintViolation(
2060                            unique_field.clone(),
2061                            value_str,
2062                        ));
2063                    }
2064                }
2065            }
2066        }
2067        Ok(())
2068    }
2069
2070    /// Validate unique constraints excluding a specific document ID (for updates)
2071    async fn validate_unique_constraints_excluding(
2072        &self,
2073        collection: &str,
2074        data: &HashMap<String, Value>,
2075        exclude_id: &str,
2076    ) -> Result<()> {
2077        self.ensure_indices_initialized().await?;
2078        let collection_def = self.get_collection_definition(collection)?;
2079        let unique_fields = self.get_unique_fields(&collection_def);
2080
2081        for unique_field in &unique_fields {
2082            if let Some(value) = data.get(unique_field) {
2083                let index_key = format!("{}:{}", collection, unique_field);
2084                if let Some(index) = self.secondary_indices.get(&index_key) {
2085                    // Get the raw string value without JSON formatting
2086                    let value_str = match value {
2087                        Value::String(s) => s.clone(),
2088                        _ => value.to_string(),
2089                    };
2090                    if let Some(doc_ids) = index.get(&value_str) {
2091                        // Check if any document other than the excluded one has this value
2092                        let exclude_key = format!("{}:{}", collection, exclude_id);
2093                        for doc_key in doc_ids.value() {
2094                            if doc_key != &exclude_key {
2095                                return Err(AuroraError::UniqueConstraintViolation(
2096                                    unique_field.clone(),
2097                                    value_str,
2098                                ));
2099                            }
2100                        }
2101                    }
2102                }
2103            }
2104        }
2105        Ok(())
2106    }
2107}
2108
2109fn check_filter(doc_val: &Value, filter: &HttpFilter) -> bool {
2110    let filter_val = match json_to_value(&filter.value) {
2111        Ok(v) => v,
2112        Err(_) => return false,
2113    };
2114
2115    match filter.operator {
2116        FilterOperator::Eq => doc_val == &filter_val,
2117        FilterOperator::Ne => doc_val != &filter_val,
2118        FilterOperator::Gt => doc_val > &filter_val,
2119        FilterOperator::Gte => doc_val >= &filter_val,
2120        FilterOperator::Lt => doc_val < &filter_val,
2121        FilterOperator::Lte => doc_val <= &filter_val,
2122        FilterOperator::Contains => match (doc_val, &filter_val) {
2123            (Value::String(s), Value::String(fv)) => s.contains(fv),
2124            (Value::Array(arr), _) => arr.contains(&filter_val),
2125            _ => false,
2126        },
2127    }
2128}
2129
2130/// Results of importing a document
2131enum ImportResult {
2132    Imported,
2133    Skipped,
2134}
2135
2136/// Statistics from an import operation
2137#[derive(Debug, Default)]
2138pub struct ImportStats {
2139    /// Number of documents successfully imported
2140    pub imported: usize,
2141    /// Number of documents skipped (usually because they already exist)
2142    pub skipped: usize,
2143    /// Number of documents that failed to import
2144    pub failed: usize,
2145}
2146
2147/// Statistics for a specific collection
2148#[derive(Debug)]
2149pub struct CollectionStats {
2150    /// Number of documents in the collection
2151    pub count: usize,
2152    /// Total size of the collection in bytes
2153    pub size_bytes: usize,
2154    /// Average document size in bytes
2155    pub avg_doc_size: usize,
2156}
2157
2158/// Statistics for an index
2159#[derive(Debug)]
2160pub struct IndexStats {
2161    /// Number of unique values in the index
2162    pub unique_values: usize,
2163    /// Total number of documents covered by the index
2164    pub total_documents: usize,
2165    /// Average number of documents per unique value
2166    pub avg_docs_per_value: usize,
2167}
2168
2169/// Combined database statistics
2170#[derive(Debug)]
2171pub struct DatabaseStats {
2172    /// Hot cache statistics
2173    pub hot_stats: crate::storage::hot::CacheStats,
2174    /// Cold storage statistics
2175    pub cold_stats: crate::storage::cold::ColdStoreStats,
2176    /// Estimated total database size in bytes
2177    pub estimated_size: u64,
2178    /// Statistics for each collection
2179    pub collections: HashMap<String, CollectionStats>,
2180}
2181
2182#[cfg(test)]
2183mod tests {
2184    use super::*;
2185    use tempfile::tempdir;
2186
2187    #[tokio::test]
2188    async fn test_basic_operations() -> Result<()> {
2189        let temp_dir = tempdir()?;
2190        let db_path = temp_dir.path().join("test.aurora");
2191        let db = Aurora::open(db_path.to_str().unwrap())?;
2192
2193        // Test collection creation
2194        db.new_collection(
2195            "users",
2196            vec![
2197                ("name".to_string(), FieldType::String, false),
2198                ("age".to_string(), FieldType::Int, false),
2199                ("email".to_string(), FieldType::String, true),
2200            ],
2201        )?;
2202
2203        // Test document insertion
2204        let doc_id = db
2205            .insert_into(
2206                "users",
2207                vec![
2208                    ("name", Value::String("John Doe".to_string())),
2209                    ("age", Value::Int(30)),
2210                    ("email", Value::String("john@example.com".to_string())),
2211                ],
2212            )
2213            .await?;
2214
2215        // Test document retrieval
2216        let doc = db.get_document("users", &doc_id)?.unwrap();
2217        assert_eq!(
2218            doc.data.get("name").unwrap(),
2219            &Value::String("John Doe".to_string())
2220        );
2221        assert_eq!(doc.data.get("age").unwrap(), &Value::Int(30));
2222
2223        Ok(())
2224    }
2225
2226    #[tokio::test]
2227    async fn test_transactions() -> Result<()> {
2228        let temp_dir = tempdir()?;
2229        let db_path = temp_dir.path().join("test.aurora");
2230        let db = Aurora::open(db_path.to_str().unwrap())?;
2231
2232        // Start transaction
2233        db.begin_transaction()?;
2234
2235        // Insert document
2236        let doc_id = db
2237            .insert_into("test", vec![("field", Value::String("value".to_string()))])
2238            .await?;
2239
2240        // Commit transaction
2241        db.commit_transaction()?;
2242
2243        // Verify document exists
2244        let doc = db.get_document("test", &doc_id)?.unwrap();
2245        assert_eq!(
2246            doc.data.get("field").unwrap(),
2247            &Value::String("value".to_string())
2248        );
2249
2250        Ok(())
2251    }
2252
2253    #[tokio::test]
2254    async fn test_query_operations() -> Result<()> {
2255        let temp_dir = tempdir()?;
2256        let db_path = temp_dir.path().join("test.aurora");
2257        let db = Aurora::open(db_path.to_str().unwrap())?;
2258
2259        // Test collection creation
2260        db.new_collection(
2261            "books",
2262            vec![
2263                ("title".to_string(), FieldType::String, false),
2264                ("author".to_string(), FieldType::String, false),
2265                ("year".to_string(), FieldType::Int, false),
2266            ],
2267        )?;
2268
2269        // Test document insertion
2270        db.insert_into(
2271            "books",
2272            vec![
2273                ("title", Value::String("Book 1".to_string())),
2274                ("author", Value::String("Author 1".to_string())),
2275                ("year", Value::Int(2020)),
2276            ],
2277        )
2278        .await?;
2279
2280        db.insert_into(
2281            "books",
2282            vec![
2283                ("title", Value::String("Book 2".to_string())),
2284                ("author", Value::String("Author 2".to_string())),
2285                ("year", Value::Int(2021)),
2286            ],
2287        )
2288        .await?;
2289
2290        // Test query
2291        let results = db
2292            .query("books")
2293            .filter(|f| f.gt("year", Value::Int(2019)))
2294            .order_by("year", true)
2295            .collect()
2296            .await?;
2297
2298        assert_eq!(results.len(), 2);
2299        assert!(results[0].data.get("year").unwrap() < results[1].data.get("year").unwrap());
2300
2301        Ok(())
2302    }
2303
2304    #[tokio::test]
2305    async fn test_blob_operations() -> Result<()> {
2306        let temp_dir = tempdir()?;
2307        let db_path = temp_dir.path().join("test.aurora");
2308        let db = Aurora::open(db_path.to_str().unwrap())?;
2309
2310        // Create test file
2311        let file_path = temp_dir.path().join("test.txt");
2312        std::fs::write(&file_path, b"Hello, World!")?;
2313
2314        // Test blob storage
2315        db.put_blob("test:blob".to_string(), &file_path).await?;
2316
2317        // Verify blob exists
2318        let data = db.get_data_by_pattern("test:blob")?;
2319        assert_eq!(data.len(), 1);
2320        match &data[0].1 {
2321            DataInfo::Blob { size } => assert_eq!(*size, 13 + 5), // content + "BLOB:" prefix
2322            _ => panic!("Expected Blob type"),
2323        }
2324
2325        Ok(())
2326    }
2327
2328    #[tokio::test]
2329    async fn test_blob_size_limit() -> Result<()> {
2330        let temp_dir = tempdir()?;
2331        let db_path = temp_dir.path().join("test.aurora");
2332        let db = Aurora::open(db_path.to_str().unwrap())?;
2333
2334        // Create a test file that's too large (201MB)
2335        let large_file_path = temp_dir.path().join("large_file.bin");
2336        let large_data = vec![0u8; 201 * 1024 * 1024];
2337        std::fs::write(&large_file_path, &large_data)?;
2338
2339        // Attempt to store the large file
2340        let result = db
2341            .put_blob("test:large_blob".to_string(), &large_file_path)
2342            .await;
2343
2344        assert!(result.is_err());
2345        assert!(matches!(
2346            result.unwrap_err(),
2347            AuroraError::InvalidOperation(_)
2348        ));
2349
2350        Ok(())
2351    }
2352
2353    #[tokio::test]
2354    async fn test_unique_constraints() -> Result<()> {
2355        let temp_dir = tempdir()?;
2356        let db_path = temp_dir.path().join("test.aurora");
2357        let db = Aurora::open(db_path.to_str().unwrap())?;
2358
2359        // Create collection with unique email field
2360        db.new_collection(
2361            "users",
2362            vec![
2363                ("name".to_string(), FieldType::String, false),
2364                ("email".to_string(), FieldType::String, true), // unique field
2365                ("age".to_string(), FieldType::Int, false),
2366            ],
2367        )?;
2368
2369        // Insert first document
2370        let _doc_id1 = db
2371            .insert_into(
2372                "users",
2373                vec![
2374                    ("name", Value::String("John Doe".to_string())),
2375                    ("email", Value::String("john@example.com".to_string())),
2376                    ("age", Value::Int(30)),
2377                ],
2378            )
2379            .await?;
2380
2381        // Try to insert second document with same email - should fail
2382        let result = db
2383            .insert_into(
2384                "users",
2385                vec![
2386                    ("name", Value::String("Jane Doe".to_string())),
2387                    ("email", Value::String("john@example.com".to_string())), // duplicate email
2388                    ("age", Value::Int(25)),
2389                ],
2390            )
2391            .await;
2392
2393        assert!(result.is_err());
2394        if let Err(AuroraError::UniqueConstraintViolation(field, value)) = result {
2395            assert_eq!(field, "email");
2396            assert_eq!(value, "john@example.com");
2397        } else {
2398            panic!("Expected UniqueConstraintViolation error");
2399        }
2400
2401        // Test upsert with unique constraint
2402        // Should succeed for new document
2403        let _doc_id2 = db
2404            .upsert(
2405                "users",
2406                "user2",
2407                vec![
2408                    ("name", Value::String("Alice Smith".to_string())),
2409                    ("email", Value::String("alice@example.com".to_string())),
2410                    ("age", Value::Int(28)),
2411                ],
2412            )
2413            .await?;
2414
2415        // Should fail when trying to upsert with duplicate email
2416        let result = db
2417            .upsert(
2418                "users",
2419                "user3",
2420                vec![
2421                    ("name", Value::String("Bob Wilson".to_string())),
2422                    ("email", Value::String("alice@example.com".to_string())), // duplicate
2423                    ("age", Value::Int(35)),
2424                ],
2425            )
2426            .await;
2427
2428        assert!(result.is_err());
2429
2430        // Should succeed when updating existing document with same email (no change)
2431        let result = db
2432            .upsert(
2433                "users",
2434                "user2",
2435                vec![
2436                    ("name", Value::String("Alice Updated".to_string())),
2437                    ("email", Value::String("alice@example.com".to_string())), // same email, same doc
2438                    ("age", Value::Int(29)),
2439                ],
2440            )
2441            .await;
2442
2443        assert!(result.is_ok());
2444
2445        Ok(())
2446    }
2447}