Skip to main content

aurora_db/
db.rs

1//! # Aurora Database
2//!
3//! Aurora is an embedded document database with tiered storage architecture.
4//! It provides document storage, querying, indexing, and search capabilities
5//! while optimizing for both performance and durability.
6//!
7//! ## Key Features
8//!
9//! * **Tiered Storage**: Hot in-memory cache + persistent cold storage
10//! * **Document Model**: Schema-flexible JSON-like document storage
11//! * **Querying**: Rich query capabilities with filtering and sorting
12//! * **Full-text Search**: Built-in search engine with relevance ranking
13//! * **Transactions**: ACID-compliant transaction support
14//! * **Blob Storage**: Efficient storage for large binary objects
15//!
16//! ## Usage Example
17//!
18//! ```rust
19//! use aurora::Aurora;
20//!
21//! // Open a database
22//! let db = Aurora::open("my_database.db")?;
23//!
24//! // Create a collection with schema
25//! db.new_collection("users", vec![
26//!     ("name", FieldType::String, false),
27//!     ("email", FieldType::String, true),  // unique field
28//!     ("age", FieldType::Int, false),
29//! ])?;
30//!
31//! // Insert a document
32//! let user_id = db.insert_into("users", vec![
33//!     ("name", Value::String("Jane Doe".to_string())),
34//!     ("email", Value::String("jane@example.com".to_string())),
35//!     ("age", Value::Int(28)),
36//! ])?;
37//!
38//! // Query for documents
39//! let adult_users = db.query("users")
40//!     .filter(|f| f.gt("age", 18))
41//!     .order_by("name", true)
42//!     .collect()
43//!     .await?;
44//! ```
45
46use crate::error::{AqlError, ErrorCode, Result};
47use crate::index::{Index, IndexDefinition, IndexType};
48use crate::network::http_models::{
49    Filter as HttpFilter, FilterOperator, QueryPayload, json_to_value,
50};
51use crate::parser;
52use crate::parser::executor::{ExecutionOptions, ExecutionPlan, ExecutionResult};
53use crate::query::FilterBuilder;
54use crate::query::{Filter, QueryBuilder, SearchBuilder, SimpleQueryBuilder};
55use crate::storage::{ColdStore, HotStore, WriteBuffer};
56use crate::types::{
57    AuroraConfig, Collection, Document, DurabilityMode, FieldDefinition, FieldType, Value,
58};
59use crate::wal::{Operation, WriteAheadLog};
60use dashmap::DashMap;
61use serde_json::Value as JsonValue;
62use serde_json::from_str;
63use std::collections::HashMap;
64use std::fmt;
65use std::fs::File as StdFile;
66use std::path::{Path, PathBuf};
67use std::sync::{Arc, RwLock};
68use std::time::Duration;
69use tokio::fs::File;
70use tokio::fs::read_to_string;
71use tokio::io::AsyncReadExt;
72use tokio::sync::OnceCell;
73use uuid::Uuid;
74
75// Disk location metadata for primary index
76// Instead of storing full Vec<u8> values, we store minimal metadata
77#[derive(Debug, Clone, Copy)]
78struct DiskLocation {
79    size: u32, // Size in bytes (useful for statistics)
80}
81
82impl DiskLocation {
83    fn new(size: usize) -> Self {
84        Self { size: size as u32 }
85    }
86}
87
88// Index types for faster lookups
89type PrimaryIndex = DashMap<String, DiskLocation>;
90type SecondaryIndex = DashMap<String, Vec<String>>;
91
92// Move DataInfo enum outside impl block
93#[derive(Debug)]
94pub enum DataInfo {
95    Data { size: usize, preview: String },
96    Blob { size: usize },
97    Compressed { size: usize },
98}
99
100/// Trait to convert field tuples into FieldDefinition
101/// Supports both 3-tuple (defaults nullable to false) and 4-tuple (explicit nullable)
102pub trait IntoFieldDefinition {
103    fn into_field_definition(self) -> (String, FieldDefinition);
104}
105
106// 3-tuple: (field_name, field_type, unique) - defaults nullable to false
107impl<S: Into<String>> IntoFieldDefinition for (S, FieldType, bool) {
108    fn into_field_definition(self) -> (String, FieldDefinition) {
109        let (name, field_type, unique) = self;
110        (
111            name.into(),
112            FieldDefinition {
113                field_type,
114                unique,
115                indexed: unique,
116                nullable: false, // Default to false
117            },
118        )
119    }
120}
121
122// 4-tuple: (field_name, field_type, unique, nullable) - explicit control
123impl<S: Into<String>> IntoFieldDefinition for (S, FieldType, bool, bool) {
124    fn into_field_definition(self) -> (String, FieldDefinition) {
125        let (name, field_type, unique, nullable) = self;
126        (
127            name.into(),
128            FieldDefinition {
129                field_type,
130                unique,
131                indexed: unique,
132                nullable,
133            },
134        )
135    }
136}
137
138#[derive(Debug)]
139#[allow(dead_code)]
140enum WalOperation {
141    Put {
142        key: Arc<String>,
143        value: Arc<Vec<u8>>,
144    },
145    Delete {
146        key: String,
147    },
148}
149
150impl DataInfo {
151    pub fn size(&self) -> usize {
152        match self {
153            DataInfo::Data { size, .. } => *size,
154            DataInfo::Blob { size } => *size,
155            DataInfo::Compressed { size } => *size,
156        }
157    }
158}
159
160/// The main database engine
161///
162/// Aurora combines a tiered storage architecture with document-oriented database features:
163/// - Hot tier: In-memory cache for frequently accessed data
164/// - Cold tier: Persistent disk storage for durability
165/// - Primary indices: Fast key-based access
166/// - Secondary indices: Fast field-based queries
167///
168/// # Examples
169///
170/// ```
171/// // Open a database (creates if doesn't exist)
172/// let db = Aurora::open("my_app.db")?;
173///
174/// // Insert a document
175/// let doc_id = db.insert_into("users", vec![
176///     ("name", Value::String("Alice".to_string())),
177///     ("age", Value::Int(32)),
178/// ])?;
179///
180/// // Retrieve a document
181/// let user = db.get_document("users", &doc_id)?;
182/// ```
183pub struct Aurora {
184    hot: HotStore,
185    cold: Arc<ColdStore>,
186    primary_indices: Arc<DashMap<String, PrimaryIndex>>,
187    secondary_indices: Arc<DashMap<String, SecondaryIndex>>,
188    indices_initialized: Arc<OnceCell<()>>,
189    transaction_manager: crate::transaction::TransactionManager,
190    indices: Arc<DashMap<String, Index>>,
191    schema_cache: Arc<DashMap<String, Arc<Collection>>>,
192    config: AuroraConfig,
193    write_buffer: Option<WriteBuffer>,
194    pub pubsub: crate::pubsub::PubSubSystem,
195    // Write-ahead log for durability (optional, based on config)
196    wal: Option<Arc<RwLock<WriteAheadLog>>>,
197    // Background checkpoint task
198    checkpoint_shutdown: Option<tokio::sync::mpsc::UnboundedSender<()>>,
199    // Background compaction task
200    compaction_shutdown: Option<tokio::sync::mpsc::UnboundedSender<()>>,
201    // Worker system for background jobs
202    pub workers: Option<Arc<crate::workers::WorkerSystem>>,
203    //  Asynchronous WAL Writer Channel
204    wal_writer: Option<tokio::sync::mpsc::UnboundedSender<WalOperation>>,
205    // Computed fields manager
206    pub computed: Arc<RwLock<crate::computed::ComputedFields>>,
207}
208
209impl fmt::Debug for Aurora {
210    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211        f.debug_struct("Aurora")
212            .field("hot", &"HotStore")
213            .field("cold", &"ColdStore")
214            .field("primary_indices_count", &self.primary_indices.len())
215            .field("secondary_indices_count", &self.secondary_indices.len())
216            .field(
217                "active_transactions",
218                &self.transaction_manager.active_count(),
219            )
220            .field("indices_count", &self.indices.len())
221            .finish()
222    }
223}
224
225impl Clone for Aurora {
226    fn clone(&self) -> Self {
227        Self {
228            hot: self.hot.clone(),
229            cold: Arc::clone(&self.cold),
230            primary_indices: Arc::clone(&self.primary_indices),
231            secondary_indices: Arc::clone(&self.secondary_indices),
232            indices_initialized: Arc::clone(&self.indices_initialized),
233            transaction_manager: self.transaction_manager.clone(),
234            indices: Arc::clone(&self.indices),
235            schema_cache: Arc::clone(&self.schema_cache),
236            config: self.config.clone(),
237            write_buffer: None,
238            pubsub: self.pubsub.clone(),
239            wal: self.wal.clone(),
240            checkpoint_shutdown: self.checkpoint_shutdown.clone(),
241            compaction_shutdown: self.compaction_shutdown.clone(),
242            workers: self.workers.clone(),
243
244            wal_writer: self.wal_writer.clone(),
245            computed: Arc::clone(&self.computed),
246        }
247    }
248}
249
250/// Trait for converting inputs into execution parameters
251pub trait ToExecParams {
252    fn into_params(self) -> Result<(String, ExecutionOptions)>;
253}
254
255impl<'a> ToExecParams for &'a str {
256    fn into_params(self) -> Result<(String, ExecutionOptions)> {
257        Ok((self.to_string(), ExecutionOptions::default()))
258    }
259}
260
261impl ToExecParams for String {
262    fn into_params(self) -> Result<(String, ExecutionOptions)> {
263        Ok((self, ExecutionOptions::default()))
264    }
265}
266
267impl<'a, V> ToExecParams for (&'a str, V)
268where
269    V: Into<serde_json::Value>,
270{
271    fn into_params(self) -> Result<(String, ExecutionOptions)> {
272        let (aql, vars) = self;
273        let json_vars = vars.into();
274        let map = match json_vars {
275            serde_json::Value::Object(map) => map.into_iter().collect(),
276            _ => {
277                return Err(AqlError::new(
278                    ErrorCode::InvalidInput,
279                    "Variables must be a JSON object",
280                ));
281            }
282        };
283        Ok((
284            aql.to_string(),
285            ExecutionOptions {
286                variables: map,
287                ..Default::default()
288            },
289        ))
290    }
291}
292
293impl Aurora {
294    /// Execute AQL query (variables are optional)
295    ///
296    /// Supports two forms:
297    /// 1. `db.execute("query").await`
298    /// 2. `db.execute(("query", vars)).await`
299    pub async fn execute<I: ToExecParams>(&self, input: I) -> Result<ExecutionResult> {
300        let (aql, options) = input.into_params()?;
301        parser::executor::execute(self, &aql, options).await
302    }
303
304    /// Stream real-time changes using AQL subscription syntax
305    ///
306    /// This is a convenience method that extracts the `ChangeListener` from
307    /// an AQL subscription query, providing a cleaner API than using `execute()` directly.
308    ///
309    /// # Example
310    /// ```
311    /// // Stream changes from active products
312    /// let mut listener = db.stream(r#"
313    ///     subscription {
314    ///         products(where: { active: { eq: true } }) {
315    ///             id
316    ///             name
317    ///         }
318    ///     }
319    /// "#).await?;
320    ///
321    /// // Receive real-time events
322    /// while let Ok(event) = listener.recv().await {
323    ///     println!("Change: {:?} on {}", event.change_type, event.id);
324    /// }
325    /// ```
326    pub async fn stream(&self, aql: &str) -> Result<crate::pubsub::ChangeListener> {
327        let result = self.execute(aql).await?;
328
329        match result {
330            ExecutionResult::Subscription(sub) => sub.stream.ok_or_else(|| {
331                crate::error::AqlError::new(
332                    crate::error::ErrorCode::QueryError,
333                    "Subscription did not return a stream".to_string(),
334                )
335            }),
336            _ => Err(crate::error::AqlError::new(
337                crate::error::ErrorCode::QueryError,
338                "Expected a subscription query, got a different operation type".to_string(),
339            )),
340        }
341    }
342
343    /// Explain AQL query execution plan
344    pub async fn explain<I: ToExecParams>(&self, input: I) -> Result<ExecutionPlan> {
345        let (aql, options) = input.into_params()?;
346
347        // Parse and analyze without executing
348        let doc = parser::parse_with_variables(
349            &aql,
350            serde_json::Value::Object(options.variables.clone().into_iter().collect()),
351        )?;
352
353        self.analyze_execution_plan(&doc).await
354    }
355
356    /// Analyze execution plan for a parsed query
357    pub async fn analyze_execution_plan(
358        &self,
359        doc: &crate::parser::ast::Document,
360    ) -> Result<ExecutionPlan> {
361        let mut operations = Vec::new();
362        for op in &doc.operations {
363            operations.push(format!("{:?}", op));
364        }
365
366        Ok(ExecutionPlan {
367            operations,
368            estimated_cost: 1.0,
369        })
370    }
371    /// Remove stale lock files from a database directory
372    ///
373    /// If Aurora crashes or is forcefully terminated, it may leave behind lock files
374    /// that prevent the database from being reopened. This method safely removes
375    /// those lock files.
376    ///
377    /// # Safety
378    /// Only call this when you're certain no other Aurora instance is using the database.
379    /// Removing lock files while another process is running could cause data corruption.
380    ///
381    /// # Example
382    /// ```no_run
383    /// use aurora_db::Aurora;
384    ///
385    /// // If you get "Access denied" error when opening:
386    /// if let Err(e) = Aurora::open("my_db") {
387    ///     eprintln!("Failed to open: {}", e);
388    ///     // Try removing stale lock
389    ///     if Aurora::remove_stale_lock("my_db").unwrap_or(false) {
390    ///         println!("Removed stale lock, try opening again");
391    ///         let db = Aurora::open("my_db")?;
392    ///     }
393    /// }
394    /// # Ok::<(), aurora_db::error::AqlError>(())
395    /// ```
396    pub fn remove_stale_lock<P: AsRef<Path>>(path: P) -> Result<bool> {
397        let resolved_path = Self::resolve_path(path)?;
398        crate::storage::cold::ColdStore::try_remove_stale_lock(resolved_path.to_str().unwrap())
399    }
400
401    /// Open or create a database at the specified location
402    ///
403    /// # Arguments
404    /// * `path` - Path to the database file or directory
405    ///   - Absolute paths (like `/data/myapp.db`) are used as-is
406    ///   - Relative paths (like `./data/myapp.db`) are resolved relative to the current directory
407    ///   - Simple names (like `myapp.db`) use the current directory
408    ///
409    /// # Returns
410    /// An initialized `Aurora` database instance
411    ///
412    /// # Examples
413    ///
414
415    /// use aurora_db::Aurora;
416    ///
417    /// let db = Aurora::open("./data/my_application.db")?;
418    ///
419    /// // Or use a relative path
420    /// let db = Aurora::open("customer_data.db")?;
421    /// ```
422    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
423        let config = AuroraConfig {
424            db_path: Self::resolve_path(path)?,
425            ..Default::default()
426        };
427        Self::with_config(config)
428    }
429
430    /// Helper method to resolve database path
431    fn resolve_path<P: AsRef<Path>>(path: P) -> Result<PathBuf> {
432        let path = path.as_ref();
433
434        // If it's an absolute path, use it directly
435        if path.is_absolute() {
436            return Ok(path.to_path_buf());
437        }
438
439        // Otherwise, resolve relative to current directory
440        match std::env::current_dir() {
441            Ok(current_dir) => Ok(current_dir.join(path)),
442            Err(e) => Err(AqlError::new(
443                ErrorCode::IoError,
444                format!("Failed to resolve current directory: {}", e),
445            )),
446        }
447    }
448
449    /// Open a database with custom configuration
450    ///
451    /// # Arguments
452    /// * `config` - Database configuration settings
453    ///
454    /// # Examples
455    ///
456    /// ```
457    /// use aurora_db::{Aurora, types::AuroraConfig};
458    /// use std::time::Duration;
459    ///
460    /// let config = AuroraConfig {
461    ///     db_path: "my_data.db".into(),
462    ///     hot_cache_size_mb: 512,           // 512 MB cache
463    ///     enable_write_buffering: true,     // Batch writes for speed
464    ///     enable_wal: true,                 // Durability
465    ///     auto_compact: true,               // Background compaction
466    ///     compact_interval_mins: 60,        // Compact every hour
467    ///     ..Default::default()
468    /// };
469    ///
470    /// let db = Aurora::with_config(config)?;
471    /// ```
472    pub fn with_config(config: AuroraConfig) -> Result<Self> {
473        let path = Self::resolve_path(&config.db_path)?;
474
475        if config.create_dirs
476            && let Some(parent) = path.parent()
477            && !parent.exists()
478        {
479            std::fs::create_dir_all(parent)?;
480        }
481
482        let cold = Arc::new(ColdStore::with_config(
483            path.to_str().unwrap(),
484            config.cold_cache_capacity_mb,
485            config.cold_flush_interval_ms,
486            config.cold_mode,
487        )?);
488
489        let hot = HotStore::with_config_and_eviction(
490            config.hot_cache_size_mb,
491            config.hot_cache_cleanup_interval_secs,
492            config.eviction_policy,
493        );
494
495        let write_buffer = if config.enable_write_buffering {
496            Some(WriteBuffer::new(
497                Arc::clone(&cold),
498                config.write_buffer_size,
499                config.write_buffer_flush_interval_ms,
500            ))
501        } else {
502            None
503        };
504
505        let auto_compact = config.auto_compact;
506        let enable_wal = config.enable_wal;
507        let pubsub = crate::pubsub::PubSubSystem::new(10000);
508
509        // Initialize WAL
510        let (wal, wal_entries) = if enable_wal {
511            let wal_path = path.to_str().unwrap();
512            match WriteAheadLog::new(wal_path) {
513                Ok(mut wal_log) => {
514                    let entries = wal_log.recover().unwrap_or_else(|_| Vec::new());
515                    (Some(Arc::new(RwLock::new(wal_log))), entries)
516                }
517                Err(_) => (None, Vec::new()),
518            }
519        } else {
520            (None, Vec::new())
521        };
522
523        // --- FIX: Initialize Background WAL Writer ---
524        // Only enable WAL writer if WAL was successfully initialized
525        let wal_writer = if enable_wal && wal.is_some() {
526            let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
527            let wal_clone = wal.clone();
528
529            // Spawn a single dedicated thread/task for writing WAL sequentially
530            tokio::spawn(async move {
531                if let Some(wal) = wal_clone {
532                    while let Some(op) = rx.recv().await {
533                        let wal = wal.clone();
534                        // Move blocking I/O to a blocking thread
535                        let result = tokio::task::spawn_blocking(move || {
536                            // Properly handle the RwLock write guard
537                            match wal.write() {
538                                Ok(mut guard) => {
539                                    let result = match op {
540                                        WalOperation::Put { key, value } => {
541                                            // Deref Arc to pass slices
542                                            guard.append(Operation::Put, &key, Some(value.as_ref()))
543                                        }
544                                        WalOperation::Delete { key } => {
545                                            guard.append(Operation::Delete, &key, None)
546                                        }
547                                    };
548                                    if let Err(e) = result {
549                                        eprintln!(
550                                            "CRITICAL: Failed to append to WAL: {}. Shutting down.",
551                                            e
552                                        );
553                                        // A failed WAL write is a catastrophic failure.
554                                        std::process::exit(1);
555                                    }
556                                }
557                                Err(e) => {
558                                    // This likely means the lock is poisoned, which is a critical state.
559                                    eprintln!(
560                                        "CRITICAL: Failed to acquire WAL lock: {}. Shutting down.",
561                                        e
562                                    );
563                                    std::process::exit(1);
564                                }
565                            }
566                        })
567                        .await;
568
569                        if let Err(e) = result {
570                            eprintln!("WAL blocking task failed: {}", e);
571                            // If the blocking task panics or is cancelled, that's critical
572                            std::process::exit(1);
573                        }
574                    }
575                }
576            });
577            Some(tx)
578        } else {
579            None
580        };
581
582        let checkpoint_shutdown = if wal.is_some() {
583            let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel();
584            let cold_clone = Arc::clone(&cold);
585            let wal_clone = wal.clone();
586            let checkpoint_interval = config.checkpoint_interval_ms;
587
588            tokio::spawn(async move {
589                let mut interval =
590                    tokio::time::interval(Duration::from_millis(checkpoint_interval));
591                loop {
592                    tokio::select! {
593                        _ = interval.tick() => {
594                            if let Err(e) = cold_clone.flush() {
595                                eprintln!("Background checkpoint flush error: {}", e);
596                            }
597                            if let Some(ref wal) = wal_clone
598                                && let Ok(mut wal_guard) = wal.write() {
599                                    let _ = wal_guard.truncate();
600                                }
601                        }
602                        _ = shutdown_rx.recv() => {
603                            let _ = cold_clone.flush();
604                            if let Some(ref wal) = wal_clone
605                                && let Ok(mut wal_guard) = wal.write() {
606                                    let _ = wal_guard.truncate();
607                                }
608                            break;
609                        }
610                    }
611                }
612            });
613
614            Some(shutdown_tx)
615        } else {
616            None
617        };
618
619        let compaction_shutdown = if auto_compact {
620            let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel();
621            let cold_clone = Arc::clone(&cold);
622            let compact_interval = config.compact_interval_mins;
623
624            tokio::spawn(async move {
625                let mut interval =
626                    tokio::time::interval(Duration::from_secs(compact_interval * 60));
627                loop {
628                    tokio::select! {
629                        _ = interval.tick() => {
630                            if let Err(e) = cold_clone.compact() {
631                                eprintln!("Background compaction error: {}", e);
632                            }
633                        }
634                        _ = shutdown_rx.recv() => {
635                            let _ = cold_clone.compact();
636                            break;
637                        }
638                    }
639                }
640            });
641
642            Some(shutdown_tx)
643        } else {
644            None
645        };
646
647        let workers_path = path.join("workers.db");
648        let worker_config = crate::workers::WorkerConfig {
649            storage_path: workers_path.to_string_lossy().into_owned(),
650            ..Default::default()
651        };
652        let workers = crate::workers::WorkerSystem::new(worker_config)
653            .map(Arc::new)
654            .ok();
655
656        let db = Self {
657            hot,
658            cold,
659            primary_indices: Arc::new(DashMap::new()),
660            secondary_indices: Arc::new(DashMap::new()),
661            indices_initialized: Arc::new(OnceCell::new()),
662            transaction_manager: crate::transaction::TransactionManager::new(),
663            indices: Arc::new(DashMap::new()),
664            schema_cache: Arc::new(DashMap::new()),
665            config,
666            write_buffer,
667            pubsub,
668            wal,
669            checkpoint_shutdown,
670            compaction_shutdown,
671            workers,
672            wal_writer, // Initialize the new channel sender
673            computed: Arc::new(RwLock::new(crate::computed::ComputedFields::new())),
674        };
675
676        if !wal_entries.is_empty() {
677            match tokio::runtime::Handle::try_current() {
678                Ok(handle) => handle.block_on(db.replay_wal(wal_entries))?,
679                Err(_) => {
680                    let rt = tokio::runtime::Builder::new_current_thread()
681                        .enable_all()
682                        .build()
683                        .map_err(|e| {
684                            AqlError::new(
685                                ErrorCode::InternalError,
686                                format!("Failed to create temp runtime: {}", e),
687                            )
688                        })?;
689                    rt.block_on(db.replay_wal(wal_entries))?;
690                }
691            }
692        }
693
694        Ok(db)
695    }
696    // Lazy index initialization
697    pub async fn ensure_indices_initialized(&self) -> Result<()> {
698        self.indices_initialized
699            .get_or_init(|| async {
700                println!("Initializing indices...");
701                if let Err(e) = self.initialize_indices() {
702                    eprintln!("Failed to initialize indices: {:?}", e);
703                }
704                println!("Indices initialized");
705            })
706            .await;
707        Ok(())
708    }
709
710    fn initialize_indices(&self) -> Result<()> {
711        for result in self.cold.scan() {
712            let (key, value) = result?;
713            let key_str = std::str::from_utf8(key.as_bytes())
714                .map_err(|_| AqlError::new(ErrorCode::InvalidKey, "Invalid UTF-8".to_string()))?;
715
716            if let Some(collection_name) = key_str.split(':').next() {
717                // Skip system/metadata prefixes (e.g., _collection, _sys_migration, _index)
718                if collection_name.starts_with('_') {
719                    continue;
720                }
721                self.index_value(collection_name, key_str, &value)?;
722            }
723        }
724        Ok(())
725    }
726
727    // Fast key-value operations with index support
728    /// Get a value by key (low-level key-value access)
729    ///
730    /// This is the low-level method. For document access, use `get_document()` instead.
731    /// Checks hot cache first, then falls back to cold storage for maximum performance.
732    ///
733    /// # Performance
734    /// - Hot cache hit: ~1M reads/sec (instant)
735    /// - Cold storage: ~500K reads/sec (disk I/O)
736    /// - Cache hit rate: typically 95%+ at scale
737    ///
738    /// # Examples
739    ///
740    /// ```
741    /// // Low-level key-value access
742    /// let data = db.get("users:12345")?;
743    /// if let Some(bytes) = data {
744    ///     let doc: Document = serde_json::from_slice(&bytes)?;
745    ///     println!("Found: {:?}", doc);
746    /// }
747    ///
748    /// // Better: use get_document() for documents
749    /// let user = db.get_document("users", "12345")?;
750    /// ```
751    pub fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
752        // Check hot cache first
753        if let Some(value) = self.hot.get(key) {
754            // Check if this is a blob reference (pointer to cold storage)
755            if value.starts_with(b"BLOBREF:") {
756                // It's a blob ref - fetch actual data from cold storage
757                return self.cold.get(key);
758            }
759            return Ok(Some(value));
760        }
761
762        // Fetch from cold storage
763        let value = self.cold.get(key)?;
764
765        if let Some(v) = &value {
766            if self.should_cache_key(key) {
767                self.hot
768                    .set(Arc::new(key.to_string()), Arc::new(v.clone()), None);
769            }
770        }
771
772        Ok(value)
773    }
774
775    /// Get value with zero-copy Arc reference (10-100x faster than get!)
776    /// Only checks hot cache - returns None if not cached
777    pub fn get_hot_ref(&self, key: &str) -> Option<Arc<Vec<u8>>> {
778        self.hot.get_ref(key)
779    }
780
781    /// Helper to decide if a key should be cached.
782    fn should_cache_key(&self, key: &str) -> bool {
783        if let Some(collection_name) = key.split(':').next() {
784            if !collection_name.starts_with('_') {
785                if let Ok(collection_def) = self.get_collection_definition(collection_name) {
786                    if collection_def
787                        .fields
788                        .values()
789                        .any(|def| def.field_type == FieldType::Any)
790                    {
791                        return false; // Don't cache if collection has Any field
792                    }
793                }
794            }
795        }
796        true // Cache by default
797    }
798
799    /// Get cache statistics
800    ///
801    /// Returns detailed metrics about cache performance including hit/miss rates,
802    /// memory usage, and access patterns. Useful for monitoring, optimization,
803    /// and understanding database performance characteristics.
804    ///
805    /// # Returns
806    /// `CacheStats` struct containing:
807    /// - `hits`: Number of cache hits (data found in memory)
808    /// - `misses`: Number of cache misses (had to read from disk)
809    /// - `hit_rate`: Percentage of requests served from cache (0.0-1.0)
810    /// - `size`: Current number of entries in cache
811    /// - `capacity`: Maximum cache capacity
812    /// - `evictions`: Number of entries evicted due to capacity
813    ///
814    /// # Examples
815    ///
816
817    /// use aurora_db::Aurora;
818    ///
819    /// let db = Aurora::open("mydb.db")?;
820
821    /// let stats = db.get_cache_stats();
822    /// println!("Cache hit rate: {:.1}%", stats.hit_rate * 100.0);
823    /// println!("Cache size: {} / {} entries", stats.size, stats.capacity);
824    /// println!("Total hits: {}, misses: {}", stats.hits, stats.misses);
825    ///
826    /// // Monitor performance during operations
827    /// let before = db.get_cache_stats();
828    ///
829    /// // Perform many reads
830    /// for i in 0..1000 {
831    ///     db.get_document("users", &format!("user-{}", i))?;
832    /// }
833    ///
834    /// let after = db.get_cache_stats();
835    /// let hit_rate = (after.hits - before.hits) as f64 / 1000.0;
836    /// println!("Read hit rate: {:.1}%", hit_rate * 100.0);
837    ///
838    /// // Performance tuning
839    /// let stats = db.get_cache_stats();
840    /// if stats.hit_rate < 0.80 {
841    ///     println!("Low cache hit rate! Consider:");
842    ///     println!("- Increasing cache size in config");
843    ///     println!("- Prewarming cache with prewarm_cache()");
844    ///     println!("- Reviewing query patterns");
845    /// }
846    ///
847    /// if stats.evictions > stats.size {
848    ///     println!("High eviction rate! Cache may be too small.");
849    ///     println!("Consider increasing cache capacity.");
850    /// }
851    ///
852    /// // Production monitoring
853    /// use std::time::Duration;
854    /// use std::thread;
855    ///
856    /// loop {
857    ///     let stats = db.get_cache_stats();
858    ///
859    ///     // Log to monitoring system
860    ///     if stats.hit_rate < 0.90 {
861    ///         eprintln!("Warning: Cache hit rate dropped to {:.1}%",
862    ///                   stats.hit_rate * 100.0);
863    ///     }
864    ///
865    ///     thread::sleep(Duration::from_secs(60));
866    /// }
867    /// ```
868    ///
869    /// # Typical Performance Metrics
870    /// - **Excellent**: 95%+ hit rate (most reads from memory)
871    /// - **Good**: 80-95% hit rate (acceptable performance)
872    /// - **Poor**: <80% hit rate (consider cache tuning)
873    ///
874    /// # See Also
875    /// - `prewarm_cache()` to improve hit rates by preloading data
876    /// - `Aurora::with_config()` to adjust cache capacity
877    pub fn get_cache_stats(&self) -> crate::storage::hot::CacheStats {
878        self.hot.get_stats()
879    }
880
881    pub fn has_index(&self, collection: &str, field: &str) -> bool {
882        if let Ok(collection_def) = self.get_collection_definition(collection) {
883            if let Some(field_def) = collection_def.fields.get(field) {
884                return field_def.indexed || field_def.unique;
885            }
886        }
887        false
888    }
889
890    pub fn get_ids_from_index(&self, collection: &str, field: &str, value: &Value) -> Vec<String> {
891        let index_key = format!("{}:{}", collection, field);
892        if let Some(index) = self.secondary_indices.get(&index_key) {
893            let value_str = match value {
894                Value::String(s) => s.clone(),
895                _ => value.to_string(),
896            };
897            if let Some(doc_ids) = index.get(&value_str) {
898                // The doc_ids in the secondary index are full keys like "collection:id".
899                // The query logic expects just the "id" part.
900                return doc_ids
901                    .value()
902                    .iter()
903                    .map(|key| key.split(':').nth(1).unwrap_or(key).to_string())
904                    .collect();
905            }
906        }
907        Vec::new()
908    }
909
910    /// Register a computed field definition
911    pub async fn register_computed_field(
912        &self,
913        collection: &str,
914        field: &str,
915        expression: crate::computed::ComputedExpression,
916    ) -> Result<()> {
917        let mut computed = self.computed.write().unwrap();
918        computed.register(collection, field, expression);
919        Ok(())
920    }
921
922    // ============================================
923    // PubSub API - Real-time Change Notifications
924    // ============================================
925
926    /// Listen for real-time changes in a collection
927    ///
928    /// Returns a stream of change events (inserts, updates, deletes) that you can subscribe to.
929    /// Perfect for building reactive UIs, cache invalidation, audit logging, webhooks, and
930    /// data synchronization systems.
931    ///
932    /// # Performance
933    /// - Zero overhead when no listeners are active
934    /// - Events are broadcast to all listeners asynchronously
935    /// - Non-blocking - doesn't slow down write operations
936    /// - Multiple listeners can watch the same collection
937    ///
938    /// # Examples
939    ///
940
941    /// use aurora_db::{Aurora, types::Value};
942    ///
943    /// let db = Aurora::open("mydb.db")?;
944    ///
945    /// // Basic listener
946    /// let mut listener = db.listen("users");
947    ///
948    /// tokio::spawn(async move {
949    ///     while let Ok(event) = listener.recv().await {
950    ///         match event.change_type {
951    ///             ChangeType::Insert => println!("New user: {:?}", event.document),
952    ///             ChangeType::Update => println!("Updated user: {:?}", event.document),
953    ///             ChangeType::Delete => println!("Deleted user ID: {}", event.id),
954    ///         }
955    ///     }
956    /// });
957    ///
958    /// // Now any insert/update/delete will trigger the listener
959    /// db.insert_into("users", vec![("name", Value::String("Alice".into()))]).await?;
960    /// ```
961    ///
962    /// # Real-World Use Cases
963    ///
964    /// **Cache Invalidation:**
965
966    /// use std::sync::Arc;
967    /// use tokio::sync::RwLock;
968    /// use std::collections::HashMap;
969    ///
970    /// let cache = Arc::new(RwLock::new(HashMap::new()));
971    /// let cache_clone = Arc::clone(&cache);
972    ///
973    /// let mut listener = db.listen("products");
974    ///
975    /// tokio::spawn(async move {
976    ///     while let Ok(event) = listener.recv().await {
977    ///         // Invalidate cache entry when product changes
978    ///         cache_clone.write().await.remove(&event.id);
979    ///         println!("Cache invalidated for product: {}", event.id);
980    ///     }
981    /// });
982    /// ```
983    ///
984    /// **Webhook Notifications:**
985    /// ```
986    /// let mut listener = db.listen("orders");
987    ///
988    /// tokio::spawn(async move {
989    ///     while let Ok(event) = listener.recv().await {
990    ///         if event.change_type == ChangeType::Insert {
991    ///             // Send webhook for new orders
992    ///             send_webhook("https://api.example.com/webhooks/order", &event).await;
993    ///         }
994    ///     }
995    /// });
996    /// ```
997    ///
998    /// **Audit Logging:**
999    /// ```
1000    /// let mut listener = db.listen("sensitive_data");
1001    ///
1002    /// tokio::spawn(async move {
1003    ///     while let Ok(event) = listener.recv().await {
1004    ///         // Log all changes to audit trail
1005    ///         db.insert_into("audit_log", vec![
1006    ///             ("collection", Value::String("sensitive_data".into())),
1007    ///             ("action", Value::String(format!("{:?}", event.change_type))),
1008    ///             ("document_id", Value::String(event.id.clone())),
1009    ///             ("timestamp", Value::String(chrono::Utc::now().to_rfc3339())),
1010    ///         ]).await?;
1011    ///     }
1012    /// });
1013    /// ```
1014    ///
1015    /// **Data Synchronization:**
1016    /// ```
1017    /// let mut listener = db.listen("users");
1018    ///
1019    /// tokio::spawn(async move {
1020    ///     while let Ok(event) = listener.recv().await {
1021    ///         // Sync changes to external system
1022    ///         match event.change_type {
1023    ///             ChangeType::Insert | ChangeType::Update => {
1024    ///                 if let Some(doc) = event.document {
1025    ///                     external_api.upsert_user(&doc).await?;
1026    ///                 }
1027    ///             },
1028    ///             ChangeType::Delete => {
1029    ///                 external_api.delete_user(&event.id).await?;
1030    ///             },
1031    ///         }
1032    ///     }
1033    /// });
1034    /// ```
1035    ///
1036    /// **Real-Time Notifications:**
1037    /// ```
1038    /// let mut listener = db.listen("messages");
1039    ///
1040    /// tokio::spawn(async move {
1041    ///     while let Ok(event) = listener.recv().await {
1042    ///         if event.change_type == ChangeType::Insert {
1043    ///             if let Some(msg) = event.document {
1044    ///                 // Push notification to connected websockets
1045    ///                 if let Some(recipient) = msg.data.get("recipient_id") {
1046    ///                     websocket_manager.send_to_user(recipient, &msg).await;
1047    ///                 }
1048    ///             }
1049    ///         }
1050    ///     }
1051    /// });
1052    /// ```
1053    ///
1054    /// **Filtered Listener:**
1055    /// ```
1056    /// use aurora_db::pubsub::EventFilter;
1057    ///
1058    /// // Only listen for inserts
1059    /// let mut listener = db.listen("users")
1060    ///     .filter(EventFilter::ChangeType(ChangeType::Insert));
1061    ///
1062    /// // Only listen for documents with specific field value
1063    /// let mut listener = db.listen("users")
1064    ///     .filter(EventFilter::FieldEquals("role".to_string(), Value::String("admin".into())));
1065    /// ```
1066    ///
1067    /// # Important Notes
1068    /// - Listener stays active until dropped
1069    /// - Events are delivered in order
1070    /// - Each listener has its own event stream
1071    /// - Use filters to reduce unnecessary event processing
1072    /// - Listeners don't affect write performance
1073    ///
1074    /// # See Also
1075    /// - `listen_all()` to listen to all collections
1076    /// - `ChangeListener::filter()` to filter events
1077    /// - `query().watch()` for reactive queries with filtering
1078    pub fn listen(&self, collection: impl Into<String>) -> crate::pubsub::ChangeListener {
1079        self.pubsub.listen(collection)
1080    }
1081
1082    /// Listen for all changes across all collections
1083    ///
1084    /// Returns a stream of change events for every insert, update, and delete
1085    /// operation across the entire database. Useful for global audit logging,
1086    /// replication, and monitoring systems.
1087    ///
1088    /// # Performance
1089    /// - Same performance as single collection listener
1090    /// - Filter events by collection in your handler
1091    /// - Consider using `listen(collection)` if only watching specific collections
1092    ///
1093    /// # Examples
1094    ///
1095    /// ```
1096    /// use aurora_db::Aurora;
1097    ///
1098    /// let db = Aurora::open("mydb.db")?;
1099    ///
1100    /// // Listen to everything
1101    /// let mut listener = db.listen_all();
1102    ///
1103    /// tokio::spawn(async move {
1104    ///     while let Ok(event) = listener.recv().await {
1105    ///         println!("Change in {}: {:?}", event.collection, event.change_type);
1106    ///     }
1107    /// });
1108    /// ```
1109    ///
1110    /// # Real-World Use Cases
1111    ///
1112    /// **Global Audit Trail:**
1113    /// ```
1114    /// let mut listener = db.listen_all();
1115    ///
1116    /// tokio::spawn(async move {
1117    ///     while let Ok(event) = listener.recv().await {
1118    ///         // Log every database change
1119    ///         audit_logger.log(AuditEntry {
1120    ///             timestamp: chrono::Utc::now(),
1121    ///             collection: event.collection,
1122    ///             action: event.change_type,
1123    ///             document_id: event.id,
1124    ///             user_id: get_current_user_id(),
1125    ///         }).await;
1126    ///     }
1127    /// });
1128    /// ```
1129    ///
1130    /// **Database Replication:**
1131    /// ```
1132    /// let mut listener = db.listen_all();
1133    ///
1134    /// tokio::spawn(async move {
1135    ///     while let Ok(event) = listener.recv().await {
1136    ///         // Replicate to secondary database
1137    ///         replica_db.apply_change(event).await?;
1138    ///     }
1139    /// });
1140    /// ```
1141    ///
1142    /// **Change Data Capture (CDC):**
1143    /// ```
1144    /// let mut listener = db.listen_all();
1145    ///
1146    /// tokio::spawn(async move {
1147    ///     while let Ok(event) = listener.recv().await {
1148    ///         // Stream changes to Kafka/RabbitMQ
1149    ///         kafka_producer.send(
1150    ///             &format!("cdc.{}", event.collection),
1151    ///             serde_json::to_string(&event)?
1152    ///         ).await?;
1153    ///     }
1154    /// });
1155    /// ```
1156    ///
1157    /// **Monitoring & Metrics:**
1158    /// ```
1159    /// use std::sync::atomic::{AtomicUsize, Ordering};
1160    ///
1161    /// let write_counter = Arc::new(AtomicUsize::new(0));
1162    /// let counter_clone = Arc::clone(&write_counter);
1163    ///
1164    /// let mut listener = db.listen_all();
1165    ///
1166    /// tokio::spawn(async move {
1167    ///     while let Ok(_event) = listener.recv().await {
1168    ///         counter_clone.fetch_add(1, Ordering::Relaxed);
1169    ///     }
1170    /// });
1171    ///
1172    /// // Report metrics every 60 seconds
1173    /// tokio::spawn(async move {
1174    ///     loop {
1175    ///         tokio::time::sleep(Duration::from_secs(60)).await;
1176    ///         let count = write_counter.swap(0, Ordering::Relaxed);
1177    ///         println!("Writes per minute: {}", count);
1178    ///     }
1179    /// });
1180    /// ```
1181    ///
1182    /// **Selective Processing:**
1183    /// ```
1184    /// let mut listener = db.listen_all();
1185    ///
1186    /// tokio::spawn(async move {
1187    ///     while let Ok(event) = listener.recv().await {
1188    ///         // Handle different collections differently
1189    ///         match event.collection.as_str() {
1190    ///             "users" => handle_user_change(event).await,
1191    ///             "orders" => handle_order_change(event).await,
1192    ///             "payments" => handle_payment_change(event).await,
1193    ///             _ => {} // Ignore others
1194    ///         }
1195    ///     }
1196    /// });
1197    /// ```
1198    ///
1199    /// # When to Use
1200    /// - Global audit logging
1201    /// - Database replication
1202    /// - Change data capture (CDC)
1203    /// - Monitoring and metrics
1204    /// - Event sourcing systems
1205    ///
1206    /// # When NOT to Use
1207    /// - Only need to watch 1-2 collections → Use `listen(collection)` instead
1208    /// - High write volume with selective interest → Use collection-specific listeners
1209    /// - Need complex filtering → Use `query().watch()` instead
1210    ///
1211    /// # See Also
1212    /// - `listen()` for single collection listening
1213    /// - `listener_count()` to check active listeners
1214    /// - `query().watch()` for filtered reactive queries
1215    pub fn listen_all(&self) -> crate::pubsub::ChangeListener {
1216        self.pubsub.listen_all()
1217    }
1218
1219    /// Get the number of active listeners for a collection
1220    pub fn listener_count(&self, collection: &str) -> usize {
1221        self.pubsub.listener_count(collection)
1222    }
1223
1224    /// Get total number of active listeners
1225    pub fn total_listeners(&self) -> usize {
1226        self.pubsub.total_listeners()
1227    }
1228
1229    /// Flushes all buffered writes to disk to ensure durability.
1230    ///
1231    /// This method forces all pending writes from:
1232    /// - Write buffer (if enabled)
1233    /// - Cold storage internal buffers
1234    /// - Write-ahead log (if enabled)
1235    ///
1236    /// Call this when you need to ensure data persistence before
1237    /// a critical operation or shutdown. After flush() completes,
1238    /// all data is guaranteed to be on disk even if power fails.
1239    ///
1240    /// # Performance
1241    /// - Flush time: ~10-50ms depending on buffered data
1242    /// - Triggers OS-level fsync() for durability guarantee
1243    /// - Truncates WAL after successful flush
1244    /// - Not needed for every write (WAL provides durability)
1245    ///
1246    /// # Examples
1247    ///
1248    /// ```
1249    /// use aurora_db::Aurora;
1250    ///
1251    /// let db = Aurora::open("mydb.db")?;
1252    ///
1253    /// // Basic flush after critical write
1254    /// db.insert_into("users", data).await?;
1255    /// db.flush()?;  // Ensure data is persisted to disk
1256    ///
1257    /// // Graceful shutdown pattern
1258    /// fn shutdown(db: &Aurora) -> Result<()> {
1259    ///     println!("Flushing pending writes...");
1260    ///     db.flush()?;
1261    ///     println!("Shutdown complete - all data persisted");
1262    ///     Ok(())
1263    /// }
1264    ///
1265    /// // Periodic checkpoint pattern
1266    /// use std::time::Duration;
1267    /// use std::thread;
1268    ///
1269    /// let db = db.clone();
1270    /// thread::spawn(move || {
1271    ///     loop {
1272    ///         thread::sleep(Duration::from_secs(60));
1273    ///         if let Err(e) = db.flush() {
1274    ///             eprintln!("Flush error: {}", e);
1275    ///         } else {
1276    ///             println!("Checkpoint: data flushed to disk");
1277    ///         }
1278    ///     }
1279    /// });
1280    ///
1281    /// // Critical transaction pattern
1282    /// let tx_id = db.begin_transaction();
1283    ///
1284    /// // Multiple operations
1285    /// db.insert_into("orders", order_data).await?;
1286    /// db.update_document("inventory", product_id, updates).await?;
1287    /// db.insert_into("audit_log", audit_data).await?;
1288    ///
1289    /// // Commit and flush immediately
1290    /// db.commit_transaction(tx_id)?;
1291    /// db.flush()?;  // Critical: ensure transaction is on disk
1292    ///
1293    /// // Backup preparation
1294    /// println!("Preparing backup...");
1295    /// db.flush()?;  // Ensure all data is written
1296    /// std::fs::copy("mydb.db", "backup.db")?;
1297    /// println!("Backup complete");
1298    /// ```
1299    ///
1300    /// # When to Use
1301    /// - Before graceful shutdown
1302    /// - After critical transactions
1303    /// - Before creating backups
1304    /// - Periodic checkpoints (every 30-60 seconds)
1305    /// - Before risky operations
1306    ///
1307    /// # When NOT to Use
1308    /// - After every single write (too slow, WAL provides durability)
1309    /// - In high-throughput loops (batch instead)
1310    /// - When durability mode is already Immediate
1311    ///
1312    /// # Important Notes
1313    /// - WAL provides durability even without explicit flush()
1314    /// - flush() adds latency (~10-50ms) so use strategically
1315    /// - Automatic flush happens during graceful shutdown
1316    /// - After flush(), WAL is truncated (data is in main storage)
1317    ///
1318    /// # See Also
1319    /// - `Aurora::with_config()` to set durability mode
1320    /// - WAL (Write-Ahead Log) provides durability without explicit flushes
1321    pub fn flush(&self) -> Result<()> {
1322        // Flush write buffer if present
1323        if let Some(ref write_buffer) = self.write_buffer {
1324            write_buffer.flush()?;
1325        }
1326
1327        // Flush cold storage
1328        self.cold.flush()?;
1329
1330        // Truncate WAL after successful flush (data is now in cold storage)
1331        if let Some(ref wal) = self.wal
1332            && let Ok(mut wal_lock) = wal.write()
1333        {
1334            wal_lock.truncate()?;
1335        }
1336
1337        Ok(())
1338    }
1339
1340    /// Store a key-value pair (low-level storage)
1341    ///
1342    /// This is the low-level method. For documents, use `insert_into()` instead.
1343    /// Writes are buffered and batched for performance.
1344    ///
1345    /// # Arguments
1346    /// * `key` - Unique key (format: "collection:id" for documents)
1347    /// * `value` - Raw bytes to store
1348    /// * `ttl` - Optional time-to-live (None = permanent)
1349    ///
1350    /// # Performance
1351    /// - Buffered writes: ~15-30K docs/sec
1352    /// - Batching improves throughput significantly
1353    /// - Call `flush()` to ensure data is persisted
1354    ///
1355    /// # Examples
1356    ///
1357    /// ```
1358    /// use std::time::Duration;
1359    ///
1360    /// // Permanent storage
1361    /// let data = serde_json::to_vec(&my_struct)?;
1362    /// db.put("mykey".to_string(), data, None)?;
1363    ///
1364    /// // With TTL (expires after 1 hour)
1365    /// db.put("session:abc".to_string(), session_data, Some(Duration::from_secs(3600)))?;
1366    ///
1367    /// // Better: use insert_into() for documents
1368    /// db.insert_into("users", vec![("name", Value::String("Alice".into()))])?;
1369    /// ```
1370    pub async fn put(&self, key: String, value: Vec<u8>, ttl: Option<Duration>) -> Result<()> {
1371        const MAX_BLOB_SIZE: usize = 50 * 1024 * 1024;
1372
1373        if value.len() > MAX_BLOB_SIZE {
1374            return Err(AqlError::invalid_operation(format!(
1375                "Blob size {} exceeds maximum allowed size of {}MB",
1376                value.len() / (1024 * 1024),
1377                MAX_BLOB_SIZE / (1024 * 1024)
1378            )));
1379        }
1380
1381        // --- OPTIMIZATION: Wrap in Arc ONCE ---
1382        let key_arc = Arc::new(key);
1383        let value_arc = Arc::new(value);
1384
1385        // Check if this is a blob (blobs bypass write buffer and hot cache)
1386        let is_blob = value_arc.starts_with(b"BLOB:");
1387
1388        // --- 1. WAL Write (Non-blocking send of Arcs) ---
1389        if let Some(ref sender) = self.wal_writer
1390            && self.config.durability_mode != DurabilityMode::None
1391        {
1392            sender
1393                .send(WalOperation::Put {
1394                    key: Arc::clone(&key_arc),
1395                    value: Arc::clone(&value_arc),
1396                })
1397                .map_err(|_| {
1398                    AqlError::new(
1399                        ErrorCode::InternalError,
1400                        "WAL writer channel closed".to_string(),
1401                    )
1402                })?;
1403        }
1404
1405        // Check if this key should be cached (false for Any-field collections)
1406        let should_cache = self.should_cache_key(&key_arc);
1407
1408        // --- 2. Cold Store Write ---
1409        if is_blob || !should_cache {
1410            // Blobs and Any-field docs write directly to cold storage
1411            // (blobs: avoid memory pressure, Any-fields: ensure immediate queryability)
1412            self.cold.set(key_arc.to_string(), value_arc.to_vec())?;
1413        } else if let Some(ref write_buffer) = self.write_buffer {
1414            write_buffer.write(Arc::clone(&key_arc), Arc::clone(&value_arc))?;
1415        } else {
1416            self.cold.set(key_arc.to_string(), value_arc.to_vec())?;
1417        }
1418
1419        // --- 3. Hot Cache Write ---
1420        if should_cache {
1421            if is_blob {
1422                // For blobs: cache only a lightweight reference (not the actual data)
1423                // Format: "BLOBREF:<size>" - just 16-24 bytes instead of potentially MB
1424                let blob_ref = format!("BLOBREF:{}", value_arc.len());
1425                self.hot
1426                    .set(Arc::clone(&key_arc), Arc::new(blob_ref.into_bytes()), ttl);
1427            } else {
1428                self.hot
1429                    .set(Arc::clone(&key_arc), Arc::clone(&value_arc), ttl);
1430            }
1431        }
1432
1433        // --- 4. Indexing (skip for blobs and system keys) ---
1434        if !is_blob {
1435            if let Some(collection_name) = key_arc.split(':').next()
1436                && !collection_name.starts_with('_')
1437            {
1438                self.index_value(collection_name, &key_arc, &value_arc)?;
1439            }
1440        }
1441
1442        Ok(())
1443    }
1444    /// Replay WAL entries to recover from crash
1445    ///
1446    /// Handles transaction boundaries:
1447    /// - Operations within a committed transaction are applied
1448    /// - Operations within a rolled-back transaction are discarded
1449    /// - Operations within an uncommitted transaction (crash during tx) are discarded
1450    async fn replay_wal(&self, entries: Vec<crate::wal::LogEntry>) -> Result<()> {
1451        // Buffer for operations within a transaction
1452        let mut tx_buffer: Vec<crate::wal::LogEntry> = Vec::new();
1453        let mut in_transaction = false;
1454
1455        for entry in entries {
1456            match entry.operation {
1457                Operation::BeginTx => {
1458                    // Start buffering operations
1459                    in_transaction = true;
1460                    tx_buffer.clear();
1461                }
1462                Operation::CommitTx => {
1463                    // Apply all buffered operations
1464                    for buffered_entry in tx_buffer.drain(..) {
1465                        self.apply_wal_entry(buffered_entry).await?;
1466                    }
1467                    in_transaction = false;
1468                }
1469                Operation::RollbackTx => {
1470                    // Discard buffered operations
1471                    tx_buffer.clear();
1472                    in_transaction = false;
1473                }
1474                Operation::Put | Operation::Delete => {
1475                    if in_transaction {
1476                        // Buffer the operation for later
1477                        tx_buffer.push(entry);
1478                    } else {
1479                        // Apply immediately (not in transaction)
1480                        self.apply_wal_entry(entry).await?;
1481                    }
1482                }
1483            }
1484        }
1485
1486        // If we end with in_transaction = true, it means we crashed mid-transaction
1487        // Those operations in tx_buffer are discarded (not committed)
1488        if in_transaction {
1489            eprintln!(
1490                "WAL replay: Discarding {} uncommitted transaction operations",
1491                tx_buffer.len()
1492            );
1493        }
1494
1495        // Flush after replay and truncate WAL
1496        self.cold.flush()?;
1497        if let Some(ref wal) = self.wal {
1498            wal.write().unwrap().truncate()?;
1499        }
1500
1501        Ok(())
1502    }
1503
1504    /// Apply a single WAL entry to storage
1505    async fn apply_wal_entry(&self, entry: crate::wal::LogEntry) -> Result<()> {
1506        match entry.operation {
1507            Operation::Put => {
1508                if let Some(value) = entry.value {
1509                    // Write directly to cold storage (skip WAL, already logged)
1510                    self.cold.set(entry.key.clone(), value.clone())?;
1511
1512                    // Update hot cache
1513                    if self.should_cache_key(&entry.key) {
1514                        self.hot
1515                            .set(Arc::new(entry.key.clone()), Arc::new(value.clone()), None);
1516                    }
1517
1518                    // Rebuild indices
1519                    if let Some(collection) = entry.key.split(':').next()
1520                        && !collection.starts_with('_')
1521                    {
1522                        self.index_value(collection, &entry.key, &value)?;
1523                    }
1524                }
1525            }
1526            Operation::Delete => {
1527                // Remove from indices before deleting the data
1528                if let Some(collection) = entry.key.split(':').next()
1529                    && !collection.starts_with('_')
1530                {
1531                    let _ = self.remove_from_index(collection, &entry.key);
1532                }
1533                self.cold.delete(&entry.key)?;
1534                self.hot.delete(&entry.key);
1535            }
1536            _ => {} // Transaction markers handled in replay_wal
1537        }
1538        Ok(())
1539    }
1540
1541    fn ensure_schema_hot(&self, collection: &str) -> Result<Arc<Collection>> {
1542        // 1. Check the high-performance object cache first (O(1))
1543        if let Some(schema) = self.schema_cache.get(collection) {
1544            return Ok(schema.value().clone());
1545        }
1546
1547        let collection_key = format!("_collection:{}", collection);
1548
1549        // 2. Fallback to Hot Byte Cache (parse if found)
1550        if let Some(data) = self.hot.get(&collection_key) {
1551            return serde_json::from_slice::<Collection>(&data)
1552                .map(|s| {
1553                    let arc_s = Arc::new(s);
1554                    // Populate object cache to avoid this parse next time
1555                    self.schema_cache
1556                        .insert(collection.to_string(), arc_s.clone());
1557                    arc_s
1558                })
1559                .map_err(|_| {
1560                    AqlError::new(
1561                        ErrorCode::SchemaError,
1562                        "Failed to parse schema from hot cache",
1563                    )
1564                });
1565        }
1566
1567        // 3. Fallback to Cold Storage
1568        if let Some(data) = self.get(&collection_key)? {
1569            // Update Hot Cache
1570            self.hot.set(
1571                Arc::new(collection_key.clone()),
1572                Arc::new(data.clone()),
1573                None,
1574            );
1575
1576            // Parse and update Schema Cache
1577            let schema = serde_json::from_slice::<Collection>(&data)?;
1578            let arc_schema = Arc::new(schema);
1579            self.schema_cache
1580                .insert(collection.to_string(), arc_schema.clone());
1581
1582            return Ok(arc_schema);
1583        }
1584
1585        Err(AqlError::new(
1586            ErrorCode::SchemaError,
1587            format!("Failed to load schema for collection '{}'", collection),
1588        ))
1589    }
1590
1591    fn index_value(&self, collection: &str, key: &str, value: &[u8]) -> Result<()> {
1592        // Update primary index with metadata only
1593        let location = DiskLocation::new(value.len());
1594        self.primary_indices
1595            .entry(collection.to_string())
1596            .or_default()
1597            .insert(key.to_string(), location);
1598
1599        // Use the optimized helper (Fast path via schema_cache)
1600        let collection_obj = self.ensure_schema_hot(collection)?;
1601
1602        // Build list of fields that should be indexed
1603        let indexed_fields: Vec<String> = collection_obj
1604            .fields
1605            .iter()
1606            .filter(|(_, def)| def.indexed || def.unique)
1607            .map(|(name, _)| name.clone())
1608            .collect();
1609
1610        if indexed_fields.is_empty() {
1611            return Ok(());
1612        }
1613
1614        // Update secondary indices
1615        if let Ok(doc) = serde_json::from_slice::<Document>(value) {
1616            for (field, field_value) in doc.data {
1617                if !indexed_fields.contains(&field) {
1618                    continue;
1619                }
1620
1621                let value_str = match &field_value {
1622                    Value::String(s) => s.clone(),
1623                    _ => field_value.to_string(),
1624                };
1625
1626                let index_key = format!("{}:{}", collection, field);
1627                let secondary_index = self.secondary_indices.entry(index_key).or_default();
1628
1629                let max_entries = self.config.max_index_entries_per_field;
1630
1631                secondary_index
1632                    .entry(value_str)
1633                    .and_modify(|doc_ids| {
1634                        if doc_ids.len() < max_entries {
1635                            doc_ids.push(key.to_string());
1636                        }
1637                    })
1638                    .or_insert_with(|| vec![key.to_string()]);
1639            }
1640        }
1641        Ok(())
1642    }
1643
1644    /// Remove a document from all indices (called during delete operations)
1645    fn remove_from_index(&self, collection: &str, key: &str) -> Result<()> {
1646        // Remove from primary index
1647        if let Some(primary_index) = self.primary_indices.get_mut(collection) {
1648            primary_index.remove(key);
1649        }
1650
1651        // Get the document data to know which secondary index entries to remove
1652        // Try cold storage first since we may be replaying WAL
1653        let doc_data = match self.cold.get(key) {
1654            Ok(Some(data)) => Some(data),
1655            _ => self.hot.get(key),
1656        };
1657
1658        // If we have the document data, remove from secondary indices
1659        if let Some(data) = doc_data {
1660            if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
1661                for (field, field_value) in &doc.data {
1662                    let value_str = match field_value {
1663                        Value::String(s) => s.clone(),
1664                        _ => field_value.to_string(),
1665                    };
1666
1667                    let index_key = format!("{}:{}", collection, field);
1668                    if let Some(secondary_index) = self.secondary_indices.get_mut(&index_key) {
1669                        if let Some(mut doc_ids) = secondary_index.get_mut(&value_str) {
1670                            doc_ids.retain(|id| id != key);
1671                            // If empty, we could remove the entry but it's not strictly necessary
1672                        }
1673                    }
1674                }
1675            }
1676        }
1677
1678        Ok(())
1679    }
1680
1681    /// Scan collection with filter and early termination support
1682    /// Used by QueryBuilder for optimized queries with LIMIT
1683    pub fn scan_and_filter<F>(
1684        &self,
1685        collection: &str,
1686        filter: F,
1687        limit: Option<usize>,
1688    ) -> Result<Vec<Document>>
1689    where
1690        F: Fn(&Document) -> bool,
1691    {
1692        let mut documents = Vec::new();
1693
1694        if let Some(index) = self.primary_indices.get(collection) {
1695            for entry in index.iter() {
1696                // Early termination
1697                if let Some(max) = limit {
1698                    if documents.len() >= max {
1699                        break;
1700                    }
1701                }
1702
1703                let key = entry.key();
1704                if let Some(data) = self.get(key)? {
1705                    if let Ok(mut doc) = serde_json::from_slice::<Document>(&data) {
1706                        // Apply computed fields
1707                        if let Ok(computed) = self.computed.read() {
1708                            let _ = computed.apply(collection, &mut doc);
1709                        }
1710
1711                        if filter(&doc) {
1712                            documents.push(doc);
1713                        }
1714                    }
1715                }
1716            }
1717        } else {
1718            // Fallback: scan from cold storage if primary index not yet initialized
1719            // Optimization: Apply filter and limit during scan to avoid full load
1720            let prefix = format!("{}:", collection);
1721            for result in self.cold.scan_prefix(&prefix) {
1722                // Early termination
1723                if let Some(max) = limit {
1724                    if documents.len() >= max {
1725                        break;
1726                    }
1727                }
1728
1729                if let Ok((_key, value)) = result {
1730                    if let Ok(mut doc) = serde_json::from_slice::<Document>(&value) {
1731                        // Apply computed fields
1732                        if let Ok(computed) = self.computed.read() {
1733                            let _ = computed.apply(collection, &mut doc);
1734                        }
1735
1736                        if filter(&doc) {
1737                            documents.push(doc);
1738                        }
1739                    }
1740                }
1741            }
1742        }
1743
1744        Ok(documents)
1745    }
1746
1747    /// Smart collection scan that uses the primary index as a key directory
1748    /// Avoids forced flushes and leverages hot cache for better performance
1749    fn scan_collection(&self, collection: &str) -> Result<Vec<Document>> {
1750        let mut documents = Vec::new();
1751
1752        // Use the primary index as a "key directory" - it contains all document keys
1753        if let Some(index) = self.primary_indices.get(collection) {
1754            // Iterate through all keys in the primary index (fast, in-memory)
1755            for entry in index.iter() {
1756                let key = entry.key();
1757
1758                // Fetch document via hot cache -> cold storage fallback
1759                if let Some(data) = self.get(key)? {
1760                    if let Ok(mut doc) = serde_json::from_slice::<Document>(&data) {
1761                        // Apply computed fields
1762                        if let Ok(computed) = self.computed.read() {
1763                            let _ = computed.apply(collection, &mut doc);
1764                        }
1765                        documents.push(doc);
1766                    }
1767                }
1768            }
1769        } else {
1770            // Fallback: scan from cold storage if primary index not yet initialized
1771            let prefix = format!("{}:", collection);
1772            for result in self.cold.scan_prefix(&prefix) {
1773                if let Ok((_key, value)) = result {
1774                    if let Ok(mut doc) = serde_json::from_slice::<Document>(&value) {
1775                        // Apply computed fields
1776                        if let Ok(computed) = self.computed.read() {
1777                            let _ = computed.apply(collection, &mut doc);
1778                        }
1779                        documents.push(doc);
1780                    }
1781                }
1782            }
1783        }
1784
1785        Ok(documents)
1786    }
1787
1788    // Restore missing methods
1789    pub async fn put_blob(&self, key: String, file_path: &Path) -> Result<()> {
1790        const MAX_FILE_SIZE: usize = 50 * 1024 * 1024; // 50MB limit
1791
1792        // Get file metadata to check size before reading
1793        let metadata = tokio::fs::metadata(file_path).await?;
1794        let file_size = metadata.len() as usize;
1795
1796        if file_size > MAX_FILE_SIZE {
1797            return Err(AqlError::invalid_operation(format!(
1798                "File size {} MB exceeds maximum allowed size of {} MB",
1799                file_size / (1024 * 1024),
1800                MAX_FILE_SIZE / (1024 * 1024)
1801            )));
1802        }
1803
1804        let mut file = File::open(file_path).await?;
1805        let mut buffer = Vec::new();
1806        file.read_to_end(&mut buffer).await?;
1807
1808        // Add BLOB: prefix to mark this as blob data
1809        let mut blob_data = Vec::with_capacity(5 + buffer.len());
1810        blob_data.extend_from_slice(b"BLOB:");
1811        blob_data.extend_from_slice(&buffer);
1812
1813        self.put(key, blob_data, None).await
1814    }
1815
1816    /// Create a new collection with schema definition
1817    ///
1818    /// Collections are like tables in SQL - they define the structure of your documents.
1819    /// The third boolean parameter indicates if the field should be indexed for fast lookups.
1820    ///
1821    /// # Arguments
1822    /// * `name` - Collection name
1823    /// * `fields` - Vector of (field_name, field_type, indexed) tuples
1824    ///   - Field name (accepts both &str and String)
1825    ///   - Field type (String, Int, Float, Bool, etc.)
1826    ///   - Indexed: true for fast lookups, false for no index
1827    ///
1828    /// # Performance
1829    /// - Indexed fields: Fast equality queries (O(1) lookup)
1830    /// - Non-indexed fields: Full scan required for queries
1831    /// - Unique fields are automatically indexed
1832    ///
1833    /// # Examples
1834    ///
1835    /// ```
1836    /// use aurora_db::{Aurora, types::FieldType};
1837    ///
1838    /// let db = Aurora::open("mydb.db")?;
1839    ///
1840    /// // Create a users collection
1841    /// db.new_collection("users", vec![
1842    ///     ("name", FieldType::String, false),      // Not indexed
1843    ///     ("email", FieldType::String, true),      // Indexed - fast lookups
1844    ///     ("age", FieldType::Int, false),
1845    ///     ("active", FieldType::Bool, true),       // Indexed
1846    ///     ("score", FieldType::Float, false),
1847    /// ])?;
1848    ///
1849    /// // Idempotent - calling again is safe
1850    /// db.new_collection("users", vec![/* ... */])?.await; // OK!
1851    /// ```
1852    pub async fn new_collection<F: IntoFieldDefinition>(
1853        &self,
1854        name: &str,
1855        fields: Vec<F>,
1856    ) -> Result<()> {
1857        let collection_key = format!("_collection:{}", name);
1858
1859        // Check if collection already exists - if so, just return Ok (idempotent)
1860        if self.get(&collection_key)?.is_some() {
1861            return Ok(());
1862        }
1863
1864        // Create field definitions
1865        let mut field_definitions = HashMap::new();
1866        for field in fields {
1867            let (field_name, field_def) = field.into_field_definition();
1868
1869            if field_def.field_type == FieldType::Any && field_def.unique {
1870                return Err(AqlError::new(
1871                    ErrorCode::InvalidDefinition,
1872                    "Fields of type 'Any' cannot be unique or indexed.".to_string(),
1873                ));
1874            }
1875
1876            field_definitions.insert(field_name, field_def);
1877        }
1878
1879        let collection = Collection {
1880            name: name.to_string(),
1881            fields: field_definitions,
1882            // REMOVED: unique_fields is now derived from fields
1883        };
1884
1885        let collection_data = serde_json::to_vec(&collection)?;
1886        self.put(collection_key, collection_data, None).await?;
1887
1888        // Invalidate schema cache since we just created/updated the collection schema
1889        self.schema_cache.remove(name);
1890
1891        Ok(())
1892    }
1893
1894    /// Insert a document into a collection
1895    ///
1896    /// Automatically generates a UUID for the document and validates against
1897    /// collection schema and unique constraints. Returns the generated document ID.
1898    ///
1899    /// # Performance
1900    /// - Single insert: ~15,000 docs/sec
1901    /// - Bulk insert: Use `batch_insert()` for 10+ documents (~50,000 docs/sec)
1902    /// - Triggers PubSub events for real-time listeners
1903    ///
1904    /// # Arguments
1905    /// * `collection` - Name of the collection to insert into
1906    /// * `data` - Document fields and values to insert
1907    ///
1908    /// # Returns
1909    /// The auto-generated ID of the inserted document or an error
1910    ///
1911    /// # Errors
1912    /// - `CollectionNotFound`: Collection doesn't exist
1913    /// - `ValidationError`: Data violates schema or unique constraints
1914    /// - `SerializationError`: Invalid data format
1915    ///
1916    /// # Examples
1917    ///
1918
1919    /// use aurora_db::{Aurora, types::Value};
1920    ///
1921    /// let db = Aurora::open("mydb.db")?;
1922    ///
1923    /// // Basic insertion
1924    /// let user_id = db.insert_into("users", vec![
1925    ///     ("name", Value::String("Alice Smith".to_string())),
1926    ///     ("email", Value::String("alice@example.com".to_string())),
1927    ///     ("age", Value::Int(28)),
1928    ///     ("active", Value::Bool(true)),
1929    /// ]).await?;
1930    ///
1931    /// println!("Created user with ID: {}", user_id);
1932    ///
1933    /// // Inserting with nested data
1934    /// let order_id = db.insert_into("orders", vec![
1935    ///     ("user_id", Value::String(user_id.clone())),
1936    ///     ("total", Value::Float(99.99)),
1937    ///     ("status", Value::String("pending".to_string())),
1938    ///     ("items", Value::Array(vec![
1939    ///         Value::String("item-123".to_string()),
1940    ///         Value::String("item-456".to_string()),
1941    ///     ])),
1942    /// ]).await?;
1943    ///
1944    /// // Error handling - unique constraint violation
1945    /// match db.insert_into("users", vec![
1946    ///     ("email", Value::String("alice@example.com".to_string())),  // Duplicate!
1947    ///     ("name", Value::String("Alice Clone".to_string())),
1948    /// ]).await {
1949    ///     Ok(id) => println!("Inserted: {}", id),
1950    ///     Err(e) => println!("Failed: {} (email already exists)", e),
1951    /// }
1952    ///
1953    /// // For bulk inserts (10+ documents), use batch_insert() instead
1954    /// let users = vec![
1955    ///     HashMap::from([
1956    ///         ("name".to_string(), Value::String("Bob".to_string())),
1957    ///         ("email".to_string(), Value::String("bob@example.com".to_string())),
1958    ///     ]),
1959    ///     HashMap::from([
1960    ///         ("name".to_string(), Value::String("Carol".to_string())),
1961    ///         ("email".to_string(), Value::String("carol@example.com".to_string())),
1962    ///     ]),
1963    ///     // ... more documents
1964    /// ];
1965    /// let ids = db.batch_insert("users", users).await?;  // 3x faster!
1966    /// println!("Inserted {} users", ids.len());
1967    /// ```
1968    pub async fn insert_into(&self, collection: &str, data: Vec<(&str, Value)>) -> Result<String> {
1969        // Convert Vec<(&str, Value)> to HashMap<String, Value>
1970        let data_map: HashMap<String, Value> =
1971            data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
1972
1973        // Validate unique constraints before inserting
1974        self.validate_unique_constraints(collection, &data_map)
1975            .await?;
1976
1977        let doc_id = Uuid::now_v7().to_string();
1978        let document = Document {
1979            id: doc_id.clone(),
1980            data: data_map,
1981        };
1982
1983        self.put(
1984            format!("{}:{}", collection, doc_id),
1985            serde_json::to_vec(&document)?,
1986            None,
1987        )
1988        .await?;
1989
1990        // Publish insert event
1991        let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
1992        let _ = self.pubsub.publish(event);
1993
1994        Ok(doc_id)
1995    }
1996
1997    pub async fn insert_map(
1998        &self,
1999        collection: &str,
2000        data: HashMap<String, Value>,
2001    ) -> Result<String> {
2002        // Validate unique constraints before inserting
2003        self.validate_unique_constraints(collection, &data).await?;
2004
2005        let doc_id = Uuid::now_v7().to_string();
2006        let document = Document {
2007            id: doc_id.clone(),
2008            data,
2009        };
2010
2011        self.put(
2012            format!("{}:{}", collection, doc_id),
2013            serde_json::to_vec(&document)?,
2014            None,
2015        )
2016        .await?;
2017
2018        // Publish insert event
2019        let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
2020        let _ = self.pubsub.publish(event);
2021
2022        Ok(doc_id)
2023    }
2024
2025    /// Batch insert multiple documents with optimized write path
2026    ///
2027    /// Inserts multiple documents in a single optimized operation, bypassing
2028    /// the write buffer for better performance. Ideal for bulk data loading,
2029    /// migrations, or initial database seeding. 3x faster than individual inserts.
2030    ///
2031    /// # Performance
2032    /// - Insert speed: ~50,000 docs/sec (vs ~15,000 for single inserts)
2033    /// - Batch writes to WAL and storage
2034    /// - Validates all unique constraints
2035    /// - Use for 10+ documents minimum
2036    ///
2037    /// # Arguments
2038    /// * `collection` - Name of the collection to insert into
2039    /// * `documents` - Vector of document data as HashMaps
2040    ///
2041    /// # Returns
2042    /// Vector of auto-generated document IDs or an error
2043    ///
2044    /// # Examples
2045    ///
2046
2047    /// use aurora_db::{Aurora, types::Value};
2048    /// use std::collections::HashMap;
2049    ///
2050    /// let db = Aurora::open("mydb.db")?;
2051    ///
2052    /// // Bulk user import
2053    /// let users = vec![
2054    ///     HashMap::from([
2055    ///         ("name".to_string(), Value::String("Alice".into())),
2056    ///         ("email".to_string(), Value::String("alice@example.com".into())),
2057    ///         ("age".to_string(), Value::Int(28)),
2058    ///     ]),
2059    ///     HashMap::from([
2060    ///         ("name".to_string(), Value::String("Bob".into())),
2061    ///         ("email".to_string(), Value::String("bob@example.com".into())),
2062    ///         ("age".to_string(), Value::Int(32)),
2063    ///     ]),
2064    ///     HashMap::from([
2065    ///         ("name".to_string(), Value::String("Carol".into())),
2066    ///         ("email".to_string(), Value::String("carol@example.com".into())),
2067    ///         ("age".to_string(), Value::Int(25)),
2068    ///     ]),
2069    /// ];
2070    ///
2071    /// let ids = db.batch_insert("users", users).await?;
2072    /// println!("Inserted {} users", ids.len());
2073    ///
2074    /// // Seeding test data
2075    /// let test_products: Vec<HashMap<String, Value>> = (0..1000)
2076    ///     .map(|i| HashMap::from([
2077    ///         ("sku".to_string(), Value::String(format!("PROD-{:04}", i))),
2078    ///         ("price".to_string(), Value::Float(9.99 + i as f64)),
2079    ///         ("stock".to_string(), Value::Int(100)),
2080    ///     ]))
2081    ///     .collect();
2082    ///
2083    /// let ids = db.batch_insert("products", test_products).await?;
2084    /// // Much faster than 1000 individual insert_into() calls!
2085    ///
2086    /// // Migration from CSV data
2087    /// let mut csv_reader = csv::Reader::from_path("data.csv")?;
2088    /// let mut batch = Vec::new();
2089    ///
2090    /// for result in csv_reader.records() {
2091    ///     let record = result?;
2092    ///     let doc = HashMap::from([
2093    ///         ("field1".to_string(), Value::String(record[0].to_string())),
2094    ///         ("field2".to_string(), Value::String(record[1].to_string())),
2095    ///     ]);
2096    ///     batch.push(doc);
2097    ///
2098    ///     // Insert in batches of 1000
2099    ///     if batch.len() >= 1000 {
2100    ///         db.batch_insert("imported_data", batch.clone()).await?;
2101    ///         batch.clear();
2102    ///     }
2103    /// }
2104    ///
2105    /// // Insert remaining
2106    /// if !batch.is_empty() {
2107    ///     db.batch_insert("imported_data", batch).await?;
2108    /// }
2109    /// ```
2110    ///
2111    /// # Errors
2112    /// - `ValidationError`: Unique constraint violation on any document
2113    /// - `CollectionNotFound`: Collection doesn't exist
2114    /// - `IoError`: Storage write failure
2115    ///
2116    /// # Important Notes
2117    /// - All inserts are atomic - if one fails, none are inserted
2118    /// - UUIDs are auto-generated for all documents
2119    /// - PubSub events are published for each insert
2120    /// - For 10+ documents, this is 3x faster than individual inserts
2121    /// - For < 10 documents, use `insert_into()` instead
2122    ///
2123    /// # See Also
2124    /// - `insert_into()` for single document inserts
2125    /// - `import_from_json()` for file-based bulk imports
2126    /// - `batch_write()` for low-level batch operations
2127    pub async fn batch_insert(
2128        &self,
2129        collection: &str,
2130        documents: Vec<HashMap<String, Value>>,
2131    ) -> Result<Vec<String>> {
2132        let mut doc_ids = Vec::with_capacity(documents.len());
2133        let mut pairs = Vec::with_capacity(documents.len());
2134
2135        // Prepare all documents
2136        for data in documents {
2137            // Validate unique constraints
2138            self.validate_unique_constraints(collection, &data).await?;
2139
2140            let doc_id = Uuid::now_v7().to_string();
2141            let document = Document {
2142                id: doc_id.clone(),
2143                data,
2144            };
2145
2146            let key = format!("{}:{}", collection, doc_id);
2147            let value = serde_json::to_vec(&document)?;
2148
2149            pairs.push((key, value));
2150            doc_ids.push(doc_id);
2151        }
2152
2153        // Write to WAL in batch (if enabled)
2154        if let Some(ref wal) = self.wal
2155            && self.config.durability_mode != DurabilityMode::None
2156        {
2157            let mut wal_lock = wal.write().unwrap();
2158            for (key, value) in &pairs {
2159                wal_lock.append(Operation::Put, key, Some(value))?;
2160            }
2161        }
2162
2163        // Bypass write buffer - go directly to cold storage batch API
2164        self.cold.batch_set(pairs.clone())?;
2165
2166        // Note: Durability is handled by background checkpoint process
2167
2168        // Update hot cache and indices
2169        for (key, value) in pairs {
2170            if self.should_cache_key(&key) {
2171                self.hot
2172                    .set(Arc::new(key.clone()), Arc::new(value.clone()), None);
2173            }
2174
2175            if let Some(collection_name) = key.split(':').next()
2176                && !collection_name.starts_with('_')
2177            {
2178                self.index_value(collection_name, &key, &value)?;
2179            }
2180        }
2181
2182        // Publish events
2183        for doc_id in &doc_ids {
2184            if let Ok(Some(doc)) = self.get_document(collection, doc_id) {
2185                let event = crate::pubsub::ChangeEvent::insert(collection, doc_id, doc);
2186                let _ = self.pubsub.publish(event);
2187            }
2188        }
2189
2190        Ok(doc_ids)
2191    }
2192
2193    /// Update a document by ID
2194    ///
2195    /// # Arguments
2196    /// * `collection` - Collection name
2197    /// * `doc_id` - Document ID to update
2198    /// * `data` - New field values to set
2199    ///
2200    /// # Returns
2201    /// Ok(()) on success, or an error if the document doesn't exist
2202    ///
2203    /// # Examples
2204    ///
2205    /// ```
2206    /// db.update_document("users", &user_id, vec![
2207    ///     ("status", Value::String("active".to_string())),
2208    ///     ("last_login", Value::String(chrono::Utc::now().to_rfc3339())),
2209    /// ]).await?;
2210    /// ```
2211    pub async fn update_document(
2212        &self,
2213        collection: &str,
2214        doc_id: &str,
2215        updates: Vec<(&str, Value)>,
2216    ) -> Result<()> {
2217        // Get existing document
2218        let mut document = self.get_document(collection, doc_id)?.ok_or_else(|| {
2219            AqlError::new(
2220                ErrorCode::NotFound,
2221                format!("Document not found: {}", doc_id),
2222            )
2223        })?;
2224
2225        // Store old document for event
2226        let old_document = document.clone();
2227
2228        // Apply updates
2229        for (field, value) in updates {
2230            document.data.insert(field.to_string(), value);
2231        }
2232
2233        // Validate unique constraints after update (excluding current document)
2234        self.validate_unique_constraints_excluding(collection, &document.data, doc_id)
2235            .await?;
2236
2237        // Save updated document
2238        self.put(
2239            format!("{}:{}", collection, doc_id),
2240            serde_json::to_vec(&document)?,
2241            None,
2242        )
2243        .await?;
2244
2245        // Publish update event
2246        let event =
2247            crate::pubsub::ChangeEvent::update(collection, doc_id, old_document, document.clone());
2248        let _ = self.pubsub.publish(event);
2249
2250        Ok(())
2251    }
2252
2253    pub async fn get_all_collection(&self, collection: &str) -> Result<Vec<Document>> {
2254        self.ensure_indices_initialized().await?;
2255        self.scan_collection(collection)
2256    }
2257
2258    pub fn get_data_by_pattern(&self, pattern: &str) -> Result<Vec<(String, DataInfo)>> {
2259        let mut data = Vec::new();
2260
2261        // Scan from cold storage instead of primary index
2262        for result in self.cold.scan() {
2263            if let Ok((key, value)) = result {
2264                if key.contains(pattern) {
2265                    let info = if value.starts_with(b"BLOB:") {
2266                        DataInfo::Blob { size: value.len() }
2267                    } else {
2268                        DataInfo::Data {
2269                            size: value.len(),
2270                            preview: String::from_utf8_lossy(&value[..value.len().min(50)])
2271                                .into_owned(),
2272                        }
2273                    };
2274
2275                    data.push((key.clone(), info));
2276                }
2277            }
2278        }
2279
2280        Ok(data)
2281    }
2282
2283    /// Begin a transaction
2284    ///
2285    /// All operations after beginning a transaction will be part of the transaction
2286    /// until either commit_transaction() or rollback_transaction() is called.
2287    ///
2288    /// # Returns
2289    /// Success or an error (e.g., if a transaction is already in progress)
2290    ///
2291    /// # Examples
2292    ///
2293
2294    /// // Start a transaction for atomic operations
2295    /// db.begin_transaction()?;
2296    ///
2297    /// // Perform multiple operations
2298    /// db.insert_into("accounts", vec![("user_id", Value::String(user_id)), ("balance", Value::Float(100.0))])?;
2299    /// db.insert_into("audit_log", vec![("action", Value::String("account_created".to_string()))])?;
2300    ///
2301    /// // Commit all changes or roll back if there's an error
2302    /// if all_ok {
2303    ///     db.commit_transaction()?;
2304    /// } else {
2305    ///     db.rollback_transaction()?;
2306    /// }
2307    /// ```
2308    /// Begin a new transaction for atomic operations
2309    ///
2310    /// Transactions ensure all-or-nothing execution: either all operations succeed,
2311    /// or none of them are applied. Perfect for maintaining data consistency.
2312    ///
2313    /// # Examples
2314    ///
2315
2316    /// use aurora_db::{Aurora, types::Value};
2317    ///
2318    /// let db = Aurora::open("mydb.db")?;
2319    ///
2320    /// // Start transaction
2321    /// let tx_id = db.begin_transaction();
2322    ///
2323    /// // Perform multiple operations
2324    /// db.insert_into("accounts", vec![
2325    ///     ("user_id", Value::String("alice".into())),
2326    ///     ("balance", Value::Int(1000)),
2327    /// ]).await?;
2328    ///
2329    /// db.insert_into("accounts", vec![
2330    ///     ("user_id", Value::String("bob".into())),
2331    ///     ("balance", Value::Int(500)),
2332    ///     ])).await?;
2333    ///
2334    /// // Commit if all succeeded
2335    /// db.commit_transaction(tx_id)?;
2336    ///
2337    /// // Or rollback on error
2338    /// // db.rollback_transaction(tx_id)?;
2339    /// ```
2340    pub fn begin_transaction(&self) -> crate::transaction::TransactionId {
2341        let buffer = self.transaction_manager.begin();
2342        buffer.id
2343    }
2344
2345    /// Commit a transaction, making all changes permanent
2346    ///
2347    /// All operations within the transaction are atomically applied to the database.
2348    /// If any operation fails, none are applied.
2349    ///
2350    /// # Arguments
2351    /// * `tx_id` - Transaction ID returned from begin_transaction()
2352    ///
2353    /// # Examples
2354    ///
2355
2356    /// use aurora_db::{Aurora, types::Value};
2357    ///
2358    /// let db = Aurora::open("mydb.db")?;
2359    ///
2360    /// // Transfer money between accounts
2361    /// let tx_id = db.begin_transaction();
2362    ///
2363    /// // Deduct from Alice
2364    /// db.update_document("accounts", "alice", vec![
2365    ///     ("balance", Value::Int(900)),  // Was 1000
2366    /// ]).await?;
2367    ///
2368    /// // Add to Bob
2369    /// db.update_document("accounts", "bob", vec![
2370    ///     ("balance", Value::Int(600)),  // Was 500
2371    /// ]).await?;
2372    ///
2373    /// // Both updates succeed - commit them
2374    /// db.commit_transaction(tx_id)?;
2375    ///
2376    /// println!("Transfer completed!");
2377    /// ```
2378    pub fn commit_transaction(&self, tx_id: crate::transaction::TransactionId) -> Result<()> {
2379        let buffer = self
2380            .transaction_manager
2381            .active_transactions
2382            .get(&tx_id)
2383            .ok_or_else(|| {
2384                AqlError::invalid_operation(
2385                    "Transaction not found or already completed".to_string(),
2386                )
2387            })?;
2388
2389        for item in buffer.writes.iter() {
2390            let key = item.key();
2391            let value = item.value();
2392            self.cold.set(key.clone(), value.clone())?;
2393            if self.should_cache_key(key) {
2394                self.hot
2395                    .set(Arc::new(key.clone()), Arc::new(value.clone()), None);
2396            }
2397            if let Some(collection_name) = key.split(':').next()
2398                && !collection_name.starts_with('_')
2399            {
2400                self.index_value(collection_name, key, value)?;
2401            }
2402        }
2403
2404        for item in buffer.deletes.iter() {
2405            let key = item.key();
2406            if let Some((collection, id)) = key.split_once(':')
2407                && let Ok(Some(doc)) = self.get_document(collection, id)
2408            {
2409                self.remove_from_indices(collection, &doc)?;
2410            }
2411            self.cold.delete(key)?;
2412            self.hot.delete(key);
2413        }
2414
2415        // Drop the buffer reference to release the DashMap read lock
2416        // before calling commit which needs to remove the entry (write lock)
2417        drop(buffer);
2418
2419        self.transaction_manager.commit(tx_id)?;
2420
2421        self.cold.compact()?;
2422
2423        Ok(())
2424    }
2425
2426    /// Roll back a transaction, discarding all changes
2427    ///
2428    /// All operations within the transaction are discarded. The database state
2429    /// remains unchanged. Use this when an error occurs during transaction processing.
2430    ///
2431    /// # Arguments
2432    /// * `tx_id` - Transaction ID returned from begin_transaction()
2433    ///
2434    /// # Examples
2435    ///
2436
2437    /// use aurora_db::{Aurora, types::Value};
2438    ///
2439    /// let db = Aurora::open("mydb.db")?;
2440    ///
2441    /// // Attempt a transfer with validation
2442    /// let tx_id = db.begin_transaction();
2443    ///
2444    /// let result = async {
2445    ///     // Deduct from Alice
2446    ///     let alice = db.get_document("accounts", "alice").await?;
2447    ///     let balance = alice.and_then(|doc| doc.data.get("balance"));
2448    ///
2449    ///     if let Some(Value::Int(bal)) = balance {
2450    ///         if *bal < 100 {
2451    ///             return Err("Insufficient funds");
2452    ///         }
2453    ///
2454    ///         db.update_document("accounts", "alice", vec![
2455    ///             ("balance", Value::Int(bal - 100)),
2456    ///         ]).await?;
2457    ///
2458    ///         db.update_document("accounts", "bob", vec![
2459    ///             ("balance", Value::Int(600)),
2460    ///         ]).await?;
2461    ///
2462    ///         Ok(())
2463    ///     } else {
2464    ///         Err("Account not found")
2465    ///     }
2466    /// }.await;
2467    ///
2468    /// match result {
2469    ///     Ok(_) => {
2470    ///         db.commit_transaction(tx_id)?;
2471    ///         println!("Transfer completed");
2472    ///     }
2473    ///     Err(e) => {
2474    ///         db.rollback_transaction(tx_id)?;
2475    ///         println!("Transfer failed: {}, changes rolled back", e);
2476    ///     }
2477    /// }
2478    /// ```
2479    pub fn rollback_transaction(&self, tx_id: crate::transaction::TransactionId) -> Result<()> {
2480        self.transaction_manager.rollback(tx_id)
2481    }
2482
2483    /// Create a secondary index on a field for faster queries
2484    ///
2485    /// Indexes dramatically improve query performance for frequently accessed fields,
2486    /// trading increased memory usage and slower writes for much faster reads.
2487    ///
2488    /// # When to Create Indexes
2489    /// - **Frequent queries**: Fields used in 80%+ of your queries
2490    /// - **High cardinality**: Fields with many unique values (user_id, email)
2491    /// - **Sorting/filtering**: Fields used in ORDER BY or WHERE clauses
2492    /// - **Large collections**: Most beneficial with 10,000+ documents
2493    ///
2494    /// # When NOT to Index
2495    /// - Low cardinality fields (e.g., boolean flags, small enums)
2496    /// - Rarely queried fields
2497    /// - Fields that change frequently (write-heavy workloads)
2498    /// - Small collections (<1,000 documents) - full scans are fast enough
2499    ///
2500    /// # Performance Characteristics
2501    /// - **Query speedup**: O(n) → O(1) for equality filters
2502    /// - **Memory cost**: ~100-200 bytes per document per index
2503    /// - **Write slowdown**: ~20-30% longer insert/update times
2504    /// - **Build time**: ~5,000 docs/sec for initial indexing
2505    /// Create a new collection with the given schema
2506    ///
2507    /// # Arguments
2508    /// * `name` - Collection name
2509    /// * `fields` - Field definitions as tuples of (name, type, unique)
2510    ///   - The boolean indicates whether the field has a **unique constraint**
2511    ///   - Unique fields are automatically indexed
2512    ///   - Non-unique fields can be indexed separately using `create_index()`
2513    ///
2514    /// # Examples
2515    /// ```no_run
2516    /// # use aurora_db::{Aurora, types::FieldType};
2517    /// # async fn example(db: &Aurora) {
2518    /// db.new_collection("users", vec![
2519    ///     .first_one()
2520    ///     .await?;
2521    ///
2522    /// // DON'T index 'active' - low cardinality (only 2 values: true/false)
2523    /// // A full scan is fast enough for boolean fields
2524    ///
2525    /// // DO index 'age' if you frequently query age ranges
2526    /// db.create_index("users", "age").await?;
2527    ///
2528    /// let young_users = db.query("users")
2529    ///     .filter(|f| f.lt("age", 30))
2530    ///     .collect()
2531    ///     .await?;
2532    /// ```
2533    ///
2534    /// # Real-World Example: E-commerce Orders
2535    ///
2536    /// ```
2537    /// // Orders collection: 1 million documents
2538    /// db.new_collection("orders", vec![
2539    ///     ("user_id", FieldType::String),    // High cardinality
2540    ///     ("status", FieldType::String),      // Low cardinality (pending, shipped, delivered)
2541    ///     ("created_at", FieldType::String),
2542    ///     ("total", FieldType::Float),
2543    /// ])?;
2544    ///
2545    /// // Index user_id - queries like "show me my orders" are common
2546    /// db.create_index("orders", "user_id").await?;  // Good choice
2547    ///
2548    /// // Query speedup: 2.5s → 0.001s
2549    /// let my_orders = db.query("orders")
2550    ///     .filter(|f| f.eq("user_id", user_id))
2551    ///     .collect()
2552    ///     .await?;
2553    ///
2554    /// // DON'T index 'status' - only 3 possible values
2555    /// // Scanning 1M docs takes ~100ms, indexing won't help much
2556    ///
2557    /// // Index created_at if you frequently query recent orders
2558    /// db.create_index("orders", "created_at").await?;  // Good for time-based queries
2559    /// ```
2560    pub async fn create_index(&self, collection: &str, field: &str) -> Result<()> {
2561        let collection_def = self.get_collection_definition(collection)?;
2562
2563        if let Some(field_def) = collection_def.fields.get(field) {
2564            if field_def.field_type == FieldType::Any {
2565                return Err(AqlError::new(
2566                    ErrorCode::InvalidDefinition,
2567                    "Cannot create an index on a field of type 'Any'.".to_string(),
2568                ));
2569            }
2570        } else {
2571            return Err(AqlError::new(
2572                ErrorCode::InvalidDefinition,
2573                format!(
2574                    "Field '{}' not found in collection '{}'.",
2575                    field, collection
2576                ),
2577            ));
2578        }
2579
2580        // Generate a default index name
2581        let index_name = format!("idx_{}_{}", collection, field);
2582
2583        // Create index definition
2584        let definition = IndexDefinition {
2585            name: index_name.clone(),
2586            collection: collection.to_string(),
2587            fields: vec![field.to_string()],
2588            index_type: IndexType::BTree,
2589            unique: false,
2590        };
2591
2592        // Create the index
2593        let index = Index::new(definition.clone());
2594
2595        // Index all existing documents in the collection
2596        let prefix = format!("{}:", collection);
2597        for result in self.cold.scan_prefix(&prefix) {
2598            if let Ok((_, data)) = result
2599                && let Ok(doc) = serde_json::from_slice::<Document>(&data)
2600            {
2601                let _ = index.insert(&doc);
2602            }
2603        }
2604
2605        // Store the index
2606        self.indices.insert(index_name, index);
2607
2608        // Store the index definition for persistence
2609        let index_key = format!("_index:{}:{}", collection, field);
2610        self.put(index_key, serde_json::to_vec(&definition)?, None)
2611            .await?;
2612
2613        Ok(())
2614    }
2615
2616    /// Query documents in a collection with filtering, sorting, and pagination
2617    ///
2618    /// Returns a `QueryBuilder` that allows fluent chaining of query operations.
2619    /// Queries use early termination for LIMIT clauses, making them extremely fast
2620    /// even on large collections (6,800x faster than naive implementations).
2621    ///
2622    /// # Performance
2623    /// - With LIMIT: O(k) where k = limit + offset (early termination!)
2624    /// - Without LIMIT: O(n) where n = matching documents
2625    /// - Uses secondary indices when available for equality filters
2626    /// - Hot cache: ~1M reads/sec, Cold storage: ~500K reads/sec
2627    ///
2628    /// # Examples
2629    ///
2630    /// ```
2631    /// use aurora_db::{Aurora, types::Value};
2632    ///
2633    /// let db = Aurora::open("mydb.db")?;
2634    ///
2635    /// // Simple equality query
2636    /// let active_users = db.query("users")
2637    ///     .filter(|f| f.eq("active", Value::Bool(true)))
2638    ///     .collect()
2639    ///     .await?;
2640    ///
2641    /// // Range query with pagination (FAST - uses early termination!)
2642    /// let top_scorers = db.query("users")
2643    ///     .filter(|f| f.gt("score", Value::Int(1000)))
2644    ///     .order_by("score", false)  // descending
2645    ///     .limit(10)
2646    ///     .offset(20)
2647    ///     .collect()
2648    ///     .await?;
2649    ///
2650    /// // Multiple filters
2651    /// let premium_active = db.query("users")
2652    ///     .filter(|f| f.eq("tier", Value::String("premium".into())))
2653    ///     .filter(|f| f.eq("active", Value::Bool(true)))
2654    ///     .limit(100)  // Only scans ~200 docs, not all million!
2655    ///     .collect()
2656    ///     .await?;
2657    ///
2658    /// // Text search in a field
2659    /// let matching = db.query("articles")
2660    ///     .filter(|f| f.contains("title", "rust"))
2661    ///     .collect()
2662    ///     .await?;
2663    /// ```
2664    pub fn query<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
2665        QueryBuilder::new(self, collection)
2666    }
2667
2668    /// Create a search builder for full-text search
2669    ///
2670    /// # Arguments
2671    /// * `collection` - Name of the collection to search
2672    ///
2673    /// # Returns
2674    /// A `SearchBuilder` for configuring and executing searches
2675    ///
2676    /// # Examples
2677    ///
2678    /// ```
2679    /// // Search for documents containing text
2680    /// let search_results = db.search("articles")
2681    ///     .field("content")
2682    ///     .matching("quantum computing")
2683    ///     .fuzzy(true)  // Enable fuzzy matching for typo tolerance
2684    ///     .collect()
2685    ///     .await?;
2686    /// ```
2687    pub fn search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
2688        SearchBuilder::new(self, collection)
2689    }
2690
2691    /// Retrieve a document by ID
2692    ///
2693    /// Fast direct lookup when you know the document ID. Significantly faster
2694    /// than querying with filters when ID is known.
2695    ///
2696    /// # Performance
2697    /// - Hot cache: ~1,000,000 reads/sec (instant)
2698    /// - Cold storage: ~500,000 reads/sec (disk I/O)
2699    /// - Complexity: O(1) - constant time lookup
2700    /// - Much faster than `.query().filter(|f| f.eq("id", ...))` which is O(n)
2701    ///
2702    /// # Arguments
2703    /// * `collection` - Name of the collection to query
2704    /// * `id` - ID of the document to retrieve
2705    ///
2706    /// # Returns
2707    /// The document if found, None if not found, or an error
2708    ///
2709    /// # Examples
2710    ///
2711
2712    /// use aurora_db::{Aurora, types::Value};
2713    ///
2714    /// let db = Aurora::open("mydb.db")?;
2715    ///
2716    /// // Basic retrieval
2717    /// if let Some(user) = db.get_document("users", &user_id)? {
2718    ///     println!("Found user: {}", user.id);
2719    ///
2720    ///     // Access fields safely
2721    ///     if let Some(Value::String(name)) = user.data.get("name") {
2722    ///         println!("Name: {}", name);
2723    ///     }
2724    ///
2725    ///     if let Some(Value::Int(age)) = user.data.get("age") {
2726    ///         println!("Age: {}", age);
2727    ///     }
2728    /// } else {
2729    ///     println!("User not found");
2730    /// }
2731    ///
2732    /// // Idiomatic error handling
2733    /// let user = db.get_document("users", &user_id)?
2734    ///     .ok_or_else(|| AqlError::new(ErrorCode::NotFound,"User not found".into()))?;
2735    ///
2736    /// // Checking existence before operations
2737    /// if db.get_document("users", &user_id)?.is_some() {
2738    ///     db.update_document("users", &user_id, vec![
2739    ///         ("last_login", Value::String(chrono::Utc::now().to_rfc3339())),
2740    ///     ]).await?;
2741    /// }
2742    ///
2743    /// // Batch retrieval (fetch multiple by ID)
2744    /// let user_ids = vec!["user-1", "user-2", "user-3"];
2745    /// let users: Vec<Document> = user_ids.iter()
2746    ///     .filter_map(|id| db.get_document("users", id).ok().flatten())
2747    ///     .collect();
2748    ///
2749    /// println!("Found {} out of {} users", users.len(), user_ids.len());
2750    /// ```
2751    ///
2752    /// # When to Use
2753    /// - You know the document ID (from insert, previous query, or URL param)
2754    /// - Need fastest possible lookup (1M reads/sec)
2755    /// - Fetching a single document
2756    ///
2757    /// # When NOT to Use
2758    /// - Searching by other fields → Use `query().filter()` instead
2759    /// - Need multiple documents by criteria → Use `query().collect()` instead
2760    /// - Don't know the ID → Use `find_by_field()` or `query()` instead
2761    pub fn get_document(&self, collection: &str, id: &str) -> Result<Option<Document>> {
2762        let key = format!("{}:{}", collection, id);
2763        if let Some(data) = self.get(&key)? {
2764            Ok(Some(serde_json::from_slice(&data)?))
2765        } else {
2766            Ok(None)
2767        }
2768    }
2769
2770    /// Delete a document by ID
2771    ///
2772    /// Permanently removes a document from storage, cache, and all indices.
2773    /// Publishes a delete event for PubSub subscribers. This operation cannot be undone.
2774    ///
2775    /// # Performance
2776    /// - Delete speed: ~50,000 deletes/sec
2777    /// - Cleans up hot cache, cold storage, primary + secondary indices
2778    /// - Triggers PubSub events for listeners
2779    ///
2780    /// # Arguments
2781    /// * `key` - Full key in format "collection:id" (e.g., "users:123")
2782    ///
2783    /// # Returns
2784    /// Success or an error
2785    ///
2786    /// # Errors
2787    /// - `InvalidOperation`: Invalid key format (must be "collection:id")
2788    /// - `IoError`: Storage deletion failed
2789    ///
2790    /// # Examples
2791    ///
2792    /// ```
2793    /// use aurora_db::Aurora;
2794    ///
2795    /// let db = Aurora::open("mydb.db")?;
2796    ///
2797    /// // Basic deletion (note: requires "collection:id" format)
2798    /// db.delete("users:abc123").await?;
2799    ///
2800    /// // Delete with existence check
2801    /// let user_id = "user-456";
2802    /// if db.get_document("users", user_id)?.is_some() {
2803    ///     db.delete(&format!("users:{}", user_id)).await?;
2804    ///     println!("User deleted");
2805    /// } else {
2806    ///     println!("User not found");
2807    /// }
2808    ///
2809    /// // Error handling
2810    /// match db.delete("users:nonexistent").await {
2811    ///     Ok(_) => println!("Deleted successfully"),
2812    ///     Err(e) => println!("Delete failed: {}", e),
2813    /// }
2814    ///
2815    /// // Batch deletion using query
2816    /// let inactive_count = db.delete_where("users", |f| {
2817    ///     f.eq("active", Value::Bool(false))
2818    /// }).await?;
2819    /// println!("Deleted {} inactive users", inactive_count);
2820    ///
2821    /// // Delete with cascading (manual cascade pattern)
2822    /// let user_id = "user-123";
2823    ///
2824    /// // Delete user's orders first
2825    /// let orders = db.query("orders")
2826    ///     .filter(|f| f.eq("user_id", user_id))
2827    ///     .collect()
2828    ///     .await?;
2829    ///
2830    /// for order in orders {
2831    ///     db.delete(&format!("orders:{}", order.id)).await?;
2832    /// }
2833    ///
2834    /// // Then delete the user
2835    /// db.delete(&format!("users:{}", user_id)).await?;
2836    /// println!("User and all orders deleted");
2837    /// ```
2838    ///
2839    /// # Alternative: Soft Delete Pattern
2840    ///
2841    /// For recoverable deletions, use soft deletes instead:
2842    ///
2843    /// ```
2844    /// // Soft delete - mark as deleted instead of removing
2845    /// db.update_document("users", &user_id, vec![
2846    ///     ("deleted", Value::Bool(true)),
2847    ///     ("deleted_at", Value::String(chrono::Utc::now().to_rfc3339())),
2848    /// ]).await?;
2849    ///
2850    /// // Query excludes soft-deleted items
2851    /// let active_users = db.query("users")
2852    ///     .filter(|f| f.eq("deleted", Value::Bool(false)))
2853    ///     .collect()
2854    ///     .await?;
2855    ///
2856    /// // Later: hard delete after retention period
2857    /// let old_deletions = db.query("users")
2858    ///     .filter(|f| f.eq("deleted", Value::Bool(true)))
2859    ///     .filter(|f| f.lt("deleted_at", thirty_days_ago))
2860    ///     .collect()
2861    ///     .await?;
2862    ///
2863    /// for user in old_deletions {
2864    ///     db.delete(&format!("users:{}", user.id)).await?;
2865    /// }
2866    /// ```
2867    ///
2868    /// # Important Notes
2869    /// - Deletion is permanent - no undo/recovery
2870    /// - Consider soft deletes for recoverable operations
2871    /// - Use transactions for multi-document deletions
2872    /// - PubSub subscribers will receive delete events
2873    /// - All indices are automatically cleaned up
2874    pub async fn delete(&self, key: &str) -> Result<()> {
2875        // Extract collection and id from key (format: "collection:id")
2876        let (collection, id) = if let Some((coll, doc_id)) = key.split_once(':') {
2877            (coll, doc_id)
2878        } else {
2879            return Err(AqlError::invalid_operation(
2880                "Invalid key format, expected 'collection:id'".to_string(),
2881            ));
2882        };
2883
2884        // CRITICAL FIX: Get document BEFORE deletion to clean up secondary indices
2885        let document = self.get_document(collection, id)?;
2886
2887        // Delete from hot cache
2888        if self.hot.get(key).is_some() {
2889            self.hot.delete(key);
2890        }
2891
2892        // Delete from cold storage
2893        self.cold.delete(key)?;
2894
2895        // CRITICAL FIX: Clean up ALL indices (primary + secondary)
2896        if let Some(doc) = document {
2897            self.remove_from_indices(collection, &doc)?;
2898        } else {
2899            // Fallback: at least remove from primary index
2900            if let Some(index) = self.primary_indices.get_mut(collection) {
2901                index.remove(id);
2902            }
2903        }
2904
2905        // Publish delete event
2906        let event = crate::pubsub::ChangeEvent::delete(collection, id);
2907        let _ = self.pubsub.publish(event);
2908
2909        Ok(())
2910    }
2911
2912    pub async fn delete_collection(&self, collection: &str) -> Result<()> {
2913        let prefix = format!("{}:", collection);
2914
2915        // Get all keys in collection
2916        let keys: Vec<String> = self
2917            .cold
2918            .scan()
2919            .filter_map(|r| r.ok())
2920            .filter(|(k, _)| k.starts_with(&prefix))
2921            .map(|(k, _)| k)
2922            .collect();
2923
2924        // Delete each key
2925        for key in keys {
2926            self.delete(&key).await?;
2927        }
2928
2929        // Remove collection indices
2930        self.primary_indices.remove(collection);
2931        self.secondary_indices
2932            .retain(|k, _| !k.starts_with(&prefix));
2933
2934        // Invalidate schema cache
2935        self.schema_cache.remove(collection);
2936
2937        Ok(())
2938    }
2939
2940    fn remove_from_indices(&self, collection: &str, doc: &Document) -> Result<()> {
2941        // Remove from primary index
2942        if let Some(index) = self.primary_indices.get(collection) {
2943            index.remove(&doc.id);
2944        }
2945
2946        // Remove from secondary indices
2947        for (field, value) in &doc.data {
2948            let index_key = format!("{}:{}", collection, field);
2949            if let Some(index) = self.secondary_indices.get(&index_key)
2950                && let Some(mut doc_ids) = index.get_mut(&value.to_string())
2951            {
2952                doc_ids.retain(|id| id != &doc.id);
2953            }
2954        }
2955
2956        Ok(())
2957    }
2958
2959    pub async fn search_text(
2960        &self,
2961        collection: &str,
2962        field: &str,
2963        query: &str,
2964    ) -> Result<Vec<Document>> {
2965        let mut results = Vec::new();
2966        let docs = self.get_all_collection(collection).await?;
2967
2968        for doc in docs {
2969            if let Some(Value::String(text)) = doc.data.get(field)
2970                && text.to_lowercase().contains(&query.to_lowercase())
2971            {
2972                results.push(doc);
2973            }
2974        }
2975
2976        Ok(results)
2977    }
2978
2979    /// Export a collection to a JSON file
2980    ///
2981    /// Creates a JSON file containing all documents in the collection.
2982    /// Useful for backups, data migration, or sharing datasets.
2983    /// Automatically appends `.json` extension if not present.
2984    ///
2985    /// # Performance
2986    /// - Export speed: ~10,000 docs/sec
2987    /// - Scans entire collection from cold storage
2988    /// - Memory efficient: streams documents to file
2989    ///
2990    /// # Arguments
2991    /// * `collection` - Name of the collection to export
2992    /// * `output_path` - Path to the output JSON file (`.json` auto-appended)
2993    ///
2994    /// # Returns
2995    /// Success or an error
2996    ///
2997    /// # Examples
2998    ///
2999    /// ```
3000    /// use aurora_db::Aurora;
3001    ///
3002    /// let db = Aurora::open("mydb.db")?;
3003    ///
3004    /// // Basic export
3005    /// db.export_as_json("users", "./backups/users_2024-01-15")?;
3006    /// // Creates: ./backups/users_2024-01-15.json
3007    ///
3008    /// // Timestamped backup
3009    /// let timestamp = chrono::Utc::now().format("%Y%m%d_%H%M%S");
3010    /// let backup_path = format!("./backups/users_{}", timestamp);
3011    /// db.export_as_json("users", &backup_path)?;
3012    ///
3013    /// // Export multiple collections
3014    /// for collection in &["users", "orders", "products"] {
3015    ///     db.export_as_json(collection, &format!("./export/{}", collection))?;
3016    /// }
3017    /// ```
3018    ///
3019    /// # Output Format
3020    ///
3021    /// The exported JSON has this structure:
3022    /// ```json
3023    /// {
3024    ///   "users": [
3025    ///     { "id": "123", "name": "Alice", "email": "alice@example.com" },
3026    ///     { "id": "456", "name": "Bob", "email": "bob@example.com" }
3027    ///   ]
3028    /// }
3029    /// ```
3030    ///
3031    /// # See Also
3032    /// - `export_as_csv()` for CSV format export
3033    /// - `import_from_json()` to restore exported data
3034    pub fn export_as_json(&self, collection: &str, output_path: &str) -> Result<()> {
3035        let output_path = if !output_path.ends_with(".json") {
3036            format!("{}.json", output_path)
3037        } else {
3038            output_path.to_string()
3039        };
3040
3041        let mut docs = Vec::new();
3042
3043        // Get all documents from the specified collection
3044        for result in self.cold.scan() {
3045            let (key, value) = result?;
3046
3047            // Only process documents from the specified collection
3048            if let Some(key_collection) = key.split(':').next()
3049                && key_collection == collection
3050                && !key.starts_with("_collection:")
3051                && let Ok(doc) = serde_json::from_slice::<Document>(&value)
3052            {
3053                // Convert Value enum to raw JSON values
3054                let mut clean_doc = serde_json::Map::new();
3055                for (k, v) in doc.data {
3056                    match v {
3057                        Value::String(s) => clean_doc.insert(k, JsonValue::String(s)),
3058                        Value::Int(i) => clean_doc.insert(k, JsonValue::Number(i.into())),
3059                        Value::Float(f) => {
3060                            if let Some(n) = serde_json::Number::from_f64(f) {
3061                                clean_doc.insert(k, JsonValue::Number(n))
3062                            } else {
3063                                clean_doc.insert(k, JsonValue::Null)
3064                            }
3065                        }
3066                        Value::Bool(b) => clean_doc.insert(k, JsonValue::Bool(b)),
3067                        Value::Array(arr) => {
3068                            let clean_arr: Vec<JsonValue> = arr
3069                                .into_iter()
3070                                .map(|v| match v {
3071                                    Value::String(s) => JsonValue::String(s),
3072                                    Value::Int(i) => JsonValue::Number(i.into()),
3073                                    Value::Float(f) => serde_json::Number::from_f64(f)
3074                                        .map(JsonValue::Number)
3075                                        .unwrap_or(JsonValue::Null),
3076                                    Value::Bool(b) => JsonValue::Bool(b),
3077                                    Value::Null => JsonValue::Null,
3078                                    _ => JsonValue::Null,
3079                                })
3080                                .collect();
3081                            clean_doc.insert(k, JsonValue::Array(clean_arr))
3082                        }
3083                        Value::Uuid(u) => clean_doc.insert(k, JsonValue::String(u.to_string())),
3084                        Value::Null => clean_doc.insert(k, JsonValue::Null),
3085                        Value::Object(_) => None, // Handle nested objects if needed
3086                    };
3087                }
3088                docs.push(JsonValue::Object(clean_doc));
3089            }
3090        }
3091
3092        let output = JsonValue::Object(serde_json::Map::from_iter(vec![(
3093            collection.to_string(),
3094            JsonValue::Array(docs),
3095        )]));
3096
3097        let mut file = StdFile::create(&output_path)?;
3098        serde_json::to_writer_pretty(&mut file, &output)?;
3099        println!("Exported collection '{}' to {}", collection, &output_path);
3100        Ok(())
3101    }
3102
3103    /// Export a collection to a CSV file
3104    ///
3105    /// Creates a CSV file with headers from the first document and rows for each document.
3106    /// Useful for spreadsheet analysis, data science workflows, or reporting.
3107    /// Automatically appends `.csv` extension if not present.
3108    ///
3109    /// # Performance
3110    /// - Export speed: ~8,000 docs/sec
3111    /// - Memory efficient: streams rows to file
3112    /// - Headers determined from first document
3113    ///
3114    /// # Arguments
3115    /// * `collection` - Name of the collection to export
3116    /// * `filename` - Path to the output CSV file (`.csv` auto-appended)
3117    ///
3118    /// # Returns
3119    /// Success or an error
3120    ///
3121    /// # Examples
3122    ///
3123    /// ```
3124    /// use aurora_db::Aurora;
3125    ///
3126    /// let db = Aurora::open("mydb.db")?;
3127    ///
3128    /// // Basic CSV export
3129    /// db.export_as_csv("users", "./reports/users")?;
3130    /// // Creates: ./reports/users.csv
3131    ///
3132    /// // Export for analysis in Excel/Google Sheets
3133    /// db.export_as_csv("orders", "./analytics/sales_data")?;
3134    ///
3135    /// // Monthly report generation
3136    /// let month = chrono::Utc::now().format("%Y-%m");
3137    /// db.export_as_csv("transactions", &format!("./reports/transactions_{}", month))?;
3138    /// ```
3139    ///
3140    /// # Output Format
3141    ///
3142    /// ```csv
3143    /// id,name,email,age
3144    /// 123,Alice,alice@example.com,28
3145    /// 456,Bob,bob@example.com,32
3146    /// ```
3147    ///
3148    /// # Important Notes
3149    /// - Headers are taken from the first document's fields
3150    /// - Documents with different fields will have empty values for missing fields
3151    /// - Nested objects/arrays are converted to strings
3152    /// - Best for flat document structures
3153    ///
3154    /// # See Also
3155    /// - `export_as_json()` for JSON format (better for nested data)
3156    /// - For complex nested structures, use JSON export instead
3157    pub fn export_as_csv(&self, collection: &str, filename: &str) -> Result<()> {
3158        let output_path = if !filename.ends_with(".csv") {
3159            format!("{}.csv", filename)
3160        } else {
3161            filename.to_string()
3162        };
3163
3164        let mut writer = csv::Writer::from_path(&output_path)?;
3165        let mut headers = Vec::new();
3166        let mut first_doc = true;
3167
3168        // Get all documents from the specified collection
3169        for result in self.cold.scan() {
3170            let (key, value) = result?;
3171
3172            // Only process documents from the specified collection
3173            if let Some(key_collection) = key.split(':').next()
3174                && key_collection == collection
3175                && !key.starts_with("_collection:")
3176                && let Ok(doc) = serde_json::from_slice::<Document>(&value)
3177            {
3178                // Write headers from first document
3179                if first_doc && !doc.data.is_empty() {
3180                    headers = doc.data.keys().cloned().collect();
3181                    writer.write_record(&headers)?;
3182                    first_doc = false;
3183                }
3184
3185                // Write the document values
3186                let values: Vec<String> = headers
3187                    .iter()
3188                    .map(|header| {
3189                        doc.data
3190                            .get(header)
3191                            .map(|v| v.to_string())
3192                            .unwrap_or_default()
3193                    })
3194                    .collect();
3195                writer.write_record(&values)?;
3196            }
3197        }
3198
3199        writer.flush()?;
3200        println!("Exported collection '{}' to {}", collection, &output_path);
3201        Ok(())
3202    }
3203
3204    // Helper method to create filter-based queries
3205    pub fn find<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
3206        self.query(collection)
3207    }
3208
3209    // Convenience methods that build on top of the FilterBuilder
3210
3211    pub async fn find_by_id(&self, collection: &str, id: &str) -> Result<Option<Document>> {
3212        self.query(collection)
3213            .filter(|f| f.eq("id", id))
3214            .first_one()
3215            .await
3216    }
3217
3218    pub async fn find_one<F>(&self, collection: &str, filter_fn: F) -> Result<Option<Document>>
3219    where
3220        F: Fn(&FilterBuilder) -> bool + Send + Sync + 'static,
3221    {
3222        self.query(collection).filter(filter_fn).first_one().await
3223    }
3224
3225    pub async fn find_by_field<T: Into<Value> + Clone + Send + Sync + 'static>(
3226        &self,
3227        collection: &str,
3228        field: &'static str,
3229        value: T,
3230    ) -> Result<Vec<Document>> {
3231        let value_clone = value.clone();
3232        self.query(collection)
3233            .filter(move |f: &FilterBuilder| f.eq(field, value_clone.clone()))
3234            .collect()
3235            .await
3236    }
3237
3238    pub async fn find_by_fields(
3239        &self,
3240        collection: &str,
3241        fields: Vec<(&str, Value)>,
3242    ) -> Result<Vec<Document>> {
3243        let mut query = self.query(collection);
3244
3245        for (field, value) in fields {
3246            let field_owned = field.to_owned();
3247            let value_owned = value.clone();
3248            query = query.filter(move |f| f.eq(&field_owned, value_owned.clone()));
3249        }
3250
3251        query.collect().await
3252    }
3253
3254    // Advanced example: find documents with a field value in a specific range
3255    pub async fn find_in_range<T: Into<Value> + Clone + Send + Sync + 'static>(
3256        &self,
3257        collection: &str,
3258        field: &'static str,
3259        min: T,
3260        max: T,
3261    ) -> Result<Vec<Document>> {
3262        self.query(collection)
3263            .filter(move |f| f.between(field, min.clone(), max.clone()))
3264            .collect()
3265            .await
3266    }
3267
3268    // Complex query example: build with multiple combined filters
3269    pub async fn find_complex<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
3270        self.query(collection)
3271    }
3272
3273    // Create a full-text search query with added filter options
3274    pub fn advanced_search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
3275        self.search(collection)
3276    }
3277
3278    // Utility methods for common operations
3279    pub async fn upsert(
3280        &self,
3281        collection: &str,
3282        id: &str,
3283        data: Vec<(&str, Value)>,
3284    ) -> Result<String> {
3285        // Convert Vec<(&str, Value)> to HashMap<String, Value>
3286        let data_map: HashMap<String, Value> =
3287            data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
3288
3289        // Check if document exists
3290        if let Some(mut doc) = self.get_document(collection, id)? {
3291            // Clone for notification
3292            let old_doc = doc.clone();
3293
3294            // Update existing document - merge new data
3295            for (key, value) in data_map {
3296                doc.data.insert(key, value);
3297            }
3298
3299            // Validate unique constraints for the updated document
3300            // We need to exclude the current document from the uniqueness check
3301            self.validate_unique_constraints_excluding(collection, &doc.data, id)
3302                .await?;
3303
3304            self.put(
3305                format!("{}:{}", collection, id),
3306                serde_json::to_vec(&doc)?,
3307                None,
3308            )
3309            .await?;
3310
3311            // Publish update event
3312            let event = crate::pubsub::ChangeEvent::update(collection, id, old_doc, doc);
3313            let _ = self.pubsub.publish(event);
3314
3315            Ok(id.to_string())
3316        } else {
3317            // Insert new document with specified ID - validate unique constraints
3318            self.validate_unique_constraints(collection, &data_map)
3319                .await?;
3320
3321            let document = Document {
3322                id: id.to_string(),
3323                data: data_map,
3324            };
3325
3326            self.put(
3327                format!("{}:{}", collection, id),
3328                serde_json::to_vec(&document)?,
3329                None,
3330            )
3331            .await?;
3332
3333            // Publish insert event
3334            let event = crate::pubsub::ChangeEvent::insert(collection, id, document);
3335            let _ = self.pubsub.publish(event);
3336
3337            Ok(id.to_string())
3338        }
3339    }
3340
3341    // Atomic increment/decrement
3342    pub async fn increment(
3343        &self,
3344        collection: &str,
3345        id: &str,
3346        field: &str,
3347        amount: i64,
3348    ) -> Result<i64> {
3349        if let Some(mut doc) = self.get_document(collection, id)? {
3350            // Get current value
3351            let current = match doc.data.get(field) {
3352                Some(Value::Int(i)) => *i,
3353                _ => 0,
3354            };
3355
3356            // Increment
3357            let new_value = current + amount;
3358            doc.data.insert(field.to_string(), Value::Int(new_value));
3359
3360            // Save changes
3361            self.put(
3362                format!("{}:{}", collection, id),
3363                serde_json::to_vec(&doc)?,
3364                None,
3365            )
3366            .await?;
3367
3368            Ok(new_value)
3369        } else {
3370            Err(AqlError::new(
3371                ErrorCode::NotFound,
3372                format!("Document {}:{} not found", collection, id),
3373            ))
3374        }
3375    }
3376
3377    // Delete documents by query
3378    pub async fn delete_by_query<F>(&self, collection: &str, filter_fn: F) -> Result<usize>
3379    where
3380        F: Fn(&FilterBuilder) -> bool + Send + Sync + 'static,
3381    {
3382        let docs = self.query(collection).filter(filter_fn).collect().await?;
3383        let mut deleted_count = 0;
3384
3385        for doc in docs {
3386            let key = format!("{}:{}", collection, doc.id);
3387            self.delete(&key).await?;
3388            deleted_count += 1;
3389        }
3390
3391        Ok(deleted_count)
3392    }
3393
3394    /// Import documents from a JSON file into a collection
3395    ///
3396    /// Validates each document against the collection schema, skips duplicates (by ID),
3397    /// and provides detailed statistics about the import operation. Useful for restoring
3398    /// backups, migrating data, or seeding development databases.
3399    ///
3400    /// # Performance
3401    /// - Import speed: ~5,000 docs/sec (with validation)
3402    /// - Memory efficient: processes documents one at a time
3403    /// - Validates schema and unique constraints
3404    ///
3405    /// # Arguments
3406    /// * `collection` - Name of the collection to import into
3407    /// * `filename` - Path to the JSON file containing documents (array format)
3408    ///
3409    /// # Returns
3410    /// `ImportStats` containing counts of imported, skipped, and failed documents
3411    ///
3412    /// # Examples
3413    ///
3414    /// ```
3415    /// use aurora_db::Aurora;
3416    ///
3417    /// let db = Aurora::open("mydb.db")?;
3418    ///
3419    /// // Basic import
3420    /// let stats = db.import_from_json("users", "./data/new_users.json").await?;
3421    /// println!("Imported: {}, Skipped: {}, Failed: {}",
3422    ///     stats.imported, stats.skipped, stats.failed);
3423    ///
3424    /// // Restore from backup
3425    /// let backup_file = "./backups/users_2024-01-15.json";
3426    /// let stats = db.import_from_json("users", backup_file).await?;
3427    ///
3428    /// if stats.failed > 0 {
3429    ///     eprintln!("Warning: {} documents failed validation", stats.failed);
3430    /// }
3431    ///
3432    /// // Idempotent import - duplicates are skipped
3433    /// let stats = db.import_from_json("users", "./data/users.json").await?;
3434    /// // Running again will skip all existing documents
3435    /// let stats2 = db.import_from_json("users", "./data/users.json").await?;
3436    /// assert_eq!(stats2.skipped, stats.imported);
3437    ///
3438    /// // Migration from another system
3439    /// db.new_collection("products", vec![
3440    ///     ("sku", FieldType::String),
3441    ///     ("name", FieldType::String),
3442    ///     ("price", FieldType::Float),
3443    /// ])?;
3444    ///
3445    /// let stats = db.import_from_json("products", "./migration/products.json").await?;
3446    /// println!("Migration complete: {} products imported", stats.imported);
3447    /// ```
3448    ///
3449    /// # Expected JSON Format
3450    ///
3451    /// The JSON file should contain an array of document objects:
3452    /// ```json
3453    /// [
3454    ///   { "id": "123", "name": "Alice", "email": "alice@example.com" },
3455    ///   { "id": "456", "name": "Bob", "email": "bob@example.com" },
3456    ///   { "name": "Carol", "email": "carol@example.com" }
3457    /// ]
3458    /// ```
3459    ///
3460    /// # Behavior
3461    /// - Documents with existing IDs are skipped (duplicate detection)
3462    /// - Documents without IDs get auto-generated UUIDs
3463    /// - Schema validation is performed on all fields
3464    /// - Failed documents are counted but don't stop the import
3465    /// - Unique constraints are checked
3466    ///
3467    /// # See Also
3468    /// - `export_as_json()` to create compatible backup files
3469    /// - `batch_insert()` for programmatic bulk inserts
3470    pub async fn import_from_json(&self, collection: &str, filename: &str) -> Result<ImportStats> {
3471        // Validate that the collection exists
3472        let collection_def = self.get_collection_definition(collection)?;
3473
3474        // Load JSON file
3475        let json_string = read_to_string(filename).await.map_err(|e| {
3476            AqlError::new(
3477                ErrorCode::IoError,
3478                format!("Failed to read import file: {}", e),
3479            )
3480        })?;
3481
3482        // Parse JSON
3483        let documents: Vec<JsonValue> = from_str(&json_string).map_err(|e| {
3484            AqlError::new(
3485                ErrorCode::SerializationError,
3486                format!("Failed to parse JSON: {}", e),
3487            )
3488        })?;
3489
3490        let mut stats = ImportStats::default();
3491
3492        // Process each document
3493        for doc_json in documents {
3494            match self
3495                .import_document(collection, &collection_def, doc_json)
3496                .await
3497            {
3498                Ok(ImportResult::Imported) => stats.imported += 1,
3499                Ok(ImportResult::Skipped) => stats.skipped += 1,
3500                Err(_) => stats.failed += 1,
3501            }
3502        }
3503
3504        Ok(stats)
3505    }
3506
3507    /// Import a single document, performing schema validation and duplicate checking
3508    async fn import_document(
3509        &self,
3510        collection: &str,
3511        collection_def: &Collection,
3512        doc_json: JsonValue,
3513    ) -> Result<ImportResult> {
3514        if !doc_json.is_object() {
3515            return Err(AqlError::invalid_operation(
3516                "Expected JSON object".to_string(),
3517            ));
3518        }
3519
3520        // Extract document ID if present
3521        let doc_id = doc_json
3522            .get("id")
3523            .and_then(|id| id.as_str())
3524            .map(|s| s.to_string())
3525            .unwrap_or_else(|| Uuid::now_v7().to_string());
3526
3527        // Check if document with this ID already exists
3528        if self.get_document(collection, &doc_id)?.is_some() {
3529            return Ok(ImportResult::Skipped);
3530        }
3531
3532        // Convert JSON to our document format and validate against schema
3533        let mut data_map = HashMap::new();
3534
3535        if let Some(obj) = doc_json.as_object() {
3536            for (field_name, field_def) in &collection_def.fields {
3537                if let Some(json_value) = obj.get(field_name) {
3538                    // Validate value against field type
3539                    if !self.validate_field_value(json_value, &field_def.field_type) {
3540                        return Err(AqlError::invalid_operation(format!(
3541                            "Field '{}' has invalid type",
3542                            field_name
3543                        )));
3544                    }
3545
3546                    // Convert JSON value to our Value type
3547                    let value = self.json_to_value(json_value)?;
3548                    data_map.insert(field_name.clone(), value);
3549                } else if field_def.unique {
3550                    // Missing required unique field
3551                    return Err(AqlError::invalid_operation(format!(
3552                        "Missing required unique field '{}'",
3553                        field_name
3554                    )));
3555                }
3556            }
3557        }
3558
3559        // Check for duplicates by unique fields
3560        let unique_fields = self.get_unique_fields(collection_def);
3561        for unique_field in &unique_fields {
3562            if let Some(value) = data_map.get(unique_field) {
3563                // Query for existing documents with this unique value
3564                let query_results = self
3565                    .query(collection)
3566                    .filter(move |f| f.eq(unique_field, value.clone()))
3567                    .limit(1)
3568                    .collect()
3569                    .await?;
3570
3571                if !query_results.is_empty() {
3572                    // Found duplicate by unique field
3573                    return Ok(ImportResult::Skipped);
3574                }
3575            }
3576        }
3577
3578        // Create and insert document
3579        let document = Document {
3580            id: doc_id,
3581            data: data_map,
3582        };
3583
3584        self.put(
3585            format!("{}:{}", collection, document.id),
3586            serde_json::to_vec(&document)?,
3587            None,
3588        )
3589        .await?;
3590
3591        Ok(ImportResult::Imported)
3592    }
3593
3594    /// Validate that a JSON value matches the expected field type
3595    fn validate_field_value(&self, value: &JsonValue, field_type: &FieldType) -> bool {
3596        use crate::types::ScalarType;
3597        match field_type {
3598            FieldType::Scalar(ScalarType::String) => value.is_string(),
3599            FieldType::Scalar(ScalarType::Int) => value.is_i64() || value.is_u64(),
3600            FieldType::Scalar(ScalarType::Float) => value.is_number(),
3601            FieldType::Scalar(ScalarType::Bool) => value.is_boolean(),
3602            FieldType::Scalar(ScalarType::Uuid) => {
3603                value.is_string() && Uuid::parse_str(value.as_str().unwrap_or("")).is_ok()
3604            }
3605            FieldType::Scalar(ScalarType::Any) => true,
3606
3607            FieldType::Array(_) => value.is_array(),
3608            FieldType::Object | FieldType::Nested(_) => value.is_object(),
3609            // Legacy/Duplicate catches just in case
3610            _ => true,
3611        }
3612    }
3613
3614    /// Convert a JSON value to our internal Value type
3615    #[allow(clippy::only_used_in_recursion)]
3616    fn json_to_value(&self, json_value: &JsonValue) -> Result<Value> {
3617        match json_value {
3618            JsonValue::Null => Ok(Value::Null),
3619            JsonValue::Bool(b) => Ok(Value::Bool(*b)),
3620            JsonValue::Number(n) => {
3621                if let Some(i) = n.as_i64() {
3622                    Ok(Value::Int(i))
3623                } else if let Some(f) = n.as_f64() {
3624                    Ok(Value::Float(f))
3625                } else {
3626                    Err(AqlError::invalid_operation(
3627                        "Invalid number value".to_string(),
3628                    ))
3629                }
3630            }
3631            JsonValue::String(s) => {
3632                // Try parsing as UUID first
3633                if let Ok(uuid) = Uuid::parse_str(s) {
3634                    Ok(Value::Uuid(uuid))
3635                } else {
3636                    Ok(Value::String(s.clone()))
3637                }
3638            }
3639            JsonValue::Array(arr) => {
3640                let mut values = Vec::new();
3641                for item in arr {
3642                    values.push(self.json_to_value(item)?);
3643                }
3644                Ok(Value::Array(values))
3645            }
3646            JsonValue::Object(obj) => {
3647                let mut map = HashMap::new();
3648                for (k, v) in obj {
3649                    map.insert(k.clone(), self.json_to_value(v)?);
3650                }
3651                Ok(Value::Object(map))
3652            }
3653        }
3654    }
3655
3656    /// Get collection definition
3657    pub fn get_collection_definition(&self, collection: &str) -> Result<Collection> {
3658        if let Some(data) = self.get(&format!("_collection:{}", collection))? {
3659            let collection_def: Collection = serde_json::from_slice(&data)?;
3660            Ok(collection_def)
3661        } else {
3662            Err(AqlError::new(
3663                ErrorCode::CollectionNotFound,
3664                collection.to_string(),
3665            ))
3666        }
3667    }
3668
3669    /// Get storage statistics and information about the database
3670    pub fn get_database_stats(&self) -> Result<DatabaseStats> {
3671        let hot_stats = self.hot.get_stats();
3672        let cold_stats = self.cold.get_stats()?;
3673
3674        Ok(DatabaseStats {
3675            hot_stats,
3676            cold_stats,
3677            estimated_size: self.cold.estimated_size(),
3678            collections: self.get_collection_stats()?,
3679        })
3680    }
3681
3682    /// Check if a key is currently stored in the hot cache
3683    pub fn is_in_hot_cache(&self, key: &str) -> bool {
3684        self.hot.is_hot(key)
3685    }
3686
3687    /// Clear the hot cache (useful when memory needs to be freed)
3688    pub fn clear_hot_cache(&self) {
3689        self.hot.clear();
3690        println!(
3691            "Hot cache cleared, current hit ratio: {:.2}%",
3692            self.hot.hit_ratio() * 100.0
3693        );
3694    }
3695
3696    /// Prewarm the cache by loading frequently accessed data from cold storage
3697    ///
3698    /// Loads documents from a collection into memory cache to eliminate cold-start
3699    /// latency. Dramatically improves initial query performance after database startup
3700    /// by preloading the most commonly accessed data.
3701    ///
3702    /// # Performance Impact
3703    /// - Prewarming speed: ~20,000 docs/sec
3704    /// - Improves subsequent read latency from ~2ms (disk) to ~0.001ms (memory)
3705    /// - Cache hit rate jumps from 0% to 95%+ for prewarmed data
3706    /// - Memory cost: ~500 bytes per document average
3707    ///
3708    /// # Arguments
3709    /// * `collection` - The collection to prewarm
3710    /// * `limit` - Maximum number of documents to load (default: 1000, None = all)
3711    ///
3712    /// # Returns
3713    /// Number of documents loaded into cache
3714    ///
3715    /// # Examples
3716    ///
3717    /// ```
3718    /// use aurora_db::Aurora;
3719    ///
3720    /// let db = Aurora::open("mydb.db")?;
3721    ///
3722    /// // Prewarm frequently accessed collection
3723    /// let loaded = db.prewarm_cache("users", Some(1000)).await?;
3724    /// println!("Prewarmed {} user documents", loaded);
3725    ///
3726    /// // Now queries are fast from the start
3727    /// let stats_before = db.get_cache_stats();
3728    /// let users = db.query("users").collect().await?;
3729    /// let stats_after = db.get_cache_stats();
3730    ///
3731    /// // High hit rate thanks to prewarming
3732    /// assert!(stats_after.hit_rate > 0.95);
3733    ///
3734    /// // Startup optimization pattern
3735    /// async fn startup_prewarm(db: &Aurora) -> Result<()> {
3736    ///     println!("Prewarming caches...");
3737    ///
3738    ///     // Prewarm most frequently accessed collections
3739    ///     db.prewarm_cache("users", Some(5000)).await?;
3740    ///     db.prewarm_cache("sessions", Some(1000)).await?;
3741    ///     db.prewarm_cache("products", Some(500)).await?;
3742    ///
3743    ///     let stats = db.get_cache_stats();
3744    ///     println!("Cache prewarmed: {} entries loaded", stats.size);
3745    ///
3746    ///     Ok(())
3747    /// }
3748    ///
3749    /// // Web server startup
3750    /// #[tokio::main]
3751    /// async fn main() {
3752    ///     let db = Aurora::open("app.db").unwrap();
3753    ///
3754    ///     // Prewarm before accepting requests
3755    ///     db.prewarm_cache("users", Some(10000)).await.unwrap();
3756    ///
3757    ///     // Server is now ready with hot cache
3758    ///     start_web_server(db).await;
3759    /// }
3760    ///
3761    /// // Prewarm all documents (for small collections)
3762    /// let all_loaded = db.prewarm_cache("config", None).await?;
3763    /// // All config documents now in memory
3764    ///
3765    /// // Selective prewarming based on access patterns
3766    /// async fn smart_prewarm(db: &Aurora) -> Result<()> {
3767    ///     // Load recent users (they're accessed most)
3768    ///     db.prewarm_cache("users", Some(1000)).await?;
3769    ///
3770    ///     // Load active sessions only
3771    ///     let active_sessions = db.query("sessions")
3772    ///         .filter(|f| f.eq("active", Value::Bool(true)))
3773    ///         .limit(500)
3774    ///         .collect()
3775    ///         .await?;
3776    ///
3777    ///     // Manually populate cache with hot data
3778    ///     for session in active_sessions {
3779    ///         // Reading automatically caches
3780    ///         db.get_document("sessions", &session.id)?;
3781    ///     }
3782    ///
3783    ///     Ok(())
3784    /// }
3785    /// ```
3786    ///
3787    /// # Typical Prewarming Scenarios
3788    ///
3789    /// **Web Application Startup:**
3790    /// ```
3791    /// // Load user data, sessions, and active content
3792    /// db.prewarm_cache("users", Some(5000)).await?;
3793    /// db.prewarm_cache("sessions", Some(2000)).await?;
3794    /// db.prewarm_cache("posts", Some(1000)).await?;
3795    /// ```
3796    ///
3797    /// **E-commerce Site:**
3798    /// ```
3799    /// // Load products, categories, and user carts
3800    /// db.prewarm_cache("products", Some(500)).await?;
3801    /// db.prewarm_cache("categories", None).await?;  // All categories
3802    /// db.prewarm_cache("active_carts", Some(1000)).await?;
3803    /// ```
3804    ///
3805    /// **API Server:**
3806    /// ```
3807    /// // Load authentication data and rate limits
3808    /// db.prewarm_cache("api_keys", None).await?;
3809    /// db.prewarm_cache("rate_limits", Some(10000)).await?;
3810    /// ```
3811    ///
3812    /// # When to Use
3813    /// - At application startup to eliminate cold-start latency
3814    /// - After cache clear operations
3815    /// - Before high-traffic events (product launches, etc.)
3816    /// - When deploying new instances (load balancer warm-up)
3817    ///
3818    /// # Memory Considerations
3819    /// - 1,000 docs ≈ 500 KB memory
3820    /// - 10,000 docs ≈ 5 MB memory
3821    /// - 100,000 docs ≈ 50 MB memory
3822    /// - Stay within configured cache capacity
3823    ///
3824    /// # See Also
3825    /// - `get_cache_stats()` to monitor cache effectiveness
3826    /// - `prewarm_all_collections()` to prewarm all collections
3827    /// - `Aurora::with_config()` to adjust cache capacity
3828    pub async fn prewarm_cache(&self, collection: &str, limit: Option<usize>) -> Result<usize> {
3829        let limit = limit.unwrap_or(1000);
3830        let prefix = format!("{}:", collection);
3831        let mut loaded = 0;
3832
3833        for entry in self.cold.scan_prefix(&prefix) {
3834            if loaded >= limit {
3835                break;
3836            }
3837
3838            if let Ok((key, value)) = entry {
3839                // Load into hot cache
3840                self.hot.set(Arc::new(key.clone()), Arc::new(value), None);
3841                loaded += 1;
3842            }
3843        }
3844
3845        println!("Prewarmed {} with {} documents", collection, loaded);
3846        Ok(loaded)
3847    }
3848
3849    /// Prewarm cache for all collections
3850    pub async fn prewarm_all_collections(
3851        &self,
3852        docs_per_collection: Option<usize>,
3853    ) -> Result<HashMap<String, usize>> {
3854        let mut stats = HashMap::new();
3855
3856        // Get all collections
3857        let collections: Vec<String> = self
3858            .cold
3859            .scan()
3860            .filter_map(|r| r.ok())
3861            .map(|(k, _)| k)
3862            .filter(|k| k.starts_with("_collection:"))
3863            .map(|k| k.trim_start_matches("_collection:").to_string())
3864            .collect();
3865
3866        for collection in collections {
3867            let loaded = self.prewarm_cache(&collection, docs_per_collection).await?;
3868            stats.insert(collection, loaded);
3869        }
3870
3871        Ok(stats)
3872    }
3873
3874    /// Store multiple key-value pairs efficiently in a single batch operation
3875    ///
3876    /// Low-level batch write operation that bypasses document validation and
3877    /// writes raw byte data directly to storage. Useful for advanced use cases,
3878    /// custom serialization, or maximum performance scenarios.
3879    ///
3880    /// # Performance
3881    /// - Write speed: ~100,000 writes/sec
3882    /// - Single disk fsync for entire batch
3883    /// - No validation or schema checking
3884    /// - Direct storage access
3885    ///
3886    /// # Arguments
3887    /// * `pairs` - Vector of (key, value) tuples where value is raw bytes
3888    ///
3889    /// # Returns
3890    /// Success or an error
3891    ///
3892    /// # Examples
3893    ///
3894    /// ```
3895    /// use aurora_db::Aurora;
3896    ///
3897    /// let db = Aurora::open("mydb.db")?;
3898    ///
3899    /// // Low-level batch write
3900    /// let pairs = vec![
3901    ///     ("users:123".to_string(), b"raw data 1".to_vec()),
3902    ///     ("users:456".to_string(), b"raw data 2".to_vec()),
3903    ///     ("cache:key1".to_string(), b"cached value".to_vec()),
3904    /// ];
3905    ///
3906    /// db.batch_write(pairs)?;
3907    ///
3908    /// // Custom binary serialization
3909    /// use bincode;
3910    ///
3911    /// #[derive(Serialize, Deserialize)]
3912    /// struct CustomData {
3913    ///     id: u64,
3914    ///     payload: Vec<u8>,
3915    /// }
3916    ///
3917    /// let custom_data = vec![
3918    ///     CustomData { id: 1, payload: vec![1, 2, 3] },
3919    ///     CustomData { id: 2, payload: vec![4, 5, 6] },
3920    /// ];
3921    ///
3922    /// let pairs: Vec<(String, Vec<u8>)> = custom_data
3923    ///     .iter()
3924    ///     .map(|data| {
3925    ///         let key = format!("binary:{}", data.id);
3926    ///         let value = bincode::serialize(data).unwrap();
3927    ///         (key, value)
3928    ///     })
3929    ///     .collect();
3930    ///
3931    /// db.batch_write(pairs)?;
3932    ///
3933    /// // Bulk cache population
3934    /// let cache_entries: Vec<(String, Vec<u8>)> = (0..10000)
3935    ///     .map(|i| {
3936    ///         let key = format!("cache:item_{}", i);
3937    ///         let value = format!("value_{}", i).into_bytes();
3938    ///         (key, value)
3939    ///     })
3940    ///     .collect();
3941    ///
3942    /// db.batch_write(cache_entries)?;
3943    /// // Writes 10,000 entries in ~100ms
3944    /// ```
3945    ///
3946    /// # Important Notes
3947    /// - No schema validation performed
3948    /// - No unique constraint checking
3949    /// - No automatic indexing
3950    /// - Keys must follow "collection:id" format for proper grouping
3951    /// - Values are raw bytes - you handle serialization
3952    /// - Use `batch_insert()` for validated document inserts
3953    ///
3954    /// # When to Use
3955    /// - Maximum write performance needed
3956    /// - Custom serialization formats (bincode, msgpack, etc.)
3957    /// - Cache population
3958    /// - Low-level database operations
3959    /// - You're bypassing the document model
3960    ///
3961    /// # When NOT to Use
3962    /// - Regular document inserts → Use `batch_insert()` instead
3963    /// - Need validation → Use `batch_insert()` instead
3964    /// - Need indexing → Use `batch_insert()` instead
3965    ///
3966    /// # See Also
3967    /// - `batch_insert()` for validated document batch inserts
3968    /// - `put()` for single key-value writes
3969    pub async fn batch_write(&self, pairs: Vec<(String, Vec<u8>)>) -> Result<()> {
3970        // Group pairs by collection name
3971        let mut collections: HashMap<String, Vec<(String, Vec<u8>)>> = HashMap::new();
3972        for (key, value) in &pairs {
3973            if let Some(collection_name) = key.split(':').next() {
3974                collections
3975                    .entry(collection_name.to_string())
3976                    .or_default()
3977                    .push((key.clone(), value.clone()));
3978            }
3979        }
3980
3981        // First, do the batch write to cold storage for all pairs
3982        self.cold.batch_set(pairs)?;
3983
3984        // Then, process each collection for in-memory updates
3985        for (collection_name, batch) in collections {
3986            // --- Optimized Batch Indexing ---
3987
3988            // 1. Get schema once for the entire collection batch
3989            let collection_obj = match self.schema_cache.get(&collection_name) {
3990                Some(cached_schema) => Arc::clone(cached_schema.value()),
3991                None => {
3992                    let collection_key = format!("_collection:{}", collection_name);
3993                    match self.get(&collection_key)? {
3994                        Some(data) => {
3995                            let obj: Collection = serde_json::from_slice(&data)?;
3996                            let arc_obj = Arc::new(obj);
3997                            self.schema_cache
3998                                .insert(collection_name.to_string(), Arc::clone(&arc_obj));
3999                            arc_obj
4000                        }
4001                        None => continue,
4002                    }
4003                }
4004            };
4005
4006            let indexed_fields: Vec<String> = collection_obj
4007                .fields
4008                .iter()
4009                .filter(|(_, def)| def.indexed || def.unique)
4010                .map(|(name, _)| name.clone())
4011                .collect();
4012
4013            let primary_index = self
4014                .primary_indices
4015                .entry(collection_name.to_string())
4016                .or_default();
4017
4018            for (key, value) in batch {
4019                // 2. Update hot cache
4020                if self.should_cache_key(&key) {
4021                    self.hot
4022                        .set(Arc::new(key.clone()), Arc::new(value.clone()), None);
4023                }
4024
4025                // 3. Update primary index with metadata only
4026                let location = DiskLocation::new(value.len());
4027                primary_index.insert(key.clone(), location);
4028
4029                // 4. Update secondary indices
4030                if !indexed_fields.is_empty()
4031                    && let Ok(doc) = serde_json::from_slice::<Document>(&value)
4032                {
4033                    for (field, field_value) in doc.data {
4034                        if indexed_fields.contains(&field) {
4035                            let value_str = match &field_value {
4036                                Value::String(s) => s.clone(),
4037                                _ => field_value.to_string(),
4038                            };
4039                            let index_key = format!("{}:{}", collection_name, field);
4040                            let secondary_index =
4041                                self.secondary_indices.entry(index_key).or_default();
4042
4043                            let max_entries = self.config.max_index_entries_per_field;
4044                            secondary_index
4045                                .entry(value_str)
4046                                .and_modify(|doc_ids| {
4047                                    if doc_ids.len() < max_entries {
4048                                        doc_ids.push(key.to_string());
4049                                    }
4050                                })
4051                                .or_insert_with(|| vec![key.to_string()]);
4052                        }
4053                    }
4054                }
4055            }
4056        }
4057
4058        Ok(())
4059    }
4060
4061    /// Scan for keys with a specific prefix
4062    pub fn scan_with_prefix(
4063        &self,
4064        prefix: &str,
4065    ) -> impl Iterator<Item = Result<(String, Vec<u8>)>> + '_ {
4066        self.cold.scan_prefix(prefix)
4067    }
4068
4069    /// Get storage efficiency metrics for the database
4070    pub fn get_collection_stats(&self) -> Result<HashMap<String, CollectionStats>> {
4071        let mut stats = HashMap::new();
4072
4073        // Scan all collections
4074        let collections: Vec<String> = self
4075            .cold
4076            .scan()
4077            .filter_map(|r| r.ok())
4078            .map(|(k, _)| k)
4079            .filter(|k| k.starts_with("_collection:"))
4080            .map(|k| k.trim_start_matches("_collection:").to_string())
4081            .collect();
4082
4083        for collection in collections {
4084            // Use primary index for fast stats (count + size from DiskLocation)
4085            let (count, size) = if let Some(index) = self.primary_indices.get(&collection) {
4086                let count = index.len();
4087                // Sum up sizes from DiskLocation metadata (much faster than disk scan)
4088                let size: usize = index.iter().map(|entry| entry.value().size as usize).sum();
4089                (count, size)
4090            } else {
4091                // Fallback: scan from cold storage if index not available
4092                let prefix = format!("{}:", collection);
4093                let count = self.cold.scan_prefix(&prefix).count();
4094                let size: usize = self
4095                    .cold
4096                    .scan_prefix(&prefix)
4097                    .filter_map(|r| r.ok())
4098                    .map(|(_, v)| v.len())
4099                    .sum();
4100                (count, size)
4101            };
4102
4103            stats.insert(
4104                collection,
4105                CollectionStats {
4106                    count,
4107                    size_bytes: size,
4108                    avg_doc_size: if count > 0 { size / count } else { 0 },
4109                },
4110            );
4111        }
4112
4113        Ok(stats)
4114    }
4115
4116    /// Search for documents by exact value using an index
4117    ///
4118    /// This method performs a fast lookup using a pre-created index
4119    pub fn search_by_value(
4120        &self,
4121        collection: &str,
4122        field: &str,
4123        value: &Value,
4124    ) -> Result<Vec<Document>> {
4125        let index_key = format!("_index:{}:{}", collection, field);
4126
4127        if let Some(index_data) = self.get(&index_key)? {
4128            let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
4129            let index = Index::new(index_def);
4130
4131            // Use the previously unused search method
4132            if let Some(doc_ids) = index.search(value) {
4133                // Load the documents by ID
4134                let mut docs = Vec::new();
4135                for id in doc_ids {
4136                    if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
4137                        let doc: Document = serde_json::from_slice(&doc_data)?;
4138                        docs.push(doc);
4139                    }
4140                }
4141                return Ok(docs);
4142            }
4143        }
4144
4145        // Return empty result if no index or no matches
4146        Ok(Vec::new())
4147    }
4148
4149    /// Perform a full-text search on an indexed text field
4150    ///
4151    /// This provides more advanced text search capabilities including
4152    /// relevance ranking of results
4153    pub fn full_text_search(
4154        &self,
4155        collection: &str,
4156        field: &str,
4157        query: &str,
4158    ) -> Result<Vec<Document>> {
4159        let index_key = format!("_index:{}:{}", collection, field);
4160
4161        if let Some(index_data) = self.get(&index_key)? {
4162            let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
4163
4164            // Ensure this is a full-text index
4165            if !matches!(index_def.index_type, IndexType::FullText) {
4166                return Err(AqlError::invalid_operation(format!(
4167                    "Field '{}' is not indexed as full-text",
4168                    field
4169                )));
4170            }
4171
4172            let index = Index::new(index_def);
4173
4174            // Use the previously unused search_text method
4175            if let Some(doc_id_scores) = index.search_text(query) {
4176                // Load the documents by ID, preserving score order
4177                let mut docs = Vec::new();
4178                for (id, _score) in doc_id_scores {
4179                    if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
4180                        let doc: Document = serde_json::from_slice(&doc_data)?;
4181                        docs.push(doc);
4182                    }
4183                }
4184                return Ok(docs);
4185            }
4186        }
4187
4188        // Return empty result if no index or no matches
4189        Ok(Vec::new())
4190    }
4191
4192    /// Create a full-text search index on a text field
4193    pub async fn create_text_index(
4194        &self,
4195        collection: &str,
4196        field: &str,
4197        _enable_stop_words: bool,
4198    ) -> Result<()> {
4199        // Check if collection exists
4200        if self.get(&format!("_collection:{}", collection))?.is_none() {
4201            return Err(AqlError::new(
4202                ErrorCode::CollectionNotFound,
4203                collection.to_string(),
4204            ));
4205        }
4206
4207        // Create index definition
4208        let index_def = IndexDefinition {
4209            name: format!("{}_{}_fulltext", collection, field),
4210            collection: collection.to_string(),
4211            fields: vec![field.to_string()],
4212            index_type: IndexType::FullText,
4213            unique: false,
4214        };
4215
4216        // Store index definition
4217        let index_key = format!("_index:{}:{}", collection, field);
4218        self.put(index_key, serde_json::to_vec(&index_def)?, None)
4219            .await?;
4220
4221        // Create the actual index
4222        let index = Index::new(index_def);
4223
4224        // Index all existing documents in the collection
4225        let prefix = format!("{}:", collection);
4226        for (_, data) in self.cold.scan_prefix(&prefix).flatten() {
4227            let doc: Document = serde_json::from_slice(&data)?;
4228            index.insert(&doc)?;
4229        }
4230
4231        Ok(())
4232    }
4233
4234    pub async fn execute_simple_query(
4235        &self,
4236        builder: &SimpleQueryBuilder,
4237    ) -> Result<Vec<Document>> {
4238        // Ensure indices are initialized
4239        self.ensure_indices_initialized().await?;
4240
4241        // A place to store the IDs of the documents we need to fetch
4242        let mut doc_ids_to_load: Option<Vec<String>> = None;
4243
4244        // --- The "Query Planner" ---
4245        // Smart heuristic: For range queries with small LIMITs, full scan can be faster
4246        // than collecting millions of IDs from secondary index
4247        let use_index_for_range = if let Some(limit) = builder.limit {
4248            // If limit is small (< 1000), prefer full scan for range queries
4249            // The secondary index would scan all entries anyway, might as well
4250            // scan documents directly and benefit from early termination
4251            limit >= 1000
4252        } else {
4253            // No limit? Index might still help if result set is small
4254            true
4255        };
4256
4257        // Look for an opportunity to use an index
4258        for (_filter_idx, filter) in builder.filters.iter().enumerate() {
4259            match filter {
4260                Filter::Eq(field, value) => {
4261                    let index_key = format!("{}:{}", &builder.collection, field);
4262
4263                    // Do we have a secondary index for this field?
4264                    if let Some(index) = self.secondary_indices.get(&index_key) {
4265                        // Yes! Let's use it.
4266                        if let Some(matching_ids) = index.get(&value.to_string()) {
4267                            doc_ids_to_load = Some(matching_ids.clone());
4268                            break; // Stop searching for other indexes for now
4269                        }
4270                    }
4271                }
4272                Filter::Gt(field, value)
4273                | Filter::Gte(field, value)
4274                | Filter::Lt(field, value)
4275                | Filter::Lte(field, value) => {
4276                    // Skip index for range queries with small LIMITs (see query planner heuristic above)
4277                    if !use_index_for_range {
4278                        continue;
4279                    }
4280
4281                    let index_key = format!("{}:{}", &builder.collection, field);
4282
4283                    // Do we have a secondary index for this field?
4284                    if let Some(index) = self.secondary_indices.get(&index_key) {
4285                        // For range queries, we need to scan through the index values
4286                        let mut matching_ids = Vec::new();
4287
4288                        for entry in index.iter() {
4289                            let index_value_str = entry.key();
4290
4291                            // Try to parse the index value to compare with our filter value
4292                            if let Ok(index_value) =
4293                                self.parse_value_from_string(index_value_str, value)
4294                            {
4295                                let matches = match filter {
4296                                    Filter::Gt(_, filter_val) => index_value > *filter_val,
4297                                    Filter::Gte(_, filter_val) => index_value >= *filter_val,
4298                                    Filter::Lt(_, filter_val) => index_value < *filter_val,
4299                                    Filter::Lte(_, filter_val) => index_value <= *filter_val,
4300                                    _ => false,
4301                                };
4302
4303                                if matches {
4304                                    matching_ids.extend(entry.value().clone());
4305                                }
4306                            }
4307                        }
4308
4309                        if !matching_ids.is_empty() {
4310                            doc_ids_to_load = Some(matching_ids);
4311                            break;
4312                        }
4313                    }
4314                }
4315                Filter::Contains(field, search_term) => {
4316                    let index_key = format!("{}:{}", &builder.collection, field);
4317
4318                    // Do we have a secondary index for this field?
4319                    if let Some(index) = self.secondary_indices.get(&index_key) {
4320                        let mut matching_ids = Vec::new();
4321
4322                        for entry in index.iter() {
4323                            let index_value_str = entry.key();
4324
4325                            // Check if this indexed value contains our search term
4326                            if index_value_str
4327                                .to_lowercase()
4328                                .contains(&search_term.to_lowercase())
4329                            {
4330                                matching_ids.extend(entry.value().clone());
4331                            }
4332                        }
4333
4334                        if !matching_ids.is_empty() {
4335                            // Remove duplicates since a document could match multiple indexed values
4336                            matching_ids.sort();
4337                            matching_ids.dedup();
4338
4339                            doc_ids_to_load = Some(matching_ids);
4340                            break;
4341                        }
4342                    }
4343                }
4344                Filter::And(_) => {
4345                    // For compound filters, we can't easily use a single index
4346                    // This would require more complex query planning
4347                    continue;
4348                }
4349                Filter::Or(sub_filters) => {
4350                    // For OR filters, collect union of all matching IDs from each sub-filter
4351                    let mut union_ids: Vec<String> = Vec::new();
4352                    let mut used_index = false;
4353
4354                    for sub_filter in sub_filters {
4355                        match sub_filter {
4356                            Filter::Eq(field, value) => {
4357                                let index_key = format!("{}:{}", &builder.collection, field);
4358                                if let Some(index) = self.secondary_indices.get(&index_key) {
4359                                    if let Some(matching_ids) = index.get(&value.to_string()) {
4360                                        union_ids.extend(matching_ids.clone());
4361                                        used_index = true;
4362                                    }
4363                                }
4364                            }
4365                            // For other filter types in OR, we fall back to full scan
4366                            _ => continue,
4367                        }
4368                    }
4369
4370                    if used_index && !union_ids.is_empty() {
4371                        // Remove duplicates
4372                        union_ids.sort();
4373                        union_ids.dedup();
4374                        doc_ids_to_load = Some(union_ids);
4375                        break;
4376                    }
4377                    // If no index was used, continue to full scan
4378                }
4379            }
4380        }
4381
4382        let mut final_docs: Vec<Document>;
4383
4384        if let Some(ids) = doc_ids_to_load {
4385            // Index path
4386            use std::io::Write;
4387            if let Ok(mut file) = std::fs::OpenOptions::new()
4388                .create(true)
4389                .append(true)
4390                .open("/tmp/aurora_query_stats.log")
4391            {
4392                let _ = writeln!(
4393                    file,
4394                    "[INDEX PATH] IDs to load: {} | Collection: {}",
4395                    ids.len(),
4396                    builder.collection
4397                );
4398            }
4399
4400            final_docs = Vec::with_capacity(ids.len());
4401
4402            for id in ids {
4403                let doc_key = format!("{}:{}", &builder.collection, id);
4404                if let Some(data) = self.get(&doc_key)?
4405                    && let Ok(doc) = serde_json::from_slice::<Document>(&data)
4406                {
4407                    final_docs.push(doc);
4408                }
4409            }
4410        } else {
4411            // --- Path 2: Full Collection Scan with Early Termination ---
4412
4413            // Optimization: If we have a LIMIT but no ORDER BY, we can stop scanning
4414            // as soon as we have enough matching documents
4415            let early_termination_target = if builder.order_by.is_none() {
4416                builder.limit.map(|l| l + builder.offset.unwrap_or(0))
4417            } else {
4418                // With ORDER BY, we need all matching docs to sort correctly
4419                None
4420            };
4421
4422            // Smart scan with early termination support
4423            final_docs = Vec::new();
4424            let mut scan_stats = (0usize, 0usize, 0usize); // (keys_scanned, docs_fetched, matches_found)
4425
4426            if let Some(index) = self.primary_indices.get(&builder.collection) {
4427                for entry in index.iter() {
4428                    let key = entry.key();
4429                    scan_stats.0 += 1; // keys scanned
4430
4431                    // Early termination check
4432                    if let Some(target) = early_termination_target {
4433                        if final_docs.len() >= target {
4434                            break; // We have enough documents!
4435                        }
4436                    }
4437
4438                    // Fetch and filter document
4439                    if let Some(data) = self.get(key)? {
4440                        scan_stats.1 += 1; // docs fetched
4441
4442                        if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
4443                            // Apply all filters
4444                            let matches_all_filters =
4445                                builder.filters.iter().all(|filter| match filter {
4446                                    Filter::Eq(field, value) => doc.data.get(field) == Some(value),
4447                                    Filter::Gt(field, value) => {
4448                                        doc.data.get(field).is_some_and(|v| v > value)
4449                                    }
4450                                    Filter::Gte(field, value) => {
4451                                        doc.data.get(field).is_some_and(|v| v >= value)
4452                                    }
4453                                    Filter::Lt(field, value) => {
4454                                        doc.data.get(field).is_some_and(|v| v < value)
4455                                    }
4456                                    Filter::Lte(field, value) => {
4457                                        doc.data.get(field).is_some_and(|v| v <= value)
4458                                    }
4459                                    Filter::Contains(field, value_str) => {
4460                                        doc.data.get(field).is_some_and(|v| match v {
4461                                            Value::String(s) => s.contains(value_str),
4462                                            Value::Array(arr) => {
4463                                                arr.contains(&Value::String(value_str.clone()))
4464                                            }
4465                                            _ => false,
4466                                        })
4467                                    }
4468                                    Filter::And(filters) => filters.iter().all(|f| f.matches(&doc)),
4469                                    Filter::Or(filters) => filters.iter().any(|f| f.matches(&doc)),
4470                                });
4471
4472                            if matches_all_filters {
4473                                scan_stats.2 += 1; // matches found
4474                                final_docs.push(doc);
4475                            }
4476                        }
4477                    }
4478                }
4479
4480                // Debug logging for query performance analysis
4481                use std::io::Write;
4482                if let Ok(mut file) = std::fs::OpenOptions::new()
4483                    .create(true)
4484                    .append(true)
4485                    .open("/tmp/aurora_query_stats.log")
4486                {
4487                    let _ = writeln!(
4488                        file,
4489                        "[SCAN PATH] Scanned: {} keys | Fetched: {} docs | Matched: {} | Collection: {}",
4490                        scan_stats.0, scan_stats.1, scan_stats.2, builder.collection
4491                    );
4492                }
4493            } else {
4494                // Fallback: scan from cold storage if index not initialized
4495                final_docs = self.get_all_collection(&builder.collection).await?;
4496
4497                // Apply filters
4498                final_docs.retain(|doc| {
4499                    builder.filters.iter().all(|filter| match filter {
4500                        Filter::Eq(field, value) => doc.data.get(field) == Some(value),
4501                        Filter::Gt(field, value) => doc.data.get(field).is_some_and(|v| v > value),
4502                        Filter::Gte(field, value) => {
4503                            doc.data.get(field).is_some_and(|v| v >= value)
4504                        }
4505                        Filter::Lt(field, value) => doc.data.get(field).is_some_and(|v| v < value),
4506                        Filter::Lte(field, value) => {
4507                            doc.data.get(field).is_some_and(|v| v <= value)
4508                        }
4509                        Filter::Contains(field, value_str) => {
4510                            doc.data.get(field).is_some_and(|v| match v {
4511                                Value::String(s) => s.contains(value_str),
4512                                Value::Array(arr) => {
4513                                    arr.contains(&Value::String(value_str.clone()))
4514                                }
4515                                _ => false,
4516                            })
4517                        }
4518                        Filter::And(filters) => filters.iter().all(|f| f.matches(doc)),
4519                        Filter::Or(filters) => filters.iter().any(|f| f.matches(doc)),
4520                    })
4521                });
4522            }
4523        }
4524
4525        // Apply ordering
4526        if let Some((field, ascending)) = &builder.order_by {
4527            final_docs.sort_by(|a, b| match (a.data.get(field), b.data.get(field)) {
4528                (Some(v1), Some(v2)) => {
4529                    let cmp = v1.cmp(v2);
4530                    if *ascending { cmp } else { cmp.reverse() }
4531                }
4532                (None, Some(_)) => std::cmp::Ordering::Less,
4533                (Some(_), None) => std::cmp::Ordering::Greater,
4534                (None, None) => std::cmp::Ordering::Equal,
4535            });
4536        }
4537
4538        // Apply offset and limit
4539        let start = builder.offset.unwrap_or(0);
4540        let end = builder
4541            .limit
4542            .map(|l| start.saturating_add(l))
4543            .unwrap_or(final_docs.len());
4544
4545        let end = end.min(final_docs.len());
4546        Ok(final_docs.get(start..end).unwrap_or(&[]).to_vec())
4547    }
4548
4549    /// Helper method to parse a string value back to a Value for comparison
4550    fn parse_value_from_string(&self, value_str: &str, reference_value: &Value) -> Result<Value> {
4551        match reference_value {
4552            Value::Int(_) => {
4553                if let Ok(i) = value_str.parse::<i64>() {
4554                    Ok(Value::Int(i))
4555                } else {
4556                    Err(AqlError::invalid_operation(
4557                        "Failed to parse int".to_string(),
4558                    ))
4559                }
4560            }
4561            Value::Float(_) => {
4562                if let Ok(f) = value_str.parse::<f64>() {
4563                    Ok(Value::Float(f))
4564                } else {
4565                    Err(AqlError::invalid_operation(
4566                        "Failed to parse float".to_string(),
4567                    ))
4568                }
4569            }
4570            Value::String(_) => Ok(Value::String(value_str.to_string())),
4571            _ => Ok(Value::String(value_str.to_string())),
4572        }
4573    }
4574
4575    pub async fn execute_dynamic_query(
4576        &self,
4577        collection: &str,
4578        payload: &QueryPayload,
4579    ) -> Result<Vec<Document>> {
4580        let mut docs = self.get_all_collection(collection).await?;
4581
4582        // 1. Apply Filters
4583        if let Some(filters) = &payload.filters {
4584            docs.retain(|doc| {
4585                filters.iter().all(|filter| {
4586                    doc.data
4587                        .get(&filter.field)
4588                        .is_some_and(|doc_val| check_filter(doc_val, filter))
4589                })
4590            });
4591        }
4592
4593        // 2. Apply Sorting
4594        if let Some(sort_options) = &payload.sort {
4595            docs.sort_by(|a, b| {
4596                let a_val = a.data.get(&sort_options.field);
4597                let b_val = b.data.get(&sort_options.field);
4598                let ordering = a_val
4599                    .partial_cmp(&b_val)
4600                    .unwrap_or(std::cmp::Ordering::Equal);
4601                if sort_options.ascending {
4602                    ordering
4603                } else {
4604                    ordering.reverse()
4605                }
4606            });
4607        }
4608
4609        // 3. Apply Pagination
4610        if let Some(offset) = payload.offset {
4611            docs = docs.into_iter().skip(offset).collect();
4612        }
4613        if let Some(limit) = payload.limit {
4614            docs = docs.into_iter().take(limit).collect();
4615        }
4616
4617        // 4. Apply Field Selection (Projection)
4618        if let Some(select_fields) = &payload.select
4619            && !select_fields.is_empty()
4620        {
4621            docs = docs
4622                .into_iter()
4623                .map(|mut doc| {
4624                    doc.data.retain(|key, _| select_fields.contains(key));
4625                    doc
4626                })
4627                .collect();
4628        }
4629
4630        Ok(docs)
4631    }
4632
4633    pub async fn process_network_request(
4634        &self,
4635        request: crate::network::protocol::Request,
4636    ) -> crate::network::protocol::Response {
4637        use crate::network::protocol::Response;
4638
4639        match request {
4640            crate::network::protocol::Request::Get(key) => match self.get(&key) {
4641                Ok(value) => Response::Success(value),
4642                Err(e) => Response::Error(e.to_string()),
4643            },
4644            crate::network::protocol::Request::Put(key, value) => {
4645                match self.put(key, value, None).await {
4646                    Ok(_) => Response::Done,
4647                    Err(e) => Response::Error(e.to_string()),
4648                }
4649            }
4650            crate::network::protocol::Request::Delete(key) => match self.delete(&key).await {
4651                Ok(_) => Response::Done,
4652                Err(e) => Response::Error(e.to_string()),
4653            },
4654            crate::network::protocol::Request::NewCollection { name, fields } => {
4655                let fields_for_db: Vec<(String, crate::types::FieldType, bool)> = fields
4656                    .iter()
4657                    .map(|(name, ft, unique)| (name.clone(), ft.clone(), *unique))
4658                    .collect();
4659
4660                match self.new_collection(&name, fields_for_db).await {
4661                    Ok(_) => Response::Done,
4662                    Err(e) => Response::Error(e.to_string()),
4663                }
4664            }
4665            crate::network::protocol::Request::Insert { collection, data } => {
4666                match self.insert_map(&collection, data).await {
4667                    Ok(id) => Response::Message(id),
4668                    Err(e) => Response::Error(e.to_string()),
4669                }
4670            }
4671            crate::network::protocol::Request::GetDocument { collection, id } => {
4672                match self.get_document(&collection, &id) {
4673                    Ok(doc) => Response::Document(doc),
4674                    Err(e) => Response::Error(e.to_string()),
4675                }
4676            }
4677            crate::network::protocol::Request::Query(builder) => {
4678                match self.execute_simple_query(&builder).await {
4679                    Ok(docs) => Response::Documents(docs),
4680                    Err(e) => Response::Error(e.to_string()),
4681                }
4682            }
4683            crate::network::protocol::Request::BeginTransaction => {
4684                let tx_id = self.begin_transaction();
4685                Response::TransactionId(tx_id.as_u64())
4686            }
4687            crate::network::protocol::Request::CommitTransaction(tx_id_u64) => {
4688                let tx_id = crate::transaction::TransactionId::from_u64(tx_id_u64);
4689                match self.commit_transaction(tx_id) {
4690                    Ok(_) => Response::Done,
4691                    Err(e) => Response::Error(e.to_string()),
4692                }
4693            }
4694            crate::network::protocol::Request::RollbackTransaction(tx_id_u64) => {
4695                let tx_id = crate::transaction::TransactionId::from_u64(tx_id_u64);
4696                match self.rollback_transaction(tx_id) {
4697                    Ok(_) => Response::Done,
4698                    Err(e) => Response::Error(e.to_string()),
4699                }
4700            }
4701        }
4702    }
4703
4704    /// Create indices for commonly queried fields automatically
4705    ///
4706    /// This is a convenience method that creates indices for fields that are
4707    /// likely to be queried frequently, improving performance.
4708    ///
4709    /// # Arguments
4710    /// * `collection` - Name of the collection
4711    /// * `fields` - List of field names to create indices for
4712    ///
4713    /// # Examples
4714    /// ```
4715    /// // Create indices for commonly queried fields
4716    /// db.create_indices("users", &["email", "status", "created_at"]).await?;
4717    /// ```
4718    pub async fn create_indices(&self, collection: &str, fields: &[&str]) -> Result<()> {
4719        for field in fields {
4720            if let Err(e) = self.create_index(collection, field).await {
4721                eprintln!(
4722                    "Warning: Failed to create index for {}.{}: {}",
4723                    collection, field, e
4724                );
4725            } else {
4726                println!("Created index for {}.{}", collection, field);
4727            }
4728        }
4729        Ok(())
4730    }
4731
4732    /// Get index statistics for a collection
4733    ///
4734    /// This helps understand which indices exist and how effective they are.
4735    pub fn get_index_stats(&self, collection: &str) -> HashMap<String, IndexStats> {
4736        let mut stats = HashMap::new();
4737
4738        for entry in self.secondary_indices.iter() {
4739            let key = entry.key();
4740            if key.starts_with(&format!("{}:", collection)) {
4741                let field = key.split(':').nth(1).unwrap_or("unknown");
4742                let index = entry.value();
4743
4744                let unique_values = index.len();
4745                let total_documents: usize = index.iter().map(|entry| entry.value().len()).sum();
4746
4747                stats.insert(
4748                    field.to_string(),
4749                    IndexStats {
4750                        unique_values,
4751                        total_documents,
4752                        avg_docs_per_value: if unique_values > 0 {
4753                            total_documents / unique_values
4754                        } else {
4755                            0
4756                        },
4757                    },
4758                );
4759            }
4760        }
4761
4762        stats
4763    }
4764
4765    /// Optimize a collection by creating indices for frequently filtered fields
4766    ///
4767    /// This analyzes common query patterns and suggests/creates optimal indices.
4768    pub async fn optimize_collection(&self, collection: &str) -> Result<()> {
4769        if let Ok(collection_def) = self.get_collection_definition(collection) {
4770            let field_names: Vec<&str> = collection_def.fields.keys().map(|s| s.as_str()).collect();
4771            self.create_indices(collection, &field_names).await?;
4772        }
4773
4774        Ok(())
4775    }
4776
4777    // Helper method to get unique fields from a collection
4778    fn get_unique_fields(&self, collection: &Collection) -> Vec<String> {
4779        collection
4780            .fields
4781            .iter()
4782            .filter(|(_, def)| def.unique)
4783            .map(|(name, _)| name.clone())
4784            .collect()
4785    }
4786
4787    // Update the validation method to use the helper
4788    async fn validate_unique_constraints(
4789        &self,
4790        collection: &str,
4791        data: &HashMap<String, Value>,
4792    ) -> Result<()> {
4793        self.ensure_indices_initialized().await?;
4794        let collection_def = match self.get_collection_definition(collection) {
4795            Ok(def) => def,
4796            Err(e) if e.code == crate::error::ErrorCode::CollectionNotFound => return Ok(()),
4797            Err(e) => return Err(e),
4798        };
4799        let unique_fields = self.get_unique_fields(&collection_def);
4800
4801        for unique_field in &unique_fields {
4802            if let Some(value) = data.get(unique_field) {
4803                let index_key = format!("{}:{}", collection, unique_field);
4804                if let Some(index) = self.secondary_indices.get(&index_key) {
4805                    // Get the raw string value without JSON formatting
4806                    let value_str = match value {
4807                        Value::String(s) => s.clone(),
4808                        _ => value.to_string(),
4809                    };
4810                    if index.contains_key(&value_str) {
4811                        return Err(AqlError::new(
4812                            ErrorCode::UniqueConstraintViolation,
4813                            format!(
4814                                "Unique constraint violation on field '{}' with value '{}'",
4815                                unique_field, value_str
4816                            ),
4817                        ));
4818                    }
4819                }
4820            }
4821        }
4822        Ok(())
4823    }
4824
4825    /// Validate unique constraints excluding a specific document ID (for updates)
4826    async fn validate_unique_constraints_excluding(
4827        &self,
4828        collection: &str,
4829        data: &HashMap<String, Value>,
4830        exclude_id: &str,
4831    ) -> Result<()> {
4832        self.ensure_indices_initialized().await?;
4833        let collection_def = self.get_collection_definition(collection)?;
4834        let unique_fields = self.get_unique_fields(&collection_def);
4835
4836        for unique_field in &unique_fields {
4837            if let Some(value) = data.get(unique_field) {
4838                let index_key = format!("{}:{}", collection, unique_field);
4839                if let Some(index) = self.secondary_indices.get(&index_key) {
4840                    // Get the raw string value without JSON formatting
4841                    let value_str = match value {
4842                        Value::String(s) => s.clone(),
4843                        _ => value.to_string(),
4844                    };
4845                    if let Some(doc_ids) = index.get(&value_str) {
4846                        // Check if any document other than the excluded one has this value
4847                        let exclude_key = format!("{}:{}", collection, exclude_id);
4848                        for doc_key in doc_ids.value() {
4849                            if doc_key != &exclude_key {
4850                                return Err(AqlError::new(
4851                                    ErrorCode::UniqueConstraintViolation,
4852                                    format!(
4853                                        "Unique constraint violation on field '{}' with value '{}'",
4854                                        unique_field, value_str
4855                                    ),
4856                                ));
4857                            }
4858                        }
4859                    }
4860                }
4861            }
4862        }
4863        Ok(())
4864    }
4865
4866    // =========================================================================
4867    // AQL Executor Helper Methods
4868    // These are wrapper methods for AQL executor integration.
4869    // They provide a simplified API compatible with the AQL query execution layer.
4870    // =========================================================================
4871
4872    /// Get all documents in a collection (AQL helper)
4873    ///
4874    /// This is a wrapper around the internal query system optimized for bulk retrieval.
4875    pub async fn aql_get_all_collection(&self, collection: &str) -> Result<Vec<Document>> {
4876        self.ensure_indices_initialized().await?;
4877
4878        let prefix = format!("{}:", collection);
4879        let mut docs = Vec::new();
4880
4881        for result in self.cold.scan_prefix(&prefix) {
4882            let (_, value) = result?;
4883            if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
4884                docs.push(doc);
4885            }
4886        }
4887
4888        Ok(docs)
4889    }
4890
4891    /// Insert a document from a HashMap (AQL helper)
4892    ///
4893    /// Returns the complete document (not just ID) for AQL executor
4894    pub async fn aql_insert(
4895        &self,
4896        collection: &str,
4897        data: HashMap<String, Value>,
4898    ) -> Result<Document> {
4899        // Validate unique constraints before inserting
4900        self.validate_unique_constraints(collection, &data).await?;
4901
4902        let doc_id = Uuid::now_v7().to_string();
4903        let document = Document {
4904            id: doc_id.clone(),
4905            data,
4906        };
4907
4908        self.put(
4909            format!("{}:{}", collection, doc_id),
4910            serde_json::to_vec(&document)?,
4911            None,
4912        )
4913        .await?;
4914
4915        // Publish insert event
4916        let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
4917        let _ = self.pubsub.publish(event);
4918
4919        Ok(document)
4920    }
4921
4922    /// Update a document by ID with new data (AQL helper)
4923    ///
4924    /// Merges new data with existing data and returns updated document
4925    pub async fn aql_update_document(
4926        &self,
4927        collection: &str,
4928        doc_id: &str,
4929        updates: HashMap<String, Value>,
4930    ) -> Result<Document> {
4931        let key = format!("{}:{}", collection, doc_id);
4932
4933        // Get existing document
4934        let existing = self.get(&key)?.ok_or_else(|| {
4935            AqlError::new(
4936                ErrorCode::NotFound,
4937                format!("Document {} not found", doc_id),
4938            )
4939        })?;
4940
4941        let mut doc: Document = serde_json::from_slice(&existing)?;
4942        let old_doc = doc.clone();
4943
4944        // Merge updates into existing data
4945        for (k, v) in updates {
4946            doc.data.insert(k, v);
4947        }
4948
4949        // Validate unique constraints excluding this document
4950        self.validate_unique_constraints_excluding(collection, &doc.data, doc_id)
4951            .await?;
4952
4953        // Save updated document
4954        self.put(key, serde_json::to_vec(&doc)?, None).await?;
4955
4956        // Publish update event
4957        let event = crate::pubsub::ChangeEvent::update(collection, doc_id, old_doc, doc.clone());
4958        let _ = self.pubsub.publish(event);
4959
4960        Ok(doc)
4961    }
4962
4963    /// Delete a document by ID (AQL helper)
4964    ///
4965    /// Returns the deleted document
4966    pub async fn aql_delete_document(&self, collection: &str, doc_id: &str) -> Result<Document> {
4967        let key = format!("{}:{}", collection, doc_id);
4968
4969        // Get existing document first
4970        let existing = self.get(&key)?.ok_or_else(|| {
4971            AqlError::new(
4972                ErrorCode::NotFound,
4973                format!("Document {} not found", doc_id),
4974            )
4975        })?;
4976
4977        let doc: Document = serde_json::from_slice(&existing)?;
4978
4979        // Append to WAL for durability
4980        // Write to WAL if enabled and not in None durability mode
4981        if self.config.durability_mode != DurabilityMode::None {
4982            if let Some(wal_writer) = &self.wal_writer {
4983                // Use async writer if available
4984                let op = WalOperation::Delete { key: key.clone() };
4985                wal_writer.send(op).map_err(|_| {
4986                    AqlError::new(
4987                        ErrorCode::InternalError,
4988                        "Failed to send to WAL writer".to_string(),
4989                    )
4990                })?;
4991            } else if let Some(wal) = &self.wal {
4992                // Fallback to blocking write if async writer not set (should be rare/legacy)
4993                wal.write()
4994                    .map_err(|e| AqlError::new(ErrorCode::InternalError, e.to_string()))?
4995                    .append(crate::wal::Operation::Delete, &key, None)?;
4996            }
4997        }
4998
4999        // Delete from storage
5000        self.cold.delete(&key)?;
5001        self.hot.delete(&key);
5002
5003        // Remove from primary index
5004        if let Some(index) = self.primary_indices.get_mut(collection) {
5005            index.remove(&key);
5006        }
5007
5008        // Remove from secondary indices
5009        for (field_name, value) in &doc.data {
5010            let index_key = format!("{}:{}", collection, field_name);
5011            if let Some(index) = self.secondary_indices.get_mut(&index_key) {
5012                let value_str = match value {
5013                    Value::String(s) => s.clone(),
5014                    _ => value.to_string(),
5015                };
5016                if let Some(mut doc_ids) = index.get_mut(&value_str) {
5017                    doc_ids.retain(|id| id != &key);
5018                }
5019            }
5020        }
5021
5022        // Publish delete event
5023        let event = crate::pubsub::ChangeEvent::delete(collection, doc_id);
5024        let _ = self.pubsub.publish(event);
5025
5026        Ok(doc)
5027    }
5028
5029    /// Get a single document by ID (AQL helper)
5030    pub async fn aql_get_document(
5031        &self,
5032        collection: &str,
5033        doc_id: &str,
5034    ) -> Result<Option<Document>> {
5035        let key = format!("{}:{}", collection, doc_id);
5036
5037        match self.get(&key)? {
5038            Some(data) => {
5039                let doc: Document = serde_json::from_slice(&data)?;
5040                Ok(Some(doc))
5041            }
5042            None => Ok(None),
5043        }
5044    }
5045
5046    /// Begin a transaction (AQL helper) - returns transaction ID
5047    pub fn aql_begin_transaction(&self) -> Result<crate::transaction::TransactionId> {
5048        Ok(self.begin_transaction())
5049    }
5050
5051    /// Commit a transaction (AQL helper)
5052    pub async fn aql_commit_transaction(
5053        &self,
5054        tx_id: crate::transaction::TransactionId,
5055    ) -> Result<()> {
5056        self.commit_transaction(tx_id)
5057    }
5058
5059    /// Rollback a transaction (AQL helper)
5060    pub async fn aql_rollback_transaction(
5061        &self,
5062        tx_id: crate::transaction::TransactionId,
5063    ) -> Result<()> {
5064        self.transaction_manager.rollback(tx_id)
5065    }
5066
5067    // ============================================
5068    // AQL Schema Management Wrappers
5069    // ============================================
5070
5071    /// Create a collection from AST schema definition
5072    pub async fn create_collection_schema(
5073        &self,
5074        name: &str,
5075        fields: HashMap<String, crate::types::FieldDefinition>,
5076    ) -> Result<()> {
5077        let collection_key = format!("_collection:{}", name);
5078
5079        // Check if collection already exists
5080        if self.get(&collection_key)?.is_some() {
5081            // Already exists
5082            return Ok(());
5083        }
5084
5085        let collection = Collection {
5086            name: name.to_string(),
5087            fields,
5088        };
5089
5090        let collection_data = serde_json::to_vec(&collection)?;
5091        self.put(collection_key, collection_data, None).await?;
5092        self.schema_cache.remove(name);
5093
5094        Ok(())
5095    }
5096
5097    /// Add a field to an existing collection schema
5098    pub async fn add_field_to_schema(
5099        &self,
5100        collection_name: &str,
5101        name: String,
5102        definition: crate::types::FieldDefinition,
5103    ) -> Result<()> {
5104        let mut collection = self
5105            .get_collection_definition(collection_name)
5106            .map_err(|_| {
5107                AqlError::new(ErrorCode::CollectionNotFound, collection_name.to_string())
5108            })?;
5109
5110        if collection.fields.contains_key(&name) {
5111            return Err(AqlError::new(
5112                ErrorCode::InvalidDefinition,
5113                format!(
5114                    "Field '{}' already exists in collection '{}'",
5115                    name, collection_name
5116                ),
5117            ));
5118        }
5119
5120        collection.fields.insert(name, definition);
5121
5122        let collection_key = format!("_collection:{}", collection_name);
5123        let collection_data = serde_json::to_vec(&collection)?;
5124        self.put(collection_key, collection_data, None).await?;
5125        self.schema_cache.remove(collection_name);
5126
5127        Ok(())
5128    }
5129
5130    /// Drop a field from an existing collection schema
5131    pub async fn drop_field_from_schema(
5132        &self,
5133        collection_name: &str,
5134        field_name: String,
5135    ) -> Result<()> {
5136        let mut collection = self
5137            .get_collection_definition(collection_name)
5138            .map_err(|_| {
5139                AqlError::new(ErrorCode::CollectionNotFound, collection_name.to_string())
5140            })?;
5141
5142        if !collection.fields.contains_key(&field_name) {
5143            return Err(AqlError::new(
5144                ErrorCode::InvalidDefinition,
5145                format!(
5146                    "Field '{}' does not exist in collection '{}'",
5147                    field_name, collection_name
5148                ),
5149            ));
5150        }
5151
5152        collection.fields.remove(&field_name);
5153
5154        let collection_key = format!("_collection:{}", collection_name);
5155        let collection_data = serde_json::to_vec(&collection)?;
5156        self.put(collection_key, collection_data, None).await?;
5157        self.schema_cache.remove(collection_name);
5158
5159        Ok(())
5160    }
5161
5162    /// Rename a field in an existing collection schema
5163    pub async fn rename_field_in_schema(
5164        &self,
5165        collection_name: &str,
5166        from: String,
5167        to: String,
5168    ) -> Result<()> {
5169        let mut collection = self
5170            .get_collection_definition(collection_name)
5171            .map_err(|_| {
5172                AqlError::new(ErrorCode::CollectionNotFound, collection_name.to_string())
5173            })?;
5174
5175        if let Some(def) = collection.fields.remove(&from) {
5176            if collection.fields.contains_key(&to) {
5177                return Err(AqlError::new(
5178                    ErrorCode::InvalidDefinition,
5179                    format!(
5180                        "Target field name '{}' already exists in collection '{}'",
5181                        to, collection_name
5182                    ),
5183                ));
5184            }
5185            collection.fields.insert(to, def);
5186        } else {
5187            return Err(AqlError::new(
5188                ErrorCode::InvalidDefinition,
5189                format!("Field '{}' not found", from),
5190            ));
5191        }
5192
5193        let collection_key = format!("_collection:{}", collection_name);
5194        let collection_data = serde_json::to_vec(&collection)?;
5195        self.put(collection_key, collection_data, None).await?;
5196        self.schema_cache.remove(collection_name);
5197
5198        Ok(())
5199    }
5200
5201    /// Modify a field in an existing collection schema
5202    pub async fn modify_field_in_schema(
5203        &self,
5204        collection_name: &str,
5205        name: String,
5206        definition: crate::types::FieldDefinition,
5207    ) -> Result<()> {
5208        let mut collection = self
5209            .get_collection_definition(collection_name)
5210            .map_err(|_| {
5211                AqlError::new(ErrorCode::CollectionNotFound, collection_name.to_string())
5212            })?;
5213
5214        if !collection.fields.contains_key(&name) {
5215            return Err(AqlError::new(
5216                ErrorCode::InvalidDefinition,
5217                format!(
5218                    "Field '{}' does not exist in collection '{}'",
5219                    name, collection_name
5220                ),
5221            ));
5222        }
5223
5224        collection.fields.insert(name, definition);
5225
5226        let collection_key = format!("_collection:{}", collection_name);
5227        let collection_data = serde_json::to_vec(&collection)?;
5228        self.put(collection_key, collection_data, None).await?;
5229        self.schema_cache.remove(collection_name);
5230
5231        Ok(())
5232    }
5233
5234    /// Drop an entire collection definition
5235    pub async fn drop_collection_schema(&self, collection_name: &str) -> Result<()> {
5236        let collection_key = format!("_collection:{}", collection_name);
5237        self.cold.delete(&collection_key)?;
5238        self.schema_cache.remove(collection_name);
5239        Ok(())
5240    }
5241
5242    // ============================================
5243    // AQL Migration Wrappers
5244    // ============================================
5245
5246    /// Check if a migration version has been applied
5247    pub async fn is_migration_applied(&self, version: &str) -> Result<bool> {
5248        let migration_key = format!("_sys_migration:{}", version);
5249        Ok(self.get(&migration_key)?.is_some())
5250    }
5251
5252    /// Mark a migration version as applied
5253    pub async fn mark_migration_applied(&self, version: &str) -> Result<()> {
5254        let migration_key = format!("_sys_migration:{}", version);
5255        let timestamp = chrono::Utc::now().to_rfc3339();
5256        self.put(migration_key, timestamp.as_bytes().to_vec(), None)
5257            .await?;
5258        Ok(())
5259    }
5260}
5261
5262impl Drop for Aurora {
5263    fn drop(&mut self) {
5264        // Signal checkpoint task to shutdown gracefully
5265        if let Some(ref shutdown_tx) = self.checkpoint_shutdown {
5266            let _ = shutdown_tx.send(());
5267        }
5268        // Signal compaction task to shutdown gracefully
5269        if let Some(ref shutdown_tx) = self.compaction_shutdown {
5270            let _ = shutdown_tx.send(());
5271        }
5272    }
5273}
5274
5275fn check_filter(doc_val: &Value, filter: &HttpFilter) -> bool {
5276    let filter_val = match json_to_value(&filter.value) {
5277        Ok(v) => v,
5278        Err(_) => return false,
5279    };
5280
5281    match filter.operator {
5282        FilterOperator::Eq => doc_val == &filter_val,
5283        FilterOperator::Ne => doc_val != &filter_val,
5284        FilterOperator::Gt => doc_val > &filter_val,
5285        FilterOperator::Gte => doc_val >= &filter_val,
5286        FilterOperator::Lt => doc_val < &filter_val,
5287        FilterOperator::Lte => doc_val <= &filter_val,
5288        FilterOperator::Contains => match (doc_val, &filter_val) {
5289            (Value::String(s), Value::String(fv)) => s.contains(fv),
5290            (Value::Array(arr), _) => arr.contains(&filter_val),
5291            _ => false,
5292        },
5293    }
5294}
5295
5296/// Results of importing a document
5297enum ImportResult {
5298    Imported,
5299    Skipped,
5300}
5301
5302/// Statistics from an import operation
5303#[derive(Debug, Default)]
5304pub struct ImportStats {
5305    /// Number of documents successfully imported
5306    pub imported: usize,
5307    /// Number of documents skipped (usually because they already exist)
5308    pub skipped: usize,
5309    /// Number of documents that failed to import
5310    pub failed: usize,
5311}
5312
5313/// Statistics for a specific collection
5314#[derive(Debug)]
5315pub struct CollectionStats {
5316    /// Number of documents in the collection
5317    pub count: usize,
5318    /// Total size of the collection in bytes
5319    pub size_bytes: usize,
5320    /// Average document size in bytes
5321    pub avg_doc_size: usize,
5322}
5323
5324/// Statistics for an index
5325#[derive(Debug)]
5326pub struct IndexStats {
5327    /// Number of unique values in the index
5328    pub unique_values: usize,
5329    /// Total number of documents covered by the index
5330    pub total_documents: usize,
5331    /// Average number of documents per unique value
5332    pub avg_docs_per_value: usize,
5333}
5334
5335/// Combined database statistics
5336#[derive(Debug)]
5337pub struct DatabaseStats {
5338    /// Hot cache statistics
5339    pub hot_stats: crate::storage::hot::CacheStats,
5340    /// Cold storage statistics
5341    pub cold_stats: crate::storage::cold::ColdStoreStats,
5342    /// Estimated total database size in bytes
5343    pub estimated_size: u64,
5344    /// Statistics for each collection
5345    pub collections: HashMap<String, CollectionStats>,
5346}
5347
5348#[cfg(test)]
5349mod tests {
5350    use super::*;
5351    use tempfile::tempdir;
5352
5353    #[tokio::test]
5354    async fn test_async_wal_integration() {
5355        let temp_dir = tempfile::tempdir().unwrap();
5356        let db_path = temp_dir.path().join("test_wal_integration.db");
5357        let db = Aurora::open(db_path.to_str().unwrap()).unwrap();
5358
5359        // 1. Enable WAL explicitly (if your config defaults to off for tests)
5360        // Note: Your implementation might default it on based on AuroraConfig.
5361
5362        // Create collection schema first
5363        db.new_collection(
5364            "users",
5365            vec![
5366                ("id", FieldType::String, false),
5367                ("counter", FieldType::Int, false),
5368            ],
5369        )
5370        .await
5371        .unwrap();
5372
5373        let db_clone = db.clone();
5374
5375        // 2. Spawn 50 concurrent writes
5376        let handles: Vec<_> = (0..50)
5377            .map(|i| {
5378                let db = db_clone.clone();
5379                tokio::spawn(async move {
5380                    let doc_id = db
5381                        .insert_into(
5382                            "users",
5383                            vec![
5384                                ("id", Value::String(format!("user-{}", i))),
5385                                ("counter", Value::Int(i)),
5386                            ],
5387                        )
5388                        .await
5389                        .unwrap();
5390                    (i, doc_id) // Return both the index and the document ID
5391                })
5392            })
5393            .collect();
5394
5395        // Wait for all to complete and collect the results
5396        let results = futures::future::join_all(handles).await;
5397        assert!(results.iter().all(|r| r.is_ok()), "Some writes failed");
5398
5399        // Extract the document IDs
5400        let doc_ids: Vec<(i64, String)> = results.into_iter().map(|r| r.unwrap()).collect();
5401
5402        // Find the document ID for the one with counter=25
5403        let target_doc_id = doc_ids
5404            .iter()
5405            .find(|(counter, _)| *counter == 25i64)
5406            .map(|(_, doc_id)| doc_id)
5407            .unwrap();
5408
5409        // 4. Verify Data Integrity
5410        let doc = db.get_document("users", target_doc_id).unwrap().unwrap();
5411        assert_eq!(doc.data.get("counter"), Some(&Value::Int(25)));
5412    }
5413
5414    #[tokio::test]
5415    async fn test_basic_operations() -> Result<()> {
5416        let temp_dir = tempdir()?;
5417        let db_path = temp_dir.path().join("test.aurora");
5418        let db = Aurora::open(db_path.to_str().unwrap())?;
5419
5420        // Test collection creation
5421        db.new_collection(
5422            "users",
5423            vec![
5424                ("name", FieldType::String, false),
5425                ("age", FieldType::Int, false),
5426                ("email", FieldType::String, true),
5427            ],
5428        )
5429        .await?;
5430
5431        // Test document insertion
5432        let doc_id = db
5433            .insert_into(
5434                "users",
5435                vec![
5436                    ("name", Value::String("John Doe".to_string())),
5437                    ("age", Value::Int(30)),
5438                    ("email", Value::String("john@example.com".to_string())),
5439                ],
5440            )
5441            .await?;
5442
5443        // Test document retrieval
5444        let doc = db.get_document("users", &doc_id)?.unwrap();
5445        assert_eq!(
5446            doc.data.get("name").unwrap(),
5447            &Value::String("John Doe".to_string())
5448        );
5449        assert_eq!(doc.data.get("age").unwrap(), &Value::Int(30));
5450
5451        Ok(())
5452    }
5453
5454    #[tokio::test]
5455    async fn test_transactions() -> Result<()> {
5456        let temp_dir = tempdir()?;
5457        let db_path = temp_dir.path().join("test.aurora");
5458        let db = Aurora::open(db_path.to_str().unwrap())?;
5459
5460        // Create collection
5461        db.new_collection("test", vec![("field", FieldType::String, false)])
5462            .await?;
5463
5464        // Start transaction
5465        let tx_id = db.begin_transaction();
5466
5467        // Insert document
5468        let doc_id = db
5469            .insert_into("test", vec![("field", Value::String("value".to_string()))])
5470            .await?;
5471
5472        // Commit transaction
5473        db.commit_transaction(tx_id)?;
5474
5475        // Verify document exists
5476        let doc = db.get_document("test", &doc_id)?.unwrap();
5477        assert_eq!(
5478            doc.data.get("field").unwrap(),
5479            &Value::String("value".to_string())
5480        );
5481
5482        Ok(())
5483    }
5484
5485    #[tokio::test]
5486    async fn test_query_operations() -> Result<()> {
5487        let temp_dir = tempdir()?;
5488        let db_path = temp_dir.path().join("test.aurora");
5489        let db = Aurora::open(db_path.to_str().unwrap())?;
5490
5491        // Test collection creation
5492        db.new_collection(
5493            "books",
5494            vec![
5495                ("title", FieldType::String, false),
5496                ("author", FieldType::String, false),
5497                ("year", FieldType::Int, false),
5498            ],
5499        )
5500        .await?;
5501
5502        // Test document insertion
5503        db.insert_into(
5504            "books",
5505            vec![
5506                ("title", Value::String("Book 1".to_string())),
5507                ("author", Value::String("Author 1".to_string())),
5508                ("year", Value::Int(2020)),
5509            ],
5510        )
5511        .await?;
5512
5513        db.insert_into(
5514            "books",
5515            vec![
5516                ("title", Value::String("Book 2".to_string())),
5517                ("author", Value::String("Author 2".to_string())),
5518                ("year", Value::Int(2021)),
5519            ],
5520        )
5521        .await?;
5522
5523        // Test query
5524        let results = db
5525            .query("books")
5526            .filter(|f| f.gt("year", Value::Int(2019)))
5527            .order_by("year", true)
5528            .collect()
5529            .await?;
5530
5531        assert_eq!(results.len(), 2);
5532        assert!(results[0].data.get("year").unwrap() < results[1].data.get("year").unwrap());
5533
5534        Ok(())
5535    }
5536
5537    #[tokio::test]
5538    async fn test_blob_operations() -> Result<()> {
5539        let temp_dir = tempdir()?;
5540        let db_path = temp_dir.path().join("test.aurora");
5541        let db = Aurora::open(db_path.to_str().unwrap())?;
5542
5543        // Create test file
5544        let file_path = temp_dir.path().join("test.txt");
5545        std::fs::write(&file_path, b"Hello, World!")?;
5546
5547        // Test blob storage
5548        db.put_blob("test:blob".to_string(), &file_path).await?;
5549
5550        // Verify blob exists
5551        let data = db.get_data_by_pattern("test:blob")?;
5552        assert_eq!(data.len(), 1);
5553        match &data[0].1 {
5554            DataInfo::Blob { size } => assert_eq!(*size, 13 + 5), // content + "BLOB:" prefix
5555            _ => panic!("Expected Blob type"),
5556        }
5557
5558        Ok(())
5559    }
5560
5561    #[tokio::test]
5562    async fn test_blob_size_limit() -> Result<()> {
5563        let temp_dir = tempdir()?;
5564        let db_path = temp_dir.path().join("test.aurora");
5565        let db = Aurora::open(db_path.to_str().unwrap())?;
5566
5567        // Create a test file that's too large (201MB)
5568        let large_file_path = temp_dir.path().join("large_file.bin");
5569        let large_data = vec![0u8; 201 * 1024 * 1024];
5570        std::fs::write(&large_file_path, &large_data)?;
5571
5572        // Attempt to store the large file
5573        let result = db
5574            .put_blob("test:large_blob".to_string(), &large_file_path)
5575            .await;
5576
5577        assert!(result.is_err());
5578        let err = result.unwrap_err();
5579        assert!(err.code == ErrorCode::InvalidOperation); // Expected error
5580
5581        Ok(())
5582    }
5583
5584    #[tokio::test]
5585    async fn test_unique_constraints() -> Result<()> {
5586        let temp_dir = tempdir()?;
5587        let db_path = temp_dir.path().join("test.aurora");
5588        let db = Aurora::open(db_path.to_str().unwrap())?;
5589
5590        // Create collection with unique email field
5591        db.new_collection(
5592            "users",
5593            vec![
5594                ("name", FieldType::String, false),
5595                ("email", FieldType::String, true), // unique field
5596                ("age", FieldType::Int, false),
5597            ],
5598        )
5599        .await?;
5600
5601        // Insert first document
5602        let _doc_id1 = db
5603            .insert_into(
5604                "users",
5605                vec![
5606                    ("name", Value::String("John Doe".to_string())),
5607                    ("email", Value::String("john@example.com".to_string())),
5608                    ("age", Value::Int(30)),
5609                ],
5610            )
5611            .await?;
5612
5613        // Try to insert second document with same email - should fail
5614        let result = db
5615            .insert_into(
5616                "users",
5617                vec![
5618                    ("name", Value::String("Jane Doe".to_string())),
5619                    ("email", Value::String("john@example.com".to_string())), // duplicate email
5620                    ("age", Value::Int(25)),
5621                ],
5622            )
5623            .await;
5624
5625        assert!(result.is_err());
5626        if let Err(e) = result {
5627            if e.code == ErrorCode::UniqueConstraintViolation {
5628                let msg = e.message;
5629                assert!(msg.contains("email"));
5630                assert!(msg.contains("john@example.com")); // Changed from test@example.com to match the actual value
5631            } else {
5632                panic!("Expected UniqueConstraintViolation, got {:?}", e);
5633            }
5634        } else {
5635            panic!("Expected UniqueConstraintViolation");
5636        }
5637
5638        // Test upsert with unique constraint
5639        // Should succeed for new document
5640        let _doc_id2 = db
5641            .upsert(
5642                "users",
5643                "user2",
5644                vec![
5645                    ("name", Value::String("Alice Smith".to_string())),
5646                    ("email", Value::String("alice@example.com".to_string())),
5647                    ("age", Value::Int(28)),
5648                ],
5649            )
5650            .await?;
5651
5652        // Should fail when trying to upsert with duplicate email
5653        let result = db
5654            .upsert(
5655                "users",
5656                "user3",
5657                vec![
5658                    ("name", Value::String("Bob Wilson".to_string())),
5659                    ("email", Value::String("alice@example.com".to_string())), // duplicate
5660                    ("age", Value::Int(35)),
5661                ],
5662            )
5663            .await;
5664
5665        assert!(result.is_err());
5666
5667        // Should succeed when updating existing document with same email (no change)
5668        let result = db
5669            .upsert(
5670                "users",
5671                "user2",
5672                vec![
5673                    ("name", Value::String("Alice Updated".to_string())),
5674                    ("email", Value::String("alice@example.com".to_string())), // same email, same doc
5675                    ("age", Value::Int(29)),
5676                ],
5677            )
5678            .await;
5679
5680        assert!(result.is_ok());
5681
5682        Ok(())
5683    }
5684
5685    #[tokio::test]
5686    async fn test_nullable_field_definition() -> Result<()> {
5687        use tempfile::TempDir;
5688
5689        let temp_dir = TempDir::new()?;
5690        let db_path = temp_dir.path().join("test.aurora");
5691        let db = Aurora::open(db_path.to_str().unwrap())?;
5692
5693        // Test 3-tuple format (defaults nullable to false)
5694        db.new_collection(
5695            "products",
5696            vec![
5697                ("name", FieldType::String, false),
5698                ("sku", FieldType::String, true), // unique
5699            ],
5700        )
5701        .await?;
5702
5703        // Test 4-tuple format with explicit nullable control
5704        db.new_collection(
5705            "users",
5706            vec![
5707                ("name", FieldType::String, false, false), // not nullable
5708                ("email", FieldType::String, true, false), // unique, not nullable
5709                ("bio", FieldType::String, false, true),   // nullable
5710                ("age", FieldType::Int, false, true),      // nullable
5711            ],
5712        )
5713        .await?;
5714
5715        // Verify the schema was created correctly
5716        let schema = db.ensure_schema_hot("users")?;
5717
5718        // Check that name is not nullable
5719        assert_eq!(schema.fields.get("name").unwrap().nullable, false);
5720
5721        // Check that email is not nullable
5722        assert_eq!(schema.fields.get("email").unwrap().nullable, false);
5723
5724        // Check that bio is nullable
5725        assert_eq!(schema.fields.get("bio").unwrap().nullable, true);
5726
5727        // Check that age is nullable
5728        assert_eq!(schema.fields.get("age").unwrap().nullable, true);
5729
5730        // Verify products collection (3-tuple defaults to false)
5731        let products_schema = db.ensure_schema_hot("products")?;
5732        assert_eq!(products_schema.fields.get("name").unwrap().nullable, false);
5733        assert_eq!(products_schema.fields.get("sku").unwrap().nullable, false);
5734
5735        Ok(())
5736    }
5737}