aurora_db/
db.rs

1//! # Aurora Database
2//!
3//! Aurora is an embedded document database with tiered storage architecture.
4//! It provides document storage, querying, indexing, and search capabilities
5//! while optimizing for both performance and durability.
6//!
7//! ## Key Features
8//!
9//! * **Tiered Storage**: Hot in-memory cache + persistent cold storage
10//! * **Document Model**: Schema-flexible JSON-like document storage
11//! * **Querying**: Rich query capabilities with filtering and sorting
12//! * **Full-text Search**: Built-in search engine with relevance ranking
13//! * **Transactions**: ACID-compliant transaction support
14//! * **Blob Storage**: Efficient storage for large binary objects
15//!
16//! ## Usage Example
17//!
18//! ```rust
19//! use aurora::Aurora;
20//!
21//! // Open a database
22//! let db = Aurora::open("my_database.db")?;
23//!
24//! // Create a collection with schema
25//! db.new_collection("users", vec![
26//!     ("name", FieldType::String, false),
27//!     ("email", FieldType::String, true),  // unique field
28//!     ("age", FieldType::Int, false),
29//! ])?;
30//!
31//! // Insert a document
32//! let user_id = db.insert_into("users", vec![
33//!     ("name", Value::String("Jane Doe".to_string())),
34//!     ("email", Value::String("jane@example.com".to_string())),
35//!     ("age", Value::Int(28)),
36//! ])?;
37//!
38//! // Query for documents
39//! let adult_users = db.query("users")
40//!     .filter(|f| f.gt("age", 18))
41//!     .order_by("name", true)
42//!     .collect()
43//!     .await?;
44//! ```
45
46use crate::error::{AuroraError, Result};
47use crate::index::{Index, IndexDefinition, IndexType};
48use crate::network::http_models::{
49    Filter as HttpFilter, FilterOperator, QueryPayload, json_to_value,
50};
51use crate::query::{Filter, FilterBuilder, QueryBuilder, SearchBuilder, SimpleQueryBuilder};
52use crate::storage::{ColdStore, HotStore, WriteBuffer};
53use crate::types::{AuroraConfig, Collection, Document, FieldDefinition, FieldType, Value};
54use dashmap::DashMap;
55use serde_json::Value as JsonValue;
56use serde_json::from_str;
57use std::collections::HashMap;
58use std::fmt;
59use std::fs::File as StdFile;
60use std::path::{Path, PathBuf};
61use std::sync::Arc;
62use std::time::Duration;
63use tokio::fs::File;
64use tokio::fs::read_to_string;
65use tokio::io::AsyncReadExt;
66use tokio::sync::OnceCell;
67use uuid::Uuid;
68// Index types for faster lookups
69type PrimaryIndex = DashMap<String, Vec<u8>>;
70type SecondaryIndex = DashMap<String, Vec<String>>;
71
72// Move DataInfo enum outside impl block
73#[derive(Debug)]
74pub enum DataInfo {
75    Data { size: usize, preview: String },
76    Blob { size: usize },
77    Compressed { size: usize },
78}
79
80impl DataInfo {
81    pub fn size(&self) -> usize {
82        match self {
83            DataInfo::Data { size, .. } => *size,
84            DataInfo::Blob { size } => *size,
85            DataInfo::Compressed { size } => *size,
86        }
87    }
88}
89
90/// The main database engine
91///
92/// Aurora combines a tiered storage architecture with document-oriented database features:
93/// - Hot tier: In-memory cache for frequently accessed data
94/// - Cold tier: Persistent disk storage for durability
95/// - Primary indices: Fast key-based access
96/// - Secondary indices: Fast field-based queries
97///
98/// # Examples
99///
100/// ```
101/// // Open a database (creates if doesn't exist)
102/// let db = Aurora::open("my_app.db")?;
103///
104/// // Insert a document
105/// let doc_id = db.insert_into("users", vec![
106///     ("name", Value::String("Alice".to_string())),
107///     ("age", Value::Int(32)),
108/// ])?;
109///
110/// // Retrieve a document
111/// let user = db.get_document("users", &doc_id)?;
112/// ```
113pub struct Aurora {
114    hot: HotStore,
115    cold: Arc<ColdStore>,
116    // Indexing
117    primary_indices: Arc<DashMap<String, PrimaryIndex>>,
118    secondary_indices: Arc<DashMap<String, SecondaryIndex>>,
119    indices_initialized: Arc<OnceCell<()>>,
120    // New transaction system with proper isolation
121    transaction_manager: crate::transaction::TransactionManager,
122    indices: Arc<DashMap<String, Index>>,
123    // Schema cache to avoid repeated deserialization
124    schema_cache: Arc<DashMap<String, Arc<Collection>>>,
125    // Configuration
126    config: AuroraConfig,
127    // Write buffer (optional, based on config)
128    write_buffer: Option<WriteBuffer>,
129    // PubSub system for change notifications
130    pubsub: crate::pubsub::PubSubSystem,
131}
132
133impl fmt::Debug for Aurora {
134    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135        f.debug_struct("Aurora")
136            .field("hot", &"HotStore")
137            .field("cold", &"ColdStore")
138            .field("primary_indices_count", &self.primary_indices.len())
139            .field("secondary_indices_count", &self.secondary_indices.len())
140            .field(
141                "active_transactions",
142                &self.transaction_manager.active_count(),
143            )
144            .field("indices_count", &self.indices.len())
145            .finish()
146    }
147}
148
149impl Aurora {
150    /// Open or create a database at the specified location
151    ///
152    /// # Arguments
153    /// * `path` - Path to the database file or directory
154    ///   - Absolute paths (like `/data/myapp.db`) are used as-is
155    ///   - Relative paths (like `./data/myapp.db`) are resolved relative to the current directory
156    ///   - Simple names (like `myapp.db`) use the current directory
157    ///
158    /// # Returns
159    /// An initialized `Aurora` database instance
160    ///
161    /// # Examples
162    ///
163    /// ```
164    /// // Use a specific location
165    /// let db = Aurora::open("./data/my_application.db")?;
166    ///
167    /// // Just use a name (creates in current directory)
168    /// let db = Aurora::open("customer_data.db")?;
169    /// ```
170    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
171        let mut config = AuroraConfig::default();
172        config.db_path = Self::resolve_path(path)?;
173        Self::with_config(config)
174    }
175
176    /// Helper method to resolve database path
177    fn resolve_path<P: AsRef<Path>>(path: P) -> Result<PathBuf> {
178        let path = path.as_ref();
179
180        // If it's an absolute path, use it directly
181        if path.is_absolute() {
182            return Ok(path.to_path_buf());
183        }
184
185        // Otherwise, resolve relative to current directory
186        match std::env::current_dir() {
187            Ok(current_dir) => Ok(current_dir.join(path)),
188            Err(e) => Err(AuroraError::IoError(format!(
189                "Failed to resolve current directory: {}",
190                e
191            ))),
192        }
193    }
194
195    /// Open a database with custom configuration
196    pub fn with_config(config: AuroraConfig) -> Result<Self> {
197        let path = Self::resolve_path(&config.db_path)?;
198
199        if config.create_dirs {
200            if let Some(parent) = path.parent() {
201                if !parent.exists() {
202                    std::fs::create_dir_all(parent)?;
203                }
204            }
205        }
206
207        // Fix method calls to pass all required parameters
208        let cold = Arc::new(ColdStore::with_config(
209            path.to_str().unwrap(),
210            config.cold_cache_capacity_mb,
211            config.cold_flush_interval_ms,
212            config.cold_mode,
213        )?);
214
215        let hot = HotStore::with_config_and_eviction(
216            config.hot_cache_size_mb,
217            config.hot_cache_cleanup_interval_secs,
218            config.eviction_policy,
219        );
220
221        // Initialize write buffer if enabled
222        let write_buffer = if config.enable_write_buffering {
223            Some(WriteBuffer::new(
224                Arc::clone(&cold),
225                config.write_buffer_size,
226                config.write_buffer_flush_interval_ms,
227            ))
228        } else {
229            None
230        };
231
232        // Store auto_compact before moving config
233        let auto_compact = config.auto_compact;
234
235        // Initialize PubSub system (10K event buffer)
236        let pubsub = crate::pubsub::PubSubSystem::new(10000);
237
238        // Initialize the rest using the config...
239        let db = Self {
240            hot,
241            cold,
242            primary_indices: Arc::new(DashMap::new()),
243            secondary_indices: Arc::new(DashMap::new()),
244            indices_initialized: Arc::new(OnceCell::new()),
245            transaction_manager: crate::transaction::TransactionManager::new(),
246            indices: Arc::new(DashMap::new()),
247            schema_cache: Arc::new(DashMap::new()),
248            config,
249            write_buffer,
250            pubsub,
251        };
252
253        // Set up auto-compaction if enabled
254        if auto_compact {
255            // Implementation for auto-compaction scheduling
256            // ...
257        }
258
259        Ok(db)
260    }
261
262    // Lazy index initialization
263    pub async fn ensure_indices_initialized(&self) -> Result<()> {
264        self.indices_initialized
265            .get_or_init(|| async {
266                println!("Initializing indices...");
267                if let Err(e) = self.initialize_indices() {
268                    eprintln!("Failed to initialize indices: {:?}", e);
269                }
270                println!("Indices initialized");
271                ()
272            })
273            .await;
274        Ok(())
275    }
276
277    fn initialize_indices(&self) -> Result<()> {
278        // Scan existing data and build indices
279        for result in self.cold.scan() {
280            let (key, value) = result?;
281            let key_str = std::str::from_utf8(&key.as_bytes())
282                .map_err(|_| AuroraError::InvalidKey("Invalid UTF-8".into()))?;
283
284            if let Some(collection_name) = key_str.split(':').next() {
285                self.index_value(collection_name, key_str, &value)?;
286            }
287        }
288        Ok(())
289    }
290
291    // Fast key-value operations with index support
292    pub fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
293        // Check hot cache first
294        if let Some(value) = self.hot.get(key) {
295            return Ok(Some(value));
296        }
297
298        // Check primary index
299        if let Some(collection) = key.split(':').next() {
300            if let Some(index) = self.primary_indices.get(collection) {
301                if let Some(value) = index.get(key) {
302                    // Promote to hot cache
303                    self.hot.set(key.to_string(), value.clone(), None);
304                    return Ok(Some(value.clone()));
305                }
306            }
307        }
308
309        // Fallback to cold storage
310        let value = self.cold.get(key)?;
311        if let Some(v) = &value {
312            self.hot.set(key.to_string(), v.clone(), None);
313        }
314        Ok(value)
315    }
316
317    /// Get value with zero-copy Arc reference (10-100x faster than get!)
318    /// Only checks hot cache - returns None if not cached
319    pub fn get_hot_ref(&self, key: &str) -> Option<Arc<Vec<u8>>> {
320        self.hot.get_ref(key)
321    }
322
323    /// Get cache statistics
324    pub fn get_cache_stats(&self) -> crate::storage::hot::CacheStats {
325        self.hot.get_stats()
326    }
327
328    // ============================================
329    // PubSub API - Real-time Change Notifications
330    // ============================================
331
332    /// Listen for changes on a specific collection
333    ///
334    /// # Examples
335    ///
336    /// ```
337    /// let mut listener = db.listen("users");
338    ///
339    /// // In another task, insert a document
340    /// db.insert_into("users", vec![...]).await?;
341    ///
342    /// // Listener receives the event
343    /// let event = listener.recv().await?;
344    /// println!("Change: {:?}", event);
345    /// ```
346    pub fn listen(&self, collection: impl Into<String>) -> crate::pubsub::ChangeListener {
347        self.pubsub.listen(collection)
348    }
349
350    /// Listen for all changes across all collections
351    pub fn listen_all(&self) -> crate::pubsub::ChangeListener {
352        self.pubsub.listen_all()
353    }
354
355    /// Get the number of active listeners for a collection
356    pub fn listener_count(&self, collection: &str) -> usize {
357        self.pubsub.listener_count(collection)
358    }
359
360    /// Get total number of active listeners
361    pub fn total_listeners(&self) -> usize {
362        self.pubsub.total_listeners()
363    }
364
365    pub fn put(&self, key: String, value: Vec<u8>, ttl: Option<Duration>) -> Result<()> {
366        const MAX_BLOB_SIZE: usize = 50 * 1024 * 1024; // 50MB limit
367
368        if value.len() > MAX_BLOB_SIZE {
369            return Err(AuroraError::InvalidOperation(format!(
370                "Blob size {} exceeds maximum allowed size of {}MB",
371                value.len() / (1024 * 1024),
372                MAX_BLOB_SIZE / (1024 * 1024)
373            )));
374        }
375
376        // Write to storage - use write buffer if enabled
377        if let Some(ref write_buffer) = self.write_buffer {
378            // Non-blocking write via buffer
379            write_buffer.write(key.clone(), value.clone())?;
380        } else {
381            // Direct synchronous write
382            self.cold.set(key.clone(), value.clone())?;
383        }
384
385        // Always update hot cache immediately
386        self.hot.set(key.clone(), value.clone(), ttl);
387
388        // FIX: Update the in-memory indices immediately after a successful write.
389        if let Some(collection_name) = key.split(':').next() {
390            // Don't index internal data like _collection or _index definitions
391            if !collection_name.starts_with('_') {
392                self.index_value(collection_name, &key, &value)?;
393            }
394        }
395
396        Ok(())
397    }
398
399    fn index_value(&self, collection: &str, key: &str, value: &[u8]) -> Result<()> {
400        // Update primary index (always index for fast full collection scans)
401        self.primary_indices
402            .entry(collection.to_string())
403            .or_insert_with(DashMap::new)
404            .insert(key.to_string(), value.to_vec());
405
406        // Try to get schema from cache first, otherwise load and cache it
407        let collection_obj = match self.schema_cache.get(collection) {
408            Some(cached_schema) => Arc::clone(cached_schema.value()),
409            None => {
410                // Schema not in cache - load it
411                let collection_key = format!("_collection:{}", collection);
412                let schema_data = match self.get(&collection_key)? {
413                    Some(data) => data,
414                    None => return Ok(()), // No schema = no secondary indices
415                };
416
417                let obj: Collection = match serde_json::from_slice(&schema_data) {
418                    Ok(obj) => obj,
419                    Err(_) => return Ok(()), // Invalid schema = skip indexing
420                };
421
422                // Cache the schema for future use
423                let arc_obj = Arc::new(obj);
424                self.schema_cache
425                    .insert(collection.to_string(), Arc::clone(&arc_obj));
426                arc_obj
427            }
428        };
429
430        // Build list of fields that should be indexed (unique or explicitly indexed)
431        let indexed_fields: Vec<String> = collection_obj
432            .fields
433            .iter()
434            .filter(|(_, def)| def.indexed || def.unique)
435            .map(|(name, _)| name.clone())
436            .collect();
437
438        // If no fields need indexing, we're done
439        if indexed_fields.is_empty() {
440            return Ok(());
441        }
442
443        // Update secondary indices - ONLY for indexed/unique fields
444        if let Ok(doc) = serde_json::from_slice::<Document>(value) {
445            for (field, field_value) in doc.data {
446                // CRITICAL FIX: Skip fields that aren't indexed
447                if !indexed_fields.contains(&field) {
448                    continue;
449                }
450
451                // Use consistent string formatting for indexing
452                let value_str = match &field_value {
453                    Value::String(s) => s.clone(),
454                    _ => field_value.to_string(),
455                };
456
457                let index_key = format!("{}:{}", collection, field);
458                let secondary_index = self
459                    .secondary_indices
460                    .entry(index_key)
461                    .or_insert_with(DashMap::new);
462
463                // Check if we're at the index limit
464                let max_entries = self.config.max_index_entries_per_field;
465
466                secondary_index
467                    .entry(value_str)
468                    .and_modify(|doc_ids| {
469                        // Only add if we haven't exceeded the limit per value
470                        if doc_ids.len() < max_entries {
471                            doc_ids.push(key.to_string());
472                        }
473                    })
474                    .or_insert_with(|| vec![key.to_string()]);
475            }
476        }
477        Ok(())
478    }
479
480    // Simplified collection scan (fallback)
481    fn scan_collection(&self, collection: &str) -> Result<Vec<Document>> {
482        let _prefix = format!("{}:", collection);
483        let mut documents = Vec::new();
484
485        if let Some(index) = self.primary_indices.get(collection) {
486            for entry in index.iter() {
487                if let Ok(doc) = serde_json::from_slice(entry.value()) {
488                    documents.push(doc);
489                }
490            }
491        }
492
493        Ok(documents)
494    }
495
496    // Restore missing methods
497    pub async fn put_blob(&self, key: String, file_path: &Path) -> Result<()> {
498        const MAX_FILE_SIZE: usize = 50 * 1024 * 1024; // 50MB limit
499
500        // Get file metadata to check size before reading
501        let metadata = tokio::fs::metadata(file_path).await?;
502        let file_size = metadata.len() as usize;
503
504        if file_size > MAX_FILE_SIZE {
505            return Err(AuroraError::InvalidOperation(format!(
506                "File size {} MB exceeds maximum allowed size of {} MB",
507                file_size / (1024 * 1024),
508                MAX_FILE_SIZE / (1024 * 1024)
509            )));
510        }
511
512        let mut file = File::open(file_path).await?;
513        let mut buffer = Vec::new();
514        file.read_to_end(&mut buffer).await?;
515
516        // Add BLOB: prefix to mark this as blob data
517        let mut blob_data = Vec::with_capacity(5 + buffer.len());
518        blob_data.extend_from_slice(b"BLOB:");
519        blob_data.extend_from_slice(&buffer);
520
521        self.put(key, blob_data, None)
522    }
523
524    /// Create a new collection with the given schema
525    ///
526    /// # Arguments
527    /// * `name` - Name of the collection to create
528    /// * `fields` - Schema definition as a list of field definitions:
529    ///   * Field name (accepts both &str and String)
530    ///   * Field type (String, Int, Float, Boolean, etc.)
531    ///   * Whether the field requires a unique value
532    ///
533    /// # Returns
534    /// Success or an error (e.g., collection already exists)
535    ///
536    /// # Examples
537    ///
538    /// ```
539    /// // Define a collection with schema - accepts &str
540    /// db.new_collection("products", vec![
541    ///     ("name", FieldType::String, false),
542    ///     ("price", FieldType::Float, false),
543    ///     ("sku", FieldType::String, true),  // unique field
544    ///     ("description", FieldType::String, false),
545    ///     ("in_stock", FieldType::Bool, false),
546    /// ])?;
547    /// ```
548    pub fn new_collection<S: Into<String>>(
549        &self,
550        name: &str,
551        fields: Vec<(S, FieldType, bool)>,
552    ) -> Result<()> {
553        let collection_key = format!("_collection:{}", name);
554
555        // Check if collection already exists - if so, just return Ok (idempotent)
556        if self.get(&collection_key)?.is_some() {
557            return Ok(());
558        }
559
560        // Create field definitions
561        let mut field_definitions = HashMap::new();
562        for (field_name, field_type, unique) in fields {
563            field_definitions.insert(
564                field_name.into(),
565                FieldDefinition {
566                    field_type,
567                    unique,
568                    indexed: unique, // Auto-index unique fields
569                },
570            );
571        }
572
573        let collection = Collection {
574            name: name.to_string(),
575            fields: field_definitions,
576            // REMOVED: unique_fields is now derived from fields
577        };
578
579        let collection_data = serde_json::to_vec(&collection)?;
580        self.put(collection_key, collection_data, None)?;
581
582        // Invalidate schema cache since we just created/updated the collection schema
583        self.schema_cache.remove(name);
584
585        Ok(())
586    }
587
588    /// Insert a document into a collection
589    ///
590    /// # Arguments
591    /// * `collection` - Name of the collection to insert into
592    /// * `data` - Document fields and values to insert
593    ///
594    /// # Returns
595    /// The ID of the inserted document or an error
596    ///
597    /// # Examples
598    ///
599    /// ```
600    /// // Insert a document
601    /// let doc_id = db.insert_into("users", vec![
602    ///     ("name", Value::String("John Doe".to_string())),
603    ///     ("email", Value::String("john@example.com".to_string())),
604    ///     ("active", Value::Bool(true)),
605    /// ])?;
606    /// ```
607    pub async fn insert_into(&self, collection: &str, data: Vec<(&str, Value)>) -> Result<String> {
608        // Convert Vec<(&str, Value)> to HashMap<String, Value>
609        let data_map: HashMap<String, Value> =
610            data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
611
612        // Validate unique constraints before inserting
613        self.validate_unique_constraints(collection, &data_map)
614            .await?;
615
616        let doc_id = Uuid::new_v4().to_string();
617        let document = Document {
618            id: doc_id.clone(),
619            data: data_map,
620        };
621
622        self.put(
623            format!("{}:{}", collection, doc_id),
624            serde_json::to_vec(&document)?,
625            None,
626        )?;
627
628        // Publish insert event
629        let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
630        let _ = self.pubsub.publish(event);
631
632        Ok(doc_id)
633    }
634
635    pub async fn insert_map(
636        &self,
637        collection: &str,
638        data: HashMap<String, Value>,
639    ) -> Result<String> {
640        // Validate unique constraints before inserting
641        self.validate_unique_constraints(collection, &data).await?;
642
643        let doc_id = Uuid::new_v4().to_string();
644        let document = Document {
645            id: doc_id.clone(),
646            data,
647        };
648
649        self.put(
650            format!("{}:{}", collection, doc_id),
651            serde_json::to_vec(&document)?,
652            None,
653        )?;
654
655        // Publish insert event
656        let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
657        let _ = self.pubsub.publish(event);
658
659        Ok(doc_id)
660    }
661
662    /// Update a document by ID
663    ///
664    /// # Arguments
665    /// * `collection` - Collection name
666    /// * `doc_id` - Document ID to update
667    /// * `data` - New field values to set
668    ///
669    /// # Returns
670    /// Ok(()) on success, or an error if the document doesn't exist
671    ///
672    /// # Examples
673    ///
674    /// ```
675    /// db.update_document("users", &user_id, vec![
676    ///     ("status", Value::String("active".to_string())),
677    ///     ("last_login", Value::String(chrono::Utc::now().to_rfc3339())),
678    /// ]).await?;
679    /// ```
680    pub async fn update_document(
681        &self,
682        collection: &str,
683        doc_id: &str,
684        updates: Vec<(&str, Value)>,
685    ) -> Result<()> {
686        // Get existing document
687        let mut document = self
688            .get_document(collection, doc_id)?
689            .ok_or_else(|| AuroraError::NotFound(format!("Document not found: {}", doc_id)))?;
690
691        // Store old document for event
692        let old_document = document.clone();
693
694        // Apply updates
695        for (field, value) in updates {
696            document.data.insert(field.to_string(), value);
697        }
698
699        // Validate unique constraints after update (excluding current document)
700        self.validate_unique_constraints_excluding(collection, &document.data, doc_id)
701            .await?;
702
703        // Save updated document
704        self.put(
705            format!("{}:{}", collection, doc_id),
706            serde_json::to_vec(&document)?,
707            None,
708        )?;
709
710        // Publish update event
711        let event =
712            crate::pubsub::ChangeEvent::update(collection, doc_id, old_document, document.clone());
713        let _ = self.pubsub.publish(event);
714
715        Ok(())
716    }
717
718    pub async fn get_all_collection(&self, collection: &str) -> Result<Vec<Document>> {
719        self.ensure_indices_initialized().await?;
720        self.scan_collection(collection)
721    }
722
723    pub fn get_data_by_pattern(&self, pattern: &str) -> Result<Vec<(String, DataInfo)>> {
724        let mut data = Vec::new();
725
726        if let Some(index) = self
727            .primary_indices
728            .get(pattern.split(':').next().unwrap_or(""))
729        {
730            for entry in index.iter() {
731                if entry.key().contains(pattern) {
732                    let value = entry.value();
733                    let info = if value.starts_with(b"BLOB:") {
734                        DataInfo::Blob { size: value.len() }
735                    } else {
736                        DataInfo::Data {
737                            size: value.len(),
738                            preview: String::from_utf8_lossy(&value[..value.len().min(50)])
739                                .into_owned(),
740                        }
741                    };
742
743                    data.push((entry.key().clone(), info));
744                }
745            }
746        }
747
748        Ok(data)
749    }
750
751    /// Begin a transaction
752    ///
753    /// All operations after beginning a transaction will be part of the transaction
754    /// until either commit_transaction() or rollback_transaction() is called.
755    ///
756    /// # Returns
757    /// Success or an error (e.g., if a transaction is already in progress)
758    ///
759    /// # Examples
760    ///
761    /// ```
762    /// // Start a transaction for atomic operations
763    /// db.begin_transaction()?;
764    ///
765    /// // Perform multiple operations
766    /// db.insert_into("accounts", vec![("user_id", Value::String(user_id)), ("balance", Value::Float(100.0))])?;
767    /// db.insert_into("audit_log", vec![("action", Value::String("account_created".to_string()))])?;
768    ///
769    /// // Commit all changes or roll back if there's an error
770    /// if all_ok {
771    ///     db.commit_transaction()?;
772    /// } else {
773    ///     db.rollback_transaction()?;
774    /// }
775    /// ```
776    pub fn begin_transaction(&self) -> crate::transaction::TransactionId {
777        let buffer = self.transaction_manager.begin();
778        buffer.id
779    }
780
781    /// Commit a transaction
782    ///
783    /// Makes all changes in the transaction permanent.
784    ///
785    /// # Arguments
786    /// * `tx_id` - Transaction ID returned from begin_transaction()
787    ///
788    /// # Returns
789    /// Success or an error if transaction not found
790    pub fn commit_transaction(&self, tx_id: crate::transaction::TransactionId) -> Result<()> {
791        let buffer = self
792            .transaction_manager
793            .active_transactions
794            .get(&tx_id)
795            .ok_or_else(|| {
796                AuroraError::InvalidOperation("Transaction not found or already completed".into())
797            })?;
798
799        for item in buffer.writes.iter() {
800            let key = item.key();
801            let value = item.value();
802            self.cold.set(key.clone(), value.clone())?;
803            self.hot.set(key.clone(), value.clone(), None);
804            if let Some(collection_name) = key.split(':').next() {
805                if !collection_name.starts_with('_') {
806                    self.index_value(collection_name, key, value)?;
807                }
808            }
809        }
810
811        for item in buffer.deletes.iter() {
812            let key = item.key();
813            if let Some((collection, id)) = key.split_once(':') {
814                if let Ok(Some(doc)) = self.get_document(collection, id) {
815                    self.remove_from_indices(collection, &doc)?;
816                }
817            }
818            self.cold.delete(key)?;
819            self.hot.delete(key);
820        }
821
822        self.transaction_manager.commit(tx_id)?;
823
824        self.cold.compact()?;
825
826        Ok(())
827    }
828
829    /// Roll back a transaction
830    ///
831    /// Discards all changes made in the transaction.
832    ///
833    /// # Arguments
834    /// * `tx_id` - Transaction ID returned from begin_transaction()
835    ///
836    /// # Returns
837    /// Success or an error if transaction not found
838    pub fn rollback_transaction(&self, tx_id: crate::transaction::TransactionId) -> Result<()> {
839        self.transaction_manager.rollback(tx_id)
840    }
841
842    pub async fn create_index(&self, collection: &str, field: &str) -> Result<()> {
843        // Check if collection exists
844        if self.get(&format!("_collection:{}", collection))?.is_none() {
845            return Err(AuroraError::CollectionNotFound(collection.to_string()));
846        }
847
848        // Generate a default index name
849        let index_name = format!("idx_{}_{}", collection, field);
850
851        // Create index definition
852        let definition = IndexDefinition {
853            name: index_name.clone(),
854            collection: collection.to_string(),
855            fields: vec![field.to_string()],
856            index_type: IndexType::BTree,
857            unique: false,
858        };
859
860        // Create the index
861        let index = Index::new(definition.clone());
862
863        // Index all existing documents in the collection
864        let prefix = format!("{}:", collection);
865        for result in self.cold.scan_prefix(&prefix) {
866            if let Ok((_, data)) = result {
867                if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
868                    let _ = index.insert(&doc);
869                }
870            }
871        }
872
873        // Store the index
874        self.indices.insert(index_name, index);
875
876        // Store the index definition for persistence
877        let index_key = format!("_index:{}:{}", collection, field);
878        self.put(index_key, serde_json::to_vec(&definition)?, None)?;
879
880        Ok(())
881    }
882
883    /// Create a query builder for advanced document queries
884    ///
885    /// # Arguments
886    /// * `collection` - Name of the collection to query
887    ///
888    /// # Returns
889    /// A `QueryBuilder` for constructing and executing queries
890    ///
891    /// # Examples
892    ///
893    /// ```
894    /// // Query for documents matching criteria
895    /// let active_premium_users = db.query("users")
896    ///     .filter(|f| f.eq("status", "active") && f.eq("plan", "premium"))
897    ///     .order_by("joined_date", false)  // newest first
898    ///     .limit(10)
899    ///     .collect()
900    ///     .await?;
901    /// ```
902    pub fn query<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
903        QueryBuilder::new(self, collection)
904    }
905
906    /// Create a search builder for full-text search
907    ///
908    /// # Arguments
909    /// * `collection` - Name of the collection to search
910    ///
911    /// # Returns
912    /// A `SearchBuilder` for configuring and executing searches
913    ///
914    /// # Examples
915    ///
916    /// ```
917    /// // Search for documents containing text
918    /// let search_results = db.search("articles")
919    ///     .field("content")
920    ///     .matching("quantum computing")
921    ///     .fuzzy(true)  // Enable fuzzy matching for typo tolerance
922    ///     .collect()
923    ///     .await?;
924    /// ```
925    pub fn search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
926        SearchBuilder::new(self, collection)
927    }
928
929    /// Retrieve a document by ID
930    ///
931    /// # Arguments
932    /// * `collection` - Name of the collection to query
933    /// * `id` - ID of the document to retrieve
934    ///
935    /// # Returns
936    /// The document if found, None if not found, or an error
937    ///
938    /// # Examples
939    ///
940    /// ```
941    /// // Get a document by ID
942    /// if let Some(user) = db.get_document("users", &user_id)? {
943    ///     println!("Found user: {}", user.data.get("name").unwrap());
944    /// } else {
945    ///     println!("User not found");
946    /// }
947    /// ```
948    pub fn get_document(&self, collection: &str, id: &str) -> Result<Option<Document>> {
949        let key = format!("{}:{}", collection, id);
950        if let Some(data) = self.get(&key)? {
951            Ok(Some(serde_json::from_slice(&data)?))
952        } else {
953            Ok(None)
954        }
955    }
956
957    /// Delete a document by ID
958    ///
959    /// # Arguments
960    /// * `collection` - Name of the collection containing the document
961    /// * `id` - ID of the document to delete
962    ///
963    /// # Returns
964    /// Success or an error
965    ///
966    /// # Examples
967    ///
968    /// ```
969    /// // Delete a specific document
970    /// db.delete("users", &user_id)?;
971    /// ```
972    pub async fn delete(&self, key: &str) -> Result<()> {
973        // Extract collection and id from key (format: "collection:id")
974        let (collection, id) = if let Some((coll, doc_id)) = key.split_once(':') {
975            (coll, doc_id)
976        } else {
977            return Err(AuroraError::InvalidOperation(
978                "Invalid key format, expected 'collection:id'".into(),
979            ));
980        };
981
982        // CRITICAL FIX: Get document BEFORE deletion to clean up secondary indices
983        let document = self.get_document(collection, id)?;
984
985        // Delete from hot cache
986        if self.hot.get(key).is_some() {
987            self.hot.delete(key);
988        }
989
990        // Delete from cold storage
991        self.cold.delete(key)?;
992
993        // CRITICAL FIX: Clean up ALL indices (primary + secondary)
994        if let Some(doc) = document {
995            self.remove_from_indices(collection, &doc)?;
996        } else {
997            // Fallback: at least remove from primary index
998            if let Some(index) = self.primary_indices.get_mut(collection) {
999                index.remove(id);
1000            }
1001        }
1002
1003        // Publish delete event
1004        let event = crate::pubsub::ChangeEvent::delete(collection, id);
1005        let _ = self.pubsub.publish(event);
1006
1007        Ok(())
1008    }
1009
1010    pub async fn delete_collection(&self, collection: &str) -> Result<()> {
1011        let prefix = format!("{}:", collection);
1012
1013        // Get all keys in collection
1014        let keys: Vec<String> = self
1015            .cold
1016            .scan()
1017            .filter_map(|r| r.ok())
1018            .filter(|(k, _)| k.starts_with(&prefix))
1019            .map(|(k, _)| k)
1020            .collect();
1021
1022        // Delete each key
1023        for key in keys {
1024            self.delete(&key).await?;
1025        }
1026
1027        // Remove collection indices
1028        self.primary_indices.remove(collection);
1029        self.secondary_indices
1030            .retain(|k, _| !k.starts_with(&prefix));
1031
1032        // Invalidate schema cache
1033        self.schema_cache.remove(collection);
1034
1035        Ok(())
1036    }
1037
1038    fn remove_from_indices(&self, collection: &str, doc: &Document) -> Result<()> {
1039        // Remove from primary index
1040        if let Some(index) = self.primary_indices.get(collection) {
1041            index.remove(&doc.id);
1042        }
1043
1044        // Remove from secondary indices
1045        for (field, value) in &doc.data {
1046            let index_key = format!("{}:{}", collection, field);
1047            if let Some(index) = self.secondary_indices.get(&index_key) {
1048                if let Some(mut doc_ids) = index.get_mut(&value.to_string()) {
1049                    doc_ids.retain(|id| id != &doc.id);
1050                }
1051            }
1052        }
1053
1054        Ok(())
1055    }
1056
1057    pub async fn search_text(
1058        &self,
1059        collection: &str,
1060        field: &str,
1061        query: &str,
1062    ) -> Result<Vec<Document>> {
1063        let mut results = Vec::new();
1064        let docs = self.get_all_collection(collection).await?;
1065
1066        for doc in docs {
1067            if let Some(Value::String(text)) = doc.data.get(field) {
1068                if text.to_lowercase().contains(&query.to_lowercase()) {
1069                    results.push(doc);
1070                }
1071            }
1072        }
1073
1074        Ok(results)
1075    }
1076
1077    /// Export a collection to a JSON file
1078    ///
1079    /// # Arguments
1080    /// * `collection` - Name of the collection to export
1081    /// * `output_path` - Path to the output JSON file
1082    ///
1083    /// # Returns
1084    /// Success or an error
1085    ///
1086    /// # Examples
1087    ///
1088    /// ```
1089    /// // Backup a collection to JSON
1090    /// db.export_as_json("users", "./backups/users_2023-10-15.json")?;
1091    /// ```
1092    pub fn export_as_json(&self, collection: &str, output_path: &str) -> Result<()> {
1093        let output_path = if !output_path.ends_with(".json") {
1094            format!("{}.json", output_path)
1095        } else {
1096            output_path.to_string()
1097        };
1098
1099        let mut docs = Vec::new();
1100
1101        // Get all documents from the specified collection
1102        for result in self.cold.scan() {
1103            let (key, value) = result?;
1104
1105            // Only process documents from the specified collection
1106            if let Some(key_collection) = key.split(':').next() {
1107                if key_collection == collection && !key.starts_with("_collection:") {
1108                    if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
1109                        // Convert Value enum to raw JSON values
1110                        let mut clean_doc = serde_json::Map::new();
1111                        for (k, v) in doc.data {
1112                            match v {
1113                                Value::String(s) => clean_doc.insert(k, JsonValue::String(s)),
1114                                Value::Int(i) => clean_doc.insert(k, JsonValue::Number(i.into())),
1115                                Value::Float(f) => {
1116                                    if let Some(n) = serde_json::Number::from_f64(f) {
1117                                        clean_doc.insert(k, JsonValue::Number(n))
1118                                    } else {
1119                                        clean_doc.insert(k, JsonValue::Null)
1120                                    }
1121                                }
1122                                Value::Bool(b) => clean_doc.insert(k, JsonValue::Bool(b)),
1123                                Value::Array(arr) => {
1124                                    let clean_arr: Vec<JsonValue> = arr
1125                                        .into_iter()
1126                                        .map(|v| match v {
1127                                            Value::String(s) => JsonValue::String(s),
1128                                            Value::Int(i) => JsonValue::Number(i.into()),
1129                                            Value::Float(f) => serde_json::Number::from_f64(f)
1130                                                .map(JsonValue::Number)
1131                                                .unwrap_or(JsonValue::Null),
1132                                            Value::Bool(b) => JsonValue::Bool(b),
1133                                            Value::Null => JsonValue::Null,
1134                                            _ => JsonValue::Null,
1135                                        })
1136                                        .collect();
1137                                    clean_doc.insert(k, JsonValue::Array(clean_arr))
1138                                }
1139                                Value::Uuid(u) => {
1140                                    clean_doc.insert(k, JsonValue::String(u.to_string()))
1141                                }
1142                                Value::Null => clean_doc.insert(k, JsonValue::Null),
1143                                Value::Object(_) => None, // Handle nested objects if needed
1144                            };
1145                        }
1146                        docs.push(JsonValue::Object(clean_doc));
1147                    }
1148                }
1149            }
1150        }
1151
1152        let output = JsonValue::Object(serde_json::Map::from_iter(vec![(
1153            collection.to_string(),
1154            JsonValue::Array(docs),
1155        )]));
1156
1157        let mut file = StdFile::create(&output_path)?;
1158        serde_json::to_writer_pretty(&mut file, &output)?;
1159        println!("Exported collection '{}' to {}", collection, &output_path);
1160        Ok(())
1161    }
1162
1163    /// Export specific collection to CSV file
1164    pub fn export_as_csv(&self, collection: &str, filename: &str) -> Result<()> {
1165        let output_path = if !filename.ends_with(".csv") {
1166            format!("{}.csv", filename)
1167        } else {
1168            filename.to_string()
1169        };
1170
1171        let mut writer = csv::Writer::from_path(&output_path)?;
1172        let mut headers = Vec::new();
1173        let mut first_doc = true;
1174
1175        // Get all documents from the specified collection
1176        for result in self.cold.scan() {
1177            let (key, value) = result?;
1178
1179            // Only process documents from the specified collection
1180            if let Some(key_collection) = key.split(':').next() {
1181                if key_collection == collection && !key.starts_with("_collection:") {
1182                    if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
1183                        // Write headers from first document
1184                        if first_doc && !doc.data.is_empty() {
1185                            headers = doc.data.keys().cloned().collect();
1186                            writer.write_record(&headers)?;
1187                            first_doc = false;
1188                        }
1189
1190                        // Write the document values
1191                        let values: Vec<String> = headers
1192                            .iter()
1193                            .map(|header| {
1194                                doc.data
1195                                    .get(header)
1196                                    .map(|v| v.to_string())
1197                                    .unwrap_or_default()
1198                            })
1199                            .collect();
1200                        writer.write_record(&values)?;
1201                    }
1202                }
1203            }
1204        }
1205
1206        writer.flush()?;
1207        println!("Exported collection '{}' to {}", collection, &output_path);
1208        Ok(())
1209    }
1210
1211    // Helper method to create filter-based queries
1212    pub fn find<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
1213        self.query(collection)
1214    }
1215
1216    // Convenience methods that build on top of the FilterBuilder
1217
1218    pub async fn find_by_id(&self, collection: &str, id: &str) -> Result<Option<Document>> {
1219        self.query(collection)
1220            .filter(|f| f.eq("id", id))
1221            .first_one()
1222            .await
1223    }
1224
1225    pub async fn find_one<F>(&self, collection: &str, filter_fn: F) -> Result<Option<Document>>
1226    where
1227        F: Fn(&FilterBuilder) -> bool + Send + Sync + 'static,
1228    {
1229        self.query(collection).filter(filter_fn).first_one().await
1230    }
1231
1232    pub async fn find_by_field<T: Into<Value> + Clone + Send + Sync + 'static>(
1233        &self,
1234        collection: &str,
1235        field: &'static str,
1236        value: T,
1237    ) -> Result<Vec<Document>> {
1238        let value_clone = value.clone();
1239        self.query(collection)
1240            .filter(move |f| f.eq(field, value_clone.clone()))
1241            .collect()
1242            .await
1243    }
1244
1245    pub async fn find_by_fields(
1246        &self,
1247        collection: &str,
1248        fields: Vec<(&str, Value)>,
1249    ) -> Result<Vec<Document>> {
1250        let mut query = self.query(collection);
1251
1252        for (field, value) in fields {
1253            let field_owned = field.to_owned();
1254            let value_owned = value.clone();
1255            query = query.filter(move |f| f.eq(&field_owned, value_owned.clone()));
1256        }
1257
1258        query.collect().await
1259    }
1260
1261    // Advanced example: find documents with a field value in a specific range
1262    pub async fn find_in_range<T: Into<Value> + Clone + Send + Sync + 'static>(
1263        &self,
1264        collection: &str,
1265        field: &'static str,
1266        min: T,
1267        max: T,
1268    ) -> Result<Vec<Document>> {
1269        self.query(collection)
1270            .filter(move |f| f.between(field, min.clone(), max.clone()))
1271            .collect()
1272            .await
1273    }
1274
1275    // Complex query example: build with multiple combined filters
1276    pub async fn find_complex<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
1277        self.query(collection)
1278    }
1279
1280    // Create a full-text search query with added filter options
1281    pub fn advanced_search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
1282        self.search(collection)
1283    }
1284
1285    // Utility methods for common operations
1286    pub async fn upsert(
1287        &self,
1288        collection: &str,
1289        id: &str,
1290        data: Vec<(&str, Value)>,
1291    ) -> Result<String> {
1292        // Convert Vec<(&str, Value)> to HashMap<String, Value>
1293        let data_map: HashMap<String, Value> =
1294            data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
1295
1296        // Check if document exists
1297        if let Some(mut doc) = self.get_document(collection, id)? {
1298            // Update existing document - merge new data
1299            for (key, value) in data_map {
1300                doc.data.insert(key, value);
1301            }
1302
1303            // Validate unique constraints for the updated document
1304            // We need to exclude the current document from the uniqueness check
1305            self.validate_unique_constraints_excluding(collection, &doc.data, id)
1306                .await?;
1307
1308            self.put(
1309                format!("{}:{}", collection, id),
1310                serde_json::to_vec(&doc)?,
1311                None,
1312            )?;
1313            Ok(id.to_string())
1314        } else {
1315            // Insert new document with specified ID - validate unique constraints
1316            self.validate_unique_constraints(collection, &data_map)
1317                .await?;
1318
1319            let document = Document {
1320                id: id.to_string(),
1321                data: data_map,
1322            };
1323
1324            self.put(
1325                format!("{}:{}", collection, id),
1326                serde_json::to_vec(&document)?,
1327                None,
1328            )?;
1329            Ok(id.to_string())
1330        }
1331    }
1332
1333    // Atomic increment/decrement
1334    pub async fn increment(
1335        &self,
1336        collection: &str,
1337        id: &str,
1338        field: &str,
1339        amount: i64,
1340    ) -> Result<i64> {
1341        if let Some(mut doc) = self.get_document(collection, id)? {
1342            // Get current value
1343            let current = match doc.data.get(field) {
1344                Some(Value::Int(i)) => *i,
1345                _ => 0,
1346            };
1347
1348            // Increment
1349            let new_value = current + amount;
1350            doc.data.insert(field.to_string(), Value::Int(new_value));
1351
1352            // Save changes
1353            self.put(
1354                format!("{}:{}", collection, id),
1355                serde_json::to_vec(&doc)?,
1356                None,
1357            )?;
1358
1359            Ok(new_value)
1360        } else {
1361            Err(AuroraError::NotFound(format!(
1362                "Document {}:{} not found",
1363                collection, id
1364            )))
1365        }
1366    }
1367
1368    // Batch operations
1369    pub async fn batch_insert(
1370        &self,
1371        collection: &str,
1372        docs: Vec<Vec<(&str, Value)>>,
1373    ) -> Result<Vec<String>> {
1374        // Convert each Vec<(&str, Value)> to HashMap<String, Value>
1375        let doc_maps: Vec<HashMap<String, Value>> = docs
1376            .into_iter()
1377            .map(|doc| doc.into_iter().map(|(k, v)| (k.to_string(), v)).collect())
1378            .collect();
1379
1380        // Begin transaction
1381        let tx_id = self.begin_transaction();
1382
1383        let mut ids = Vec::with_capacity(doc_maps.len());
1384
1385        // Insert all documents
1386        for data_map in doc_maps {
1387            // Convert HashMap back to Vec for insert_into method
1388            let data_vec: Vec<(&str, Value)> = data_map
1389                .iter()
1390                .map(|(k, v)| (k.as_str(), v.clone()))
1391                .collect();
1392
1393            match self.insert_into(collection, data_vec).await {
1394                Ok(id) => ids.push(id),
1395                Err(e) => {
1396                    self.rollback_transaction(tx_id)?;
1397                    return Err(e);
1398                }
1399            }
1400        }
1401
1402        // Commit transaction
1403        self.commit_transaction(tx_id)?;
1404
1405        Ok(ids)
1406    }
1407
1408    // Delete documents by query
1409    pub async fn delete_by_query<F>(&self, collection: &str, filter_fn: F) -> Result<usize>
1410    where
1411        F: Fn(&FilterBuilder) -> bool + Send + Sync + 'static,
1412    {
1413        let docs = self.query(collection).filter(filter_fn).collect().await?;
1414
1415        let mut deleted_count = 0;
1416
1417        for doc in docs {
1418            let key = format!("{}:{}", collection, doc.id);
1419            self.delete(&key).await?;
1420            deleted_count += 1;
1421        }
1422
1423        Ok(deleted_count)
1424    }
1425
1426    /// Import documents from a JSON file into a collection
1427    ///
1428    /// This method validates documents against the collection schema
1429    /// and skips documents that already exist in the database.
1430    ///
1431    /// # Arguments
1432    /// * `collection` - Name of the collection to import into
1433    /// * `filename` - Path to the JSON file containing documents
1434    ///
1435    /// # Returns
1436    /// Statistics about the import operation or an error
1437    ///
1438    /// # Examples
1439    ///
1440    /// ```
1441    /// // Import documents from JSON
1442    /// let stats = db.import_from_json("users", "./data/new_users.json").await?;
1443    /// println!("Imported: {}, Skipped: {}, Failed: {}",
1444    ///     stats.imported, stats.skipped, stats.failed);
1445    /// ```
1446    pub async fn import_from_json(&self, collection: &str, filename: &str) -> Result<ImportStats> {
1447        // Validate that the collection exists
1448        let collection_def = self.get_collection_definition(collection)?;
1449
1450        // Load JSON file
1451        let json_string = read_to_string(filename)
1452            .await
1453            .map_err(|e| AuroraError::IoError(format!("Failed to read import file: {}", e)))?;
1454
1455        // Parse JSON
1456        let documents: Vec<JsonValue> = from_str(&json_string)
1457            .map_err(|e| AuroraError::SerializationError(format!("Failed to parse JSON: {}", e)))?;
1458
1459        let mut stats = ImportStats::default();
1460
1461        // Process each document
1462        for doc_json in documents {
1463            match self
1464                .import_document(collection, &collection_def, doc_json)
1465                .await
1466            {
1467                Ok(ImportResult::Imported) => stats.imported += 1,
1468                Ok(ImportResult::Skipped) => stats.skipped += 1,
1469                Err(_) => stats.failed += 1,
1470            }
1471        }
1472
1473        Ok(stats)
1474    }
1475
1476    /// Import a single document, performing schema validation and duplicate checking
1477    async fn import_document(
1478        &self,
1479        collection: &str,
1480        collection_def: &Collection,
1481        doc_json: JsonValue,
1482    ) -> Result<ImportResult> {
1483        if !doc_json.is_object() {
1484            return Err(AuroraError::InvalidOperation("Expected JSON object".into()));
1485        }
1486
1487        // Extract document ID if present
1488        let doc_id = doc_json
1489            .get("id")
1490            .and_then(|id| id.as_str())
1491            .map(|s| s.to_string())
1492            .unwrap_or_else(|| Uuid::new_v4().to_string());
1493
1494        // Check if document with this ID already exists
1495        if let Some(_) = self.get_document(collection, &doc_id)? {
1496            return Ok(ImportResult::Skipped);
1497        }
1498
1499        // Convert JSON to our document format and validate against schema
1500        let mut data_map = HashMap::new();
1501
1502        if let Some(obj) = doc_json.as_object() {
1503            for (field_name, field_def) in &collection_def.fields {
1504                if let Some(json_value) = obj.get(field_name) {
1505                    // Validate value against field type
1506                    if !self.validate_field_value(json_value, &field_def.field_type) {
1507                        return Err(AuroraError::InvalidOperation(format!(
1508                            "Field '{}' has invalid type",
1509                            field_name
1510                        )));
1511                    }
1512
1513                    // Convert JSON value to our Value type
1514                    let value = self.json_to_value(json_value)?;
1515                    data_map.insert(field_name.clone(), value);
1516                } else if field_def.unique {
1517                    // Missing required unique field
1518                    return Err(AuroraError::InvalidOperation(format!(
1519                        "Missing required unique field '{}'",
1520                        field_name
1521                    )));
1522                }
1523            }
1524        }
1525
1526        // Check for duplicates by unique fields
1527        let unique_fields = self.get_unique_fields(&collection_def);
1528        for unique_field in &unique_fields {
1529            if let Some(value) = data_map.get(unique_field) {
1530                // Query for existing documents with this unique value
1531                let query_results = self
1532                    .query(collection)
1533                    .filter(move |f| f.eq(unique_field, value.clone()))
1534                    .limit(1)
1535                    .collect()
1536                    .await?;
1537
1538                if !query_results.is_empty() {
1539                    // Found duplicate by unique field
1540                    return Ok(ImportResult::Skipped);
1541                }
1542            }
1543        }
1544
1545        // Create and insert document
1546        let document = Document {
1547            id: doc_id,
1548            data: data_map,
1549        };
1550
1551        self.put(
1552            format!("{}:{}", collection, document.id),
1553            serde_json::to_vec(&document)?,
1554            None,
1555        )?;
1556
1557        Ok(ImportResult::Imported)
1558    }
1559
1560    /// Validate that a JSON value matches the expected field type
1561    fn validate_field_value(&self, value: &JsonValue, field_type: &FieldType) -> bool {
1562        match field_type {
1563            FieldType::String => value.is_string(),
1564            FieldType::Int => value.is_i64() || value.is_u64(),
1565            FieldType::Float => value.is_number(),
1566            FieldType::Bool => value.is_boolean(),
1567            FieldType::Array => value.is_array(),
1568            FieldType::Object => value.is_object(),
1569            FieldType::Uuid => {
1570                value.is_string() && Uuid::parse_str(value.as_str().unwrap_or("")).is_ok()
1571            }
1572        }
1573    }
1574
1575    /// Convert a JSON value to our internal Value type
1576    fn json_to_value(&self, json_value: &JsonValue) -> Result<Value> {
1577        match json_value {
1578            JsonValue::Null => Ok(Value::Null),
1579            JsonValue::Bool(b) => Ok(Value::Bool(*b)),
1580            JsonValue::Number(n) => {
1581                if let Some(i) = n.as_i64() {
1582                    Ok(Value::Int(i))
1583                } else if let Some(f) = n.as_f64() {
1584                    Ok(Value::Float(f))
1585                } else {
1586                    Err(AuroraError::InvalidOperation("Invalid number value".into()))
1587                }
1588            }
1589            JsonValue::String(s) => {
1590                // Try parsing as UUID first
1591                if let Ok(uuid) = Uuid::parse_str(s) {
1592                    Ok(Value::Uuid(uuid))
1593                } else {
1594                    Ok(Value::String(s.clone()))
1595                }
1596            }
1597            JsonValue::Array(arr) => {
1598                let mut values = Vec::new();
1599                for item in arr {
1600                    values.push(self.json_to_value(item)?);
1601                }
1602                Ok(Value::Array(values))
1603            }
1604            JsonValue::Object(obj) => {
1605                let mut map = HashMap::new();
1606                for (k, v) in obj {
1607                    map.insert(k.clone(), self.json_to_value(v)?);
1608                }
1609                Ok(Value::Object(map))
1610            }
1611        }
1612    }
1613
1614    /// Get collection definition
1615    fn get_collection_definition(&self, collection: &str) -> Result<Collection> {
1616        if let Some(data) = self.get(&format!("_collection:{}", collection))? {
1617            let collection_def: Collection = serde_json::from_slice(&data)?;
1618            Ok(collection_def)
1619        } else {
1620            Err(AuroraError::CollectionNotFound(collection.to_string()))
1621        }
1622    }
1623
1624    /// Get storage statistics and information about the database
1625    pub fn get_database_stats(&self) -> Result<DatabaseStats> {
1626        let hot_stats = self.hot.get_stats();
1627        let cold_stats = self.cold.get_stats()?;
1628
1629        Ok(DatabaseStats {
1630            hot_stats,
1631            cold_stats,
1632            estimated_size: self.cold.estimated_size(),
1633            collections: self.get_collection_stats()?,
1634        })
1635    }
1636
1637    /// Check if a key is currently stored in the hot cache
1638    pub fn is_in_hot_cache(&self, key: &str) -> bool {
1639        self.hot.is_hot(key)
1640    }
1641
1642    /// Start background cleanup of hot cache with specified interval
1643    pub async fn start_hot_cache_maintenance(&self, interval_secs: u64) {
1644        let hot_store = Arc::new(self.hot.clone());
1645        hot_store.start_cleanup_with_interval(interval_secs).await;
1646    }
1647
1648    /// Clear the hot cache (useful when memory needs to be freed)
1649    pub fn clear_hot_cache(&self) {
1650        self.hot.clear();
1651        println!(
1652            "Hot cache cleared, current hit ratio: {:.2}%",
1653            self.hot.hit_ratio() * 100.0
1654        );
1655    }
1656
1657    /// Prewarm the cache by loading frequently accessed data from cold storage
1658    ///
1659    /// This loads the most recently modified documents from a collection into
1660    /// the hot cache to improve initial query performance after startup.
1661    ///
1662    /// # Arguments
1663    /// * `collection` - The collection to prewarm
1664    /// * `limit` - Maximum number of documents to load (default: 1000)
1665    ///
1666    /// # Returns
1667    /// Number of documents loaded into cache
1668    pub async fn prewarm_cache(&self, collection: &str, limit: Option<usize>) -> Result<usize> {
1669        let limit = limit.unwrap_or(1000);
1670        let prefix = format!("{}:", collection);
1671        let mut loaded = 0;
1672
1673        for entry in self.cold.scan_prefix(&prefix) {
1674            if loaded >= limit {
1675                break;
1676            }
1677
1678            if let Ok((key, value)) = entry {
1679                // Load into hot cache
1680                self.hot.set(key.clone(), value, None);
1681                loaded += 1;
1682            }
1683        }
1684
1685        println!("Prewarmed {} with {} documents", collection, loaded);
1686        Ok(loaded)
1687    }
1688
1689    /// Prewarm cache for all collections
1690    pub async fn prewarm_all_collections(
1691        &self,
1692        docs_per_collection: Option<usize>,
1693    ) -> Result<HashMap<String, usize>> {
1694        let mut stats = HashMap::new();
1695
1696        // Get all collections
1697        let collections: Vec<String> = self
1698            .cold
1699            .scan()
1700            .filter_map(|r| r.ok())
1701            .map(|(k, _)| k)
1702            .filter(|k| k.starts_with("_collection:"))
1703            .map(|k| k.trim_start_matches("_collection:").to_string())
1704            .collect();
1705
1706        for collection in collections {
1707            let loaded = self.prewarm_cache(&collection, docs_per_collection).await?;
1708            stats.insert(collection, loaded);
1709        }
1710
1711        Ok(stats)
1712    }
1713
1714    /// Store multiple key-value pairs efficiently in a single batch operation
1715    pub fn batch_write(&self, pairs: Vec<(String, Vec<u8>)>) -> Result<()> {
1716        // Group pairs by collection name
1717        let mut collections: HashMap<String, Vec<(String, Vec<u8>)>> = HashMap::new();
1718        for (key, value) in &pairs {
1719            if let Some(collection_name) = key.split(':').next() {
1720                collections
1721                    .entry(collection_name.to_string())
1722                    .or_default()
1723                    .push((key.clone(), value.clone()));
1724            }
1725        }
1726
1727        // First, do the batch write to cold storage for all pairs
1728        self.cold.batch_set(pairs)?;
1729
1730        // Then, process each collection for in-memory updates
1731        for (collection_name, batch) in collections {
1732            // --- Optimized Batch Indexing ---
1733
1734            // 1. Get schema once for the entire collection batch
1735            let collection_obj = match self.schema_cache.get(&collection_name) {
1736                Some(cached_schema) => Arc::clone(cached_schema.value()),
1737                None => {
1738                    let collection_key = format!("_collection:{}", collection_name);
1739                    match self.get(&collection_key)? {
1740                        Some(data) => {
1741                            let obj: Collection = serde_json::from_slice(&data)?;
1742                            let arc_obj = Arc::new(obj);
1743                            self.schema_cache
1744                                .insert(collection_name.to_string(), Arc::clone(&arc_obj));
1745                            arc_obj
1746                        }
1747                        None => continue,
1748                    }
1749                }
1750            };
1751
1752            let indexed_fields: Vec<String> = collection_obj
1753                .fields
1754                .iter()
1755                .filter(|(_, def)| def.indexed || def.unique)
1756                .map(|(name, _)| name.clone())
1757                .collect();
1758
1759            let primary_index = self
1760                .primary_indices
1761                .entry(collection_name.to_string())
1762                .or_insert_with(DashMap::new);
1763
1764            for (key, value) in batch {
1765                // 2. Update hot cache
1766                self.hot.set(key.clone(), value.clone(), None);
1767
1768                // 3. Update primary index
1769                primary_index.insert(key.clone(), value.clone());
1770
1771                // 4. Update secondary indices
1772                if !indexed_fields.is_empty() {
1773                    if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
1774                        for (field, field_value) in doc.data {
1775                            if indexed_fields.contains(&field) {
1776                                let value_str = match &field_value {
1777                                    Value::String(s) => s.clone(),
1778                                    _ => field_value.to_string(),
1779                                };
1780                                let index_key = format!("{}:{}", collection_name, field);
1781                                let secondary_index = self
1782                                    .secondary_indices
1783                                    .entry(index_key)
1784                                    .or_insert_with(DashMap::new);
1785
1786                                let max_entries = self.config.max_index_entries_per_field;
1787                                secondary_index
1788                                    .entry(value_str)
1789                                    .and_modify(|doc_ids| {
1790                                        if doc_ids.len() < max_entries {
1791                                            doc_ids.push(key.to_string());
1792                                        }
1793                                    })
1794                                    .or_insert_with(|| vec![key.to_string()]);
1795                            }
1796                        }
1797                    }
1798                }
1799            }
1800        }
1801
1802        Ok(())
1803    }
1804
1805    /// Scan for keys with a specific prefix
1806    pub fn scan_with_prefix(
1807        &self,
1808        prefix: &str,
1809    ) -> impl Iterator<Item = Result<(String, Vec<u8>)>> + '_ {
1810        self.cold.scan_prefix(prefix)
1811    }
1812
1813    /// Get storage efficiency metrics for the database
1814    pub fn get_collection_stats(&self) -> Result<HashMap<String, CollectionStats>> {
1815        let mut stats = HashMap::new();
1816
1817        // Scan all collections
1818        let collections: Vec<String> = self
1819            .cold
1820            .scan()
1821            .filter_map(|r| r.ok())
1822            .map(|(k, _)| k)
1823            .filter(|k| k.starts_with("_collection:"))
1824            .map(|k| k.trim_start_matches("_collection:").to_string())
1825            .collect();
1826
1827        for collection in collections {
1828            let prefix = format!("{}:", collection);
1829
1830            // Count documents
1831            let count = self.cold.scan_prefix(&prefix).count();
1832
1833            // Estimate size
1834            let size: usize = self
1835                .cold
1836                .scan_prefix(&prefix)
1837                .filter_map(|r| r.ok())
1838                .map(|(_, v)| v.len())
1839                .sum();
1840
1841            stats.insert(
1842                collection,
1843                CollectionStats {
1844                    count,
1845                    size_bytes: size,
1846                    avg_doc_size: if count > 0 { size / count } else { 0 },
1847                },
1848            );
1849        }
1850
1851        Ok(stats)
1852    }
1853
1854    /// Search for documents by exact value using an index
1855    ///
1856    /// This method performs a fast lookup using a pre-created index
1857    pub fn search_by_value(
1858        &self,
1859        collection: &str,
1860        field: &str,
1861        value: &Value,
1862    ) -> Result<Vec<Document>> {
1863        let index_key = format!("_index:{}:{}", collection, field);
1864
1865        if let Some(index_data) = self.get(&index_key)? {
1866            let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
1867            let index = Index::new(index_def);
1868
1869            // Use the previously unused search method
1870            if let Some(doc_ids) = index.search(value) {
1871                // Load the documents by ID
1872                let mut docs = Vec::new();
1873                for id in doc_ids {
1874                    if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
1875                        let doc: Document = serde_json::from_slice(&doc_data)?;
1876                        docs.push(doc);
1877                    }
1878                }
1879                return Ok(docs);
1880            }
1881        }
1882
1883        // Return empty result if no index or no matches
1884        Ok(Vec::new())
1885    }
1886
1887    /// Perform a full-text search on an indexed text field
1888    ///
1889    /// This provides more advanced text search capabilities including
1890    /// relevance ranking of results
1891    pub fn full_text_search(
1892        &self,
1893        collection: &str,
1894        field: &str,
1895        query: &str,
1896    ) -> Result<Vec<Document>> {
1897        let index_key = format!("_index:{}:{}", collection, field);
1898
1899        if let Some(index_data) = self.get(&index_key)? {
1900            let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
1901
1902            // Ensure this is a full-text index
1903            if !matches!(index_def.index_type, IndexType::FullText) {
1904                return Err(AuroraError::InvalidOperation(format!(
1905                    "Field '{}' is not indexed as full-text",
1906                    field
1907                )));
1908            }
1909
1910            let index = Index::new(index_def);
1911
1912            // Use the previously unused search_text method
1913            if let Some(doc_id_scores) = index.search_text(query) {
1914                // Load the documents by ID, preserving score order
1915                let mut docs = Vec::new();
1916                for (id, _score) in doc_id_scores {
1917                    if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
1918                        let doc: Document = serde_json::from_slice(&doc_data)?;
1919                        docs.push(doc);
1920                    }
1921                }
1922                return Ok(docs);
1923            }
1924        }
1925
1926        // Return empty result if no index or no matches
1927        Ok(Vec::new())
1928    }
1929
1930    /// Create a full-text search index on a text field
1931    pub fn create_text_index(
1932        &self,
1933        collection: &str,
1934        field: &str,
1935        _enable_stop_words: bool,
1936    ) -> Result<()> {
1937        // Check if collection exists
1938        if self.get(&format!("_collection:{}", collection))?.is_none() {
1939            return Err(AuroraError::CollectionNotFound(collection.to_string()));
1940        }
1941
1942        // Create index definition
1943        let index_def = IndexDefinition {
1944            name: format!("{}_{}_fulltext", collection, field),
1945            collection: collection.to_string(),
1946            fields: vec![field.to_string()],
1947            index_type: IndexType::FullText,
1948            unique: false,
1949        };
1950
1951        // Store index definition
1952        let index_key = format!("_index:{}:{}", collection, field);
1953        self.put(index_key, serde_json::to_vec(&index_def)?, None)?;
1954
1955        // Create the actual index
1956        let index = Index::new(index_def);
1957
1958        // Index all existing documents in the collection
1959        let prefix = format!("{}:", collection);
1960        for result in self.cold.scan_prefix(&prefix) {
1961            if let Ok((_, data)) = result {
1962                let doc: Document = serde_json::from_slice(&data)?;
1963                index.insert(&doc)?;
1964            }
1965        }
1966
1967        Ok(())
1968    }
1969
1970    pub async fn execute_simple_query(
1971        &self,
1972        builder: &SimpleQueryBuilder,
1973    ) -> Result<Vec<Document>> {
1974        // Ensure indices are initialized
1975        self.ensure_indices_initialized().await?;
1976
1977        // A place to store the IDs of the documents we need to fetch
1978        let mut doc_ids_to_load: Option<Vec<String>> = None;
1979        let mut used_filter_index: Option<usize> = None;
1980
1981        // --- The "Query Planner" ---
1982        // Look for an opportunity to use an index
1983        for (filter_idx, filter) in builder.filters.iter().enumerate() {
1984            match filter {
1985                Filter::Eq(field, value) => {
1986                    let index_key = format!("{}:{}", &builder.collection, field);
1987
1988                    // Do we have a secondary index for this field?
1989                    if let Some(index) = self.secondary_indices.get(&index_key) {
1990                        // Yes! Let's use it.
1991                        if let Some(matching_ids) = index.get(&value.to_string()) {
1992                            doc_ids_to_load = Some(matching_ids.clone());
1993                            used_filter_index = Some(filter_idx);
1994                            break; // Stop searching for other indexes for now
1995                        }
1996                    }
1997                }
1998                Filter::Gt(field, value)
1999                | Filter::Gte(field, value)
2000                | Filter::Lt(field, value)
2001                | Filter::Lte(field, value) => {
2002                    let index_key = format!("{}:{}", &builder.collection, field);
2003
2004                    // Do we have a secondary index for this field?
2005                    if let Some(index) = self.secondary_indices.get(&index_key) {
2006                        // For range queries, we need to scan through the index values
2007                        let mut matching_ids = Vec::new();
2008
2009                        for entry in index.iter() {
2010                            let index_value_str = entry.key();
2011
2012                            // Try to parse the index value to compare with our filter value
2013                            if let Ok(index_value) =
2014                                self.parse_value_from_string(index_value_str, value)
2015                            {
2016                                let matches = match filter {
2017                                    Filter::Gt(_, filter_val) => index_value > *filter_val,
2018                                    Filter::Gte(_, filter_val) => index_value >= *filter_val,
2019                                    Filter::Lt(_, filter_val) => index_value < *filter_val,
2020                                    Filter::Lte(_, filter_val) => index_value <= *filter_val,
2021                                    _ => false,
2022                                };
2023
2024                                if matches {
2025                                    matching_ids.extend(entry.value().clone());
2026                                }
2027                            }
2028                        }
2029
2030                        if !matching_ids.is_empty() {
2031                            doc_ids_to_load = Some(matching_ids);
2032                            used_filter_index = Some(filter_idx);
2033                            break;
2034                        }
2035                    }
2036                }
2037                Filter::Contains(field, search_term) => {
2038                    let index_key = format!("{}:{}", &builder.collection, field);
2039
2040                    // Do we have a secondary index for this field?
2041                    if let Some(index) = self.secondary_indices.get(&index_key) {
2042                        // For Contains queries, we need to scan through the index keys
2043                        // to find those that contain the search term
2044                        let mut matching_ids = Vec::new();
2045
2046                        for entry in index.iter() {
2047                            let index_value_str = entry.key();
2048
2049                            // Check if this indexed value contains our search term
2050                            if index_value_str
2051                                .to_lowercase()
2052                                .contains(&search_term.to_lowercase())
2053                            {
2054                                matching_ids.extend(entry.value().clone());
2055                            }
2056                        }
2057
2058                        if !matching_ids.is_empty() {
2059                            // Remove duplicates since a document could match multiple indexed values
2060                            matching_ids.sort();
2061                            matching_ids.dedup();
2062
2063                            doc_ids_to_load = Some(matching_ids);
2064                            used_filter_index = Some(filter_idx);
2065                            break;
2066                        }
2067                    }
2068                }
2069            }
2070        }
2071
2072        let mut final_docs: Vec<Document>;
2073
2074        if let Some(ids) = doc_ids_to_load {
2075            // --- Path 1: Index-based Fetch ---
2076            // We have a specific list of IDs to load. This is fast.
2077            println!("📊 Loading {} documents via index", ids.len());
2078            final_docs = Vec::with_capacity(ids.len());
2079
2080            for id in ids {
2081                let doc_key = format!("{}:{}", &builder.collection, id);
2082                if let Some(data) = self.get(&doc_key)? {
2083                    if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
2084                        final_docs.push(doc);
2085                    }
2086                }
2087            }
2088        } else {
2089            // --- Path 2: Full Collection Scan (Fallback) ---
2090            // No suitable index was found, so we must do the slow scan
2091            println!(
2092                "⚠️  INDEX MISS. Falling back to full collection scan for '{}'",
2093                &builder.collection
2094            );
2095            final_docs = self.get_all_collection(&builder.collection).await?;
2096        }
2097
2098        // Now, apply the *rest* of the filters in memory
2099        // This is important for queries with multiple filters, where only one might be indexed
2100        // Skip the filter we already used for the index lookup
2101        final_docs.retain(|doc| {
2102            builder.filters.iter().enumerate().all(|(idx, filter)| {
2103                // Skip the filter we already used for index lookup
2104                if Some(idx) == used_filter_index {
2105                    return true;
2106                }
2107
2108                match filter {
2109                    Filter::Eq(field, value) => doc.data.get(field).map_or(false, |v| v == value),
2110                    Filter::Gt(field, value) => doc.data.get(field).map_or(false, |v| v > value),
2111                    Filter::Gte(field, value) => doc.data.get(field).map_or(false, |v| v >= value),
2112                    Filter::Lt(field, value) => doc.data.get(field).map_or(false, |v| v < value),
2113                    Filter::Lte(field, value) => doc.data.get(field).map_or(false, |v| v <= value),
2114                    Filter::Contains(field, value_str) => {
2115                        doc.data.get(field).map_or(false, |v| match v {
2116                            Value::String(s) => s.contains(value_str),
2117                            Value::Array(arr) => arr.contains(&Value::String(value_str.clone())),
2118                            _ => false,
2119                        })
2120                    }
2121                }
2122            })
2123        });
2124
2125        println!(
2126            "✅ Query completed. Returning {} documents",
2127            final_docs.len()
2128        );
2129
2130        // NOTE: This implementation does not yet support ordering, limits, or offsets.
2131        // Those would be added here in a production system.
2132        Ok(final_docs)
2133    }
2134
2135    /// Helper method to parse a string value back to a Value for comparison
2136    fn parse_value_from_string(&self, value_str: &str, reference_value: &Value) -> Result<Value> {
2137        match reference_value {
2138            Value::Int(_) => {
2139                if let Ok(i) = value_str.parse::<i64>() {
2140                    Ok(Value::Int(i))
2141                } else {
2142                    Err(AuroraError::InvalidOperation("Failed to parse int".into()))
2143                }
2144            }
2145            Value::Float(_) => {
2146                if let Ok(f) = value_str.parse::<f64>() {
2147                    Ok(Value::Float(f))
2148                } else {
2149                    Err(AuroraError::InvalidOperation(
2150                        "Failed to parse float".into(),
2151                    ))
2152                }
2153            }
2154            Value::String(_) => Ok(Value::String(value_str.to_string())),
2155            _ => Ok(Value::String(value_str.to_string())),
2156        }
2157    }
2158
2159    pub async fn execute_dynamic_query(
2160        &self,
2161        collection: &str,
2162        payload: &QueryPayload,
2163    ) -> Result<Vec<Document>> {
2164        let mut docs = self.get_all_collection(collection).await?;
2165
2166        // 1. Apply Filters
2167        if let Some(filters) = &payload.filters {
2168            docs.retain(|doc| {
2169                filters.iter().all(|filter| {
2170                    doc.data
2171                        .get(&filter.field)
2172                        .map_or(false, |doc_val| check_filter(doc_val, filter))
2173                })
2174            });
2175        }
2176
2177        // 2. Apply Sorting
2178        if let Some(sort_options) = &payload.sort {
2179            docs.sort_by(|a, b| {
2180                let a_val = a.data.get(&sort_options.field);
2181                let b_val = b.data.get(&sort_options.field);
2182                let ordering = a_val
2183                    .partial_cmp(&b_val)
2184                    .unwrap_or(std::cmp::Ordering::Equal);
2185                if sort_options.ascending {
2186                    ordering
2187                } else {
2188                    ordering.reverse()
2189                }
2190            });
2191        }
2192
2193        // 3. Apply Pagination
2194        let mut docs = docs;
2195        if let Some(offset) = payload.offset {
2196            docs = docs.into_iter().skip(offset).collect();
2197        }
2198        if let Some(limit) = payload.limit {
2199            docs = docs.into_iter().take(limit).collect();
2200        }
2201
2202        // 4. Apply Field Selection (Projection)
2203        if let Some(select_fields) = &payload.select {
2204            if !select_fields.is_empty() {
2205                docs = docs
2206                    .into_iter()
2207                    .map(|mut doc| {
2208                        doc.data.retain(|key, _| select_fields.contains(key));
2209                        doc
2210                    })
2211                    .collect();
2212            }
2213        }
2214
2215        Ok(docs)
2216    }
2217
2218    pub async fn process_network_request(
2219        &self,
2220        request: crate::network::protocol::Request,
2221    ) -> crate::network::protocol::Response {
2222        use crate::network::protocol::Response;
2223
2224        match request {
2225            crate::network::protocol::Request::Get(key) => match self.get(&key) {
2226                Ok(value) => Response::Success(value),
2227                Err(e) => Response::Error(e.to_string()),
2228            },
2229            crate::network::protocol::Request::Put(key, value) => {
2230                match self.put(key, value, None) {
2231                    Ok(_) => Response::Done,
2232                    Err(e) => Response::Error(e.to_string()),
2233                }
2234            }
2235            crate::network::protocol::Request::Delete(key) => match self.delete(&key).await {
2236                Ok(_) => Response::Done,
2237                Err(e) => Response::Error(e.to_string()),
2238            },
2239            crate::network::protocol::Request::NewCollection { name, fields } => {
2240                let fields_for_db: Vec<(String, crate::types::FieldType, bool)> = fields
2241                    .iter()
2242                    .map(|(name, ft, unique)| (name.clone(), ft.clone(), *unique))
2243                    .collect();
2244
2245                match self.new_collection(&name, fields_for_db) {
2246                    Ok(_) => Response::Done,
2247                    Err(e) => Response::Error(e.to_string()),
2248                }
2249            }
2250            crate::network::protocol::Request::Insert { collection, data } => {
2251                match self.insert_map(&collection, data).await {
2252                    Ok(id) => Response::Message(id),
2253                    Err(e) => Response::Error(e.to_string()),
2254                }
2255            }
2256            crate::network::protocol::Request::GetDocument { collection, id } => {
2257                match self.get_document(&collection, &id) {
2258                    Ok(doc) => Response::Document(doc),
2259                    Err(e) => Response::Error(e.to_string()),
2260                }
2261            }
2262            crate::network::protocol::Request::Query(builder) => {
2263                match self.execute_simple_query(&builder).await {
2264                    Ok(docs) => Response::Documents(docs),
2265                    Err(e) => Response::Error(e.to_string()),
2266                }
2267            }
2268            crate::network::protocol::Request::BeginTransaction => {
2269                let tx_id = self.begin_transaction();
2270                Response::TransactionId(tx_id.as_u64())
2271            }
2272            crate::network::protocol::Request::CommitTransaction(tx_id_u64) => {
2273                let tx_id = crate::transaction::TransactionId::from_u64(tx_id_u64);
2274                match self.commit_transaction(tx_id) {
2275                    Ok(_) => Response::Done,
2276                    Err(e) => Response::Error(e.to_string()),
2277                }
2278            }
2279            crate::network::protocol::Request::RollbackTransaction(tx_id_u64) => {
2280                let tx_id = crate::transaction::TransactionId::from_u64(tx_id_u64);
2281                match self.rollback_transaction(tx_id) {
2282                    Ok(_) => Response::Done,
2283                    Err(e) => Response::Error(e.to_string()),
2284                }
2285            }
2286        }
2287    }
2288
2289    /// Create indices for commonly queried fields automatically
2290    ///
2291    /// This is a convenience method that creates indices for fields that are
2292    /// likely to be queried frequently, improving performance.
2293    ///
2294    /// # Arguments
2295    /// * `collection` - Name of the collection
2296    /// * `fields` - List of field names to create indices for
2297    ///
2298    /// # Examples
2299    /// ```
2300    /// // Create indices for commonly queried fields
2301    /// db.create_indices("users", &["email", "status", "created_at"]).await?;
2302    /// ```
2303    pub async fn create_indices(&self, collection: &str, fields: &[&str]) -> Result<()> {
2304        for field in fields {
2305            if let Err(e) = self.create_index(collection, field).await {
2306                eprintln!(
2307                    "Warning: Failed to create index for {}.{}: {}",
2308                    collection, field, e
2309                );
2310            } else {
2311                println!("✅ Created index for {}.{}", collection, field);
2312            }
2313        }
2314        Ok(())
2315    }
2316
2317    /// Get index statistics for a collection
2318    ///
2319    /// This helps understand which indices exist and how effective they are.
2320    pub fn get_index_stats(&self, collection: &str) -> HashMap<String, IndexStats> {
2321        let mut stats = HashMap::new();
2322
2323        for entry in self.secondary_indices.iter() {
2324            let key = entry.key();
2325            if key.starts_with(&format!("{}:", collection)) {
2326                let field = key.split(':').nth(1).unwrap_or("unknown");
2327                let index = entry.value();
2328
2329                let unique_values = index.len();
2330                let total_documents: usize = index.iter().map(|entry| entry.value().len()).sum();
2331
2332                stats.insert(
2333                    field.to_string(),
2334                    IndexStats {
2335                        unique_values,
2336                        total_documents,
2337                        avg_docs_per_value: if unique_values > 0 {
2338                            total_documents / unique_values
2339                        } else {
2340                            0
2341                        },
2342                    },
2343                );
2344            }
2345        }
2346
2347        stats
2348    }
2349
2350    /// Optimize a collection by creating indices for frequently filtered fields
2351    ///
2352    /// This analyzes common query patterns and suggests/creates optimal indices.
2353    pub async fn optimize_collection(&self, collection: &str) -> Result<()> {
2354        // For now, this is a simple implementation that creates indices for all fields
2355        // In a more sophisticated version, this would analyze query logs
2356
2357        if let Ok(collection_def) = self.get_collection_definition(collection) {
2358            let field_names: Vec<&str> = collection_def.fields.keys().map(|s| s.as_str()).collect();
2359            self.create_indices(collection, &field_names).await?;
2360            println!(
2361                "🚀 Optimized collection '{}' with {} indices",
2362                collection,
2363                field_names.len()
2364            );
2365        }
2366
2367        Ok(())
2368    }
2369
2370    // Helper method to get unique fields from a collection
2371    fn get_unique_fields(&self, collection: &Collection) -> Vec<String> {
2372        collection
2373            .fields
2374            .iter()
2375            .filter(|(_, def)| def.unique)
2376            .map(|(name, _)| name.clone())
2377            .collect()
2378    }
2379
2380    // Update the validation method to use the helper
2381    async fn validate_unique_constraints(
2382        &self,
2383        collection: &str,
2384        data: &HashMap<String, Value>,
2385    ) -> Result<()> {
2386        self.ensure_indices_initialized().await?;
2387        let collection_def = self.get_collection_definition(collection)?;
2388        let unique_fields = self.get_unique_fields(&collection_def);
2389
2390        for unique_field in &unique_fields {
2391            if let Some(value) = data.get(unique_field) {
2392                let index_key = format!("{}:{}", collection, unique_field);
2393                if let Some(index) = self.secondary_indices.get(&index_key) {
2394                    // Get the raw string value without JSON formatting
2395                    let value_str = match value {
2396                        Value::String(s) => s.clone(),
2397                        _ => value.to_string(),
2398                    };
2399                    if index.contains_key(&value_str) {
2400                        return Err(AuroraError::UniqueConstraintViolation(
2401                            unique_field.clone(),
2402                            value_str,
2403                        ));
2404                    }
2405                }
2406            }
2407        }
2408        Ok(())
2409    }
2410
2411    /// Validate unique constraints excluding a specific document ID (for updates)
2412    async fn validate_unique_constraints_excluding(
2413        &self,
2414        collection: &str,
2415        data: &HashMap<String, Value>,
2416        exclude_id: &str,
2417    ) -> Result<()> {
2418        self.ensure_indices_initialized().await?;
2419        let collection_def = self.get_collection_definition(collection)?;
2420        let unique_fields = self.get_unique_fields(&collection_def);
2421
2422        for unique_field in &unique_fields {
2423            if let Some(value) = data.get(unique_field) {
2424                let index_key = format!("{}:{}", collection, unique_field);
2425                if let Some(index) = self.secondary_indices.get(&index_key) {
2426                    // Get the raw string value without JSON formatting
2427                    let value_str = match value {
2428                        Value::String(s) => s.clone(),
2429                        _ => value.to_string(),
2430                    };
2431                    if let Some(doc_ids) = index.get(&value_str) {
2432                        // Check if any document other than the excluded one has this value
2433                        let exclude_key = format!("{}:{}", collection, exclude_id);
2434                        for doc_key in doc_ids.value() {
2435                            if doc_key != &exclude_key {
2436                                return Err(AuroraError::UniqueConstraintViolation(
2437                                    unique_field.clone(),
2438                                    value_str,
2439                                ));
2440                            }
2441                        }
2442                    }
2443                }
2444            }
2445        }
2446        Ok(())
2447    }
2448}
2449
2450fn check_filter(doc_val: &Value, filter: &HttpFilter) -> bool {
2451    let filter_val = match json_to_value(&filter.value) {
2452        Ok(v) => v,
2453        Err(_) => return false,
2454    };
2455
2456    match filter.operator {
2457        FilterOperator::Eq => doc_val == &filter_val,
2458        FilterOperator::Ne => doc_val != &filter_val,
2459        FilterOperator::Gt => doc_val > &filter_val,
2460        FilterOperator::Gte => doc_val >= &filter_val,
2461        FilterOperator::Lt => doc_val < &filter_val,
2462        FilterOperator::Lte => doc_val <= &filter_val,
2463        FilterOperator::Contains => match (doc_val, &filter_val) {
2464            (Value::String(s), Value::String(fv)) => s.contains(fv),
2465            (Value::Array(arr), _) => arr.contains(&filter_val),
2466            _ => false,
2467        },
2468    }
2469}
2470
2471/// Results of importing a document
2472enum ImportResult {
2473    Imported,
2474    Skipped,
2475}
2476
2477/// Statistics from an import operation
2478#[derive(Debug, Default)]
2479pub struct ImportStats {
2480    /// Number of documents successfully imported
2481    pub imported: usize,
2482    /// Number of documents skipped (usually because they already exist)
2483    pub skipped: usize,
2484    /// Number of documents that failed to import
2485    pub failed: usize,
2486}
2487
2488/// Statistics for a specific collection
2489#[derive(Debug)]
2490pub struct CollectionStats {
2491    /// Number of documents in the collection
2492    pub count: usize,
2493    /// Total size of the collection in bytes
2494    pub size_bytes: usize,
2495    /// Average document size in bytes
2496    pub avg_doc_size: usize,
2497}
2498
2499/// Statistics for an index
2500#[derive(Debug)]
2501pub struct IndexStats {
2502    /// Number of unique values in the index
2503    pub unique_values: usize,
2504    /// Total number of documents covered by the index
2505    pub total_documents: usize,
2506    /// Average number of documents per unique value
2507    pub avg_docs_per_value: usize,
2508}
2509
2510/// Combined database statistics
2511#[derive(Debug)]
2512pub struct DatabaseStats {
2513    /// Hot cache statistics
2514    pub hot_stats: crate::storage::hot::CacheStats,
2515    /// Cold storage statistics
2516    pub cold_stats: crate::storage::cold::ColdStoreStats,
2517    /// Estimated total database size in bytes
2518    pub estimated_size: u64,
2519    /// Statistics for each collection
2520    pub collections: HashMap<String, CollectionStats>,
2521}
2522
2523#[cfg(test)]
2524mod tests {
2525    use super::*;
2526    use tempfile::tempdir;
2527
2528    #[tokio::test]
2529    async fn test_basic_operations() -> Result<()> {
2530        let temp_dir = tempdir()?;
2531        let db_path = temp_dir.path().join("test.aurora");
2532        let db = Aurora::open(db_path.to_str().unwrap())?;
2533
2534        // Test collection creation
2535        db.new_collection(
2536            "users",
2537            vec![
2538                ("name", FieldType::String, false),
2539                ("age", FieldType::Int, false),
2540                ("email", FieldType::String, true),
2541            ],
2542        )?;
2543
2544        // Test document insertion
2545        let doc_id = db
2546            .insert_into(
2547                "users",
2548                vec![
2549                    ("name", Value::String("John Doe".to_string())),
2550                    ("age", Value::Int(30)),
2551                    ("email", Value::String("john@example.com".to_string())),
2552                ],
2553            )
2554            .await?;
2555
2556        // Test document retrieval
2557        let doc = db.get_document("users", &doc_id)?.unwrap();
2558        assert_eq!(
2559            doc.data.get("name").unwrap(),
2560            &Value::String("John Doe".to_string())
2561        );
2562        assert_eq!(doc.data.get("age").unwrap(), &Value::Int(30));
2563
2564        Ok(())
2565    }
2566
2567    #[tokio::test]
2568    async fn test_transactions() -> Result<()> {
2569        let temp_dir = tempdir()?;
2570        let db_path = temp_dir.path().join("test.aurora");
2571        let db = Aurora::open(db_path.to_str().unwrap())?;
2572
2573        // Start transaction
2574        let tx_id = db.begin_transaction();
2575
2576        // Insert document
2577        let doc_id = db
2578            .insert_into("test", vec![("field", Value::String("value".to_string()))])
2579            .await?;
2580
2581        // Commit transaction
2582        db.commit_transaction(tx_id)?;
2583
2584        // Verify document exists
2585        let doc = db.get_document("test", &doc_id)?.unwrap();
2586        assert_eq!(
2587            doc.data.get("field").unwrap(),
2588            &Value::String("value".to_string())
2589        );
2590
2591        Ok(())
2592    }
2593
2594    #[tokio::test]
2595    async fn test_query_operations() -> Result<()> {
2596        let temp_dir = tempdir()?;
2597        let db_path = temp_dir.path().join("test.aurora");
2598        let db = Aurora::open(db_path.to_str().unwrap())?;
2599
2600        // Test collection creation
2601        db.new_collection(
2602            "books",
2603            vec![
2604                ("title", FieldType::String, false),
2605                ("author", FieldType::String, false),
2606                ("year", FieldType::Int, false),
2607            ],
2608        )?;
2609
2610        // Test document insertion
2611        db.insert_into(
2612            "books",
2613            vec![
2614                ("title", Value::String("Book 1".to_string())),
2615                ("author", Value::String("Author 1".to_string())),
2616                ("year", Value::Int(2020)),
2617            ],
2618        )
2619        .await?;
2620
2621        db.insert_into(
2622            "books",
2623            vec![
2624                ("title", Value::String("Book 2".to_string())),
2625                ("author", Value::String("Author 2".to_string())),
2626                ("year", Value::Int(2021)),
2627            ],
2628        )
2629        .await?;
2630
2631        // Test query
2632        let results = db
2633            .query("books")
2634            .filter(|f| f.gt("year", Value::Int(2019)))
2635            .order_by("year", true)
2636            .collect()
2637            .await?;
2638
2639        assert_eq!(results.len(), 2);
2640        assert!(results[0].data.get("year").unwrap() < results[1].data.get("year").unwrap());
2641
2642        Ok(())
2643    }
2644
2645    #[tokio::test]
2646    async fn test_blob_operations() -> Result<()> {
2647        let temp_dir = tempdir()?;
2648        let db_path = temp_dir.path().join("test.aurora");
2649        let db = Aurora::open(db_path.to_str().unwrap())?;
2650
2651        // Create test file
2652        let file_path = temp_dir.path().join("test.txt");
2653        std::fs::write(&file_path, b"Hello, World!")?;
2654
2655        // Test blob storage
2656        db.put_blob("test:blob".to_string(), &file_path).await?;
2657
2658        // Verify blob exists
2659        let data = db.get_data_by_pattern("test:blob")?;
2660        assert_eq!(data.len(), 1);
2661        match &data[0].1 {
2662            DataInfo::Blob { size } => assert_eq!(*size, 13 + 5), // content + "BLOB:" prefix
2663            _ => panic!("Expected Blob type"),
2664        }
2665
2666        Ok(())
2667    }
2668
2669    #[tokio::test]
2670    async fn test_blob_size_limit() -> Result<()> {
2671        let temp_dir = tempdir()?;
2672        let db_path = temp_dir.path().join("test.aurora");
2673        let db = Aurora::open(db_path.to_str().unwrap())?;
2674
2675        // Create a test file that's too large (201MB)
2676        let large_file_path = temp_dir.path().join("large_file.bin");
2677        let large_data = vec![0u8; 201 * 1024 * 1024];
2678        std::fs::write(&large_file_path, &large_data)?;
2679
2680        // Attempt to store the large file
2681        let result = db
2682            .put_blob("test:large_blob".to_string(), &large_file_path)
2683            .await;
2684
2685        assert!(result.is_err());
2686        assert!(matches!(
2687            result.unwrap_err(),
2688            AuroraError::InvalidOperation(_)
2689        ));
2690
2691        Ok(())
2692    }
2693
2694    #[tokio::test]
2695    async fn test_unique_constraints() -> Result<()> {
2696        let temp_dir = tempdir()?;
2697        let db_path = temp_dir.path().join("test.aurora");
2698        let db = Aurora::open(db_path.to_str().unwrap())?;
2699
2700        // Create collection with unique email field
2701        db.new_collection(
2702            "users",
2703            vec![
2704                ("name", FieldType::String, false),
2705                ("email", FieldType::String, true), // unique field
2706                ("age", FieldType::Int, false),
2707            ],
2708        )?;
2709
2710        // Insert first document
2711        let _doc_id1 = db
2712            .insert_into(
2713                "users",
2714                vec![
2715                    ("name", Value::String("John Doe".to_string())),
2716                    ("email", Value::String("john@example.com".to_string())),
2717                    ("age", Value::Int(30)),
2718                ],
2719            )
2720            .await?;
2721
2722        // Try to insert second document with same email - should fail
2723        let result = db
2724            .insert_into(
2725                "users",
2726                vec![
2727                    ("name", Value::String("Jane Doe".to_string())),
2728                    ("email", Value::String("john@example.com".to_string())), // duplicate email
2729                    ("age", Value::Int(25)),
2730                ],
2731            )
2732            .await;
2733
2734        assert!(result.is_err());
2735        if let Err(AuroraError::UniqueConstraintViolation(field, value)) = result {
2736            assert_eq!(field, "email");
2737            assert_eq!(value, "john@example.com");
2738        } else {
2739            panic!("Expected UniqueConstraintViolation error");
2740        }
2741
2742        // Test upsert with unique constraint
2743        // Should succeed for new document
2744        let _doc_id2 = db
2745            .upsert(
2746                "users",
2747                "user2",
2748                vec![
2749                    ("name", Value::String("Alice Smith".to_string())),
2750                    ("email", Value::String("alice@example.com".to_string())),
2751                    ("age", Value::Int(28)),
2752                ],
2753            )
2754            .await?;
2755
2756        // Should fail when trying to upsert with duplicate email
2757        let result = db
2758            .upsert(
2759                "users",
2760                "user3",
2761                vec![
2762                    ("name", Value::String("Bob Wilson".to_string())),
2763                    ("email", Value::String("alice@example.com".to_string())), // duplicate
2764                    ("age", Value::Int(35)),
2765                ],
2766            )
2767            .await;
2768
2769        assert!(result.is_err());
2770
2771        // Should succeed when updating existing document with same email (no change)
2772        let result = db
2773            .upsert(
2774                "users",
2775                "user2",
2776                vec![
2777                    ("name", Value::String("Alice Updated".to_string())),
2778                    ("email", Value::String("alice@example.com".to_string())), // same email, same doc
2779                    ("age", Value::Int(29)),
2780                ],
2781            )
2782            .await;
2783
2784        assert!(result.is_ok());
2785
2786        Ok(())
2787    }
2788}