Skip to main content

aurora_db/
db.rs

1//! # Aurora Database
2//!
3//! Aurora is an embedded document database with tiered storage architecture.
4//! It provides document storage, querying, indexing, and search capabilities
5//! while optimizing for both performance and durability.
6//!
7//! ## Key Features
8//!
9//! * **Tiered Storage**: Hot in-memory cache + persistent cold storage
10//! * **Document Model**: Schema-flexible JSON-like document storage
11//! * **Querying**: Rich query capabilities with filtering and sorting
12//! * **Full-text Search**: Built-in search engine with relevance ranking
13//! * **Transactions**: ACID-compliant transaction support
14//! * **Blob Storage**: Efficient storage for large binary objects
15//!
16//! ## Usage Example
17//!
18//! ```rust
19//! use aurora::Aurora;
20//!
21//! // Open a database
22//! let db = Aurora::open("my_database.db")?;
23//!
24//! // Create a collection with schema
25//! db.new_collection("users", vec![
26//!     ("name", FieldType::String, false),
27//!     ("email", FieldType::String, true),  // unique field
28//!     ("age", FieldType::Int, false),
29//! ])?;
30//!
31//! // Insert a document
32//! let user_id = db.insert_into("users", vec![
33//!     ("name", Value::String("Jane Doe".to_string())),
34//!     ("email", Value::String("jane@example.com".to_string())),
35//!     ("age", Value::Int(28)),
36//! ])?;
37//!
38//! // Query for documents
39//! let adult_users = db.query("users")
40//!     .filter(|f| f.gt("age", 18))
41//!     .order_by("name", true)
42//!     .collect()
43//!     .await?;
44//! ```
45
46use crate::error::{AqlError, ErrorCode, Result};
47use crate::index::{Index, IndexDefinition, IndexType};
48use crate::network::http_models::{
49    Filter as HttpFilter, FilterOperator, QueryPayload, json_to_value,
50};
51use crate::parser;
52use crate::parser::executor::{ExecutionOptions, ExecutionPlan, ExecutionResult};
53use crate::query::FilterBuilder;
54use crate::query::{Filter, QueryBuilder, SearchBuilder, SimpleQueryBuilder};
55use crate::storage::{ColdStore, HotStore, WriteBuffer};
56use crate::types::{
57    AuroraConfig, Collection, Document, DurabilityMode, FieldDefinition, FieldType, Value,
58};
59use crate::wal::{Operation, WriteAheadLog};
60use dashmap::DashMap;
61use serde_json::Value as JsonValue;
62use serde_json::from_str;
63use std::collections::HashMap;
64use std::fmt;
65use std::fs::File as StdFile;
66use std::path::{Path, PathBuf};
67use std::sync::{Arc, RwLock};
68use std::time::Duration;
69use tokio::fs::File;
70use tokio::fs::read_to_string;
71use tokio::io::AsyncReadExt;
72use tokio::sync::OnceCell;
73use uuid::Uuid;
74
75// Disk location metadata for primary index
76// Instead of storing full Vec<u8> values, we store minimal metadata
77#[derive(Debug, Clone, Copy)]
78struct DiskLocation {
79    size: u32, // Size in bytes (useful for statistics)
80}
81
82impl DiskLocation {
83    fn new(size: usize) -> Self {
84        Self { size: size as u32 }
85    }
86}
87
88// Index types for faster lookups
89type PrimaryIndex = DashMap<String, DiskLocation>;
90type SecondaryIndex = DashMap<String, Vec<String>>;
91
92// Move DataInfo enum outside impl block
93#[derive(Debug)]
94pub enum DataInfo {
95    Data { size: usize, preview: String },
96    Blob { size: usize },
97    Compressed { size: usize },
98}
99
100#[derive(Debug)]
101#[allow(dead_code)]
102enum WalOperation {
103    Put {
104        key: Arc<String>,
105        value: Arc<Vec<u8>>,
106    },
107    Delete {
108        key: String,
109    },
110}
111
112impl DataInfo {
113    pub fn size(&self) -> usize {
114        match self {
115            DataInfo::Data { size, .. } => *size,
116            DataInfo::Blob { size } => *size,
117            DataInfo::Compressed { size } => *size,
118        }
119    }
120}
121
122/// The main database engine
123///
124/// Aurora combines a tiered storage architecture with document-oriented database features:
125/// - Hot tier: In-memory cache for frequently accessed data
126/// - Cold tier: Persistent disk storage for durability
127/// - Primary indices: Fast key-based access
128/// - Secondary indices: Fast field-based queries
129///
130/// # Examples
131///
132/// ```
133/// // Open a database (creates if doesn't exist)
134/// let db = Aurora::open("my_app.db")?;
135///
136/// // Insert a document
137/// let doc_id = db.insert_into("users", vec![
138///     ("name", Value::String("Alice".to_string())),
139///     ("age", Value::Int(32)),
140/// ])?;
141///
142/// // Retrieve a document
143/// let user = db.get_document("users", &doc_id)?;
144/// ```
145pub struct Aurora {
146    hot: HotStore,
147    cold: Arc<ColdStore>,
148    primary_indices: Arc<DashMap<String, PrimaryIndex>>,
149    secondary_indices: Arc<DashMap<String, SecondaryIndex>>,
150    indices_initialized: Arc<OnceCell<()>>,
151    transaction_manager: crate::transaction::TransactionManager,
152    indices: Arc<DashMap<String, Index>>,
153    schema_cache: Arc<DashMap<String, Arc<Collection>>>,
154    config: AuroraConfig,
155    write_buffer: Option<WriteBuffer>,
156    pub pubsub: crate::pubsub::PubSubSystem,
157    // Write-ahead log for durability (optional, based on config)
158    wal: Option<Arc<RwLock<WriteAheadLog>>>,
159    // Background checkpoint task
160    checkpoint_shutdown: Option<tokio::sync::mpsc::UnboundedSender<()>>,
161    // Background compaction task
162    compaction_shutdown: Option<tokio::sync::mpsc::UnboundedSender<()>>,
163    // Worker system for background jobs
164    pub workers: Option<Arc<crate::workers::WorkerSystem>>,
165    //  Asynchronous WAL Writer Channel
166    wal_writer: Option<tokio::sync::mpsc::UnboundedSender<WalOperation>>,
167    // Computed fields manager
168    pub computed: Arc<RwLock<crate::computed::ComputedFields>>,
169}
170
171impl fmt::Debug for Aurora {
172    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173        f.debug_struct("Aurora")
174            .field("hot", &"HotStore")
175            .field("cold", &"ColdStore")
176            .field("primary_indices_count", &self.primary_indices.len())
177            .field("secondary_indices_count", &self.secondary_indices.len())
178            .field(
179                "active_transactions",
180                &self.transaction_manager.active_count(),
181            )
182            .field("indices_count", &self.indices.len())
183            .finish()
184    }
185}
186
187impl Clone for Aurora {
188    fn clone(&self) -> Self {
189        Self {
190            hot: self.hot.clone(),
191            cold: Arc::clone(&self.cold),
192            primary_indices: Arc::clone(&self.primary_indices),
193            secondary_indices: Arc::clone(&self.secondary_indices),
194            indices_initialized: Arc::clone(&self.indices_initialized),
195            transaction_manager: self.transaction_manager.clone(),
196            indices: Arc::clone(&self.indices),
197            schema_cache: Arc::clone(&self.schema_cache),
198            config: self.config.clone(),
199            write_buffer: None,
200            pubsub: self.pubsub.clone(),
201            wal: self.wal.clone(),
202            checkpoint_shutdown: self.checkpoint_shutdown.clone(),
203            compaction_shutdown: self.compaction_shutdown.clone(),
204            workers: self.workers.clone(),
205
206            wal_writer: self.wal_writer.clone(),
207            computed: Arc::clone(&self.computed),
208        }
209    }
210}
211
212/// Trait for converting inputs into execution parameters
213pub trait ToExecParams {
214    fn into_params(self) -> Result<(String, ExecutionOptions)>;
215}
216
217impl<'a> ToExecParams for &'a str {
218    fn into_params(self) -> Result<(String, ExecutionOptions)> {
219        Ok((self.to_string(), ExecutionOptions::default()))
220    }
221}
222
223impl ToExecParams for String {
224    fn into_params(self) -> Result<(String, ExecutionOptions)> {
225        Ok((self, ExecutionOptions::default()))
226    }
227}
228
229impl<'a, V> ToExecParams for (&'a str, V)
230where
231    V: Into<serde_json::Value>,
232{
233    fn into_params(self) -> Result<(String, ExecutionOptions)> {
234        let (aql, vars) = self;
235        let json_vars = vars.into();
236        let map = match json_vars {
237            serde_json::Value::Object(map) => map.into_iter().collect(),
238            _ => {
239                return Err(AqlError::new(
240                    ErrorCode::InvalidInput,
241                    "Variables must be a JSON object",
242                ));
243            }
244        };
245        Ok((
246            aql.to_string(),
247            ExecutionOptions {
248                variables: map,
249                ..Default::default()
250            },
251        ))
252    }
253}
254
255impl Aurora {
256    /// Execute AQL query (variables are optional)
257    ///
258    /// Supports two forms:
259    /// 1. `db.execute("query").await`
260    /// 2. `db.execute(("query", vars)).await`
261    pub async fn execute<I: ToExecParams>(&self, input: I) -> Result<ExecutionResult> {
262        let (aql, options) = input.into_params()?;
263        parser::executor::execute(self, &aql, options).await
264    }
265
266    /// Stream real-time changes using AQL subscription syntax
267    ///
268    /// This is a convenience method that extracts the `ChangeListener` from
269    /// an AQL subscription query, providing a cleaner API than using `execute()` directly.
270    ///
271    /// # Example
272    /// ```
273    /// // Stream changes from active products
274    /// let mut listener = db.stream(r#"
275    ///     subscription {
276    ///         products(where: { active: { eq: true } }) {
277    ///             id
278    ///             name
279    ///         }
280    ///     }
281    /// "#).await?;
282    ///
283    /// // Receive real-time events
284    /// while let Ok(event) = listener.recv().await {
285    ///     println!("Change: {:?} on {}", event.change_type, event.id);
286    /// }
287    /// ```
288    pub async fn stream(&self, aql: &str) -> Result<crate::pubsub::ChangeListener> {
289        let result = self.execute(aql).await?;
290
291        match result {
292            ExecutionResult::Subscription(sub) => sub.stream.ok_or_else(|| {
293                crate::error::AqlError::new(
294                    crate::error::ErrorCode::QueryError,
295                    "Subscription did not return a stream".to_string(),
296                )
297            }),
298            _ => Err(crate::error::AqlError::new(
299                crate::error::ErrorCode::QueryError,
300                "Expected a subscription query, got a different operation type".to_string(),
301            )),
302        }
303    }
304
305    /// Explain AQL query execution plan
306    pub async fn explain<I: ToExecParams>(&self, input: I) -> Result<ExecutionPlan> {
307        let (aql, options) = input.into_params()?;
308
309        // Parse and analyze without executing
310        let doc = parser::parse_with_variables(
311            &aql,
312            serde_json::Value::Object(options.variables.clone().into_iter().collect()),
313        )?;
314
315        self.analyze_execution_plan(&doc).await
316    }
317
318    /// Analyze execution plan for a parsed query
319    pub async fn analyze_execution_plan(
320        &self,
321        doc: &crate::parser::ast::Document,
322    ) -> Result<ExecutionPlan> {
323        let mut operations = Vec::new();
324        for op in &doc.operations {
325            operations.push(format!("{:?}", op));
326        }
327
328        Ok(ExecutionPlan {
329            operations,
330            estimated_cost: 1.0,
331        })
332    }
333    /// Remove stale lock files from a database directory
334    ///
335    /// If Aurora crashes or is forcefully terminated, it may leave behind lock files
336    /// that prevent the database from being reopened. This method safely removes
337    /// those lock files.
338    ///
339    /// # Safety
340    /// Only call this when you're certain no other Aurora instance is using the database.
341    /// Removing lock files while another process is running could cause data corruption.
342    ///
343    /// # Example
344    /// ```no_run
345    /// use aurora_db::Aurora;
346    ///
347    /// // If you get "Access denied" error when opening:
348    /// if let Err(e) = Aurora::open("my_db") {
349    ///     eprintln!("Failed to open: {}", e);
350    ///     // Try removing stale lock
351    ///     if Aurora::remove_stale_lock("my_db").unwrap_or(false) {
352    ///         println!("Removed stale lock, try opening again");
353    ///         let db = Aurora::open("my_db")?;
354    ///     }
355    /// }
356    /// # Ok::<(), aurora_db::error::AqlError>(())
357    /// ```
358    pub fn remove_stale_lock<P: AsRef<Path>>(path: P) -> Result<bool> {
359        let resolved_path = Self::resolve_path(path)?;
360        crate::storage::cold::ColdStore::try_remove_stale_lock(resolved_path.to_str().unwrap())
361    }
362
363    /// Open or create a database at the specified location
364    ///
365    /// # Arguments
366    /// * `path` - Path to the database file or directory
367    ///   - Absolute paths (like `/data/myapp.db`) are used as-is
368    ///   - Relative paths (like `./data/myapp.db`) are resolved relative to the current directory
369    ///   - Simple names (like `myapp.db`) use the current directory
370    ///
371    /// # Returns
372    /// An initialized `Aurora` database instance
373    ///
374    /// # Examples
375    ///
376   
377    /// use aurora_db::Aurora;
378    ///
379    /// let db = Aurora::open("./data/my_application.db")?;
380    ///
381    /// // Or use a relative path
382    /// let db = Aurora::open("customer_data.db")?;
383    /// ```
384    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
385        let config = AuroraConfig {
386            db_path: Self::resolve_path(path)?,
387            ..Default::default()
388        };
389        Self::with_config(config)
390    }
391
392    /// Helper method to resolve database path
393    fn resolve_path<P: AsRef<Path>>(path: P) -> Result<PathBuf> {
394        let path = path.as_ref();
395
396        // If it's an absolute path, use it directly
397        if path.is_absolute() {
398            return Ok(path.to_path_buf());
399        }
400
401        // Otherwise, resolve relative to current directory
402        match std::env::current_dir() {
403            Ok(current_dir) => Ok(current_dir.join(path)),
404            Err(e) => Err(AqlError::new(
405                ErrorCode::IoError,
406                format!("Failed to resolve current directory: {}", e),
407            )),
408        }
409    }
410
411    /// Open a database with custom configuration
412    ///
413    /// # Arguments
414    /// * `config` - Database configuration settings
415    ///
416    /// # Examples
417    ///
418    /// ```
419    /// use aurora_db::{Aurora, types::AuroraConfig};
420    /// use std::time::Duration;
421    ///
422    /// let config = AuroraConfig {
423    ///     db_path: "my_data.db".into(),
424    ///     hot_cache_size_mb: 512,           // 512 MB cache
425    ///     enable_write_buffering: true,     // Batch writes for speed
426    ///     enable_wal: true,                 // Durability
427    ///     auto_compact: true,               // Background compaction
428    ///     compact_interval_mins: 60,        // Compact every hour
429    ///     ..Default::default()
430    /// };
431    ///
432    /// let db = Aurora::with_config(config)?;
433    /// ```
434    pub fn with_config(config: AuroraConfig) -> Result<Self> {
435        let path = Self::resolve_path(&config.db_path)?;
436
437        if config.create_dirs
438            && let Some(parent) = path.parent()
439            && !parent.exists()
440        {
441            std::fs::create_dir_all(parent)?;
442        }
443
444        let cold = Arc::new(ColdStore::with_config(
445            path.to_str().unwrap(),
446            config.cold_cache_capacity_mb,
447            config.cold_flush_interval_ms,
448            config.cold_mode,
449        )?);
450
451        let hot = HotStore::with_config_and_eviction(
452            config.hot_cache_size_mb,
453            config.hot_cache_cleanup_interval_secs,
454            config.eviction_policy,
455        );
456
457        let write_buffer = if config.enable_write_buffering {
458            Some(WriteBuffer::new(
459                Arc::clone(&cold),
460                config.write_buffer_size,
461                config.write_buffer_flush_interval_ms,
462            ))
463        } else {
464            None
465        };
466
467        let auto_compact = config.auto_compact;
468        let enable_wal = config.enable_wal;
469        let pubsub = crate::pubsub::PubSubSystem::new(10000);
470
471        // Initialize WAL
472        let (wal, wal_entries) = if enable_wal {
473            let wal_path = path.to_str().unwrap();
474            match WriteAheadLog::new(wal_path) {
475                Ok(mut wal_log) => {
476                    let entries = wal_log.recover().unwrap_or_else(|_| Vec::new());
477                    (Some(Arc::new(RwLock::new(wal_log))), entries)
478                }
479                Err(_) => (None, Vec::new()),
480            }
481        } else {
482            (None, Vec::new())
483        };
484
485        // --- FIX: Initialize Background WAL Writer ---
486        let wal_writer = if enable_wal {
487            let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
488            let wal_clone = wal.clone();
489
490            // Spawn a single dedicated thread/task for writing WAL sequentially
491            tokio::spawn(async move {
492                if let Some(wal) = wal_clone {
493                    while let Some(op) = rx.recv().await {
494                        // Properly handle the RwLock write guard
495                        match wal.write() {
496                            Ok(mut guard) => {
497                                let result = match op {
498                                    WalOperation::Put { key, value } => {
499                                        // Deref Arc to pass slices
500                                        guard.append(Operation::Put, &key, Some(value.as_ref()))
501                                    }
502                                    WalOperation::Delete { key } => {
503                                        guard.append(Operation::Delete, &key, None)
504                                    }
505                                };
506                                if let Err(e) = result {
507                                    eprintln!("CRITICAL: Failed to append to WAL: {}. Shutting down.", e);
508                                    // A failed WAL write is a catastrophic failure.
509                                    std::process::exit(1);
510                                }
511                            }
512                            Err(e) => {
513                                // This likely means the lock is poisoned, which is a critical state.
514                                eprintln!("CRITICAL: Failed to acquire WAL lock: {}. Shutting down.", e);
515                                std::process::exit(1);
516                            }
517                        }
518                    }
519                }
520            });
521            Some(tx)
522        } else {
523            None
524        };
525
526        let checkpoint_shutdown = if wal.is_some() {
527            let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel();
528            let cold_clone = Arc::clone(&cold);
529            let wal_clone = wal.clone();
530            let checkpoint_interval = config.checkpoint_interval_ms;
531
532            tokio::spawn(async move {
533                let mut interval =
534                    tokio::time::interval(Duration::from_millis(checkpoint_interval));
535                loop {
536                    tokio::select! {
537                        _ = interval.tick() => {
538                            if let Err(e) = cold_clone.flush() {
539                                eprintln!("Background checkpoint flush error: {}", e);
540                            }
541                            if let Some(ref wal) = wal_clone
542                                && let Ok(mut wal_guard) = wal.write() {
543                                    let _ = wal_guard.truncate();
544                                }
545                        }
546                        _ = shutdown_rx.recv() => {
547                            let _ = cold_clone.flush();
548                            if let Some(ref wal) = wal_clone
549                                && let Ok(mut wal_guard) = wal.write() {
550                                    let _ = wal_guard.truncate();
551                                }
552                            break;
553                        }
554                    }
555                }
556            });
557
558            Some(shutdown_tx)
559        } else {
560            None
561        };
562
563        let compaction_shutdown = if auto_compact {
564            let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel();
565            let cold_clone = Arc::clone(&cold);
566            let compact_interval = config.compact_interval_mins;
567
568            tokio::spawn(async move {
569                let mut interval =
570                    tokio::time::interval(Duration::from_secs(compact_interval * 60));
571                loop {
572                    tokio::select! {
573                        _ = interval.tick() => {
574                            if let Err(e) = cold_clone.compact() {
575                                eprintln!("Background compaction error: {}", e);
576                            }
577                        }
578                        _ = shutdown_rx.recv() => {
579                            let _ = cold_clone.compact();
580                            break;
581                        }
582                    }
583                }
584            });
585
586            Some(shutdown_tx)
587        } else {
588            None
589        };
590
591        let workers_path = path.join("workers.db");
592        let worker_config = crate::workers::WorkerConfig {
593            storage_path: workers_path.to_string_lossy().into_owned(),
594            ..Default::default()
595        };
596        let workers = crate::workers::WorkerSystem::new(worker_config)
597            .map(Arc::new)
598            .ok();
599
600        let db = Self {
601            hot,
602            cold,
603            primary_indices: Arc::new(DashMap::new()),
604            secondary_indices: Arc::new(DashMap::new()),
605            indices_initialized: Arc::new(OnceCell::new()),
606            transaction_manager: crate::transaction::TransactionManager::new(),
607            indices: Arc::new(DashMap::new()),
608            schema_cache: Arc::new(DashMap::new()),
609            config,
610            write_buffer,
611            pubsub,
612            wal,
613            checkpoint_shutdown,
614            compaction_shutdown,
615            workers,
616            wal_writer, // Initialize the new channel sender
617            computed: Arc::new(RwLock::new(crate::computed::ComputedFields::new())),
618        };
619
620        if !wal_entries.is_empty() {
621            let handle = tokio::runtime::Handle::try_current()
622                .expect("Tokio runtime must be started to open database");
623            handle.block_on(db.replay_wal(wal_entries))?;
624        }
625
626        Ok(db)
627    }
628    // Lazy index initialization
629    pub async fn ensure_indices_initialized(&self) -> Result<()> {
630        self.indices_initialized
631            .get_or_init(|| async {
632                println!("Initializing indices...");
633                if let Err(e) = self.initialize_indices() {
634                    eprintln!("Failed to initialize indices: {:?}", e);
635                }
636                println!("Indices initialized");
637            })
638            .await;
639        Ok(())
640    }
641
642    fn initialize_indices(&self) -> Result<()> {
643        for result in self.cold.scan() {
644            let (key, value) = result?;
645            let key_str = std::str::from_utf8(key.as_bytes())
646                .map_err(|_| AqlError::new(ErrorCode::InvalidKey, "Invalid UTF-8".to_string()))?;
647
648            if let Some(collection_name) = key_str.split(':').next() {
649                // Skip system/metadata prefixes (e.g., _collection, _sys_migration, _index)
650                if collection_name.starts_with('_') {
651                    continue;
652                }
653                self.index_value(collection_name, key_str, &value)?;
654            }
655        }
656        Ok(())
657    }
658
659    // Fast key-value operations with index support
660    /// Get a value by key (low-level key-value access)
661    ///
662    /// This is the low-level method. For document access, use `get_document()` instead.
663    /// Checks hot cache first, then falls back to cold storage for maximum performance.
664    ///
665    /// # Performance
666    /// - Hot cache hit: ~1M reads/sec (instant)
667    /// - Cold storage: ~500K reads/sec (disk I/O)
668    /// - Cache hit rate: typically 95%+ at scale
669    ///
670    /// # Examples
671    ///
672    /// ```
673    /// // Low-level key-value access
674    /// let data = db.get("users:12345")?;
675    /// if let Some(bytes) = data {
676    ///     let doc: Document = serde_json::from_slice(&bytes)?;
677    ///     println!("Found: {:?}", doc);
678    /// }
679    ///
680    /// // Better: use get_document() for documents
681    /// let user = db.get_document("users", "12345")?;
682    /// ```
683    pub fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
684        // Check hot cache first
685        if let Some(value) = self.hot.get(key) {
686            // Check if this is a blob reference (pointer to cold storage)
687            if value.starts_with(b"BLOBREF:") {
688                // It's a blob ref - fetch actual data from cold storage
689                return self.cold.get(key);
690            }
691            return Ok(Some(value));
692        }
693
694        // Fetch from cold storage
695        let value = self.cold.get(key)?;
696
697        if let Some(v) = &value {
698            if self.should_cache_key(key) {
699                self.hot
700                    .set(Arc::new(key.to_string()), Arc::new(v.clone()), None);
701            }
702        }
703
704        Ok(value)
705    }
706
707    /// Get value with zero-copy Arc reference (10-100x faster than get!)
708    /// Only checks hot cache - returns None if not cached
709    pub fn get_hot_ref(&self, key: &str) -> Option<Arc<Vec<u8>>> {
710        self.hot.get_ref(key)
711    }
712
713    /// Helper to decide if a key should be cached.
714    fn should_cache_key(&self, key: &str) -> bool {
715        if let Some(collection_name) = key.split(':').next() {
716            if !collection_name.starts_with('_') {
717                if let Ok(collection_def) = self.get_collection_definition(collection_name) {
718                    if collection_def
719                        .fields
720                        .values()
721                        .any(|def| def.field_type == FieldType::Any)
722                    {
723                        return false; // Don't cache if collection has Any field
724                    }
725                }
726            }
727        }
728        true // Cache by default
729    }
730
731    /// Get cache statistics
732    ///
733    /// Returns detailed metrics about cache performance including hit/miss rates,
734    /// memory usage, and access patterns. Useful for monitoring, optimization,
735    /// and understanding database performance characteristics.
736    ///
737    /// # Returns
738    /// `CacheStats` struct containing:
739    /// - `hits`: Number of cache hits (data found in memory)
740    /// - `misses`: Number of cache misses (had to read from disk)
741    /// - `hit_rate`: Percentage of requests served from cache (0.0-1.0)
742    /// - `size`: Current number of entries in cache
743    /// - `capacity`: Maximum cache capacity
744    /// - `evictions`: Number of entries evicted due to capacity
745    ///
746    /// # Examples
747    ///
748   
749    /// use aurora_db::Aurora;
750    ///
751    /// let db = Aurora::open("mydb.db")?;
752
753    /// let stats = db.get_cache_stats();
754    /// println!("Cache hit rate: {:.1}%", stats.hit_rate * 100.0);
755    /// println!("Cache size: {} / {} entries", stats.size, stats.capacity);
756    /// println!("Total hits: {}, misses: {}", stats.hits, stats.misses);
757    ///
758    /// // Monitor performance during operations
759    /// let before = db.get_cache_stats();
760    ///
761    /// // Perform many reads
762    /// for i in 0..1000 {
763    ///     db.get_document("users", &format!("user-{}", i))?;
764    /// }
765    ///
766    /// let after = db.get_cache_stats();
767    /// let hit_rate = (after.hits - before.hits) as f64 / 1000.0;
768    /// println!("Read hit rate: {:.1}%", hit_rate * 100.0);
769    ///
770    /// // Performance tuning
771    /// let stats = db.get_cache_stats();
772    /// if stats.hit_rate < 0.80 {
773    ///     println!("Low cache hit rate! Consider:");
774    ///     println!("- Increasing cache size in config");
775    ///     println!("- Prewarming cache with prewarm_cache()");
776    ///     println!("- Reviewing query patterns");
777    /// }
778    ///
779    /// if stats.evictions > stats.size {
780    ///     println!("High eviction rate! Cache may be too small.");
781    ///     println!("Consider increasing cache capacity.");
782    /// }
783    ///
784    /// // Production monitoring
785    /// use std::time::Duration;
786    /// use std::thread;
787    ///
788    /// loop {
789    ///     let stats = db.get_cache_stats();
790    ///
791    ///     // Log to monitoring system
792    ///     if stats.hit_rate < 0.90 {
793    ///         eprintln!("Warning: Cache hit rate dropped to {:.1}%",
794    ///                   stats.hit_rate * 100.0);
795    ///     }
796    ///
797    ///     thread::sleep(Duration::from_secs(60));
798    /// }
799    /// ```
800    ///
801    /// # Typical Performance Metrics
802    /// - **Excellent**: 95%+ hit rate (most reads from memory)
803    /// - **Good**: 80-95% hit rate (acceptable performance)
804    /// - **Poor**: <80% hit rate (consider cache tuning)
805    ///
806    /// # See Also
807    /// - `prewarm_cache()` to improve hit rates by preloading data
808    /// - `Aurora::with_config()` to adjust cache capacity
809    pub fn get_cache_stats(&self) -> crate::storage::hot::CacheStats {
810        self.hot.get_stats()
811    }
812
813    pub fn has_index(&self, collection: &str, field: &str) -> bool {
814        if let Ok(collection_def) = self.get_collection_definition(collection) {
815            if let Some(field_def) = collection_def.fields.get(field) {
816                return field_def.indexed || field_def.unique;
817            }
818        }
819        false
820    }
821
822    pub fn get_ids_from_index(&self, collection: &str, field: &str, value: &Value) -> Vec<String> {
823        let index_key = format!("{}:{}", collection, field);
824        if let Some(index) = self.secondary_indices.get(&index_key) {
825            let value_str = match value {
826                Value::String(s) => s.clone(),
827                _ => value.to_string(),
828            };
829            if let Some(doc_ids) = index.get(&value_str) {
830                // The doc_ids in the secondary index are full keys like "collection:id".
831                // The query logic expects just the "id" part.
832                return doc_ids
833                    .value()
834                    .iter()
835                    .map(|key| key.split(':').nth(1).unwrap_or(key).to_string())
836                    .collect();
837            }
838        }
839        Vec::new()
840    }
841
842    /// Register a computed field definition
843    pub async fn register_computed_field(
844        &self,
845        collection: &str,
846        field: &str,
847        expression: crate::computed::ComputedExpression,
848    ) -> Result<()> {
849        let mut computed = self.computed.write().unwrap();
850        computed.register(collection, field, expression);
851        Ok(())
852    }
853
854    // ============================================
855    // PubSub API - Real-time Change Notifications
856    // ============================================
857
858    /// Listen for real-time changes in a collection
859    ///
860    /// Returns a stream of change events (inserts, updates, deletes) that you can subscribe to.
861    /// Perfect for building reactive UIs, cache invalidation, audit logging, webhooks, and
862    /// data synchronization systems.
863    ///
864    /// # Performance
865    /// - Zero overhead when no listeners are active
866    /// - Events are broadcast to all listeners asynchronously
867    /// - Non-blocking - doesn't slow down write operations
868    /// - Multiple listeners can watch the same collection
869    ///
870    /// # Examples
871    ///
872   
873    /// use aurora_db::{Aurora, types::Value};
874    ///
875    /// let db = Aurora::open("mydb.db")?;
876    ///
877    /// // Basic listener
878    /// let mut listener = db.listen("users");
879    ///
880    /// tokio::spawn(async move {
881    ///     while let Ok(event) = listener.recv().await {
882    ///         match event.change_type {
883    ///             ChangeType::Insert => println!("New user: {:?}", event.document),
884    ///             ChangeType::Update => println!("Updated user: {:?}", event.document),
885    ///             ChangeType::Delete => println!("Deleted user ID: {}", event.id),
886    ///         }
887    ///     }
888    /// });
889    ///
890    /// // Now any insert/update/delete will trigger the listener
891    /// db.insert_into("users", vec![("name", Value::String("Alice".into()))]).await?;
892    /// ```
893    ///
894    /// # Real-World Use Cases
895    ///
896    /// **Cache Invalidation:**
897   
898    /// use std::sync::Arc;
899    /// use tokio::sync::RwLock;
900    /// use std::collections::HashMap;
901    ///
902    /// let cache = Arc::new(RwLock::new(HashMap::new()));
903    /// let cache_clone = Arc::clone(&cache);
904    ///
905    /// let mut listener = db.listen("products");
906    ///
907    /// tokio::spawn(async move {
908    ///     while let Ok(event) = listener.recv().await {
909    ///         // Invalidate cache entry when product changes
910    ///         cache_clone.write().await.remove(&event.id);
911    ///         println!("Cache invalidated for product: {}", event.id);
912    ///     }
913    /// });
914    /// ```
915    ///
916    /// **Webhook Notifications:**
917    /// ```
918    /// let mut listener = db.listen("orders");
919    ///
920    /// tokio::spawn(async move {
921    ///     while let Ok(event) = listener.recv().await {
922    ///         if event.change_type == ChangeType::Insert {
923    ///             // Send webhook for new orders
924    ///             send_webhook("https://api.example.com/webhooks/order", &event).await;
925    ///         }
926    ///     }
927    /// });
928    /// ```
929    ///
930    /// **Audit Logging:**
931    /// ```
932    /// let mut listener = db.listen("sensitive_data");
933    ///
934    /// tokio::spawn(async move {
935    ///     while let Ok(event) = listener.recv().await {
936    ///         // Log all changes to audit trail
937    ///         db.insert_into("audit_log", vec![
938    ///             ("collection", Value::String("sensitive_data".into())),
939    ///             ("action", Value::String(format!("{:?}", event.change_type))),
940    ///             ("document_id", Value::String(event.id.clone())),
941    ///             ("timestamp", Value::String(chrono::Utc::now().to_rfc3339())),
942    ///         ]).await?;
943    ///     }
944    /// });
945    /// ```
946    ///
947    /// **Data Synchronization:**
948    /// ```
949    /// let mut listener = db.listen("users");
950    ///
951    /// tokio::spawn(async move {
952    ///     while let Ok(event) = listener.recv().await {
953    ///         // Sync changes to external system
954    ///         match event.change_type {
955    ///             ChangeType::Insert | ChangeType::Update => {
956    ///                 if let Some(doc) = event.document {
957    ///                     external_api.upsert_user(&doc).await?;
958    ///                 }
959    ///             },
960    ///             ChangeType::Delete => {
961    ///                 external_api.delete_user(&event.id).await?;
962    ///             },
963    ///         }
964    ///     }
965    /// });
966    /// ```
967    ///
968    /// **Real-Time Notifications:**
969    /// ```
970    /// let mut listener = db.listen("messages");
971    ///
972    /// tokio::spawn(async move {
973    ///     while let Ok(event) = listener.recv().await {
974    ///         if event.change_type == ChangeType::Insert {
975    ///             if let Some(msg) = event.document {
976    ///                 // Push notification to connected websockets
977    ///                 if let Some(recipient) = msg.data.get("recipient_id") {
978    ///                     websocket_manager.send_to_user(recipient, &msg).await;
979    ///                 }
980    ///             }
981    ///         }
982    ///     }
983    /// });
984    /// ```
985    ///
986    /// **Filtered Listener:**
987    /// ```
988    /// use aurora_db::pubsub::EventFilter;
989    ///
990    /// // Only listen for inserts
991    /// let mut listener = db.listen("users")
992    ///     .filter(EventFilter::ChangeType(ChangeType::Insert));
993    ///
994    /// // Only listen for documents with specific field value
995    /// let mut listener = db.listen("users")
996    ///     .filter(EventFilter::FieldEquals("role".to_string(), Value::String("admin".into())));
997    /// ```
998    ///
999    /// # Important Notes
1000    /// - Listener stays active until dropped
1001    /// - Events are delivered in order
1002    /// - Each listener has its own event stream
1003    /// - Use filters to reduce unnecessary event processing
1004    /// - Listeners don't affect write performance
1005    ///
1006    /// # See Also
1007    /// - `listen_all()` to listen to all collections
1008    /// - `ChangeListener::filter()` to filter events
1009    /// - `query().watch()` for reactive queries with filtering
1010    pub fn listen(&self, collection: impl Into<String>) -> crate::pubsub::ChangeListener {
1011        self.pubsub.listen(collection)
1012    }
1013
1014    /// Listen for all changes across all collections
1015    ///
1016    /// Returns a stream of change events for every insert, update, and delete
1017    /// operation across the entire database. Useful for global audit logging,
1018    /// replication, and monitoring systems.
1019    ///
1020    /// # Performance
1021    /// - Same performance as single collection listener
1022    /// - Filter events by collection in your handler
1023    /// - Consider using `listen(collection)` if only watching specific collections
1024    ///
1025    /// # Examples
1026    ///
1027    /// ```
1028    /// use aurora_db::Aurora;
1029    ///
1030    /// let db = Aurora::open("mydb.db")?;
1031    ///
1032    /// // Listen to everything
1033    /// let mut listener = db.listen_all();
1034    ///
1035    /// tokio::spawn(async move {
1036    ///     while let Ok(event) = listener.recv().await {
1037    ///         println!("Change in {}: {:?}", event.collection, event.change_type);
1038    ///     }
1039    /// });
1040    /// ```
1041    ///
1042    /// # Real-World Use Cases
1043    ///
1044    /// **Global Audit Trail:**
1045    /// ```
1046    /// let mut listener = db.listen_all();
1047    ///
1048    /// tokio::spawn(async move {
1049    ///     while let Ok(event) = listener.recv().await {
1050    ///         // Log every database change
1051    ///         audit_logger.log(AuditEntry {
1052    ///             timestamp: chrono::Utc::now(),
1053    ///             collection: event.collection,
1054    ///             action: event.change_type,
1055    ///             document_id: event.id,
1056    ///             user_id: get_current_user_id(),
1057    ///         }).await;
1058    ///     }
1059    /// });
1060    /// ```
1061    ///
1062    /// **Database Replication:**
1063    /// ```
1064    /// let mut listener = db.listen_all();
1065    ///
1066    /// tokio::spawn(async move {
1067    ///     while let Ok(event) = listener.recv().await {
1068    ///         // Replicate to secondary database
1069    ///         replica_db.apply_change(event).await?;
1070    ///     }
1071    /// });
1072    /// ```
1073    ///
1074    /// **Change Data Capture (CDC):**
1075    /// ```
1076    /// let mut listener = db.listen_all();
1077    ///
1078    /// tokio::spawn(async move {
1079    ///     while let Ok(event) = listener.recv().await {
1080    ///         // Stream changes to Kafka/RabbitMQ
1081    ///         kafka_producer.send(
1082    ///             &format!("cdc.{}", event.collection),
1083    ///             serde_json::to_string(&event)?
1084    ///         ).await?;
1085    ///     }
1086    /// });
1087    /// ```
1088    ///
1089    /// **Monitoring & Metrics:**
1090    /// ```
1091    /// use std::sync::atomic::{AtomicUsize, Ordering};
1092    ///
1093    /// let write_counter = Arc::new(AtomicUsize::new(0));
1094    /// let counter_clone = Arc::clone(&write_counter);
1095    ///
1096    /// let mut listener = db.listen_all();
1097    ///
1098    /// tokio::spawn(async move {
1099    ///     while let Ok(_event) = listener.recv().await {
1100    ///         counter_clone.fetch_add(1, Ordering::Relaxed);
1101    ///     }
1102    /// });
1103    ///
1104    /// // Report metrics every 60 seconds
1105    /// tokio::spawn(async move {
1106    ///     loop {
1107    ///         tokio::time::sleep(Duration::from_secs(60)).await;
1108    ///         let count = write_counter.swap(0, Ordering::Relaxed);
1109    ///         println!("Writes per minute: {}", count);
1110    ///     }
1111    /// });
1112    /// ```
1113    ///
1114    /// **Selective Processing:**
1115    /// ```
1116    /// let mut listener = db.listen_all();
1117    ///
1118    /// tokio::spawn(async move {
1119    ///     while let Ok(event) = listener.recv().await {
1120    ///         // Handle different collections differently
1121    ///         match event.collection.as_str() {
1122    ///             "users" => handle_user_change(event).await,
1123    ///             "orders" => handle_order_change(event).await,
1124    ///             "payments" => handle_payment_change(event).await,
1125    ///             _ => {} // Ignore others
1126    ///         }
1127    ///     }
1128    /// });
1129    /// ```
1130    ///
1131    /// # When to Use
1132    /// - Global audit logging
1133    /// - Database replication
1134    /// - Change data capture (CDC)
1135    /// - Monitoring and metrics
1136    /// - Event sourcing systems
1137    ///
1138    /// # When NOT to Use
1139    /// - Only need to watch 1-2 collections → Use `listen(collection)` instead
1140    /// - High write volume with selective interest → Use collection-specific listeners
1141    /// - Need complex filtering → Use `query().watch()` instead
1142    ///
1143    /// # See Also
1144    /// - `listen()` for single collection listening
1145    /// - `listener_count()` to check active listeners
1146    /// - `query().watch()` for filtered reactive queries
1147    pub fn listen_all(&self) -> crate::pubsub::ChangeListener {
1148        self.pubsub.listen_all()
1149    }
1150
1151    /// Get the number of active listeners for a collection
1152    pub fn listener_count(&self, collection: &str) -> usize {
1153        self.pubsub.listener_count(collection)
1154    }
1155
1156    /// Get total number of active listeners
1157    pub fn total_listeners(&self) -> usize {
1158        self.pubsub.total_listeners()
1159    }
1160
1161    /// Flushes all buffered writes to disk to ensure durability.
1162    ///
1163    /// This method forces all pending writes from:
1164    /// - Write buffer (if enabled)
1165    /// - Cold storage internal buffers
1166    /// - Write-ahead log (if enabled)
1167    ///
1168    /// Call this when you need to ensure data persistence before
1169    /// a critical operation or shutdown. After flush() completes,
1170    /// all data is guaranteed to be on disk even if power fails.
1171    ///
1172    /// # Performance
1173    /// - Flush time: ~10-50ms depending on buffered data
1174    /// - Triggers OS-level fsync() for durability guarantee
1175    /// - Truncates WAL after successful flush
1176    /// - Not needed for every write (WAL provides durability)
1177    ///
1178    /// # Examples
1179    ///
1180    /// ```
1181    /// use aurora_db::Aurora;
1182    ///
1183    /// let db = Aurora::open("mydb.db")?;
1184    ///
1185    /// // Basic flush after critical write
1186    /// db.insert_into("users", data).await?;
1187    /// db.flush()?;  // Ensure data is persisted to disk
1188    ///
1189    /// // Graceful shutdown pattern
1190    /// fn shutdown(db: &Aurora) -> Result<()> {
1191    ///     println!("Flushing pending writes...");
1192    ///     db.flush()?;
1193    ///     println!("Shutdown complete - all data persisted");
1194    ///     Ok(())
1195    /// }
1196    ///
1197    /// // Periodic checkpoint pattern
1198    /// use std::time::Duration;
1199    /// use std::thread;
1200    ///
1201    /// let db = db.clone();
1202    /// thread::spawn(move || {
1203    ///     loop {
1204    ///         thread::sleep(Duration::from_secs(60));
1205    ///         if let Err(e) = db.flush() {
1206    ///             eprintln!("Flush error: {}", e);
1207    ///         } else {
1208    ///             println!("Checkpoint: data flushed to disk");
1209    ///         }
1210    ///     }
1211    /// });
1212    ///
1213    /// // Critical transaction pattern
1214    /// let tx_id = db.begin_transaction();
1215    ///
1216    /// // Multiple operations
1217    /// db.insert_into("orders", order_data).await?;
1218    /// db.update_document("inventory", product_id, updates).await?;
1219    /// db.insert_into("audit_log", audit_data).await?;
1220    ///
1221    /// // Commit and flush immediately
1222    /// db.commit_transaction(tx_id)?;
1223    /// db.flush()?;  // Critical: ensure transaction is on disk
1224    ///
1225    /// // Backup preparation
1226    /// println!("Preparing backup...");
1227    /// db.flush()?;  // Ensure all data is written
1228    /// std::fs::copy("mydb.db", "backup.db")?;
1229    /// println!("Backup complete");
1230    /// ```
1231    ///
1232    /// # When to Use
1233    /// - Before graceful shutdown
1234    /// - After critical transactions
1235    /// - Before creating backups
1236    /// - Periodic checkpoints (every 30-60 seconds)
1237    /// - Before risky operations
1238    ///
1239    /// # When NOT to Use
1240    /// - After every single write (too slow, WAL provides durability)
1241    /// - In high-throughput loops (batch instead)
1242    /// - When durability mode is already Immediate
1243    ///
1244    /// # Important Notes
1245    /// - WAL provides durability even without explicit flush()
1246    /// - flush() adds latency (~10-50ms) so use strategically
1247    /// - Automatic flush happens during graceful shutdown
1248    /// - After flush(), WAL is truncated (data is in main storage)
1249    ///
1250    /// # See Also
1251    /// - `Aurora::with_config()` to set durability mode
1252    /// - WAL (Write-Ahead Log) provides durability without explicit flushes
1253    pub fn flush(&self) -> Result<()> {
1254        // Flush write buffer if present
1255        if let Some(ref write_buffer) = self.write_buffer {
1256            write_buffer.flush()?;
1257        }
1258
1259        // Flush cold storage
1260        self.cold.flush()?;
1261
1262        // Truncate WAL after successful flush (data is now in cold storage)
1263        if let Some(ref wal) = self.wal
1264            && let Ok(mut wal_lock) = wal.write()
1265        {
1266            wal_lock.truncate()?;
1267        }
1268
1269        Ok(())
1270    }
1271
1272    /// Store a key-value pair (low-level storage)
1273    ///
1274    /// This is the low-level method. For documents, use `insert_into()` instead.
1275    /// Writes are buffered and batched for performance.
1276    ///
1277    /// # Arguments
1278    /// * `key` - Unique key (format: "collection:id" for documents)
1279    /// * `value` - Raw bytes to store
1280    /// * `ttl` - Optional time-to-live (None = permanent)
1281    ///
1282    /// # Performance
1283    /// - Buffered writes: ~15-30K docs/sec
1284    /// - Batching improves throughput significantly
1285    /// - Call `flush()` to ensure data is persisted
1286    ///
1287    /// # Examples
1288    ///
1289    /// ```
1290    /// use std::time::Duration;
1291    ///
1292    /// // Permanent storage
1293    /// let data = serde_json::to_vec(&my_struct)?;
1294    /// db.put("mykey".to_string(), data, None)?;
1295    ///
1296    /// // With TTL (expires after 1 hour)
1297    /// db.put("session:abc".to_string(), session_data, Some(Duration::from_secs(3600)))?;
1298    ///
1299    /// // Better: use insert_into() for documents
1300    /// db.insert_into("users", vec![("name", Value::String("Alice".into()))])?;
1301    /// ```
1302    pub async fn put(&self, key: String, value: Vec<u8>, ttl: Option<Duration>) -> Result<()> {
1303        const MAX_BLOB_SIZE: usize = 50 * 1024 * 1024;
1304
1305        if value.len() > MAX_BLOB_SIZE {
1306            return Err(AqlError::invalid_operation(format!(
1307                "Blob size {} exceeds maximum allowed size of {}MB",
1308                value.len() / (1024 * 1024),
1309                MAX_BLOB_SIZE / (1024 * 1024)
1310            )));
1311        }
1312
1313        // --- OPTIMIZATION: Wrap in Arc ONCE ---
1314        let key_arc = Arc::new(key);
1315        let value_arc = Arc::new(value);
1316
1317        // Check if this is a blob (blobs bypass write buffer and hot cache)
1318        let is_blob = value_arc.starts_with(b"BLOB:");
1319
1320        // --- 1. WAL Write (Non-blocking send of Arcs) ---
1321        if let Some(ref sender) = self.wal_writer
1322            && self.config.durability_mode != DurabilityMode::None
1323        {
1324            let _ = sender.send(WalOperation::Put {
1325                key: Arc::clone(&key_arc),
1326                value: Arc::clone(&value_arc),
1327            });
1328        }
1329
1330        // Check if this key should be cached (false for Any-field collections)
1331        let should_cache = self.should_cache_key(&key_arc);
1332
1333        // --- 2. Cold Store Write ---
1334        if is_blob || !should_cache {
1335            // Blobs and Any-field docs write directly to cold storage
1336            // (blobs: avoid memory pressure, Any-fields: ensure immediate queryability)
1337            self.cold.set(key_arc.to_string(), value_arc.to_vec())?;
1338        } else if let Some(ref write_buffer) = self.write_buffer {
1339            write_buffer.write(Arc::clone(&key_arc), Arc::clone(&value_arc))?;
1340        } else {
1341            self.cold.set(key_arc.to_string(), value_arc.to_vec())?;
1342        }
1343
1344        // --- 3. Hot Cache Write ---
1345        if should_cache {
1346            if is_blob {
1347                // For blobs: cache only a lightweight reference (not the actual data)
1348                // Format: "BLOBREF:<size>" - just 16-24 bytes instead of potentially MB
1349                let blob_ref = format!("BLOBREF:{}", value_arc.len());
1350                self.hot
1351                    .set(Arc::clone(&key_arc), Arc::new(blob_ref.into_bytes()), ttl);
1352            } else {
1353                self.hot
1354                    .set(Arc::clone(&key_arc), Arc::clone(&value_arc), ttl);
1355            }
1356        }
1357
1358        // --- 4. Indexing (skip for blobs and system keys) ---
1359        if !is_blob {
1360            if let Some(collection_name) = key_arc.split(':').next()
1361                && !collection_name.starts_with('_')
1362            {
1363                self.index_value(collection_name, &key_arc, &value_arc)?;
1364            }
1365        }
1366
1367        Ok(())
1368    }
1369    /// Replay WAL entries to recover from crash
1370    ///
1371    /// Handles transaction boundaries:
1372    /// - Operations within a committed transaction are applied
1373    /// - Operations within a rolled-back transaction are discarded
1374    /// - Operations within an uncommitted transaction (crash during tx) are discarded
1375    async fn replay_wal(&self, entries: Vec<crate::wal::LogEntry>) -> Result<()> {
1376        // Buffer for operations within a transaction
1377        let mut tx_buffer: Vec<crate::wal::LogEntry> = Vec::new();
1378        let mut in_transaction = false;
1379
1380        for entry in entries {
1381            match entry.operation {
1382                Operation::BeginTx => {
1383                    // Start buffering operations
1384                    in_transaction = true;
1385                    tx_buffer.clear();
1386                }
1387                Operation::CommitTx => {
1388                    // Apply all buffered operations
1389                    for buffered_entry in tx_buffer.drain(..) {
1390                        self.apply_wal_entry(buffered_entry).await?;
1391                    }
1392                    in_transaction = false;
1393                }
1394                Operation::RollbackTx => {
1395                    // Discard buffered operations
1396                    tx_buffer.clear();
1397                    in_transaction = false;
1398                }
1399                Operation::Put | Operation::Delete => {
1400                    if in_transaction {
1401                        // Buffer the operation for later
1402                        tx_buffer.push(entry);
1403                    } else {
1404                        // Apply immediately (not in transaction)
1405                        self.apply_wal_entry(entry).await?;
1406                    }
1407                }
1408            }
1409        }
1410
1411        // If we end with in_transaction = true, it means we crashed mid-transaction
1412        // Those operations in tx_buffer are discarded (not committed)
1413        if in_transaction {
1414            eprintln!(
1415                "WAL replay: Discarding {} uncommitted transaction operations",
1416                tx_buffer.len()
1417            );
1418        }
1419
1420        // Flush after replay and truncate WAL
1421        self.cold.flush()?;
1422        if let Some(ref wal) = self.wal {
1423            wal.write().unwrap().truncate()?;
1424        }
1425
1426        Ok(())
1427    }
1428
1429    /// Apply a single WAL entry to storage
1430    async fn apply_wal_entry(&self, entry: crate::wal::LogEntry) -> Result<()> {
1431        match entry.operation {
1432            Operation::Put => {
1433                if let Some(value) = entry.value {
1434                    // Write directly to cold storage (skip WAL, already logged)
1435                    self.cold.set(entry.key.clone(), value.clone())?;
1436
1437                    // Update hot cache
1438                    if self.should_cache_key(&entry.key) {
1439                        self.hot
1440                            .set(Arc::new(entry.key.clone()), Arc::new(value.clone()), None);
1441                    }
1442
1443                    // Rebuild indices
1444                    if let Some(collection) = entry.key.split(':').next()
1445                        && !collection.starts_with('_')
1446                    {
1447                        self.index_value(collection, &entry.key, &value)?;
1448                    }
1449                }
1450            }
1451            Operation::Delete => {
1452                // Remove from indices before deleting the data
1453                if let Some(collection) = entry.key.split(':').next()
1454                    && !collection.starts_with('_')
1455                {
1456                    let _ = self.remove_from_index(collection, &entry.key);
1457                }
1458                self.cold.delete(&entry.key)?;
1459                self.hot.delete(&entry.key);
1460            }
1461            _ => {} // Transaction markers handled in replay_wal
1462        }
1463        Ok(())
1464    }
1465
1466    fn ensure_schema_hot(&self, collection: &str) -> Result<Arc<Collection>> {
1467        // 1. Check the high-performance object cache first (O(1))
1468        if let Some(schema) = self.schema_cache.get(collection) {
1469            return Ok(schema.value().clone());
1470        }
1471
1472        let collection_key = format!("_collection:{}", collection);
1473
1474        // 2. Fallback to Hot Byte Cache (parse if found)
1475        if let Some(data) = self.hot.get(&collection_key) {
1476            return serde_json::from_slice::<Collection>(&data)
1477                .map(|s| {
1478                    let arc_s = Arc::new(s);
1479                    // Populate object cache to avoid this parse next time
1480                    self.schema_cache
1481                        .insert(collection.to_string(), arc_s.clone());
1482                    arc_s
1483                })
1484                .map_err(|_| {
1485                    AqlError::new(
1486                        ErrorCode::SchemaError,
1487                        "Failed to parse schema from hot cache",
1488                    )
1489                });
1490        }
1491
1492        // 3. Fallback to Cold Storage
1493        if let Some(data) = self.get(&collection_key)? {
1494            // Update Hot Cache
1495            self.hot.set(
1496                Arc::new(collection_key.clone()),
1497                Arc::new(data.clone()),
1498                None,
1499            );
1500
1501            // Parse and update Schema Cache
1502            let schema = serde_json::from_slice::<Collection>(&data)?;
1503            let arc_schema = Arc::new(schema);
1504            self.schema_cache
1505                .insert(collection.to_string(), arc_schema.clone());
1506
1507            return Ok(arc_schema);
1508        }
1509
1510        Err(AqlError::new(
1511            ErrorCode::SchemaError,
1512            format!("Failed to load schema for collection '{}'", collection),
1513        ))
1514    }
1515
1516    fn index_value(&self, collection: &str, key: &str, value: &[u8]) -> Result<()> {
1517        // Update primary index with metadata only
1518        let location = DiskLocation::new(value.len());
1519        self.primary_indices
1520            .entry(collection.to_string())
1521            .or_default()
1522            .insert(key.to_string(), location);
1523
1524        // Use the optimized helper (Fast path via schema_cache)
1525        let collection_obj = self.ensure_schema_hot(collection)?;
1526
1527        // Build list of fields that should be indexed
1528        let indexed_fields: Vec<String> = collection_obj
1529            .fields
1530            .iter()
1531            .filter(|(_, def)| def.indexed || def.unique)
1532            .map(|(name, _)| name.clone())
1533            .collect();
1534
1535        if indexed_fields.is_empty() {
1536            return Ok(());
1537        }
1538
1539        // Update secondary indices
1540        if let Ok(doc) = serde_json::from_slice::<Document>(value) {
1541            for (field, field_value) in doc.data {
1542                if !indexed_fields.contains(&field) {
1543                    continue;
1544                }
1545
1546                let value_str = match &field_value {
1547                    Value::String(s) => s.clone(),
1548                    _ => field_value.to_string(),
1549                };
1550
1551                let index_key = format!("{}:{}", collection, field);
1552                let secondary_index = self.secondary_indices.entry(index_key).or_default();
1553
1554                let max_entries = self.config.max_index_entries_per_field;
1555
1556                secondary_index
1557                    .entry(value_str)
1558                    .and_modify(|doc_ids| {
1559                        if doc_ids.len() < max_entries {
1560                            doc_ids.push(key.to_string());
1561                        }
1562                    })
1563                    .or_insert_with(|| vec![key.to_string()]);
1564            }
1565        }
1566        Ok(())
1567    }
1568
1569    /// Remove a document from all indices (called during delete operations)
1570    fn remove_from_index(&self, collection: &str, key: &str) -> Result<()> {
1571        // Remove from primary index
1572        if let Some(primary_index) = self.primary_indices.get_mut(collection) {
1573            primary_index.remove(key);
1574        }
1575
1576        // Get the document data to know which secondary index entries to remove
1577        // Try cold storage first since we may be replaying WAL
1578        let doc_data = match self.cold.get(key) {
1579            Ok(Some(data)) => Some(data),
1580            _ => self.hot.get(key),
1581        };
1582
1583        // If we have the document data, remove from secondary indices
1584        if let Some(data) = doc_data {
1585            if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
1586                for (field, field_value) in &doc.data {
1587                    let value_str = match field_value {
1588                        Value::String(s) => s.clone(),
1589                        _ => field_value.to_string(),
1590                    };
1591
1592                    let index_key = format!("{}:{}", collection, field);
1593                    if let Some(secondary_index) = self.secondary_indices.get_mut(&index_key) {
1594                        if let Some(mut doc_ids) = secondary_index.get_mut(&value_str) {
1595                            doc_ids.retain(|id| id != key);
1596                            // If empty, we could remove the entry but it's not strictly necessary
1597                        }
1598                    }
1599                }
1600            }
1601        }
1602
1603        Ok(())
1604    }
1605
1606    /// Scan collection with filter and early termination support
1607    /// Used by QueryBuilder for optimized queries with LIMIT
1608    pub fn scan_and_filter<F>(
1609        &self,
1610        collection: &str,
1611        filter: F,
1612        limit: Option<usize>,
1613    ) -> Result<Vec<Document>>
1614    where
1615        F: Fn(&Document) -> bool,
1616    {
1617        let mut documents = Vec::new();
1618
1619        if let Some(index) = self.primary_indices.get(collection) {
1620            for entry in index.iter() {
1621                // Early termination
1622                if let Some(max) = limit {
1623                    if documents.len() >= max {
1624                        break;
1625                    }
1626                }
1627
1628                let key = entry.key();
1629                if let Some(data) = self.get(key)? {
1630                    if let Ok(mut doc) = serde_json::from_slice::<Document>(&data) {
1631                        // Apply computed fields
1632                        if let Ok(computed) = self.computed.read() {
1633                            let _ = computed.apply(collection, &mut doc);
1634                        }
1635
1636                        if filter(&doc) {
1637                            documents.push(doc);
1638                        }
1639                    }
1640                }
1641            }
1642        } else {
1643            // Fallback
1644            documents = self.scan_collection(collection)?;
1645            documents.retain(|doc| filter(doc));
1646            if let Some(max) = limit {
1647                documents.truncate(max);
1648            }
1649        }
1650
1651        Ok(documents)
1652    }
1653
1654    /// Smart collection scan that uses the primary index as a key directory
1655    /// Avoids forced flushes and leverages hot cache for better performance
1656    fn scan_collection(&self, collection: &str) -> Result<Vec<Document>> {
1657        let mut documents = Vec::new();
1658
1659        // Use the primary index as a "key directory" - it contains all document keys
1660        if let Some(index) = self.primary_indices.get(collection) {
1661            // Iterate through all keys in the primary index (fast, in-memory)
1662            for entry in index.iter() {
1663                let key = entry.key();
1664
1665                // Fetch document via hot cache -> cold storage fallback
1666                if let Some(data) = self.get(key)? {
1667                    if let Ok(mut doc) = serde_json::from_slice::<Document>(&data) {
1668                        // Apply computed fields
1669                        if let Ok(computed) = self.computed.read() {
1670                            let _ = computed.apply(collection, &mut doc);
1671                        }
1672                        documents.push(doc);
1673                    }
1674                }
1675            }
1676        } else {
1677            // Fallback: scan from cold storage if primary index not yet initialized
1678            let prefix = format!("{}:", collection);
1679            for result in self.cold.scan_prefix(&prefix) {
1680                if let Ok((_key, value)) = result {
1681                    if let Ok(mut doc) = serde_json::from_slice::<Document>(&value) {
1682                        // Apply computed fields
1683                        if let Ok(computed) = self.computed.read() {
1684                            let _ = computed.apply(collection, &mut doc);
1685                        }
1686                        documents.push(doc);
1687                    }
1688                }
1689            }
1690        }
1691
1692        Ok(documents)
1693    }
1694
1695    // Restore missing methods
1696    pub async fn put_blob(&self, key: String, file_path: &Path) -> Result<()> {
1697        const MAX_FILE_SIZE: usize = 50 * 1024 * 1024; // 50MB limit
1698
1699        // Get file metadata to check size before reading
1700        let metadata = tokio::fs::metadata(file_path).await?;
1701        let file_size = metadata.len() as usize;
1702
1703        if file_size > MAX_FILE_SIZE {
1704            return Err(AqlError::invalid_operation(format!(
1705                "File size {} MB exceeds maximum allowed size of {} MB",
1706                file_size / (1024 * 1024),
1707                MAX_FILE_SIZE / (1024 * 1024)
1708            )));
1709        }
1710
1711        let mut file = File::open(file_path).await?;
1712        let mut buffer = Vec::new();
1713        file.read_to_end(&mut buffer).await?;
1714
1715        // Add BLOB: prefix to mark this as blob data
1716        let mut blob_data = Vec::with_capacity(5 + buffer.len());
1717        blob_data.extend_from_slice(b"BLOB:");
1718        blob_data.extend_from_slice(&buffer);
1719
1720        self.put(key, blob_data, None).await
1721    }
1722
1723    /// Create a new collection with schema definition
1724    ///
1725    /// Collections are like tables in SQL - they define the structure of your documents.
1726    /// The third boolean parameter indicates if the field should be indexed for fast lookups.
1727    ///
1728    /// # Arguments
1729    /// * `name` - Collection name
1730    /// * `fields` - Vector of (field_name, field_type, indexed) tuples
1731    ///   - Field name (accepts both &str and String)
1732    ///   - Field type (String, Int, Float, Bool, etc.)
1733    ///   - Indexed: true for fast lookups, false for no index
1734    ///
1735    /// # Performance
1736    /// - Indexed fields: Fast equality queries (O(1) lookup)
1737    /// - Non-indexed fields: Full scan required for queries
1738    /// - Unique fields are automatically indexed
1739    ///
1740    /// # Examples
1741    ///
1742    /// ```
1743    /// use aurora_db::{Aurora, types::FieldType};
1744    ///
1745    /// let db = Aurora::open("mydb.db")?;
1746    ///
1747    /// // Create a users collection
1748    /// db.new_collection("users", vec![
1749    ///     ("name", FieldType::String, false),      // Not indexed
1750    ///     ("email", FieldType::String, true),      // Indexed - fast lookups
1751    ///     ("age", FieldType::Int, false),
1752    ///     ("active", FieldType::Bool, true),       // Indexed
1753    ///     ("score", FieldType::Float, false),
1754    /// ])?;
1755    ///
1756    /// // Idempotent - calling again is safe
1757    /// db.new_collection("users", vec![/* ... */])?.await; // OK!
1758    /// ```
1759    pub async fn new_collection<S: Into<String>>(
1760        &self,
1761        name: &str,
1762        fields: Vec<(S, FieldType, bool)>,
1763    ) -> Result<()> {
1764        let collection_key = format!("_collection:{}", name);
1765
1766        // Check if collection already exists - if so, just return Ok (idempotent)
1767        if self.get(&collection_key)?.is_some() {
1768            return Ok(());
1769        }
1770
1771        // Create field definitions
1772        let mut field_definitions = HashMap::new();
1773        for (field_name, field_type, unique) in fields {
1774            if field_type == FieldType::Any && unique {
1775                return Err(AqlError::new(
1776                    ErrorCode::InvalidDefinition,
1777                    "Fields of type 'Any' cannot be unique or indexed.".to_string(),
1778                ));
1779            }
1780            field_definitions.insert(
1781                field_name.into(),
1782                FieldDefinition {
1783                    field_type,
1784                    unique,
1785                    indexed: unique, // Auto-index unique fields
1786                },
1787            );
1788        }
1789
1790        let collection = Collection {
1791            name: name.to_string(),
1792            fields: field_definitions,
1793            // REMOVED: unique_fields is now derived from fields
1794        };
1795
1796        let collection_data = serde_json::to_vec(&collection)?;
1797        self.put(collection_key, collection_data, None).await?;
1798
1799        // Invalidate schema cache since we just created/updated the collection schema
1800        self.schema_cache.remove(name);
1801
1802        Ok(())
1803    }
1804
1805    /// Insert a document into a collection
1806    ///
1807    /// Automatically generates a UUID for the document and validates against
1808    /// collection schema and unique constraints. Returns the generated document ID.
1809    ///
1810    /// # Performance
1811    /// - Single insert: ~15,000 docs/sec
1812    /// - Bulk insert: Use `batch_insert()` for 10+ documents (~50,000 docs/sec)
1813    /// - Triggers PubSub events for real-time listeners
1814    ///
1815    /// # Arguments
1816    /// * `collection` - Name of the collection to insert into
1817    /// * `data` - Document fields and values to insert
1818    ///
1819    /// # Returns
1820    /// The auto-generated ID of the inserted document or an error
1821    ///
1822    /// # Errors
1823    /// - `CollectionNotFound`: Collection doesn't exist
1824    /// - `ValidationError`: Data violates schema or unique constraints
1825    /// - `SerializationError`: Invalid data format
1826    ///
1827    /// # Examples
1828    ///
1829   
1830    /// use aurora_db::{Aurora, types::Value};
1831    ///
1832    /// let db = Aurora::open("mydb.db")?;
1833    ///
1834    /// // Basic insertion
1835    /// let user_id = db.insert_into("users", vec![
1836    ///     ("name", Value::String("Alice Smith".to_string())),
1837    ///     ("email", Value::String("alice@example.com".to_string())),
1838    ///     ("age", Value::Int(28)),
1839    ///     ("active", Value::Bool(true)),
1840    /// ]).await?;
1841    ///
1842    /// println!("Created user with ID: {}", user_id);
1843    ///
1844    /// // Inserting with nested data
1845    /// let order_id = db.insert_into("orders", vec![
1846    ///     ("user_id", Value::String(user_id.clone())),
1847    ///     ("total", Value::Float(99.99)),
1848    ///     ("status", Value::String("pending".to_string())),
1849    ///     ("items", Value::Array(vec![
1850    ///         Value::String("item-123".to_string()),
1851    ///         Value::String("item-456".to_string()),
1852    ///     ])),
1853    /// ]).await?;
1854    ///
1855    /// // Error handling - unique constraint violation
1856    /// match db.insert_into("users", vec![
1857    ///     ("email", Value::String("alice@example.com".to_string())),  // Duplicate!
1858    ///     ("name", Value::String("Alice Clone".to_string())),
1859    /// ]).await {
1860    ///     Ok(id) => println!("Inserted: {}", id),
1861    ///     Err(e) => println!("Failed: {} (email already exists)", e),
1862    /// }
1863    ///
1864    /// // For bulk inserts (10+ documents), use batch_insert() instead
1865    /// let users = vec![
1866    ///     HashMap::from([
1867    ///         ("name".to_string(), Value::String("Bob".to_string())),
1868    ///         ("email".to_string(), Value::String("bob@example.com".to_string())),
1869    ///     ]),
1870    ///     HashMap::from([
1871    ///         ("name".to_string(), Value::String("Carol".to_string())),
1872    ///         ("email".to_string(), Value::String("carol@example.com".to_string())),
1873    ///     ]),
1874    ///     // ... more documents
1875    /// ];
1876    /// let ids = db.batch_insert("users", users).await?;  // 3x faster!
1877    /// println!("Inserted {} users", ids.len());
1878    /// ```
1879    pub async fn insert_into(&self, collection: &str, data: Vec<(&str, Value)>) -> Result<String> {
1880        // Convert Vec<(&str, Value)> to HashMap<String, Value>
1881        let data_map: HashMap<String, Value> =
1882            data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
1883
1884        // Validate unique constraints before inserting
1885        self.validate_unique_constraints(collection, &data_map)
1886            .await?;
1887
1888        let doc_id = Uuid::now_v7().to_string();
1889        let document = Document {
1890            id: doc_id.clone(),
1891            data: data_map,
1892        };
1893
1894        self.put(
1895            format!("{}:{}", collection, doc_id),
1896            serde_json::to_vec(&document)?,
1897            None,
1898        )
1899        .await?;
1900
1901        // Publish insert event
1902        let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
1903        let _ = self.pubsub.publish(event);
1904
1905        Ok(doc_id)
1906    }
1907
1908    pub async fn insert_map(
1909        &self,
1910        collection: &str,
1911        data: HashMap<String, Value>,
1912    ) -> Result<String> {
1913        // Validate unique constraints before inserting
1914        self.validate_unique_constraints(collection, &data).await?;
1915
1916        let doc_id = Uuid::now_v7().to_string();
1917        let document = Document {
1918            id: doc_id.clone(),
1919            data,
1920        };
1921
1922        self.put(
1923            format!("{}:{}", collection, doc_id),
1924            serde_json::to_vec(&document)?,
1925            None,
1926        )
1927        .await?;
1928
1929        // Publish insert event
1930        let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
1931        let _ = self.pubsub.publish(event);
1932
1933        Ok(doc_id)
1934    }
1935
1936    /// Batch insert multiple documents with optimized write path
1937    ///
1938    /// Inserts multiple documents in a single optimized operation, bypassing
1939    /// the write buffer for better performance. Ideal for bulk data loading,
1940    /// migrations, or initial database seeding. 3x faster than individual inserts.
1941    ///
1942    /// # Performance
1943    /// - Insert speed: ~50,000 docs/sec (vs ~15,000 for single inserts)
1944    /// - Batch writes to WAL and storage
1945    /// - Validates all unique constraints
1946    /// - Use for 10+ documents minimum
1947    ///
1948    /// # Arguments
1949    /// * `collection` - Name of the collection to insert into
1950    /// * `documents` - Vector of document data as HashMaps
1951    ///
1952    /// # Returns
1953    /// Vector of auto-generated document IDs or an error
1954    ///
1955    /// # Examples
1956    ///
1957   
1958    /// use aurora_db::{Aurora, types::Value};
1959    /// use std::collections::HashMap;
1960    ///
1961    /// let db = Aurora::open("mydb.db")?;
1962    ///
1963    /// // Bulk user import
1964    /// let users = vec![
1965    ///     HashMap::from([
1966    ///         ("name".to_string(), Value::String("Alice".into())),
1967    ///         ("email".to_string(), Value::String("alice@example.com".into())),
1968    ///         ("age".to_string(), Value::Int(28)),
1969    ///     ]),
1970    ///     HashMap::from([
1971    ///         ("name".to_string(), Value::String("Bob".into())),
1972    ///         ("email".to_string(), Value::String("bob@example.com".into())),
1973    ///         ("age".to_string(), Value::Int(32)),
1974    ///     ]),
1975    ///     HashMap::from([
1976    ///         ("name".to_string(), Value::String("Carol".into())),
1977    ///         ("email".to_string(), Value::String("carol@example.com".into())),
1978    ///         ("age".to_string(), Value::Int(25)),
1979    ///     ]),
1980    /// ];
1981    ///
1982    /// let ids = db.batch_insert("users", users).await?;
1983    /// println!("Inserted {} users", ids.len());
1984    ///
1985    /// // Seeding test data
1986    /// let test_products: Vec<HashMap<String, Value>> = (0..1000)
1987    ///     .map(|i| HashMap::from([
1988    ///         ("sku".to_string(), Value::String(format!("PROD-{:04}", i))),
1989    ///         ("price".to_string(), Value::Float(9.99 + i as f64)),
1990    ///         ("stock".to_string(), Value::Int(100)),
1991    ///     ]))
1992    ///     .collect();
1993    ///
1994    /// let ids = db.batch_insert("products", test_products).await?;
1995    /// // Much faster than 1000 individual insert_into() calls!
1996    ///
1997    /// // Migration from CSV data
1998    /// let mut csv_reader = csv::Reader::from_path("data.csv")?;
1999    /// let mut batch = Vec::new();
2000    ///
2001    /// for result in csv_reader.records() {
2002    ///     let record = result?;
2003    ///     let doc = HashMap::from([
2004    ///         ("field1".to_string(), Value::String(record[0].to_string())),
2005    ///         ("field2".to_string(), Value::String(record[1].to_string())),
2006    ///     ]);
2007    ///     batch.push(doc);
2008    ///
2009    ///     // Insert in batches of 1000
2010    ///     if batch.len() >= 1000 {
2011    ///         db.batch_insert("imported_data", batch.clone()).await?;
2012    ///         batch.clear();
2013    ///     }
2014    /// }
2015    ///
2016    /// // Insert remaining
2017    /// if !batch.is_empty() {
2018    ///     db.batch_insert("imported_data", batch).await?;
2019    /// }
2020    /// ```
2021    ///
2022    /// # Errors
2023    /// - `ValidationError`: Unique constraint violation on any document
2024    /// - `CollectionNotFound`: Collection doesn't exist
2025    /// - `IoError`: Storage write failure
2026    ///
2027    /// # Important Notes
2028    /// - All inserts are atomic - if one fails, none are inserted
2029    /// - UUIDs are auto-generated for all documents
2030    /// - PubSub events are published for each insert
2031    /// - For 10+ documents, this is 3x faster than individual inserts
2032    /// - For < 10 documents, use `insert_into()` instead
2033    ///
2034    /// # See Also
2035    /// - `insert_into()` for single document inserts
2036    /// - `import_from_json()` for file-based bulk imports
2037    /// - `batch_write()` for low-level batch operations
2038    pub async fn batch_insert(
2039        &self,
2040        collection: &str,
2041        documents: Vec<HashMap<String, Value>>,
2042    ) -> Result<Vec<String>> {
2043        let mut doc_ids = Vec::with_capacity(documents.len());
2044        let mut pairs = Vec::with_capacity(documents.len());
2045
2046        // Prepare all documents
2047        for data in documents {
2048            // Validate unique constraints
2049            self.validate_unique_constraints(collection, &data).await?;
2050
2051            let doc_id = Uuid::now_v7().to_string();
2052            let document = Document {
2053                id: doc_id.clone(),
2054                data,
2055            };
2056
2057            let key = format!("{}:{}", collection, doc_id);
2058            let value = serde_json::to_vec(&document)?;
2059
2060            pairs.push((key, value));
2061            doc_ids.push(doc_id);
2062        }
2063
2064        // Write to WAL in batch (if enabled)
2065        if let Some(ref wal) = self.wal
2066            && self.config.durability_mode != DurabilityMode::None
2067        {
2068            let mut wal_lock = wal.write().unwrap();
2069            for (key, value) in &pairs {
2070                wal_lock.append(Operation::Put, key, Some(value))?;
2071            }
2072        }
2073
2074        // Bypass write buffer - go directly to cold storage batch API
2075        self.cold.batch_set(pairs.clone())?;
2076
2077        // Note: Durability is handled by background checkpoint process
2078
2079        // Update hot cache and indices
2080        for (key, value) in pairs {
2081            if self.should_cache_key(&key) {
2082                self.hot
2083                    .set(Arc::new(key.clone()), Arc::new(value.clone()), None);
2084            }
2085
2086            if let Some(collection_name) = key.split(':').next()
2087                && !collection_name.starts_with('_')
2088            {
2089                self.index_value(collection_name, &key, &value)?;
2090            }
2091        }
2092
2093        // Publish events
2094        for doc_id in &doc_ids {
2095            if let Ok(Some(doc)) = self.get_document(collection, doc_id) {
2096                let event = crate::pubsub::ChangeEvent::insert(collection, doc_id, doc);
2097                let _ = self.pubsub.publish(event);
2098            }
2099        }
2100
2101        Ok(doc_ids)
2102    }
2103
2104    /// Update a document by ID
2105    ///
2106    /// # Arguments
2107    /// * `collection` - Collection name
2108    /// * `doc_id` - Document ID to update
2109    /// * `data` - New field values to set
2110    ///
2111    /// # Returns
2112    /// Ok(()) on success, or an error if the document doesn't exist
2113    ///
2114    /// # Examples
2115    ///
2116    /// ```
2117    /// db.update_document("users", &user_id, vec![
2118    ///     ("status", Value::String("active".to_string())),
2119    ///     ("last_login", Value::String(chrono::Utc::now().to_rfc3339())),
2120    /// ]).await?;
2121    /// ```
2122    pub async fn update_document(
2123        &self,
2124        collection: &str,
2125        doc_id: &str,
2126        updates: Vec<(&str, Value)>,
2127    ) -> Result<()> {
2128        // Get existing document
2129        let mut document = self.get_document(collection, doc_id)?.ok_or_else(|| {
2130            AqlError::new(
2131                ErrorCode::NotFound,
2132                format!("Document not found: {}", doc_id),
2133            )
2134        })?;
2135
2136        // Store old document for event
2137        let old_document = document.clone();
2138
2139        // Apply updates
2140        for (field, value) in updates {
2141            document.data.insert(field.to_string(), value);
2142        }
2143
2144        // Validate unique constraints after update (excluding current document)
2145        self.validate_unique_constraints_excluding(collection, &document.data, doc_id)
2146            .await?;
2147
2148        // Save updated document
2149        self.put(
2150            format!("{}:{}", collection, doc_id),
2151            serde_json::to_vec(&document)?,
2152            None,
2153        )
2154        .await?;
2155
2156        // Publish update event
2157        let event =
2158            crate::pubsub::ChangeEvent::update(collection, doc_id, old_document, document.clone());
2159        let _ = self.pubsub.publish(event);
2160
2161        Ok(())
2162    }
2163
2164    pub async fn get_all_collection(&self, collection: &str) -> Result<Vec<Document>> {
2165        self.ensure_indices_initialized().await?;
2166        self.scan_collection(collection)
2167    }
2168
2169    pub fn get_data_by_pattern(&self, pattern: &str) -> Result<Vec<(String, DataInfo)>> {
2170        let mut data = Vec::new();
2171
2172        // Scan from cold storage instead of primary index
2173        for result in self.cold.scan() {
2174            if let Ok((key, value)) = result {
2175                if key.contains(pattern) {
2176                    let info = if value.starts_with(b"BLOB:") {
2177                        DataInfo::Blob { size: value.len() }
2178                    } else {
2179                        DataInfo::Data {
2180                            size: value.len(),
2181                            preview: String::from_utf8_lossy(&value[..value.len().min(50)])
2182                                .into_owned(),
2183                        }
2184                    };
2185
2186                    data.push((key.clone(), info));
2187                }
2188            }
2189        }
2190
2191        Ok(data)
2192    }
2193
2194    /// Begin a transaction
2195    ///
2196    /// All operations after beginning a transaction will be part of the transaction
2197    /// until either commit_transaction() or rollback_transaction() is called.
2198    ///
2199    /// # Returns
2200    /// Success or an error (e.g., if a transaction is already in progress)
2201    ///
2202    /// # Examples
2203    ///
2204   
2205    /// // Start a transaction for atomic operations
2206    /// db.begin_transaction()?;
2207    ///
2208    /// // Perform multiple operations
2209    /// db.insert_into("accounts", vec![("user_id", Value::String(user_id)), ("balance", Value::Float(100.0))])?;
2210    /// db.insert_into("audit_log", vec![("action", Value::String("account_created".to_string()))])?;
2211    ///
2212    /// // Commit all changes or roll back if there's an error
2213    /// if all_ok {
2214    ///     db.commit_transaction()?;
2215    /// } else {
2216    ///     db.rollback_transaction()?;
2217    /// }
2218    /// ```
2219    /// Begin a new transaction for atomic operations
2220    ///
2221    /// Transactions ensure all-or-nothing execution: either all operations succeed,
2222    /// or none of them are applied. Perfect for maintaining data consistency.
2223    ///
2224    /// # Examples
2225    ///
2226   
2227    /// use aurora_db::{Aurora, types::Value};
2228    ///
2229    /// let db = Aurora::open("mydb.db")?;
2230    ///
2231    /// // Start transaction
2232    /// let tx_id = db.begin_transaction();
2233    ///
2234    /// // Perform multiple operations
2235    /// db.insert_into("accounts", vec![
2236    ///     ("user_id", Value::String("alice".into())),
2237    ///     ("balance", Value::Int(1000)),
2238    /// ]).await?;
2239    ///
2240    /// db.insert_into("accounts", vec![
2241    ///     ("user_id", Value::String("bob".into())),
2242    ///     ("balance", Value::Int(500)),
2243    ///     ])).await?;
2244    ///
2245    /// // Commit if all succeeded
2246    /// db.commit_transaction(tx_id)?;
2247    ///
2248    /// // Or rollback on error
2249    /// // db.rollback_transaction(tx_id)?;
2250    /// ```
2251    pub fn begin_transaction(&self) -> crate::transaction::TransactionId {
2252        let buffer = self.transaction_manager.begin();
2253        buffer.id
2254    }
2255
2256    /// Commit a transaction, making all changes permanent
2257    ///
2258    /// All operations within the transaction are atomically applied to the database.
2259    /// If any operation fails, none are applied.
2260    ///
2261    /// # Arguments
2262    /// * `tx_id` - Transaction ID returned from begin_transaction()
2263    ///
2264    /// # Examples
2265    ///
2266   
2267    /// use aurora_db::{Aurora, types::Value};
2268    ///
2269    /// let db = Aurora::open("mydb.db")?;
2270    ///
2271    /// // Transfer money between accounts
2272    /// let tx_id = db.begin_transaction();
2273    ///
2274    /// // Deduct from Alice
2275    /// db.update_document("accounts", "alice", vec![
2276    ///     ("balance", Value::Int(900)),  // Was 1000
2277    /// ]).await?;
2278    ///
2279    /// // Add to Bob
2280    /// db.update_document("accounts", "bob", vec![
2281    ///     ("balance", Value::Int(600)),  // Was 500
2282    /// ]).await?;
2283    ///
2284    /// // Both updates succeed - commit them
2285    /// db.commit_transaction(tx_id)?;
2286    ///
2287    /// println!("Transfer completed!");
2288    /// ```
2289    pub fn commit_transaction(&self, tx_id: crate::transaction::TransactionId) -> Result<()> {
2290        let buffer = self
2291            .transaction_manager
2292            .active_transactions
2293            .get(&tx_id)
2294            .ok_or_else(|| {
2295                AqlError::invalid_operation(
2296                    "Transaction not found or already completed".to_string(),
2297                )
2298            })?;
2299
2300        for item in buffer.writes.iter() {
2301            let key = item.key();
2302            let value = item.value();
2303            self.cold.set(key.clone(), value.clone())?;
2304            if self.should_cache_key(key) {
2305                self.hot
2306                    .set(Arc::new(key.clone()), Arc::new(value.clone()), None);
2307            }
2308            if let Some(collection_name) = key.split(':').next()
2309                && !collection_name.starts_with('_')
2310            {
2311                self.index_value(collection_name, key, value)?;
2312            }
2313        }
2314
2315        for item in buffer.deletes.iter() {
2316            let key = item.key();
2317            if let Some((collection, id)) = key.split_once(':')
2318                && let Ok(Some(doc)) = self.get_document(collection, id)
2319            {
2320                self.remove_from_indices(collection, &doc)?;
2321            }
2322            self.cold.delete(key)?;
2323            self.hot.delete(key);
2324        }
2325
2326        // Drop the buffer reference to release the DashMap read lock
2327        // before calling commit which needs to remove the entry (write lock)
2328        drop(buffer);
2329
2330        self.transaction_manager.commit(tx_id)?;
2331
2332        self.cold.compact()?;
2333
2334        Ok(())
2335    }
2336
2337    /// Roll back a transaction, discarding all changes
2338    ///
2339    /// All operations within the transaction are discarded. The database state
2340    /// remains unchanged. Use this when an error occurs during transaction processing.
2341    ///
2342    /// # Arguments
2343    /// * `tx_id` - Transaction ID returned from begin_transaction()
2344    ///
2345    /// # Examples
2346    ///
2347   
2348    /// use aurora_db::{Aurora, types::Value};
2349    ///
2350    /// let db = Aurora::open("mydb.db")?;
2351    ///
2352    /// // Attempt a transfer with validation
2353    /// let tx_id = db.begin_transaction();
2354    ///
2355    /// let result = async {
2356    ///     // Deduct from Alice
2357    ///     let alice = db.get_document("accounts", "alice").await?;
2358    ///     let balance = alice.and_then(|doc| doc.data.get("balance"));
2359    ///
2360    ///     if let Some(Value::Int(bal)) = balance {
2361    ///         if *bal < 100 {
2362    ///             return Err("Insufficient funds");
2363    ///         }
2364    ///
2365    ///         db.update_document("accounts", "alice", vec![
2366    ///             ("balance", Value::Int(bal - 100)),
2367    ///         ]).await?;
2368    ///
2369    ///         db.update_document("accounts", "bob", vec![
2370    ///             ("balance", Value::Int(600)),
2371    ///         ]).await?;
2372    ///
2373    ///         Ok(())
2374    ///     } else {
2375    ///         Err("Account not found")
2376    ///     }
2377    /// }.await;
2378    ///
2379    /// match result {
2380    ///     Ok(_) => {
2381    ///         db.commit_transaction(tx_id)?;
2382    ///         println!("Transfer completed");
2383    ///     }
2384    ///     Err(e) => {
2385    ///         db.rollback_transaction(tx_id)?;
2386    ///         println!("Transfer failed: {}, changes rolled back", e);
2387    ///     }
2388    /// }
2389    /// ```
2390    pub fn rollback_transaction(&self, tx_id: crate::transaction::TransactionId) -> Result<()> {
2391        self.transaction_manager.rollback(tx_id)
2392    }
2393
2394    /// Create a secondary index on a field for faster queries
2395    ///
2396    /// Indexes dramatically improve query performance for frequently accessed fields,
2397    /// trading increased memory usage and slower writes for much faster reads.
2398    ///
2399    /// # When to Create Indexes
2400    /// - **Frequent queries**: Fields used in 80%+ of your queries
2401    /// - **High cardinality**: Fields with many unique values (user_id, email)
2402    /// - **Sorting/filtering**: Fields used in ORDER BY or WHERE clauses
2403    /// - **Large collections**: Most beneficial with 10,000+ documents
2404    ///
2405    /// # When NOT to Index
2406    /// - Low cardinality fields (e.g., boolean flags, small enums)
2407    /// - Rarely queried fields
2408    /// - Fields that change frequently (write-heavy workloads)
2409    /// - Small collections (<1,000 documents) - full scans are fast enough
2410    ///
2411    /// # Performance Characteristics
2412    /// - **Query speedup**: O(n) → O(1) for equality filters
2413    /// - **Memory cost**: ~100-200 bytes per document per index
2414    /// - **Write slowdown**: ~20-30% longer insert/update times
2415    /// - **Build time**: ~5,000 docs/sec for initial indexing
2416    ///
2417    /// # Arguments
2418    /// * `collection` - Name of the collection to index
2419    /// * `field` - Name of the field to index
2420    ///
2421    /// # Examples
2422    ///
2423    /// ```
2424    /// use aurora_db::Aurora;
2425    ///
2426    /// let db = Aurora::open("mydb.db")?;
2427    /// db.new_collection("users", vec![
2428    ///     ("email", FieldType::String),
2429    ///     ("age", FieldType::Int),
2430    ///     ("active", FieldType::Bool),
2431    /// ])?;
2432    ///
2433    /// // Index email - frequently queried, high cardinality
2434    /// db.create_index("users", "email").await?;
2435    ///
2436    /// // Now this query is FAST (O(1) instead of O(n))
2437    /// let user = db.query("users")
2438    ///     .filter(|f| f.eq("email", "alice@example.com"))
2439    ///     .first_one()
2440    ///     .await?;
2441    ///
2442    /// // DON'T index 'active' - low cardinality (only 2 values: true/false)
2443    /// // A full scan is fast enough for boolean fields
2444    ///
2445    /// // DO index 'age' if you frequently query age ranges
2446    /// db.create_index("users", "age").await?;
2447    ///
2448    /// let young_users = db.query("users")
2449    ///     .filter(|f| f.lt("age", 30))
2450    ///     .collect()
2451    ///     .await?;
2452    /// ```
2453    ///
2454    /// # Real-World Example: E-commerce Orders
2455    ///
2456    /// ```
2457    /// // Orders collection: 1 million documents
2458    /// db.new_collection("orders", vec![
2459    ///     ("user_id", FieldType::String),    // High cardinality
2460    ///     ("status", FieldType::String),      // Low cardinality (pending, shipped, delivered)
2461    ///     ("created_at", FieldType::String),
2462    ///     ("total", FieldType::Float),
2463    /// ])?;
2464    ///
2465    /// // Index user_id - queries like "show me my orders" are common
2466    /// db.create_index("orders", "user_id").await?;  // Good choice
2467    ///
2468    /// // Query speedup: 2.5s → 0.001s
2469    /// let my_orders = db.query("orders")
2470    ///     .filter(|f| f.eq("user_id", user_id))
2471    ///     .collect()
2472    ///     .await?;
2473    ///
2474    /// // DON'T index 'status' - only 3 possible values
2475    /// // Scanning 1M docs takes ~100ms, indexing won't help much
2476    ///
2477    /// // Index created_at if you frequently query recent orders
2478    /// db.create_index("orders", "created_at").await?;  // Good for time-based queries
2479    /// ```
2480    pub async fn create_index(&self, collection: &str, field: &str) -> Result<()> {
2481        let collection_def = self.get_collection_definition(collection)?;
2482
2483        if let Some(field_def) = collection_def.fields.get(field) {
2484            if field_def.field_type == FieldType::Any {
2485                return Err(AqlError::new(
2486                    ErrorCode::InvalidDefinition,
2487                    "Cannot create an index on a field of type 'Any'.".to_string(),
2488                ));
2489            }
2490        } else {
2491            return Err(AqlError::new(
2492                ErrorCode::InvalidDefinition,
2493                format!(
2494                    "Field '{}' not found in collection '{}'.",
2495                    field, collection
2496                ),
2497            ));
2498        }
2499
2500        // Generate a default index name
2501        let index_name = format!("idx_{}_{}", collection, field);
2502
2503        // Create index definition
2504        let definition = IndexDefinition {
2505            name: index_name.clone(),
2506            collection: collection.to_string(),
2507            fields: vec![field.to_string()],
2508            index_type: IndexType::BTree,
2509            unique: false,
2510        };
2511
2512        // Create the index
2513        let index = Index::new(definition.clone());
2514
2515        // Index all existing documents in the collection
2516        let prefix = format!("{}:", collection);
2517        for result in self.cold.scan_prefix(&prefix) {
2518            if let Ok((_, data)) = result
2519                && let Ok(doc) = serde_json::from_slice::<Document>(&data)
2520            {
2521                let _ = index.insert(&doc);
2522            }
2523        }
2524
2525        // Store the index
2526        self.indices.insert(index_name, index);
2527
2528        // Store the index definition for persistence
2529        let index_key = format!("_index:{}:{}", collection, field);
2530        self.put(index_key, serde_json::to_vec(&definition)?, None)
2531            .await?;
2532
2533        Ok(())
2534    }
2535
2536    /// Query documents in a collection with filtering, sorting, and pagination
2537    ///
2538    /// Returns a `QueryBuilder` that allows fluent chaining of query operations.
2539    /// Queries use early termination for LIMIT clauses, making them extremely fast
2540    /// even on large collections (6,800x faster than naive implementations).
2541    ///
2542    /// # Performance
2543    /// - With LIMIT: O(k) where k = limit + offset (early termination!)
2544    /// - Without LIMIT: O(n) where n = matching documents
2545    /// - Uses secondary indices when available for equality filters
2546    /// - Hot cache: ~1M reads/sec, Cold storage: ~500K reads/sec
2547    ///
2548    /// # Examples
2549    ///
2550    /// ```
2551    /// use aurora_db::{Aurora, types::Value};
2552    ///
2553    /// let db = Aurora::open("mydb.db")?;
2554    ///
2555    /// // Simple equality query
2556    /// let active_users = db.query("users")
2557    ///     .filter(|f| f.eq("active", Value::Bool(true)))
2558    ///     .collect()
2559    ///     .await?;
2560    ///
2561    /// // Range query with pagination (FAST - uses early termination!)
2562    /// let top_scorers = db.query("users")
2563    ///     .filter(|f| f.gt("score", Value::Int(1000)))
2564    ///     .order_by("score", false)  // descending
2565    ///     .limit(10)
2566    ///     .offset(20)
2567    ///     .collect()
2568    ///     .await?;
2569    ///
2570    /// // Multiple filters
2571    /// let premium_active = db.query("users")
2572    ///     .filter(|f| f.eq("tier", Value::String("premium".into())))
2573    ///     .filter(|f| f.eq("active", Value::Bool(true)))
2574    ///     .limit(100)  // Only scans ~200 docs, not all million!
2575    ///     .collect()
2576    ///     .await?;
2577    ///
2578    /// // Text search in a field
2579    /// let matching = db.query("articles")
2580    ///     .filter(|f| f.contains("title", "rust"))
2581    ///     .collect()
2582    ///     .await?;
2583    /// ```
2584    pub fn query<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
2585        QueryBuilder::new(self, collection)
2586    }
2587
2588    /// Create a search builder for full-text search
2589    ///
2590    /// # Arguments
2591    /// * `collection` - Name of the collection to search
2592    ///
2593    /// # Returns
2594    /// A `SearchBuilder` for configuring and executing searches
2595    ///
2596    /// # Examples
2597    ///
2598    /// ```
2599    /// // Search for documents containing text
2600    /// let search_results = db.search("articles")
2601    ///     .field("content")
2602    ///     .matching("quantum computing")
2603    ///     .fuzzy(true)  // Enable fuzzy matching for typo tolerance
2604    ///     .collect()
2605    ///     .await?;
2606    /// ```
2607    pub fn search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
2608        SearchBuilder::new(self, collection)
2609    }
2610
2611    /// Retrieve a document by ID
2612    ///
2613    /// Fast direct lookup when you know the document ID. Significantly faster
2614    /// than querying with filters when ID is known.
2615    ///
2616    /// # Performance
2617    /// - Hot cache: ~1,000,000 reads/sec (instant)
2618    /// - Cold storage: ~500,000 reads/sec (disk I/O)
2619    /// - Complexity: O(1) - constant time lookup
2620    /// - Much faster than `.query().filter(|f| f.eq("id", ...))` which is O(n)
2621    ///
2622    /// # Arguments
2623    /// * `collection` - Name of the collection to query
2624    /// * `id` - ID of the document to retrieve
2625    ///
2626    /// # Returns
2627    /// The document if found, None if not found, or an error
2628    ///
2629    /// # Examples
2630    ///
2631   
2632    /// use aurora_db::{Aurora, types::Value};
2633    ///
2634    /// let db = Aurora::open("mydb.db")?;
2635    ///
2636    /// // Basic retrieval
2637    /// if let Some(user) = db.get_document("users", &user_id)? {
2638    ///     println!("Found user: {}", user.id);
2639    ///
2640    ///     // Access fields safely
2641    ///     if let Some(Value::String(name)) = user.data.get("name") {
2642    ///         println!("Name: {}", name);
2643    ///     }
2644    ///
2645    ///     if let Some(Value::Int(age)) = user.data.get("age") {
2646    ///         println!("Age: {}", age);
2647    ///     }
2648    /// } else {
2649    ///     println!("User not found");
2650    /// }
2651    ///
2652    /// // Idiomatic error handling
2653    /// let user = db.get_document("users", &user_id)?
2654    ///     .ok_or_else(|| AqlError::new(ErrorCode::NotFound,"User not found".into()))?;
2655    ///
2656    /// // Checking existence before operations
2657    /// if db.get_document("users", &user_id)?.is_some() {
2658    ///     db.update_document("users", &user_id, vec![
2659    ///         ("last_login", Value::String(chrono::Utc::now().to_rfc3339())),
2660    ///     ]).await?;
2661    /// }
2662    ///
2663    /// // Batch retrieval (fetch multiple by ID)
2664    /// let user_ids = vec!["user-1", "user-2", "user-3"];
2665    /// let users: Vec<Document> = user_ids.iter()
2666    ///     .filter_map(|id| db.get_document("users", id).ok().flatten())
2667    ///     .collect();
2668    ///
2669    /// println!("Found {} out of {} users", users.len(), user_ids.len());
2670    /// ```
2671    ///
2672    /// # When to Use
2673    /// - You know the document ID (from insert, previous query, or URL param)
2674    /// - Need fastest possible lookup (1M reads/sec)
2675    /// - Fetching a single document
2676    ///
2677    /// # When NOT to Use
2678    /// - Searching by other fields → Use `query().filter()` instead
2679    /// - Need multiple documents by criteria → Use `query().collect()` instead
2680    /// - Don't know the ID → Use `find_by_field()` or `query()` instead
2681    pub fn get_document(&self, collection: &str, id: &str) -> Result<Option<Document>> {
2682        let key = format!("{}:{}", collection, id);
2683        if let Some(data) = self.get(&key)? {
2684            Ok(Some(serde_json::from_slice(&data)?))
2685        } else {
2686            Ok(None)
2687        }
2688    }
2689
2690    /// Delete a document by ID
2691    ///
2692    /// Permanently removes a document from storage, cache, and all indices.
2693    /// Publishes a delete event for PubSub subscribers. This operation cannot be undone.
2694    ///
2695    /// # Performance
2696    /// - Delete speed: ~50,000 deletes/sec
2697    /// - Cleans up hot cache, cold storage, primary + secondary indices
2698    /// - Triggers PubSub events for listeners
2699    ///
2700    /// # Arguments
2701    /// * `key` - Full key in format "collection:id" (e.g., "users:123")
2702    ///
2703    /// # Returns
2704    /// Success or an error
2705    ///
2706    /// # Errors
2707    /// - `InvalidOperation`: Invalid key format (must be "collection:id")
2708    /// - `IoError`: Storage deletion failed
2709    ///
2710    /// # Examples
2711    ///
2712    /// ```
2713    /// use aurora_db::Aurora;
2714    ///
2715    /// let db = Aurora::open("mydb.db")?;
2716    ///
2717    /// // Basic deletion (note: requires "collection:id" format)
2718    /// db.delete("users:abc123").await?;
2719    ///
2720    /// // Delete with existence check
2721    /// let user_id = "user-456";
2722    /// if db.get_document("users", user_id)?.is_some() {
2723    ///     db.delete(&format!("users:{}", user_id)).await?;
2724    ///     println!("User deleted");
2725    /// } else {
2726    ///     println!("User not found");
2727    /// }
2728    ///
2729    /// // Error handling
2730    /// match db.delete("users:nonexistent").await {
2731    ///     Ok(_) => println!("Deleted successfully"),
2732    ///     Err(e) => println!("Delete failed: {}", e),
2733    /// }
2734    ///
2735    /// // Batch deletion using query
2736    /// let inactive_count = db.delete_where("users", |f| {
2737    ///     f.eq("active", Value::Bool(false))
2738    /// }).await?;
2739    /// println!("Deleted {} inactive users", inactive_count);
2740    ///
2741    /// // Delete with cascading (manual cascade pattern)
2742    /// let user_id = "user-123";
2743    ///
2744    /// // Delete user's orders first
2745    /// let orders = db.query("orders")
2746    ///     .filter(|f| f.eq("user_id", user_id))
2747    ///     .collect()
2748    ///     .await?;
2749    ///
2750    /// for order in orders {
2751    ///     db.delete(&format!("orders:{}", order.id)).await?;
2752    /// }
2753    ///
2754    /// // Then delete the user
2755    /// db.delete(&format!("users:{}", user_id)).await?;
2756    /// println!("User and all orders deleted");
2757    /// ```
2758    ///
2759    /// # Alternative: Soft Delete Pattern
2760    ///
2761    /// For recoverable deletions, use soft deletes instead:
2762    ///
2763    /// ```
2764    /// // Soft delete - mark as deleted instead of removing
2765    /// db.update_document("users", &user_id, vec![
2766    ///     ("deleted", Value::Bool(true)),
2767    ///     ("deleted_at", Value::String(chrono::Utc::now().to_rfc3339())),
2768    /// ]).await?;
2769    ///
2770    /// // Query excludes soft-deleted items
2771    /// let active_users = db.query("users")
2772    ///     .filter(|f| f.eq("deleted", Value::Bool(false)))
2773    ///     .collect()
2774    ///     .await?;
2775    ///
2776    /// // Later: hard delete after retention period
2777    /// let old_deletions = db.query("users")
2778    ///     .filter(|f| f.eq("deleted", Value::Bool(true)))
2779    ///     .filter(|f| f.lt("deleted_at", thirty_days_ago))
2780    ///     .collect()
2781    ///     .await?;
2782    ///
2783    /// for user in old_deletions {
2784    ///     db.delete(&format!("users:{}", user.id)).await?;
2785    /// }
2786    /// ```
2787    ///
2788    /// # Important Notes
2789    /// - Deletion is permanent - no undo/recovery
2790    /// - Consider soft deletes for recoverable operations
2791    /// - Use transactions for multi-document deletions
2792    /// - PubSub subscribers will receive delete events
2793    /// - All indices are automatically cleaned up
2794    pub async fn delete(&self, key: &str) -> Result<()> {
2795        // Extract collection and id from key (format: "collection:id")
2796        let (collection, id) = if let Some((coll, doc_id)) = key.split_once(':') {
2797            (coll, doc_id)
2798        } else {
2799            return Err(AqlError::invalid_operation(
2800                "Invalid key format, expected 'collection:id'".to_string(),
2801            ));
2802        };
2803
2804        // CRITICAL FIX: Get document BEFORE deletion to clean up secondary indices
2805        let document = self.get_document(collection, id)?;
2806
2807        // Delete from hot cache
2808        if self.hot.get(key).is_some() {
2809            self.hot.delete(key);
2810        }
2811
2812        // Delete from cold storage
2813        self.cold.delete(key)?;
2814
2815        // CRITICAL FIX: Clean up ALL indices (primary + secondary)
2816        if let Some(doc) = document {
2817            self.remove_from_indices(collection, &doc)?;
2818        } else {
2819            // Fallback: at least remove from primary index
2820            if let Some(index) = self.primary_indices.get_mut(collection) {
2821                index.remove(id);
2822            }
2823        }
2824
2825        // Publish delete event
2826        let event = crate::pubsub::ChangeEvent::delete(collection, id);
2827        let _ = self.pubsub.publish(event);
2828
2829        Ok(())
2830    }
2831
2832    pub async fn delete_collection(&self, collection: &str) -> Result<()> {
2833        let prefix = format!("{}:", collection);
2834
2835        // Get all keys in collection
2836        let keys: Vec<String> = self
2837            .cold
2838            .scan()
2839            .filter_map(|r| r.ok())
2840            .filter(|(k, _)| k.starts_with(&prefix))
2841            .map(|(k, _)| k)
2842            .collect();
2843
2844        // Delete each key
2845        for key in keys {
2846            self.delete(&key).await?;
2847        }
2848
2849        // Remove collection indices
2850        self.primary_indices.remove(collection);
2851        self.secondary_indices
2852            .retain(|k, _| !k.starts_with(&prefix));
2853
2854        // Invalidate schema cache
2855        self.schema_cache.remove(collection);
2856
2857        Ok(())
2858    }
2859
2860    fn remove_from_indices(&self, collection: &str, doc: &Document) -> Result<()> {
2861        // Remove from primary index
2862        if let Some(index) = self.primary_indices.get(collection) {
2863            index.remove(&doc.id);
2864        }
2865
2866        // Remove from secondary indices
2867        for (field, value) in &doc.data {
2868            let index_key = format!("{}:{}", collection, field);
2869            if let Some(index) = self.secondary_indices.get(&index_key)
2870                && let Some(mut doc_ids) = index.get_mut(&value.to_string())
2871            {
2872                doc_ids.retain(|id| id != &doc.id);
2873            }
2874        }
2875
2876        Ok(())
2877    }
2878
2879    pub async fn search_text(
2880        &self,
2881        collection: &str,
2882        field: &str,
2883        query: &str,
2884    ) -> Result<Vec<Document>> {
2885        let mut results = Vec::new();
2886        let docs = self.get_all_collection(collection).await?;
2887
2888        for doc in docs {
2889            if let Some(Value::String(text)) = doc.data.get(field)
2890                && text.to_lowercase().contains(&query.to_lowercase())
2891            {
2892                results.push(doc);
2893            }
2894        }
2895
2896        Ok(results)
2897    }
2898
2899    /// Export a collection to a JSON file
2900    ///
2901    /// Creates a JSON file containing all documents in the collection.
2902    /// Useful for backups, data migration, or sharing datasets.
2903    /// Automatically appends `.json` extension if not present.
2904    ///
2905    /// # Performance
2906    /// - Export speed: ~10,000 docs/sec
2907    /// - Scans entire collection from cold storage
2908    /// - Memory efficient: streams documents to file
2909    ///
2910    /// # Arguments
2911    /// * `collection` - Name of the collection to export
2912    /// * `output_path` - Path to the output JSON file (`.json` auto-appended)
2913    ///
2914    /// # Returns
2915    /// Success or an error
2916    ///
2917    /// # Examples
2918    ///
2919    /// ```
2920    /// use aurora_db::Aurora;
2921    ///
2922    /// let db = Aurora::open("mydb.db")?;
2923    ///
2924    /// // Basic export
2925    /// db.export_as_json("users", "./backups/users_2024-01-15")?;
2926    /// // Creates: ./backups/users_2024-01-15.json
2927    ///
2928    /// // Timestamped backup
2929    /// let timestamp = chrono::Utc::now().format("%Y%m%d_%H%M%S");
2930    /// let backup_path = format!("./backups/users_{}", timestamp);
2931    /// db.export_as_json("users", &backup_path)?;
2932    ///
2933    /// // Export multiple collections
2934    /// for collection in &["users", "orders", "products"] {
2935    ///     db.export_as_json(collection, &format!("./export/{}", collection))?;
2936    /// }
2937    /// ```
2938    ///
2939    /// # Output Format
2940    ///
2941    /// The exported JSON has this structure:
2942    /// ```json
2943    /// {
2944    ///   "users": [
2945    ///     { "id": "123", "name": "Alice", "email": "alice@example.com" },
2946    ///     { "id": "456", "name": "Bob", "email": "bob@example.com" }
2947    ///   ]
2948    /// }
2949    /// ```
2950    ///
2951    /// # See Also
2952    /// - `export_as_csv()` for CSV format export
2953    /// - `import_from_json()` to restore exported data
2954    pub fn export_as_json(&self, collection: &str, output_path: &str) -> Result<()> {
2955        let output_path = if !output_path.ends_with(".json") {
2956            format!("{}.json", output_path)
2957        } else {
2958            output_path.to_string()
2959        };
2960
2961        let mut docs = Vec::new();
2962
2963        // Get all documents from the specified collection
2964        for result in self.cold.scan() {
2965            let (key, value) = result?;
2966
2967            // Only process documents from the specified collection
2968            if let Some(key_collection) = key.split(':').next()
2969                && key_collection == collection
2970                && !key.starts_with("_collection:")
2971                && let Ok(doc) = serde_json::from_slice::<Document>(&value)
2972            {
2973                // Convert Value enum to raw JSON values
2974                let mut clean_doc = serde_json::Map::new();
2975                for (k, v) in doc.data {
2976                    match v {
2977                        Value::String(s) => clean_doc.insert(k, JsonValue::String(s)),
2978                        Value::Int(i) => clean_doc.insert(k, JsonValue::Number(i.into())),
2979                        Value::Float(f) => {
2980                            if let Some(n) = serde_json::Number::from_f64(f) {
2981                                clean_doc.insert(k, JsonValue::Number(n))
2982                            } else {
2983                                clean_doc.insert(k, JsonValue::Null)
2984                            }
2985                        }
2986                        Value::Bool(b) => clean_doc.insert(k, JsonValue::Bool(b)),
2987                        Value::Array(arr) => {
2988                            let clean_arr: Vec<JsonValue> = arr
2989                                .into_iter()
2990                                .map(|v| match v {
2991                                    Value::String(s) => JsonValue::String(s),
2992                                    Value::Int(i) => JsonValue::Number(i.into()),
2993                                    Value::Float(f) => serde_json::Number::from_f64(f)
2994                                        .map(JsonValue::Number)
2995                                        .unwrap_or(JsonValue::Null),
2996                                    Value::Bool(b) => JsonValue::Bool(b),
2997                                    Value::Null => JsonValue::Null,
2998                                    _ => JsonValue::Null,
2999                                })
3000                                .collect();
3001                            clean_doc.insert(k, JsonValue::Array(clean_arr))
3002                        }
3003                        Value::Uuid(u) => clean_doc.insert(k, JsonValue::String(u.to_string())),
3004                        Value::Null => clean_doc.insert(k, JsonValue::Null),
3005                        Value::Object(_) => None, // Handle nested objects if needed
3006                    };
3007                }
3008                docs.push(JsonValue::Object(clean_doc));
3009            }
3010        }
3011
3012        let output = JsonValue::Object(serde_json::Map::from_iter(vec![(
3013            collection.to_string(),
3014            JsonValue::Array(docs),
3015        )]));
3016
3017        let mut file = StdFile::create(&output_path)?;
3018        serde_json::to_writer_pretty(&mut file, &output)?;
3019        println!("Exported collection '{}' to {}", collection, &output_path);
3020        Ok(())
3021    }
3022
3023    /// Export a collection to a CSV file
3024    ///
3025    /// Creates a CSV file with headers from the first document and rows for each document.
3026    /// Useful for spreadsheet analysis, data science workflows, or reporting.
3027    /// Automatically appends `.csv` extension if not present.
3028    ///
3029    /// # Performance
3030    /// - Export speed: ~8,000 docs/sec
3031    /// - Memory efficient: streams rows to file
3032    /// - Headers determined from first document
3033    ///
3034    /// # Arguments
3035    /// * `collection` - Name of the collection to export
3036    /// * `filename` - Path to the output CSV file (`.csv` auto-appended)
3037    ///
3038    /// # Returns
3039    /// Success or an error
3040    ///
3041    /// # Examples
3042    ///
3043    /// ```
3044    /// use aurora_db::Aurora;
3045    ///
3046    /// let db = Aurora::open("mydb.db")?;
3047    ///
3048    /// // Basic CSV export
3049    /// db.export_as_csv("users", "./reports/users")?;
3050    /// // Creates: ./reports/users.csv
3051    ///
3052    /// // Export for analysis in Excel/Google Sheets
3053    /// db.export_as_csv("orders", "./analytics/sales_data")?;
3054    ///
3055    /// // Monthly report generation
3056    /// let month = chrono::Utc::now().format("%Y-%m");
3057    /// db.export_as_csv("transactions", &format!("./reports/transactions_{}", month))?;
3058    /// ```
3059    ///
3060    /// # Output Format
3061    ///
3062    /// ```csv
3063    /// id,name,email,age
3064    /// 123,Alice,alice@example.com,28
3065    /// 456,Bob,bob@example.com,32
3066    /// ```
3067    ///
3068    /// # Important Notes
3069    /// - Headers are taken from the first document's fields
3070    /// - Documents with different fields will have empty values for missing fields
3071    /// - Nested objects/arrays are converted to strings
3072    /// - Best for flat document structures
3073    ///
3074    /// # See Also
3075    /// - `export_as_json()` for JSON format (better for nested data)
3076    /// - For complex nested structures, use JSON export instead
3077    pub fn export_as_csv(&self, collection: &str, filename: &str) -> Result<()> {
3078        let output_path = if !filename.ends_with(".csv") {
3079            format!("{}.csv", filename)
3080        } else {
3081            filename.to_string()
3082        };
3083
3084        let mut writer = csv::Writer::from_path(&output_path)?;
3085        let mut headers = Vec::new();
3086        let mut first_doc = true;
3087
3088        // Get all documents from the specified collection
3089        for result in self.cold.scan() {
3090            let (key, value) = result?;
3091
3092            // Only process documents from the specified collection
3093            if let Some(key_collection) = key.split(':').next()
3094                && key_collection == collection
3095                && !key.starts_with("_collection:")
3096                && let Ok(doc) = serde_json::from_slice::<Document>(&value)
3097            {
3098                // Write headers from first document
3099                if first_doc && !doc.data.is_empty() {
3100                    headers = doc.data.keys().cloned().collect();
3101                    writer.write_record(&headers)?;
3102                    first_doc = false;
3103                }
3104
3105                // Write the document values
3106                let values: Vec<String> = headers
3107                    .iter()
3108                    .map(|header| {
3109                        doc.data
3110                            .get(header)
3111                            .map(|v| v.to_string())
3112                            .unwrap_or_default()
3113                    })
3114                    .collect();
3115                writer.write_record(&values)?;
3116            }
3117        }
3118
3119        writer.flush()?;
3120        println!("Exported collection '{}' to {}", collection, &output_path);
3121        Ok(())
3122    }
3123
3124    // Helper method to create filter-based queries
3125    pub fn find<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
3126        self.query(collection)
3127    }
3128
3129    // Convenience methods that build on top of the FilterBuilder
3130
3131    pub async fn find_by_id(&self, collection: &str, id: &str) -> Result<Option<Document>> {
3132        self.query(collection)
3133            .filter(|f| f.eq("id", id))
3134            .first_one()
3135            .await
3136    }
3137
3138    pub async fn find_one<F>(&self, collection: &str, filter_fn: F) -> Result<Option<Document>>
3139    where
3140        F: Fn(&FilterBuilder) -> bool + Send + Sync + 'static,
3141    {
3142        self.query(collection).filter(filter_fn).first_one().await
3143    }
3144
3145    pub async fn find_by_field<T: Into<Value> + Clone + Send + Sync + 'static>(
3146        &self,
3147        collection: &str,
3148        field: &'static str,
3149        value: T,
3150    ) -> Result<Vec<Document>> {
3151        let value_clone = value.clone();
3152        self.query(collection)
3153            .filter(move |f: &FilterBuilder| f.eq(field, value_clone.clone()))
3154            .collect()
3155            .await
3156    }
3157
3158    pub async fn find_by_fields(
3159        &self,
3160        collection: &str,
3161        fields: Vec<(&str, Value)>,
3162    ) -> Result<Vec<Document>> {
3163        let mut query = self.query(collection);
3164
3165        for (field, value) in fields {
3166            let field_owned = field.to_owned();
3167            let value_owned = value.clone();
3168            query = query.filter(move |f| f.eq(&field_owned, value_owned.clone()));
3169        }
3170
3171        query.collect().await
3172    }
3173
3174    // Advanced example: find documents with a field value in a specific range
3175    pub async fn find_in_range<T: Into<Value> + Clone + Send + Sync + 'static>(
3176        &self,
3177        collection: &str,
3178        field: &'static str,
3179        min: T,
3180        max: T,
3181    ) -> Result<Vec<Document>> {
3182        self.query(collection)
3183            .filter(move |f| f.between(field, min.clone(), max.clone()))
3184            .collect()
3185            .await
3186    }
3187
3188    // Complex query example: build with multiple combined filters
3189    pub async fn find_complex<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
3190        self.query(collection)
3191    }
3192
3193    // Create a full-text search query with added filter options
3194    pub fn advanced_search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
3195        self.search(collection)
3196    }
3197
3198    // Utility methods for common operations
3199    pub async fn upsert(
3200        &self,
3201        collection: &str,
3202        id: &str,
3203        data: Vec<(&str, Value)>,
3204    ) -> Result<String> {
3205        // Convert Vec<(&str, Value)> to HashMap<String, Value>
3206        let data_map: HashMap<String, Value> =
3207            data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
3208
3209        // Check if document exists
3210        if let Some(mut doc) = self.get_document(collection, id)? {
3211            // Update existing document - merge new data
3212            for (key, value) in data_map {
3213                doc.data.insert(key, value);
3214            }
3215
3216            // Validate unique constraints for the updated document
3217            // We need to exclude the current document from the uniqueness check
3218            self.validate_unique_constraints_excluding(collection, &doc.data, id)
3219                .await?;
3220
3221            self.put(
3222                format!("{}:{}", collection, id),
3223                serde_json::to_vec(&doc)?,
3224                None,
3225            )
3226            .await?;
3227            Ok(id.to_string())
3228        } else {
3229            // Insert new document with specified ID - validate unique constraints
3230            self.validate_unique_constraints(collection, &data_map)
3231                .await?;
3232
3233            let document = Document {
3234                id: id.to_string(),
3235                data: data_map,
3236            };
3237
3238            self.put(
3239                format!("{}:{}", collection, id),
3240                serde_json::to_vec(&document)?,
3241                None,
3242            )
3243            .await?;
3244            Ok(id.to_string())
3245        }
3246    }
3247
3248    // Atomic increment/decrement
3249    pub async fn increment(
3250        &self,
3251        collection: &str,
3252        id: &str,
3253        field: &str,
3254        amount: i64,
3255    ) -> Result<i64> {
3256        if let Some(mut doc) = self.get_document(collection, id)? {
3257            // Get current value
3258            let current = match doc.data.get(field) {
3259                Some(Value::Int(i)) => *i,
3260                _ => 0,
3261            };
3262
3263            // Increment
3264            let new_value = current + amount;
3265            doc.data.insert(field.to_string(), Value::Int(new_value));
3266
3267            // Save changes
3268            self.put(
3269                format!("{}:{}", collection, id),
3270                serde_json::to_vec(&doc)?,
3271                None,
3272            )
3273            .await?;
3274
3275            Ok(new_value)
3276        } else {
3277            Err(AqlError::new(
3278                ErrorCode::NotFound,
3279                format!("Document {}:{} not found", collection, id),
3280            ))
3281        }
3282    }
3283
3284    // Delete documents by query
3285    pub async fn delete_by_query<F>(&self, collection: &str, filter_fn: F) -> Result<usize>
3286    where
3287        F: Fn(&FilterBuilder) -> bool + Send + Sync + 'static,
3288    {
3289        let docs = self.query(collection).filter(filter_fn).collect().await?;
3290        let mut deleted_count = 0;
3291
3292        for doc in docs {
3293            let key = format!("{}:{}", collection, doc.id);
3294            self.delete(&key).await?;
3295            deleted_count += 1;
3296        }
3297
3298        Ok(deleted_count)
3299    }
3300
3301    /// Import documents from a JSON file into a collection
3302    ///
3303    /// Validates each document against the collection schema, skips duplicates (by ID),
3304    /// and provides detailed statistics about the import operation. Useful for restoring
3305    /// backups, migrating data, or seeding development databases.
3306    ///
3307    /// # Performance
3308    /// - Import speed: ~5,000 docs/sec (with validation)
3309    /// - Memory efficient: processes documents one at a time
3310    /// - Validates schema and unique constraints
3311    ///
3312    /// # Arguments
3313    /// * `collection` - Name of the collection to import into
3314    /// * `filename` - Path to the JSON file containing documents (array format)
3315    ///
3316    /// # Returns
3317    /// `ImportStats` containing counts of imported, skipped, and failed documents
3318    ///
3319    /// # Examples
3320    ///
3321    /// ```
3322    /// use aurora_db::Aurora;
3323    ///
3324    /// let db = Aurora::open("mydb.db")?;
3325    ///
3326    /// // Basic import
3327    /// let stats = db.import_from_json("users", "./data/new_users.json").await?;
3328    /// println!("Imported: {}, Skipped: {}, Failed: {}",
3329    ///     stats.imported, stats.skipped, stats.failed);
3330    ///
3331    /// // Restore from backup
3332    /// let backup_file = "./backups/users_2024-01-15.json";
3333    /// let stats = db.import_from_json("users", backup_file).await?;
3334    ///
3335    /// if stats.failed > 0 {
3336    ///     eprintln!("Warning: {} documents failed validation", stats.failed);
3337    /// }
3338    ///
3339    /// // Idempotent import - duplicates are skipped
3340    /// let stats = db.import_from_json("users", "./data/users.json").await?;
3341    /// // Running again will skip all existing documents
3342    /// let stats2 = db.import_from_json("users", "./data/users.json").await?;
3343    /// assert_eq!(stats2.skipped, stats.imported);
3344    ///
3345    /// // Migration from another system
3346    /// db.new_collection("products", vec![
3347    ///     ("sku", FieldType::String),
3348    ///     ("name", FieldType::String),
3349    ///     ("price", FieldType::Float),
3350    /// ])?;
3351    ///
3352    /// let stats = db.import_from_json("products", "./migration/products.json").await?;
3353    /// println!("Migration complete: {} products imported", stats.imported);
3354    /// ```
3355    ///
3356    /// # Expected JSON Format
3357    ///
3358    /// The JSON file should contain an array of document objects:
3359    /// ```json
3360    /// [
3361    ///   { "id": "123", "name": "Alice", "email": "alice@example.com" },
3362    ///   { "id": "456", "name": "Bob", "email": "bob@example.com" },
3363    ///   { "name": "Carol", "email": "carol@example.com" }
3364    /// ]
3365    /// ```
3366    ///
3367    /// # Behavior
3368    /// - Documents with existing IDs are skipped (duplicate detection)
3369    /// - Documents without IDs get auto-generated UUIDs
3370    /// - Schema validation is performed on all fields
3371    /// - Failed documents are counted but don't stop the import
3372    /// - Unique constraints are checked
3373    ///
3374    /// # See Also
3375    /// - `export_as_json()` to create compatible backup files
3376    /// - `batch_insert()` for programmatic bulk inserts
3377    pub async fn import_from_json(&self, collection: &str, filename: &str) -> Result<ImportStats> {
3378        // Validate that the collection exists
3379        let collection_def = self.get_collection_definition(collection)?;
3380
3381        // Load JSON file
3382        let json_string = read_to_string(filename).await.map_err(|e| {
3383            AqlError::new(
3384                ErrorCode::IoError,
3385                format!("Failed to read import file: {}", e),
3386            )
3387        })?;
3388
3389        // Parse JSON
3390        let documents: Vec<JsonValue> = from_str(&json_string).map_err(|e| {
3391            AqlError::new(
3392                ErrorCode::SerializationError,
3393                format!("Failed to parse JSON: {}", e),
3394            )
3395        })?;
3396
3397        let mut stats = ImportStats::default();
3398
3399        // Process each document
3400        for doc_json in documents {
3401            match self
3402                .import_document(collection, &collection_def, doc_json)
3403                .await
3404            {
3405                Ok(ImportResult::Imported) => stats.imported += 1,
3406                Ok(ImportResult::Skipped) => stats.skipped += 1,
3407                Err(_) => stats.failed += 1,
3408            }
3409        }
3410
3411        Ok(stats)
3412    }
3413
3414    /// Import a single document, performing schema validation and duplicate checking
3415    async fn import_document(
3416        &self,
3417        collection: &str,
3418        collection_def: &Collection,
3419        doc_json: JsonValue,
3420    ) -> Result<ImportResult> {
3421        if !doc_json.is_object() {
3422            return Err(AqlError::invalid_operation(
3423                "Expected JSON object".to_string(),
3424            ));
3425        }
3426
3427        // Extract document ID if present
3428        let doc_id = doc_json
3429            .get("id")
3430            .and_then(|id| id.as_str())
3431            .map(|s| s.to_string())
3432            .unwrap_or_else(|| Uuid::now_v7().to_string());
3433
3434        // Check if document with this ID already exists
3435        if self.get_document(collection, &doc_id)?.is_some() {
3436            return Ok(ImportResult::Skipped);
3437        }
3438
3439        // Convert JSON to our document format and validate against schema
3440        let mut data_map = HashMap::new();
3441
3442        if let Some(obj) = doc_json.as_object() {
3443            for (field_name, field_def) in &collection_def.fields {
3444                if let Some(json_value) = obj.get(field_name) {
3445                    // Validate value against field type
3446                    if !self.validate_field_value(json_value, &field_def.field_type) {
3447                        return Err(AqlError::invalid_operation(format!(
3448                            "Field '{}' has invalid type",
3449                            field_name
3450                        )));
3451                    }
3452
3453                    // Convert JSON value to our Value type
3454                    let value = self.json_to_value(json_value)?;
3455                    data_map.insert(field_name.clone(), value);
3456                } else if field_def.unique {
3457                    // Missing required unique field
3458                    return Err(AqlError::invalid_operation(format!(
3459                        "Missing required unique field '{}'",
3460                        field_name
3461                    )));
3462                }
3463            }
3464        }
3465
3466        // Check for duplicates by unique fields
3467        let unique_fields = self.get_unique_fields(collection_def);
3468        for unique_field in &unique_fields {
3469            if let Some(value) = data_map.get(unique_field) {
3470                // Query for existing documents with this unique value
3471                let query_results = self
3472                    .query(collection)
3473                    .filter(move |f| f.eq(unique_field, value.clone()))
3474                    .limit(1)
3475                    .collect()
3476                    .await?;
3477
3478                if !query_results.is_empty() {
3479                    // Found duplicate by unique field
3480                    return Ok(ImportResult::Skipped);
3481                }
3482            }
3483        }
3484
3485        // Create and insert document
3486        let document = Document {
3487            id: doc_id,
3488            data: data_map,
3489        };
3490
3491        self.put(
3492            format!("{}:{}", collection, document.id),
3493            serde_json::to_vec(&document)?,
3494            None,
3495        )
3496        .await?;
3497
3498        Ok(ImportResult::Imported)
3499    }
3500
3501    /// Validate that a JSON value matches the expected field type
3502    fn validate_field_value(&self, value: &JsonValue, field_type: &FieldType) -> bool {
3503        match field_type {
3504            FieldType::String => value.is_string(),
3505            FieldType::Int => value.is_i64() || value.is_u64(),
3506            FieldType::Float => value.is_number(),
3507            FieldType::Bool => value.is_boolean(),
3508            FieldType::Array => value.is_array(),
3509            FieldType::Object => value.is_object(),
3510            FieldType::Uuid => {
3511                value.is_string() && Uuid::parse_str(value.as_str().unwrap_or("")).is_ok()
3512            }
3513            FieldType::Any => true,
3514        }
3515    }
3516
3517    /// Convert a JSON value to our internal Value type
3518    #[allow(clippy::only_used_in_recursion)]
3519    fn json_to_value(&self, json_value: &JsonValue) -> Result<Value> {
3520        match json_value {
3521            JsonValue::Null => Ok(Value::Null),
3522            JsonValue::Bool(b) => Ok(Value::Bool(*b)),
3523            JsonValue::Number(n) => {
3524                if let Some(i) = n.as_i64() {
3525                    Ok(Value::Int(i))
3526                } else if let Some(f) = n.as_f64() {
3527                    Ok(Value::Float(f))
3528                } else {
3529                    Err(AqlError::invalid_operation(
3530                        "Invalid number value".to_string(),
3531                    ))
3532                }
3533            }
3534            JsonValue::String(s) => {
3535                // Try parsing as UUID first
3536                if let Ok(uuid) = Uuid::parse_str(s) {
3537                    Ok(Value::Uuid(uuid))
3538                } else {
3539                    Ok(Value::String(s.clone()))
3540                }
3541            }
3542            JsonValue::Array(arr) => {
3543                let mut values = Vec::new();
3544                for item in arr {
3545                    values.push(self.json_to_value(item)?);
3546                }
3547                Ok(Value::Array(values))
3548            }
3549            JsonValue::Object(obj) => {
3550                let mut map = HashMap::new();
3551                for (k, v) in obj {
3552                    map.insert(k.clone(), self.json_to_value(v)?);
3553                }
3554                Ok(Value::Object(map))
3555            }
3556        }
3557    }
3558
3559    /// Get collection definition
3560    pub fn get_collection_definition(&self, collection: &str) -> Result<Collection> {
3561        if let Some(data) = self.get(&format!("_collection:{}", collection))? {
3562            let collection_def: Collection = serde_json::from_slice(&data)?;
3563            Ok(collection_def)
3564        } else {
3565            Err(AqlError::new(
3566                ErrorCode::CollectionNotFound,
3567                collection.to_string(),
3568            ))
3569        }
3570    }
3571
3572    /// Get storage statistics and information about the database
3573    pub fn get_database_stats(&self) -> Result<DatabaseStats> {
3574        let hot_stats = self.hot.get_stats();
3575        let cold_stats = self.cold.get_stats()?;
3576
3577        Ok(DatabaseStats {
3578            hot_stats,
3579            cold_stats,
3580            estimated_size: self.cold.estimated_size(),
3581            collections: self.get_collection_stats()?,
3582        })
3583    }
3584
3585    /// Check if a key is currently stored in the hot cache
3586    pub fn is_in_hot_cache(&self, key: &str) -> bool {
3587        self.hot.is_hot(key)
3588    }
3589
3590    /// Clear the hot cache (useful when memory needs to be freed)
3591    pub fn clear_hot_cache(&self) {
3592        self.hot.clear();
3593        println!(
3594            "Hot cache cleared, current hit ratio: {:.2}%",
3595            self.hot.hit_ratio() * 100.0
3596        );
3597    }
3598
3599    /// Prewarm the cache by loading frequently accessed data from cold storage
3600    ///
3601    /// Loads documents from a collection into memory cache to eliminate cold-start
3602    /// latency. Dramatically improves initial query performance after database startup
3603    /// by preloading the most commonly accessed data.
3604    ///
3605    /// # Performance Impact
3606    /// - Prewarming speed: ~20,000 docs/sec
3607    /// - Improves subsequent read latency from ~2ms (disk) to ~0.001ms (memory)
3608    /// - Cache hit rate jumps from 0% to 95%+ for prewarmed data
3609    /// - Memory cost: ~500 bytes per document average
3610    ///
3611    /// # Arguments
3612    /// * `collection` - The collection to prewarm
3613    /// * `limit` - Maximum number of documents to load (default: 1000, None = all)
3614    ///
3615    /// # Returns
3616    /// Number of documents loaded into cache
3617    ///
3618    /// # Examples
3619    ///
3620    /// ```
3621    /// use aurora_db::Aurora;
3622    ///
3623    /// let db = Aurora::open("mydb.db")?;
3624    ///
3625    /// // Prewarm frequently accessed collection
3626    /// let loaded = db.prewarm_cache("users", Some(1000)).await?;
3627    /// println!("Prewarmed {} user documents", loaded);
3628    ///
3629    /// // Now queries are fast from the start
3630    /// let stats_before = db.get_cache_stats();
3631    /// let users = db.query("users").collect().await?;
3632    /// let stats_after = db.get_cache_stats();
3633    ///
3634    /// // High hit rate thanks to prewarming
3635    /// assert!(stats_after.hit_rate > 0.95);
3636    ///
3637    /// // Startup optimization pattern
3638    /// async fn startup_prewarm(db: &Aurora) -> Result<()> {
3639    ///     println!("Prewarming caches...");
3640    ///
3641    ///     // Prewarm most frequently accessed collections
3642    ///     db.prewarm_cache("users", Some(5000)).await?;
3643    ///     db.prewarm_cache("sessions", Some(1000)).await?;
3644    ///     db.prewarm_cache("products", Some(500)).await?;
3645    ///
3646    ///     let stats = db.get_cache_stats();
3647    ///     println!("Cache prewarmed: {} entries loaded", stats.size);
3648    ///
3649    ///     Ok(())
3650    /// }
3651    ///
3652    /// // Web server startup
3653    /// #[tokio::main]
3654    /// async fn main() {
3655    ///     let db = Aurora::open("app.db").unwrap();
3656    ///
3657    ///     // Prewarm before accepting requests
3658    ///     db.prewarm_cache("users", Some(10000)).await.unwrap();
3659    ///
3660    ///     // Server is now ready with hot cache
3661    ///     start_web_server(db).await;
3662    /// }
3663    ///
3664    /// // Prewarm all documents (for small collections)
3665    /// let all_loaded = db.prewarm_cache("config", None).await?;
3666    /// // All config documents now in memory
3667    ///
3668    /// // Selective prewarming based on access patterns
3669    /// async fn smart_prewarm(db: &Aurora) -> Result<()> {
3670    ///     // Load recent users (they're accessed most)
3671    ///     db.prewarm_cache("users", Some(1000)).await?;
3672    ///
3673    ///     // Load active sessions only
3674    ///     let active_sessions = db.query("sessions")
3675    ///         .filter(|f| f.eq("active", Value::Bool(true)))
3676    ///         .limit(500)
3677    ///         .collect()
3678    ///         .await?;
3679    ///
3680    ///     // Manually populate cache with hot data
3681    ///     for session in active_sessions {
3682    ///         // Reading automatically caches
3683    ///         db.get_document("sessions", &session.id)?;
3684    ///     }
3685    ///
3686    ///     Ok(())
3687    /// }
3688    /// ```
3689    ///
3690    /// # Typical Prewarming Scenarios
3691    ///
3692    /// **Web Application Startup:**
3693    /// ```
3694    /// // Load user data, sessions, and active content
3695    /// db.prewarm_cache("users", Some(5000)).await?;
3696    /// db.prewarm_cache("sessions", Some(2000)).await?;
3697    /// db.prewarm_cache("posts", Some(1000)).await?;
3698    /// ```
3699    ///
3700    /// **E-commerce Site:**
3701    /// ```
3702    /// // Load products, categories, and user carts
3703    /// db.prewarm_cache("products", Some(500)).await?;
3704    /// db.prewarm_cache("categories", None).await?;  // All categories
3705    /// db.prewarm_cache("active_carts", Some(1000)).await?;
3706    /// ```
3707    ///
3708    /// **API Server:**
3709    /// ```
3710    /// // Load authentication data and rate limits
3711    /// db.prewarm_cache("api_keys", None).await?;
3712    /// db.prewarm_cache("rate_limits", Some(10000)).await?;
3713    /// ```
3714    ///
3715    /// # When to Use
3716    /// - At application startup to eliminate cold-start latency
3717    /// - After cache clear operations
3718    /// - Before high-traffic events (product launches, etc.)
3719    /// - When deploying new instances (load balancer warm-up)
3720    ///
3721    /// # Memory Considerations
3722    /// - 1,000 docs ≈ 500 KB memory
3723    /// - 10,000 docs ≈ 5 MB memory
3724    /// - 100,000 docs ≈ 50 MB memory
3725    /// - Stay within configured cache capacity
3726    ///
3727    /// # See Also
3728    /// - `get_cache_stats()` to monitor cache effectiveness
3729    /// - `prewarm_all_collections()` to prewarm all collections
3730    /// - `Aurora::with_config()` to adjust cache capacity
3731    pub async fn prewarm_cache(&self, collection: &str, limit: Option<usize>) -> Result<usize> {
3732        let limit = limit.unwrap_or(1000);
3733        let prefix = format!("{}:", collection);
3734        let mut loaded = 0;
3735
3736        for entry in self.cold.scan_prefix(&prefix) {
3737            if loaded >= limit {
3738                break;
3739            }
3740
3741            if let Ok((key, value)) = entry {
3742                // Load into hot cache
3743                self.hot.set(Arc::new(key.clone()), Arc::new(value), None);
3744                loaded += 1;
3745            }
3746        }
3747
3748        println!("Prewarmed {} with {} documents", collection, loaded);
3749        Ok(loaded)
3750    }
3751
3752    /// Prewarm cache for all collections
3753    pub async fn prewarm_all_collections(
3754        &self,
3755        docs_per_collection: Option<usize>,
3756    ) -> Result<HashMap<String, usize>> {
3757        let mut stats = HashMap::new();
3758
3759        // Get all collections
3760        let collections: Vec<String> = self
3761            .cold
3762            .scan()
3763            .filter_map(|r| r.ok())
3764            .map(|(k, _)| k)
3765            .filter(|k| k.starts_with("_collection:"))
3766            .map(|k| k.trim_start_matches("_collection:").to_string())
3767            .collect();
3768
3769        for collection in collections {
3770            let loaded = self.prewarm_cache(&collection, docs_per_collection).await?;
3771            stats.insert(collection, loaded);
3772        }
3773
3774        Ok(stats)
3775    }
3776
3777    /// Store multiple key-value pairs efficiently in a single batch operation
3778    ///
3779    /// Low-level batch write operation that bypasses document validation and
3780    /// writes raw byte data directly to storage. Useful for advanced use cases,
3781    /// custom serialization, or maximum performance scenarios.
3782    ///
3783    /// # Performance
3784    /// - Write speed: ~100,000 writes/sec
3785    /// - Single disk fsync for entire batch
3786    /// - No validation or schema checking
3787    /// - Direct storage access
3788    ///
3789    /// # Arguments
3790    /// * `pairs` - Vector of (key, value) tuples where value is raw bytes
3791    ///
3792    /// # Returns
3793    /// Success or an error
3794    ///
3795    /// # Examples
3796    ///
3797    /// ```
3798    /// use aurora_db::Aurora;
3799    ///
3800    /// let db = Aurora::open("mydb.db")?;
3801    ///
3802    /// // Low-level batch write
3803    /// let pairs = vec![
3804    ///     ("users:123".to_string(), b"raw data 1".to_vec()),
3805    ///     ("users:456".to_string(), b"raw data 2".to_vec()),
3806    ///     ("cache:key1".to_string(), b"cached value".to_vec()),
3807    /// ];
3808    ///
3809    /// db.batch_write(pairs)?;
3810    ///
3811    /// // Custom binary serialization
3812    /// use bincode;
3813    ///
3814    /// #[derive(Serialize, Deserialize)]
3815    /// struct CustomData {
3816    ///     id: u64,
3817    ///     payload: Vec<u8>,
3818    /// }
3819    ///
3820    /// let custom_data = vec![
3821    ///     CustomData { id: 1, payload: vec![1, 2, 3] },
3822    ///     CustomData { id: 2, payload: vec![4, 5, 6] },
3823    /// ];
3824    ///
3825    /// let pairs: Vec<(String, Vec<u8>)> = custom_data
3826    ///     .iter()
3827    ///     .map(|data| {
3828    ///         let key = format!("binary:{}", data.id);
3829    ///         let value = bincode::serialize(data).unwrap();
3830    ///         (key, value)
3831    ///     })
3832    ///     .collect();
3833    ///
3834    /// db.batch_write(pairs)?;
3835    ///
3836    /// // Bulk cache population
3837    /// let cache_entries: Vec<(String, Vec<u8>)> = (0..10000)
3838    ///     .map(|i| {
3839    ///         let key = format!("cache:item_{}", i);
3840    ///         let value = format!("value_{}", i).into_bytes();
3841    ///         (key, value)
3842    ///     })
3843    ///     .collect();
3844    ///
3845    /// db.batch_write(cache_entries)?;
3846    /// // Writes 10,000 entries in ~100ms
3847    /// ```
3848    ///
3849    /// # Important Notes
3850    /// - No schema validation performed
3851    /// - No unique constraint checking
3852    /// - No automatic indexing
3853    /// - Keys must follow "collection:id" format for proper grouping
3854    /// - Values are raw bytes - you handle serialization
3855    /// - Use `batch_insert()` for validated document inserts
3856    ///
3857    /// # When to Use
3858    /// - Maximum write performance needed
3859    /// - Custom serialization formats (bincode, msgpack, etc.)
3860    /// - Cache population
3861    /// - Low-level database operations
3862    /// - You're bypassing the document model
3863    ///
3864    /// # When NOT to Use
3865    /// - Regular document inserts → Use `batch_insert()` instead
3866    /// - Need validation → Use `batch_insert()` instead
3867    /// - Need indexing → Use `batch_insert()` instead
3868    ///
3869    /// # See Also
3870    /// - `batch_insert()` for validated document batch inserts
3871    /// - `put()` for single key-value writes
3872    pub async fn batch_write(&self, pairs: Vec<(String, Vec<u8>)>) -> Result<()> {
3873        // Group pairs by collection name
3874        let mut collections: HashMap<String, Vec<(String, Vec<u8>)>> = HashMap::new();
3875        for (key, value) in &pairs {
3876            if let Some(collection_name) = key.split(':').next() {
3877                collections
3878                    .entry(collection_name.to_string())
3879                    .or_default()
3880                    .push((key.clone(), value.clone()));
3881            }
3882        }
3883
3884        // First, do the batch write to cold storage for all pairs
3885        self.cold.batch_set(pairs)?;
3886
3887        // Then, process each collection for in-memory updates
3888        for (collection_name, batch) in collections {
3889            // --- Optimized Batch Indexing ---
3890
3891            // 1. Get schema once for the entire collection batch
3892            let collection_obj = match self.schema_cache.get(&collection_name) {
3893                Some(cached_schema) => Arc::clone(cached_schema.value()),
3894                None => {
3895                    let collection_key = format!("_collection:{}", collection_name);
3896                    match self.get(&collection_key)? {
3897                        Some(data) => {
3898                            let obj: Collection = serde_json::from_slice(&data)?;
3899                            let arc_obj = Arc::new(obj);
3900                            self.schema_cache
3901                                .insert(collection_name.to_string(), Arc::clone(&arc_obj));
3902                            arc_obj
3903                        }
3904                        None => continue,
3905                    }
3906                }
3907            };
3908
3909            let indexed_fields: Vec<String> = collection_obj
3910                .fields
3911                .iter()
3912                .filter(|(_, def)| def.indexed || def.unique)
3913                .map(|(name, _)| name.clone())
3914                .collect();
3915
3916            let primary_index = self
3917                .primary_indices
3918                .entry(collection_name.to_string())
3919                .or_default();
3920
3921            for (key, value) in batch {
3922                // 2. Update hot cache
3923                if self.should_cache_key(&key) {
3924                    self.hot
3925                        .set(Arc::new(key.clone()), Arc::new(value.clone()), None);
3926                }
3927
3928                // 3. Update primary index with metadata only
3929                let location = DiskLocation::new(value.len());
3930                primary_index.insert(key.clone(), location);
3931
3932                // 4. Update secondary indices
3933                if !indexed_fields.is_empty()
3934                    && let Ok(doc) = serde_json::from_slice::<Document>(&value)
3935                {
3936                    for (field, field_value) in doc.data {
3937                        if indexed_fields.contains(&field) {
3938                            let value_str = match &field_value {
3939                                Value::String(s) => s.clone(),
3940                                _ => field_value.to_string(),
3941                            };
3942                            let index_key = format!("{}:{}", collection_name, field);
3943                            let secondary_index =
3944                                self.secondary_indices.entry(index_key).or_default();
3945
3946                            let max_entries = self.config.max_index_entries_per_field;
3947                            secondary_index
3948                                .entry(value_str)
3949                                .and_modify(|doc_ids| {
3950                                    if doc_ids.len() < max_entries {
3951                                        doc_ids.push(key.to_string());
3952                                    }
3953                                })
3954                                .or_insert_with(|| vec![key.to_string()]);
3955                        }
3956                    }
3957                }
3958            }
3959        }
3960
3961        Ok(())
3962    }
3963
3964    /// Scan for keys with a specific prefix
3965    pub fn scan_with_prefix(
3966        &self,
3967        prefix: &str,
3968    ) -> impl Iterator<Item = Result<(String, Vec<u8>)>> + '_ {
3969        self.cold.scan_prefix(prefix)
3970    }
3971
3972    /// Get storage efficiency metrics for the database
3973    pub fn get_collection_stats(&self) -> Result<HashMap<String, CollectionStats>> {
3974        let mut stats = HashMap::new();
3975
3976        // Scan all collections
3977        let collections: Vec<String> = self
3978            .cold
3979            .scan()
3980            .filter_map(|r| r.ok())
3981            .map(|(k, _)| k)
3982            .filter(|k| k.starts_with("_collection:"))
3983            .map(|k| k.trim_start_matches("_collection:").to_string())
3984            .collect();
3985
3986        for collection in collections {
3987            // Use primary index for fast stats (count + size from DiskLocation)
3988            let (count, size) = if let Some(index) = self.primary_indices.get(&collection) {
3989                let count = index.len();
3990                // Sum up sizes from DiskLocation metadata (much faster than disk scan)
3991                let size: usize = index.iter().map(|entry| entry.value().size as usize).sum();
3992                (count, size)
3993            } else {
3994                // Fallback: scan from cold storage if index not available
3995                let prefix = format!("{}:", collection);
3996                let count = self.cold.scan_prefix(&prefix).count();
3997                let size: usize = self
3998                    .cold
3999                    .scan_prefix(&prefix)
4000                    .filter_map(|r| r.ok())
4001                    .map(|(_, v)| v.len())
4002                    .sum();
4003                (count, size)
4004            };
4005
4006            stats.insert(
4007                collection,
4008                CollectionStats {
4009                    count,
4010                    size_bytes: size,
4011                    avg_doc_size: if count > 0 { size / count } else { 0 },
4012                },
4013            );
4014        }
4015
4016        Ok(stats)
4017    }
4018
4019    /// Search for documents by exact value using an index
4020    ///
4021    /// This method performs a fast lookup using a pre-created index
4022    pub fn search_by_value(
4023        &self,
4024        collection: &str,
4025        field: &str,
4026        value: &Value,
4027    ) -> Result<Vec<Document>> {
4028        let index_key = format!("_index:{}:{}", collection, field);
4029
4030        if let Some(index_data) = self.get(&index_key)? {
4031            let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
4032            let index = Index::new(index_def);
4033
4034            // Use the previously unused search method
4035            if let Some(doc_ids) = index.search(value) {
4036                // Load the documents by ID
4037                let mut docs = Vec::new();
4038                for id in doc_ids {
4039                    if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
4040                        let doc: Document = serde_json::from_slice(&doc_data)?;
4041                        docs.push(doc);
4042                    }
4043                }
4044                return Ok(docs);
4045            }
4046        }
4047
4048        // Return empty result if no index or no matches
4049        Ok(Vec::new())
4050    }
4051
4052    /// Perform a full-text search on an indexed text field
4053    ///
4054    /// This provides more advanced text search capabilities including
4055    /// relevance ranking of results
4056    pub fn full_text_search(
4057        &self,
4058        collection: &str,
4059        field: &str,
4060        query: &str,
4061    ) -> Result<Vec<Document>> {
4062        let index_key = format!("_index:{}:{}", collection, field);
4063
4064        if let Some(index_data) = self.get(&index_key)? {
4065            let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
4066
4067            // Ensure this is a full-text index
4068            if !matches!(index_def.index_type, IndexType::FullText) {
4069                return Err(AqlError::invalid_operation(format!(
4070                    "Field '{}' is not indexed as full-text",
4071                    field
4072                )));
4073            }
4074
4075            let index = Index::new(index_def);
4076
4077            // Use the previously unused search_text method
4078            if let Some(doc_id_scores) = index.search_text(query) {
4079                // Load the documents by ID, preserving score order
4080                let mut docs = Vec::new();
4081                for (id, _score) in doc_id_scores {
4082                    if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
4083                        let doc: Document = serde_json::from_slice(&doc_data)?;
4084                        docs.push(doc);
4085                    }
4086                }
4087                return Ok(docs);
4088            }
4089        }
4090
4091        // Return empty result if no index or no matches
4092        Ok(Vec::new())
4093    }
4094
4095    /// Create a full-text search index on a text field
4096    pub async fn create_text_index(
4097        &self,
4098        collection: &str,
4099        field: &str,
4100        _enable_stop_words: bool,
4101    ) -> Result<()> {
4102        // Check if collection exists
4103        if self.get(&format!("_collection:{}", collection))?.is_none() {
4104            return Err(AqlError::new(
4105                ErrorCode::CollectionNotFound,
4106                collection.to_string(),
4107            ));
4108        }
4109
4110        // Create index definition
4111        let index_def = IndexDefinition {
4112            name: format!("{}_{}_fulltext", collection, field),
4113            collection: collection.to_string(),
4114            fields: vec![field.to_string()],
4115            index_type: IndexType::FullText,
4116            unique: false,
4117        };
4118
4119        // Store index definition
4120        let index_key = format!("_index:{}:{}", collection, field);
4121        self.put(index_key, serde_json::to_vec(&index_def)?, None)
4122            .await?;
4123
4124        // Create the actual index
4125        let index = Index::new(index_def);
4126
4127        // Index all existing documents in the collection
4128        let prefix = format!("{}:", collection);
4129        for (_, data) in self.cold.scan_prefix(&prefix).flatten() {
4130            let doc: Document = serde_json::from_slice(&data)?;
4131            index.insert(&doc)?;
4132        }
4133
4134        Ok(())
4135    }
4136
4137    pub async fn execute_simple_query(
4138        &self,
4139        builder: &SimpleQueryBuilder,
4140    ) -> Result<Vec<Document>> {
4141        // Ensure indices are initialized
4142        self.ensure_indices_initialized().await?;
4143
4144        // A place to store the IDs of the documents we need to fetch
4145        let mut doc_ids_to_load: Option<Vec<String>> = None;
4146
4147        // --- The "Query Planner" ---
4148        // Smart heuristic: For range queries with small LIMITs, full scan can be faster
4149        // than collecting millions of IDs from secondary index
4150        let use_index_for_range = if let Some(limit) = builder.limit {
4151            // If limit is small (< 1000), prefer full scan for range queries
4152            // The secondary index would scan all entries anyway, might as well
4153            // scan documents directly and benefit from early termination
4154            limit >= 1000
4155        } else {
4156            // No limit? Index might still help if result set is small
4157            true
4158        };
4159
4160        // Look for an opportunity to use an index
4161        for (_filter_idx, filter) in builder.filters.iter().enumerate() {
4162            match filter {
4163                Filter::Eq(field, value) => {
4164                    let index_key = format!("{}:{}", &builder.collection, field);
4165
4166                    // Do we have a secondary index for this field?
4167                    if let Some(index) = self.secondary_indices.get(&index_key) {
4168                        // Yes! Let's use it.
4169                        if let Some(matching_ids) = index.get(&value.to_string()) {
4170                            doc_ids_to_load = Some(matching_ids.clone());
4171                            break; // Stop searching for other indexes for now
4172                        }
4173                    }
4174                }
4175                Filter::Gt(field, value)
4176                | Filter::Gte(field, value)
4177                | Filter::Lt(field, value)
4178                | Filter::Lte(field, value) => {
4179                    // Skip index for range queries with small LIMITs (see query planner heuristic above)
4180                    if !use_index_for_range {
4181                        continue;
4182                    }
4183
4184                    let index_key = format!("{}:{}", &builder.collection, field);
4185
4186                    // Do we have a secondary index for this field?
4187                    if let Some(index) = self.secondary_indices.get(&index_key) {
4188                        // For range queries, we need to scan through the index values
4189                        let mut matching_ids = Vec::new();
4190
4191                        for entry in index.iter() {
4192                            let index_value_str = entry.key();
4193
4194                            // Try to parse the index value to compare with our filter value
4195                            if let Ok(index_value) =
4196                                self.parse_value_from_string(index_value_str, value)
4197                            {
4198                                let matches = match filter {
4199                                    Filter::Gt(_, filter_val) => index_value > *filter_val,
4200                                    Filter::Gte(_, filter_val) => index_value >= *filter_val,
4201                                    Filter::Lt(_, filter_val) => index_value < *filter_val,
4202                                    Filter::Lte(_, filter_val) => index_value <= *filter_val,
4203                                    _ => false,
4204                                };
4205
4206                                if matches {
4207                                    matching_ids.extend(entry.value().clone());
4208                                }
4209                            }
4210                        }
4211
4212                        if !matching_ids.is_empty() {
4213                            doc_ids_to_load = Some(matching_ids);
4214                            break;
4215                        }
4216                    }
4217                }
4218                Filter::Contains(field, search_term) => {
4219                    let index_key = format!("{}:{}", &builder.collection, field);
4220
4221                    // Do we have a secondary index for this field?
4222                    if let Some(index) = self.secondary_indices.get(&index_key) {
4223                        let mut matching_ids = Vec::new();
4224
4225                        for entry in index.iter() {
4226                            let index_value_str = entry.key();
4227
4228                            // Check if this indexed value contains our search term
4229                            if index_value_str
4230                                .to_lowercase()
4231                                .contains(&search_term.to_lowercase())
4232                            {
4233                                matching_ids.extend(entry.value().clone());
4234                            }
4235                        }
4236
4237                        if !matching_ids.is_empty() {
4238                            // Remove duplicates since a document could match multiple indexed values
4239                            matching_ids.sort();
4240                            matching_ids.dedup();
4241
4242                            doc_ids_to_load = Some(matching_ids);
4243                            break;
4244                        }
4245                    }
4246                }
4247                Filter::And(_) => {
4248                    // For compound filters, we can't easily use a single index
4249                    // This would require more complex query planning
4250                    continue;
4251                }
4252                Filter::Or(sub_filters) => {
4253                    // For OR filters, collect union of all matching IDs from each sub-filter
4254                    let mut union_ids: Vec<String> = Vec::new();
4255                    let mut used_index = false;
4256
4257                    for sub_filter in sub_filters {
4258                        match sub_filter {
4259                            Filter::Eq(field, value) => {
4260                                let index_key = format!("{}:{}", &builder.collection, field);
4261                                if let Some(index) = self.secondary_indices.get(&index_key) {
4262                                    if let Some(matching_ids) = index.get(&value.to_string()) {
4263                                        union_ids.extend(matching_ids.clone());
4264                                        used_index = true;
4265                                    }
4266                                }
4267                            }
4268                            // For other filter types in OR, we fall back to full scan
4269                            _ => continue,
4270                        }
4271                    }
4272
4273                    if used_index && !union_ids.is_empty() {
4274                        // Remove duplicates
4275                        union_ids.sort();
4276                        union_ids.dedup();
4277                        doc_ids_to_load = Some(union_ids);
4278                        break;
4279                    }
4280                    // If no index was used, continue to full scan
4281                }
4282            }
4283        }
4284
4285        let mut final_docs: Vec<Document>;
4286
4287        if let Some(ids) = doc_ids_to_load {
4288            // Index path
4289            use std::io::Write;
4290            if let Ok(mut file) = std::fs::OpenOptions::new()
4291                .create(true)
4292                .append(true)
4293                .open("/tmp/aurora_query_stats.log")
4294            {
4295                let _ = writeln!(
4296                    file,
4297                    "[INDEX PATH] IDs to load: {} | Collection: {}",
4298                    ids.len(),
4299                    builder.collection
4300                );
4301            }
4302
4303            final_docs = Vec::with_capacity(ids.len());
4304
4305            for id in ids {
4306                let doc_key = format!("{}:{}", &builder.collection, id);
4307                if let Some(data) = self.get(&doc_key)?
4308                    && let Ok(doc) = serde_json::from_slice::<Document>(&data)
4309                {
4310                    final_docs.push(doc);
4311                }
4312            }
4313        } else {
4314            // --- Path 2: Full Collection Scan with Early Termination ---
4315
4316            // Optimization: If we have a LIMIT but no ORDER BY, we can stop scanning
4317            // as soon as we have enough matching documents
4318            let early_termination_target = if builder.order_by.is_none() {
4319                builder.limit.map(|l| l + builder.offset.unwrap_or(0))
4320            } else {
4321                // With ORDER BY, we need all matching docs to sort correctly
4322                None
4323            };
4324
4325            // Smart scan with early termination support
4326            final_docs = Vec::new();
4327            let mut scan_stats = (0usize, 0usize, 0usize); // (keys_scanned, docs_fetched, matches_found)
4328
4329            if let Some(index) = self.primary_indices.get(&builder.collection) {
4330                for entry in index.iter() {
4331                    let key = entry.key();
4332                    scan_stats.0 += 1; // keys scanned
4333
4334                    // Early termination check
4335                    if let Some(target) = early_termination_target {
4336                        if final_docs.len() >= target {
4337                            break; // We have enough documents!
4338                        }
4339                    }
4340
4341                    // Fetch and filter document
4342                    if let Some(data) = self.get(key)? {
4343                        scan_stats.1 += 1; // docs fetched
4344
4345                        if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
4346                            // Apply all filters
4347                            let matches_all_filters =
4348                                builder.filters.iter().all(|filter| match filter {
4349                                    Filter::Eq(field, value) => doc.data.get(field) == Some(value),
4350                                    Filter::Gt(field, value) => {
4351                                        doc.data.get(field).is_some_and(|v| v > value)
4352                                    }
4353                                    Filter::Gte(field, value) => {
4354                                        doc.data.get(field).is_some_and(|v| v >= value)
4355                                    }
4356                                    Filter::Lt(field, value) => {
4357                                        doc.data.get(field).is_some_and(|v| v < value)
4358                                    }
4359                                    Filter::Lte(field, value) => {
4360                                        doc.data.get(field).is_some_and(|v| v <= value)
4361                                    }
4362                                    Filter::Contains(field, value_str) => {
4363                                        doc.data.get(field).is_some_and(|v| match v {
4364                                            Value::String(s) => s.contains(value_str),
4365                                            Value::Array(arr) => {
4366                                                arr.contains(&Value::String(value_str.clone()))
4367                                            }
4368                                            _ => false,
4369                                        })
4370                                    }
4371                                    Filter::And(filters) => filters.iter().all(|f| f.matches(&doc)),
4372                                    Filter::Or(filters) => filters.iter().any(|f| f.matches(&doc)),
4373                                });
4374
4375                            if matches_all_filters {
4376                                scan_stats.2 += 1; // matches found
4377                                final_docs.push(doc);
4378                            }
4379                        }
4380                    }
4381                }
4382
4383                // Debug logging for query performance analysis
4384                use std::io::Write;
4385                if let Ok(mut file) = std::fs::OpenOptions::new()
4386                    .create(true)
4387                    .append(true)
4388                    .open("/tmp/aurora_query_stats.log")
4389                {
4390                    let _ = writeln!(
4391                        file,
4392                        "[SCAN PATH] Scanned: {} keys | Fetched: {} docs | Matched: {} | Collection: {}",
4393                        scan_stats.0, scan_stats.1, scan_stats.2, builder.collection
4394                    );
4395                }
4396            } else {
4397                // Fallback: scan from cold storage if index not initialized
4398                final_docs = self.get_all_collection(&builder.collection).await?;
4399
4400                // Apply filters
4401                final_docs.retain(|doc| {
4402                    builder.filters.iter().all(|filter| match filter {
4403                        Filter::Eq(field, value) => doc.data.get(field) == Some(value),
4404                        Filter::Gt(field, value) => doc.data.get(field).is_some_and(|v| v > value),
4405                        Filter::Gte(field, value) => {
4406                            doc.data.get(field).is_some_and(|v| v >= value)
4407                        }
4408                        Filter::Lt(field, value) => doc.data.get(field).is_some_and(|v| v < value),
4409                        Filter::Lte(field, value) => {
4410                            doc.data.get(field).is_some_and(|v| v <= value)
4411                        }
4412                        Filter::Contains(field, value_str) => {
4413                            doc.data.get(field).is_some_and(|v| match v {
4414                                Value::String(s) => s.contains(value_str),
4415                                Value::Array(arr) => {
4416                                    arr.contains(&Value::String(value_str.clone()))
4417                                }
4418                                _ => false,
4419                            })
4420                        }
4421                        Filter::And(filters) => filters.iter().all(|f| f.matches(doc)),
4422                        Filter::Or(filters) => filters.iter().any(|f| f.matches(doc)),
4423                    })
4424                });
4425            }
4426        }
4427
4428        // Apply ordering
4429        if let Some((field, ascending)) = &builder.order_by {
4430            final_docs.sort_by(|a, b| match (a.data.get(field), b.data.get(field)) {
4431                (Some(v1), Some(v2)) => {
4432                    let cmp = v1.cmp(v2);
4433                    if *ascending { cmp } else { cmp.reverse() }
4434                }
4435                (None, Some(_)) => std::cmp::Ordering::Less,
4436                (Some(_), None) => std::cmp::Ordering::Greater,
4437                (None, None) => std::cmp::Ordering::Equal,
4438            });
4439        }
4440
4441        // Apply offset and limit
4442        let start = builder.offset.unwrap_or(0);
4443        let end = builder
4444            .limit
4445            .map(|l| start.saturating_add(l))
4446            .unwrap_or(final_docs.len());
4447
4448        let end = end.min(final_docs.len());
4449        Ok(final_docs.get(start..end).unwrap_or(&[]).to_vec())
4450    }
4451
4452    /// Helper method to parse a string value back to a Value for comparison
4453    fn parse_value_from_string(&self, value_str: &str, reference_value: &Value) -> Result<Value> {
4454        match reference_value {
4455            Value::Int(_) => {
4456                if let Ok(i) = value_str.parse::<i64>() {
4457                    Ok(Value::Int(i))
4458                } else {
4459                    Err(AqlError::invalid_operation(
4460                        "Failed to parse int".to_string(),
4461                    ))
4462                }
4463            }
4464            Value::Float(_) => {
4465                if let Ok(f) = value_str.parse::<f64>() {
4466                    Ok(Value::Float(f))
4467                } else {
4468                    Err(AqlError::invalid_operation(
4469                        "Failed to parse float".to_string(),
4470                    ))
4471                }
4472            }
4473            Value::String(_) => Ok(Value::String(value_str.to_string())),
4474            _ => Ok(Value::String(value_str.to_string())),
4475        }
4476    }
4477
4478    pub async fn execute_dynamic_query(
4479        &self,
4480        collection: &str,
4481        payload: &QueryPayload,
4482    ) -> Result<Vec<Document>> {
4483        let mut docs = self.get_all_collection(collection).await?;
4484
4485        // 1. Apply Filters
4486        if let Some(filters) = &payload.filters {
4487            docs.retain(|doc| {
4488                filters.iter().all(|filter| {
4489                    doc.data
4490                        .get(&filter.field)
4491                        .is_some_and(|doc_val| check_filter(doc_val, filter))
4492                })
4493            });
4494        }
4495
4496        // 2. Apply Sorting
4497        if let Some(sort_options) = &payload.sort {
4498            docs.sort_by(|a, b| {
4499                let a_val = a.data.get(&sort_options.field);
4500                let b_val = b.data.get(&sort_options.field);
4501                let ordering = a_val
4502                    .partial_cmp(&b_val)
4503                    .unwrap_or(std::cmp::Ordering::Equal);
4504                if sort_options.ascending {
4505                    ordering
4506                } else {
4507                    ordering.reverse()
4508                }
4509            });
4510        }
4511
4512        // 3. Apply Pagination
4513        if let Some(offset) = payload.offset {
4514            docs = docs.into_iter().skip(offset).collect();
4515        }
4516        if let Some(limit) = payload.limit {
4517            docs = docs.into_iter().take(limit).collect();
4518        }
4519
4520        // 4. Apply Field Selection (Projection)
4521        if let Some(select_fields) = &payload.select
4522            && !select_fields.is_empty()
4523        {
4524            docs = docs
4525                .into_iter()
4526                .map(|mut doc| {
4527                    doc.data.retain(|key, _| select_fields.contains(key));
4528                    doc
4529                })
4530                .collect();
4531        }
4532
4533        Ok(docs)
4534    }
4535
4536    pub async fn process_network_request(
4537        &self,
4538        request: crate::network::protocol::Request,
4539    ) -> crate::network::protocol::Response {
4540        use crate::network::protocol::Response;
4541
4542        match request {
4543            crate::network::protocol::Request::Get(key) => match self.get(&key) {
4544                Ok(value) => Response::Success(value),
4545                Err(e) => Response::Error(e.to_string()),
4546            },
4547            crate::network::protocol::Request::Put(key, value) => {
4548                match self.put(key, value, None).await {
4549                    Ok(_) => Response::Done,
4550                    Err(e) => Response::Error(e.to_string()),
4551                }
4552            }
4553            crate::network::protocol::Request::Delete(key) => match self.delete(&key).await {
4554                Ok(_) => Response::Done,
4555                Err(e) => Response::Error(e.to_string()),
4556            },
4557            crate::network::protocol::Request::NewCollection { name, fields } => {
4558                let fields_for_db: Vec<(String, crate::types::FieldType, bool)> = fields
4559                    .iter()
4560                    .map(|(name, ft, unique)| (name.clone(), ft.clone(), *unique))
4561                    .collect();
4562
4563                match self.new_collection(&name, fields_for_db).await {
4564                    Ok(_) => Response::Done,
4565                    Err(e) => Response::Error(e.to_string()),
4566                }
4567            }
4568            crate::network::protocol::Request::Insert { collection, data } => {
4569                match self.insert_map(&collection, data).await {
4570                    Ok(id) => Response::Message(id),
4571                    Err(e) => Response::Error(e.to_string()),
4572                }
4573            }
4574            crate::network::protocol::Request::GetDocument { collection, id } => {
4575                match self.get_document(&collection, &id) {
4576                    Ok(doc) => Response::Document(doc),
4577                    Err(e) => Response::Error(e.to_string()),
4578                }
4579            }
4580            crate::network::protocol::Request::Query(builder) => {
4581                match self.execute_simple_query(&builder).await {
4582                    Ok(docs) => Response::Documents(docs),
4583                    Err(e) => Response::Error(e.to_string()),
4584                }
4585            }
4586            crate::network::protocol::Request::BeginTransaction => {
4587                let tx_id = self.begin_transaction();
4588                Response::TransactionId(tx_id.as_u64())
4589            }
4590            crate::network::protocol::Request::CommitTransaction(tx_id_u64) => {
4591                let tx_id = crate::transaction::TransactionId::from_u64(tx_id_u64);
4592                match self.commit_transaction(tx_id) {
4593                    Ok(_) => Response::Done,
4594                    Err(e) => Response::Error(e.to_string()),
4595                }
4596            }
4597            crate::network::protocol::Request::RollbackTransaction(tx_id_u64) => {
4598                let tx_id = crate::transaction::TransactionId::from_u64(tx_id_u64);
4599                match self.rollback_transaction(tx_id) {
4600                    Ok(_) => Response::Done,
4601                    Err(e) => Response::Error(e.to_string()),
4602                }
4603            }
4604        }
4605    }
4606
4607    /// Create indices for commonly queried fields automatically
4608    ///
4609    /// This is a convenience method that creates indices for fields that are
4610    /// likely to be queried frequently, improving performance.
4611    ///
4612    /// # Arguments
4613    /// * `collection` - Name of the collection
4614    /// * `fields` - List of field names to create indices for
4615    ///
4616    /// # Examples
4617    /// ```
4618    /// // Create indices for commonly queried fields
4619    /// db.create_indices("users", &["email", "status", "created_at"]).await?;
4620    /// ```
4621    pub async fn create_indices(&self, collection: &str, fields: &[&str]) -> Result<()> {
4622        for field in fields {
4623            if let Err(e) = self.create_index(collection, field).await {
4624                eprintln!(
4625                    "Warning: Failed to create index for {}.{}: {}",
4626                    collection, field, e
4627                );
4628            } else {
4629                println!("Created index for {}.{}", collection, field);
4630            }
4631        }
4632        Ok(())
4633    }
4634
4635    /// Get index statistics for a collection
4636    ///
4637    /// This helps understand which indices exist and how effective they are.
4638    pub fn get_index_stats(&self, collection: &str) -> HashMap<String, IndexStats> {
4639        let mut stats = HashMap::new();
4640
4641        for entry in self.secondary_indices.iter() {
4642            let key = entry.key();
4643            if key.starts_with(&format!("{}:", collection)) {
4644                let field = key.split(':').nth(1).unwrap_or("unknown");
4645                let index = entry.value();
4646
4647                let unique_values = index.len();
4648                let total_documents: usize = index.iter().map(|entry| entry.value().len()).sum();
4649
4650                stats.insert(
4651                    field.to_string(),
4652                    IndexStats {
4653                        unique_values,
4654                        total_documents,
4655                        avg_docs_per_value: if unique_values > 0 {
4656                            total_documents / unique_values
4657                        } else {
4658                            0
4659                        },
4660                    },
4661                );
4662            }
4663        }
4664
4665        stats
4666    }
4667
4668    /// Optimize a collection by creating indices for frequently filtered fields
4669    ///
4670    /// This analyzes common query patterns and suggests/creates optimal indices.
4671    pub async fn optimize_collection(&self, collection: &str) -> Result<()> {
4672        if let Ok(collection_def) = self.get_collection_definition(collection) {
4673            let field_names: Vec<&str> = collection_def.fields.keys().map(|s| s.as_str()).collect();
4674            self.create_indices(collection, &field_names).await?;
4675        }
4676
4677        Ok(())
4678    }
4679
4680    // Helper method to get unique fields from a collection
4681    fn get_unique_fields(&self, collection: &Collection) -> Vec<String> {
4682        collection
4683            .fields
4684            .iter()
4685            .filter(|(_, def)| def.unique)
4686            .map(|(name, _)| name.clone())
4687            .collect()
4688    }
4689
4690    // Update the validation method to use the helper
4691    async fn validate_unique_constraints(
4692        &self,
4693        collection: &str,
4694        data: &HashMap<String, Value>,
4695    ) -> Result<()> {
4696        self.ensure_indices_initialized().await?;
4697        let collection_def = match self.get_collection_definition(collection) {
4698            Ok(def) => def,
4699            Err(e) if e.code == crate::error::ErrorCode::CollectionNotFound => return Ok(()),
4700            Err(e) => return Err(e),
4701        };
4702        let unique_fields = self.get_unique_fields(&collection_def);
4703
4704        for unique_field in &unique_fields {
4705            if let Some(value) = data.get(unique_field) {
4706                let index_key = format!("{}:{}", collection, unique_field);
4707                if let Some(index) = self.secondary_indices.get(&index_key) {
4708                    // Get the raw string value without JSON formatting
4709                    let value_str = match value {
4710                        Value::String(s) => s.clone(),
4711                        _ => value.to_string(),
4712                    };
4713                    if index.contains_key(&value_str) {
4714                        return Err(AqlError::new(
4715                            ErrorCode::UniqueConstraintViolation,
4716                            format!(
4717                                "Unique constraint violation on field '{}' with value '{}'",
4718                                unique_field, value_str
4719                            ),
4720                        ));
4721                    }
4722                }
4723            }
4724        }
4725        Ok(())
4726    }
4727
4728    /// Validate unique constraints excluding a specific document ID (for updates)
4729    async fn validate_unique_constraints_excluding(
4730        &self,
4731        collection: &str,
4732        data: &HashMap<String, Value>,
4733        exclude_id: &str,
4734    ) -> Result<()> {
4735        self.ensure_indices_initialized().await?;
4736        let collection_def = self.get_collection_definition(collection)?;
4737        let unique_fields = self.get_unique_fields(&collection_def);
4738
4739        for unique_field in &unique_fields {
4740            if let Some(value) = data.get(unique_field) {
4741                let index_key = format!("{}:{}", collection, unique_field);
4742                if let Some(index) = self.secondary_indices.get(&index_key) {
4743                    // Get the raw string value without JSON formatting
4744                    let value_str = match value {
4745                        Value::String(s) => s.clone(),
4746                        _ => value.to_string(),
4747                    };
4748                    if let Some(doc_ids) = index.get(&value_str) {
4749                        // Check if any document other than the excluded one has this value
4750                        let exclude_key = format!("{}:{}", collection, exclude_id);
4751                        for doc_key in doc_ids.value() {
4752                            if doc_key != &exclude_key {
4753                                return Err(AqlError::new(
4754                                    ErrorCode::UniqueConstraintViolation,
4755                                    format!(
4756                                        "Unique constraint violation on field '{}' with value '{}'",
4757                                        unique_field, value_str
4758                                    ),
4759                                ));
4760                            }
4761                        }
4762                    }
4763                }
4764            }
4765        }
4766        Ok(())
4767    }
4768
4769    // =========================================================================
4770    // AQL Executor Helper Methods
4771    // These are wrapper methods for AQL executor integration.
4772    // They provide a simplified API compatible with the AQL query execution layer.
4773    // =========================================================================
4774
4775    /// Get all documents in a collection (AQL helper)
4776    ///
4777    /// This is a wrapper around the internal query system optimized for bulk retrieval.
4778    pub async fn aql_get_all_collection(&self, collection: &str) -> Result<Vec<Document>> {
4779        self.ensure_indices_initialized().await?;
4780
4781        let prefix = format!("{}:", collection);
4782        let mut docs = Vec::new();
4783
4784        for result in self.cold.scan_prefix(&prefix) {
4785            let (_, value) = result?;
4786            if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
4787                docs.push(doc);
4788            }
4789        }
4790
4791        Ok(docs)
4792    }
4793
4794    /// Insert a document from a HashMap (AQL helper)
4795    ///
4796    /// Returns the complete document (not just ID) for AQL executor
4797    pub async fn aql_insert(
4798        &self,
4799        collection: &str,
4800        data: HashMap<String, Value>,
4801    ) -> Result<Document> {
4802        // Validate unique constraints before inserting
4803        self.validate_unique_constraints(collection, &data).await?;
4804
4805        let doc_id = Uuid::now_v7().to_string();
4806        let document = Document {
4807            id: doc_id.clone(),
4808            data,
4809        };
4810
4811        self.put(
4812            format!("{}:{}", collection, doc_id),
4813            serde_json::to_vec(&document)?,
4814            None,
4815        )
4816        .await?;
4817
4818        // Publish insert event
4819        let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
4820        let _ = self.pubsub.publish(event);
4821
4822        Ok(document)
4823    }
4824
4825    /// Update a document by ID with new data (AQL helper)
4826    ///
4827    /// Merges new data with existing data and returns updated document
4828    pub async fn aql_update_document(
4829        &self,
4830        collection: &str,
4831        doc_id: &str,
4832        updates: HashMap<String, Value>,
4833    ) -> Result<Document> {
4834        let key = format!("{}:{}", collection, doc_id);
4835
4836        // Get existing document
4837        let existing = self.get(&key)?.ok_or_else(|| {
4838            AqlError::new(
4839                ErrorCode::NotFound,
4840                format!("Document {} not found", doc_id),
4841            )
4842        })?;
4843
4844        let mut doc: Document = serde_json::from_slice(&existing)?;
4845        let old_doc = doc.clone();
4846
4847        // Merge updates into existing data
4848        for (k, v) in updates {
4849            doc.data.insert(k, v);
4850        }
4851
4852        // Validate unique constraints excluding this document
4853        self.validate_unique_constraints_excluding(collection, &doc.data, doc_id)
4854            .await?;
4855
4856        // Save updated document
4857        self.put(key, serde_json::to_vec(&doc)?, None).await?;
4858
4859        // Publish update event
4860        let event = crate::pubsub::ChangeEvent::update(collection, doc_id, old_doc, doc.clone());
4861        let _ = self.pubsub.publish(event);
4862
4863        Ok(doc)
4864    }
4865
4866    /// Delete a document by ID (AQL helper)
4867    ///
4868    /// Returns the deleted document
4869    pub async fn aql_delete_document(&self, collection: &str, doc_id: &str) -> Result<Document> {
4870        let key = format!("{}:{}", collection, doc_id);
4871
4872        // Get existing document first
4873        let existing = self.get(&key)?.ok_or_else(|| {
4874            AqlError::new(
4875                ErrorCode::NotFound,
4876                format!("Document {} not found", doc_id),
4877            )
4878        })?;
4879
4880        let doc: Document = serde_json::from_slice(&existing)?;
4881
4882        // Append to WAL for durability
4883        if let Some(wal) = &self.wal {
4884            wal.write()
4885                .map_err(|e| AqlError::new(ErrorCode::InternalError, e.to_string()))?
4886                .append(crate::wal::Operation::Delete, &key, None)?;
4887        }
4888
4889        // Delete from storage
4890        self.cold.delete(&key)?;
4891        self.hot.delete(&key);
4892
4893        // Remove from primary index
4894        if let Some(index) = self.primary_indices.get_mut(collection) {
4895            index.remove(&key);
4896        }
4897
4898        // Remove from secondary indices
4899        for (field_name, value) in &doc.data {
4900            let index_key = format!("{}:{}", collection, field_name);
4901            if let Some(index) = self.secondary_indices.get_mut(&index_key) {
4902                let value_str = match value {
4903                    Value::String(s) => s.clone(),
4904                    _ => value.to_string(),
4905                };
4906                if let Some(mut doc_ids) = index.get_mut(&value_str) {
4907                    doc_ids.retain(|id| id != &key);
4908                }
4909            }
4910        }
4911
4912        // Publish delete event
4913        let event = crate::pubsub::ChangeEvent::delete(collection, doc_id);
4914        let _ = self.pubsub.publish(event);
4915
4916        Ok(doc)
4917    }
4918
4919    /// Get a single document by ID (AQL helper)
4920    pub async fn aql_get_document(
4921        &self,
4922        collection: &str,
4923        doc_id: &str,
4924    ) -> Result<Option<Document>> {
4925        let key = format!("{}:{}", collection, doc_id);
4926
4927        match self.get(&key)? {
4928            Some(data) => {
4929                let doc: Document = serde_json::from_slice(&data)?;
4930                Ok(Some(doc))
4931            }
4932            None => Ok(None),
4933        }
4934    }
4935
4936    /// Begin a transaction (AQL helper) - returns transaction ID
4937    pub fn aql_begin_transaction(&self) -> Result<crate::transaction::TransactionId> {
4938        Ok(self.begin_transaction())
4939    }
4940
4941    /// Commit a transaction (AQL helper)
4942    pub async fn aql_commit_transaction(
4943        &self,
4944        tx_id: crate::transaction::TransactionId,
4945    ) -> Result<()> {
4946        self.commit_transaction(tx_id)
4947    }
4948
4949    /// Rollback a transaction (AQL helper)
4950    pub async fn aql_rollback_transaction(
4951        &self,
4952        tx_id: crate::transaction::TransactionId,
4953    ) -> Result<()> {
4954        self.transaction_manager.rollback(tx_id)
4955    }
4956
4957    // ============================================
4958    // AQL Schema Management Wrappers
4959    // ============================================
4960
4961    /// Create a collection from AST schema definition
4962    pub async fn create_collection_schema(
4963        &self,
4964        name: &str,
4965        fields: HashMap<String, crate::types::FieldDefinition>,
4966    ) -> Result<()> {
4967        let collection_key = format!("_collection:{}", name);
4968
4969        // Check if collection already exists
4970        if self.get(&collection_key)?.is_some() {
4971            // Already exists
4972            return Ok(());
4973        }
4974
4975        let collection = Collection {
4976            name: name.to_string(),
4977            fields,
4978        };
4979
4980        let collection_data = serde_json::to_vec(&collection)?;
4981        self.put(collection_key, collection_data, None).await?;
4982        self.schema_cache.remove(name);
4983
4984        Ok(())
4985    }
4986
4987    /// Add a field to an existing collection schema
4988    pub async fn add_field_to_schema(
4989        &self,
4990        collection_name: &str,
4991        name: String,
4992        definition: crate::types::FieldDefinition,
4993    ) -> Result<()> {
4994        let mut collection = self
4995            .get_collection_definition(collection_name)
4996            .map_err(|_| {
4997                AqlError::new(ErrorCode::CollectionNotFound, collection_name.to_string())
4998            })?;
4999
5000        if collection.fields.contains_key(&name) {
5001            return Err(AqlError::new(
5002                ErrorCode::InvalidDefinition,
5003                format!(
5004                    "Field '{}' already exists in collection '{}'",
5005                    name, collection_name
5006                ),
5007            ));
5008        }
5009
5010        collection.fields.insert(name, definition);
5011
5012        let collection_key = format!("_collection:{}", collection_name);
5013        let collection_data = serde_json::to_vec(&collection)?;
5014        self.put(collection_key, collection_data, None).await?;
5015        self.schema_cache.remove(collection_name);
5016
5017        Ok(())
5018    }
5019
5020    /// Drop a field from an existing collection schema
5021    pub async fn drop_field_from_schema(
5022        &self,
5023        collection_name: &str,
5024        field_name: String,
5025    ) -> Result<()> {
5026        let mut collection = self
5027            .get_collection_definition(collection_name)
5028            .map_err(|_| {
5029                AqlError::new(ErrorCode::CollectionNotFound, collection_name.to_string())
5030            })?;
5031
5032        if !collection.fields.contains_key(&field_name) {
5033            return Err(AqlError::new(
5034                ErrorCode::InvalidDefinition,
5035                format!(
5036                    "Field '{}' does not exist in collection '{}'",
5037                    field_name, collection_name
5038                ),
5039            ));
5040        }
5041
5042        collection.fields.remove(&field_name);
5043
5044        let collection_key = format!("_collection:{}", collection_name);
5045        let collection_data = serde_json::to_vec(&collection)?;
5046        self.put(collection_key, collection_data, None).await?;
5047        self.schema_cache.remove(collection_name);
5048
5049        Ok(())
5050    }
5051
5052    /// Rename a field in an existing collection schema
5053    pub async fn rename_field_in_schema(
5054        &self,
5055        collection_name: &str,
5056        from: String,
5057        to: String,
5058    ) -> Result<()> {
5059        let mut collection = self
5060            .get_collection_definition(collection_name)
5061            .map_err(|_| {
5062                AqlError::new(ErrorCode::CollectionNotFound, collection_name.to_string())
5063            })?;
5064
5065        if let Some(def) = collection.fields.remove(&from) {
5066            if collection.fields.contains_key(&to) {
5067                return Err(AqlError::new(
5068                    ErrorCode::InvalidDefinition,
5069                    format!(
5070                        "Target field name '{}' already exists in collection '{}'",
5071                        to, collection_name
5072                    ),
5073                ));
5074            }
5075            collection.fields.insert(to, def);
5076        } else {
5077            return Err(AqlError::new(
5078                ErrorCode::InvalidDefinition,
5079                format!("Field '{}' not found", from),
5080            ));
5081        }
5082
5083        let collection_key = format!("_collection:{}", collection_name);
5084        let collection_data = serde_json::to_vec(&collection)?;
5085        self.put(collection_key, collection_data, None).await?;
5086        self.schema_cache.remove(collection_name);
5087
5088        Ok(())
5089    }
5090
5091    /// Modify a field in an existing collection schema
5092    pub async fn modify_field_in_schema(
5093        &self,
5094        collection_name: &str,
5095        name: String,
5096        definition: crate::types::FieldDefinition,
5097    ) -> Result<()> {
5098        let mut collection = self
5099            .get_collection_definition(collection_name)
5100            .map_err(|_| {
5101                AqlError::new(ErrorCode::CollectionNotFound, collection_name.to_string())
5102            })?;
5103
5104        if !collection.fields.contains_key(&name) {
5105            return Err(AqlError::new(
5106                ErrorCode::InvalidDefinition,
5107                format!(
5108                    "Field '{}' does not exist in collection '{}'",
5109                    name, collection_name
5110                ),
5111            ));
5112        }
5113
5114        collection.fields.insert(name, definition);
5115
5116        let collection_key = format!("_collection:{}", collection_name);
5117        let collection_data = serde_json::to_vec(&collection)?;
5118        self.put(collection_key, collection_data, None).await?;
5119        self.schema_cache.remove(collection_name);
5120
5121        Ok(())
5122    }
5123
5124    /// Drop an entire collection definition
5125    pub async fn drop_collection_schema(&self, collection_name: &str) -> Result<()> {
5126        let collection_key = format!("_collection:{}", collection_name);
5127        self.cold.delete(&collection_key)?;
5128        self.schema_cache.remove(collection_name);
5129        Ok(())
5130    }
5131
5132    // ============================================
5133    // AQL Migration Wrappers
5134    // ============================================
5135
5136    /// Check if a migration version has been applied
5137    pub async fn is_migration_applied(&self, version: &str) -> Result<bool> {
5138        let migration_key = format!("_sys_migration:{}", version);
5139        Ok(self.get(&migration_key)?.is_some())
5140    }
5141
5142    /// Mark a migration version as applied
5143    pub async fn mark_migration_applied(&self, version: &str) -> Result<()> {
5144        let migration_key = format!("_sys_migration:{}", version);
5145        let timestamp = chrono::Utc::now().to_rfc3339();
5146        self.put(migration_key, timestamp.as_bytes().to_vec(), None)
5147            .await?;
5148        Ok(())
5149    }
5150}
5151
5152impl Drop for Aurora {
5153    fn drop(&mut self) {
5154        // Signal checkpoint task to shutdown gracefully
5155        if let Some(ref shutdown_tx) = self.checkpoint_shutdown {
5156            let _ = shutdown_tx.send(());
5157        }
5158        // Signal compaction task to shutdown gracefully
5159        if let Some(ref shutdown_tx) = self.compaction_shutdown {
5160            let _ = shutdown_tx.send(());
5161        }
5162    }
5163}
5164
5165fn check_filter(doc_val: &Value, filter: &HttpFilter) -> bool {
5166    let filter_val = match json_to_value(&filter.value) {
5167        Ok(v) => v,
5168        Err(_) => return false,
5169    };
5170
5171    match filter.operator {
5172        FilterOperator::Eq => doc_val == &filter_val,
5173        FilterOperator::Ne => doc_val != &filter_val,
5174        FilterOperator::Gt => doc_val > &filter_val,
5175        FilterOperator::Gte => doc_val >= &filter_val,
5176        FilterOperator::Lt => doc_val < &filter_val,
5177        FilterOperator::Lte => doc_val <= &filter_val,
5178        FilterOperator::Contains => match (doc_val, &filter_val) {
5179            (Value::String(s), Value::String(fv)) => s.contains(fv),
5180            (Value::Array(arr), _) => arr.contains(&filter_val),
5181            _ => false,
5182        },
5183    }
5184}
5185
5186/// Results of importing a document
5187enum ImportResult {
5188    Imported,
5189    Skipped,
5190}
5191
5192/// Statistics from an import operation
5193#[derive(Debug, Default)]
5194pub struct ImportStats {
5195    /// Number of documents successfully imported
5196    pub imported: usize,
5197    /// Number of documents skipped (usually because they already exist)
5198    pub skipped: usize,
5199    /// Number of documents that failed to import
5200    pub failed: usize,
5201}
5202
5203/// Statistics for a specific collection
5204#[derive(Debug)]
5205pub struct CollectionStats {
5206    /// Number of documents in the collection
5207    pub count: usize,
5208    /// Total size of the collection in bytes
5209    pub size_bytes: usize,
5210    /// Average document size in bytes
5211    pub avg_doc_size: usize,
5212}
5213
5214/// Statistics for an index
5215#[derive(Debug)]
5216pub struct IndexStats {
5217    /// Number of unique values in the index
5218    pub unique_values: usize,
5219    /// Total number of documents covered by the index
5220    pub total_documents: usize,
5221    /// Average number of documents per unique value
5222    pub avg_docs_per_value: usize,
5223}
5224
5225/// Combined database statistics
5226#[derive(Debug)]
5227pub struct DatabaseStats {
5228    /// Hot cache statistics
5229    pub hot_stats: crate::storage::hot::CacheStats,
5230    /// Cold storage statistics
5231    pub cold_stats: crate::storage::cold::ColdStoreStats,
5232    /// Estimated total database size in bytes
5233    pub estimated_size: u64,
5234    /// Statistics for each collection
5235    pub collections: HashMap<String, CollectionStats>,
5236}
5237
5238#[cfg(test)]
5239mod tests {
5240    use super::*;
5241    use tempfile::tempdir;
5242
5243    #[tokio::test]
5244    async fn test_async_wal_integration() {
5245        let temp_dir = tempfile::tempdir().unwrap();
5246        let db_path = temp_dir.path().join("test_wal_integration.db");
5247        let db = Aurora::open(db_path.to_str().unwrap()).unwrap();
5248
5249        // 1. Enable WAL explicitly (if your config defaults to off for tests)
5250        // Note: Your implementation might default it on based on AuroraConfig.
5251
5252        // Create collection schema first
5253        db.new_collection(
5254            "users",
5255            vec![
5256                ("id", FieldType::String, false),
5257                ("counter", FieldType::Int, false),
5258            ],
5259        )
5260        .await
5261        .unwrap();
5262
5263        let db_clone = db.clone();
5264
5265        // 2. Spawn 50 concurrent writes
5266        let handles: Vec<_> = (0..50)
5267            .map(|i| {
5268                let db = db_clone.clone();
5269                tokio::spawn(async move {
5270                    let doc_id = db
5271                        .insert_into(
5272                            "users",
5273                            vec![
5274                                ("id", Value::String(format!("user-{}", i))),
5275                                ("counter", Value::Int(i)),
5276                            ],
5277                        )
5278                        .await
5279                        .unwrap();
5280                    (i, doc_id) // Return both the index and the document ID
5281                })
5282            })
5283            .collect();
5284
5285        // Wait for all to complete and collect the results
5286        let results = futures::future::join_all(handles).await;
5287        assert!(results.iter().all(|r| r.is_ok()), "Some writes failed");
5288
5289        // Extract the document IDs
5290        let doc_ids: Vec<(i64, String)> = results.into_iter().map(|r| r.unwrap()).collect();
5291
5292        // Find the document ID for the one with counter=25
5293        let target_doc_id = doc_ids
5294            .iter()
5295            .find(|(counter, _)| *counter == 25i64)
5296            .map(|(_, doc_id)| doc_id)
5297            .unwrap();
5298
5299        // 4. Verify Data Integrity
5300        let doc = db.get_document("users", target_doc_id).unwrap().unwrap();
5301        assert_eq!(doc.data.get("counter"), Some(&Value::Int(25)));
5302    }
5303
5304    #[tokio::test]
5305    async fn test_basic_operations() -> Result<()> {
5306        let temp_dir = tempdir()?;
5307        let db_path = temp_dir.path().join("test.aurora");
5308        let db = Aurora::open(db_path.to_str().unwrap())?;
5309
5310        // Test collection creation
5311        db.new_collection(
5312            "users",
5313            vec![
5314                ("name", FieldType::String, false),
5315                ("age", FieldType::Int, false),
5316                ("email", FieldType::String, true),
5317            ],
5318        )
5319        .await?;
5320
5321        // Test document insertion
5322        let doc_id = db
5323            .insert_into(
5324                "users",
5325                vec![
5326                    ("name", Value::String("John Doe".to_string())),
5327                    ("age", Value::Int(30)),
5328                    ("email", Value::String("john@example.com".to_string())),
5329                ],
5330            )
5331            .await?;
5332
5333        // Test document retrieval
5334        let doc = db.get_document("users", &doc_id)?.unwrap();
5335        assert_eq!(
5336            doc.data.get("name").unwrap(),
5337            &Value::String("John Doe".to_string())
5338        );
5339        assert_eq!(doc.data.get("age").unwrap(), &Value::Int(30));
5340
5341        Ok(())
5342    }
5343
5344    #[tokio::test]
5345    async fn test_transactions() -> Result<()> {
5346        let temp_dir = tempdir()?;
5347        let db_path = temp_dir.path().join("test.aurora");
5348        let db = Aurora::open(db_path.to_str().unwrap())?;
5349
5350        // Create collection
5351        db.new_collection("test", vec![("field", FieldType::String, false)])
5352            .await?;
5353
5354        // Start transaction
5355        let tx_id = db.begin_transaction();
5356
5357        // Insert document
5358        let doc_id = db
5359            .insert_into("test", vec![("field", Value::String("value".to_string()))])
5360            .await?;
5361
5362        // Commit transaction
5363        db.commit_transaction(tx_id)?;
5364
5365        // Verify document exists
5366        let doc = db.get_document("test", &doc_id)?.unwrap();
5367        assert_eq!(
5368            doc.data.get("field").unwrap(),
5369            &Value::String("value".to_string())
5370        );
5371
5372        Ok(())
5373    }
5374
5375    #[tokio::test]
5376    async fn test_query_operations() -> Result<()> {
5377        let temp_dir = tempdir()?;
5378        let db_path = temp_dir.path().join("test.aurora");
5379        let db = Aurora::open(db_path.to_str().unwrap())?;
5380
5381        // Test collection creation
5382        db.new_collection(
5383            "books",
5384            vec![
5385                ("title", FieldType::String, false),
5386                ("author", FieldType::String, false),
5387                ("year", FieldType::Int, false),
5388            ],
5389        )
5390        .await?;
5391
5392        // Test document insertion
5393        db.insert_into(
5394            "books",
5395            vec![
5396                ("title", Value::String("Book 1".to_string())),
5397                ("author", Value::String("Author 1".to_string())),
5398                ("year", Value::Int(2020)),
5399            ],
5400        )
5401        .await?;
5402
5403        db.insert_into(
5404            "books",
5405            vec![
5406                ("title", Value::String("Book 2".to_string())),
5407                ("author", Value::String("Author 2".to_string())),
5408                ("year", Value::Int(2021)),
5409            ],
5410        )
5411        .await?;
5412
5413        // Test query
5414        let results = db
5415            .query("books")
5416            .filter(|f| f.gt("year", Value::Int(2019)))
5417            .order_by("year", true)
5418            .collect()
5419            .await?;
5420
5421        assert_eq!(results.len(), 2);
5422        assert!(results[0].data.get("year").unwrap() < results[1].data.get("year").unwrap());
5423
5424        Ok(())
5425    }
5426
5427    #[tokio::test]
5428    async fn test_blob_operations() -> Result<()> {
5429        let temp_dir = tempdir()?;
5430        let db_path = temp_dir.path().join("test.aurora");
5431        let db = Aurora::open(db_path.to_str().unwrap())?;
5432
5433        // Create test file
5434        let file_path = temp_dir.path().join("test.txt");
5435        std::fs::write(&file_path, b"Hello, World!")?;
5436
5437        // Test blob storage
5438        db.put_blob("test:blob".to_string(), &file_path).await?;
5439
5440        // Verify blob exists
5441        let data = db.get_data_by_pattern("test:blob")?;
5442        assert_eq!(data.len(), 1);
5443        match &data[0].1 {
5444            DataInfo::Blob { size } => assert_eq!(*size, 13 + 5), // content + "BLOB:" prefix
5445            _ => panic!("Expected Blob type"),
5446        }
5447
5448        Ok(())
5449    }
5450
5451    #[tokio::test]
5452    async fn test_blob_size_limit() -> Result<()> {
5453        let temp_dir = tempdir()?;
5454        let db_path = temp_dir.path().join("test.aurora");
5455        let db = Aurora::open(db_path.to_str().unwrap())?;
5456
5457        // Create a test file that's too large (201MB)
5458        let large_file_path = temp_dir.path().join("large_file.bin");
5459        let large_data = vec![0u8; 201 * 1024 * 1024];
5460        std::fs::write(&large_file_path, &large_data)?;
5461
5462        // Attempt to store the large file
5463        let result = db
5464            .put_blob("test:large_blob".to_string(), &large_file_path)
5465            .await;
5466
5467        assert!(result.is_err());
5468        let err = result.unwrap_err();
5469        assert!(err.code == ErrorCode::InvalidOperation); // Expected error
5470
5471        Ok(())
5472    }
5473
5474    #[tokio::test]
5475    async fn test_unique_constraints() -> Result<()> {
5476        let temp_dir = tempdir()?;
5477        let db_path = temp_dir.path().join("test.aurora");
5478        let db = Aurora::open(db_path.to_str().unwrap())?;
5479
5480        // Create collection with unique email field
5481        db.new_collection(
5482            "users",
5483            vec![
5484                ("name", FieldType::String, false),
5485                ("email", FieldType::String, true), // unique field
5486                ("age", FieldType::Int, false),
5487            ],
5488        )
5489        .await?;
5490
5491        // Insert first document
5492        let _doc_id1 = db
5493            .insert_into(
5494                "users",
5495                vec![
5496                    ("name", Value::String("John Doe".to_string())),
5497                    ("email", Value::String("john@example.com".to_string())),
5498                    ("age", Value::Int(30)),
5499                ],
5500            )
5501            .await?;
5502
5503        // Try to insert second document with same email - should fail
5504        let result = db
5505            .insert_into(
5506                "users",
5507                vec![
5508                    ("name", Value::String("Jane Doe".to_string())),
5509                    ("email", Value::String("john@example.com".to_string())), // duplicate email
5510                    ("age", Value::Int(25)),
5511                ],
5512            )
5513            .await;
5514
5515        assert!(result.is_err());
5516        if let Err(e) = result {
5517            if e.code == ErrorCode::UniqueConstraintViolation {
5518                let msg = e.message;
5519                assert!(msg.contains("email"));
5520                assert!(msg.contains("john@example.com")); // Changed from test@example.com to match the actual value
5521            } else {
5522                panic!("Expected UniqueConstraintViolation, got {:?}", e);
5523            }
5524        } else {
5525            panic!("Expected UniqueConstraintViolation");
5526        }
5527
5528        // Test upsert with unique constraint
5529        // Should succeed for new document
5530        let _doc_id2 = db
5531            .upsert(
5532                "users",
5533                "user2",
5534                vec![
5535                    ("name", Value::String("Alice Smith".to_string())),
5536                    ("email", Value::String("alice@example.com".to_string())),
5537                    ("age", Value::Int(28)),
5538                ],
5539            )
5540            .await?;
5541
5542        // Should fail when trying to upsert with duplicate email
5543        let result = db
5544            .upsert(
5545                "users",
5546                "user3",
5547                vec![
5548                    ("name", Value::String("Bob Wilson".to_string())),
5549                    ("email", Value::String("alice@example.com".to_string())), // duplicate
5550                    ("age", Value::Int(35)),
5551                ],
5552            )
5553            .await;
5554
5555        assert!(result.is_err());
5556
5557        // Should succeed when updating existing document with same email (no change)
5558        let result = db
5559            .upsert(
5560                "users",
5561                "user2",
5562                vec![
5563                    ("name", Value::String("Alice Updated".to_string())),
5564                    ("email", Value::String("alice@example.com".to_string())), // same email, same doc
5565                    ("age", Value::Int(29)),
5566                ],
5567            )
5568            .await;
5569
5570        assert!(result.is_ok());
5571
5572        Ok(())
5573    }
5574}