aurora_db/
db.rs

1//! # Aurora Database
2//!
3//! Aurora is an embedded document database with tiered storage architecture.
4//! It provides document storage, querying, indexing, and search capabilities
5//! while optimizing for both performance and durability.
6//!
7//! ## Key Features
8//!
9//! * **Tiered Storage**: Hot in-memory cache + persistent cold storage
10//! * **Document Model**: Schema-flexible JSON-like document storage
11//! * **Querying**: Rich query capabilities with filtering and sorting
12//! * **Full-text Search**: Built-in search engine with relevance ranking
13//! * **Transactions**: ACID-compliant transaction support
14//! * **Blob Storage**: Efficient storage for large binary objects
15//!
16//! ## Usage Example
17//!
18//! ```rust
19//! use aurora::Aurora;
20//!
21//! // Open a database
22//! let db = Aurora::open("my_database.db")?;
23//!
24//! // Create a collection with schema
25//! db.new_collection("users", vec![
26//!     ("name", FieldType::String, false),
27//!     ("email", FieldType::String, true),  // unique field
28//!     ("age", FieldType::Int, false),
29//! ])?;
30//!
31//! // Insert a document
32//! let user_id = db.insert_into("users", vec![
33//!     ("name", Value::String("Jane Doe".to_string())),
34//!     ("email", Value::String("jane@example.com".to_string())),
35//!     ("age", Value::Int(28)),
36//! ])?;
37//!
38//! // Query for documents
39//! let adult_users = db.query("users")
40//!     .filter(|f| f.gt("age", 18))
41//!     .order_by("name", true)
42//!     .collect()
43//!     .await?;
44//! ```
45
46use crate::error::{AuroraError, Result};
47use crate::index::{Index, IndexDefinition, IndexType};
48use crate::network::http_models::{
49    Filter as HttpFilter, FilterOperator, QueryPayload, json_to_value,
50};
51use crate::query::{Filter, FilterBuilder, QueryBuilder, SearchBuilder, SimpleQueryBuilder};
52use crate::storage::{ColdStore, HotStore, WriteBuffer};
53use crate::types::{
54    AuroraConfig, Collection, Document, DurabilityMode, FieldDefinition, FieldType, Value,
55};
56use crate::wal::{Operation, WriteAheadLog};
57use dashmap::DashMap;
58use serde_json::Value as JsonValue;
59use serde_json::from_str;
60use std::collections::HashMap;
61use std::fmt;
62use std::fs::File as StdFile;
63use std::path::{Path, PathBuf};
64use std::sync::{Arc, RwLock};
65use std::time::Duration;
66use tokio::fs::File;
67use tokio::fs::read_to_string;
68use tokio::io::AsyncReadExt;
69use tokio::sync::OnceCell;
70use uuid::Uuid;
71
72// Disk location metadata for primary index
73// Instead of storing full Vec<u8> values, we store minimal metadata
74#[derive(Debug, Clone, Copy)]
75struct DiskLocation {
76    size: u32,  // Size in bytes (useful for statistics)
77}
78
79impl DiskLocation {
80    fn new(size: usize) -> Self {
81        Self {
82            size: size as u32,
83        }
84    }
85}
86
87// Index types for faster lookups
88type PrimaryIndex = DashMap<String, DiskLocation>;
89type SecondaryIndex = DashMap<String, Vec<String>>;
90
91// Move DataInfo enum outside impl block
92#[derive(Debug)]
93pub enum DataInfo {
94    Data { size: usize, preview: String },
95    Blob { size: usize },
96    Compressed { size: usize },
97}
98
99impl DataInfo {
100    pub fn size(&self) -> usize {
101        match self {
102            DataInfo::Data { size, .. } => *size,
103            DataInfo::Blob { size } => *size,
104            DataInfo::Compressed { size } => *size,
105        }
106    }
107}
108
109/// The main database engine
110///
111/// Aurora combines a tiered storage architecture with document-oriented database features:
112/// - Hot tier: In-memory cache for frequently accessed data
113/// - Cold tier: Persistent disk storage for durability
114/// - Primary indices: Fast key-based access
115/// - Secondary indices: Fast field-based queries
116///
117/// # Examples
118///
119/// ```
120/// // Open a database (creates if doesn't exist)
121/// let db = Aurora::open("my_app.db")?;
122///
123/// // Insert a document
124/// let doc_id = db.insert_into("users", vec![
125///     ("name", Value::String("Alice".to_string())),
126///     ("age", Value::Int(32)),
127/// ])?;
128///
129/// // Retrieve a document
130/// let user = db.get_document("users", &doc_id)?;
131/// ```
132pub struct Aurora {
133    hot: HotStore,
134    cold: Arc<ColdStore>,
135    primary_indices: Arc<DashMap<String, PrimaryIndex>>,
136    secondary_indices: Arc<DashMap<String, SecondaryIndex>>,
137    indices_initialized: Arc<OnceCell<()>>,
138    transaction_manager: crate::transaction::TransactionManager,
139    indices: Arc<DashMap<String, Index>>,
140    schema_cache: Arc<DashMap<String, Arc<Collection>>>,
141    config: AuroraConfig,
142    write_buffer: Option<WriteBuffer>,
143    pubsub: crate::pubsub::PubSubSystem,
144    // Write-ahead log for durability (optional, based on config)
145    wal: Option<Arc<RwLock<WriteAheadLog>>>,
146    // Background checkpoint task
147    checkpoint_shutdown: Option<tokio::sync::mpsc::UnboundedSender<()>>,
148    // Background compaction task
149    compaction_shutdown: Option<tokio::sync::mpsc::UnboundedSender<()>>,
150}
151
152impl fmt::Debug for Aurora {
153    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154        f.debug_struct("Aurora")
155            .field("hot", &"HotStore")
156            .field("cold", &"ColdStore")
157            .field("primary_indices_count", &self.primary_indices.len())
158            .field("secondary_indices_count", &self.secondary_indices.len())
159            .field(
160                "active_transactions",
161                &self.transaction_manager.active_count(),
162            )
163            .field("indices_count", &self.indices.len())
164            .finish()
165    }
166}
167
168impl Aurora {
169    /// Remove stale lock files from a database directory
170    ///
171    /// If Aurora crashes or is forcefully terminated, it may leave behind lock files
172    /// that prevent the database from being reopened. This method safely removes
173    /// those lock files.
174    ///
175    /// # Safety
176    /// Only call this when you're certain no other Aurora instance is using the database.
177    /// Removing lock files while another process is running could cause data corruption.
178    ///
179    /// # Example
180    /// ```no_run
181    /// use aurora_db::Aurora;
182    ///
183    /// // If you get "Access denied" error when opening:
184    /// if let Err(e) = Aurora::open("my_db") {
185    ///     eprintln!("Failed to open: {}", e);
186    ///     // Try removing stale lock
187    ///     if Aurora::remove_stale_lock("my_db").unwrap_or(false) {
188    ///         println!("Removed stale lock, try opening again");
189    ///         let db = Aurora::open("my_db")?;
190    ///     }
191    /// }
192    /// # Ok::<(), aurora_db::error::AuroraError>(())
193    /// ```
194    pub fn remove_stale_lock<P: AsRef<Path>>(path: P) -> Result<bool> {
195        let resolved_path = Self::resolve_path(path)?;
196        crate::storage::cold::ColdStore::try_remove_stale_lock(
197            resolved_path.to_str().unwrap()
198        )
199    }
200
201    /// Open or create a database at the specified location
202    ///
203    /// # Arguments
204    /// * `path` - Path to the database file or directory
205    ///   - Absolute paths (like `/data/myapp.db`) are used as-is
206    ///   - Relative paths (like `./data/myapp.db`) are resolved relative to the current directory
207    ///   - Simple names (like `myapp.db`) use the current directory
208    ///
209    /// # Returns
210    /// An initialized `Aurora` database instance
211    ///
212    /// # Examples
213    ///
214    /// ```
215    /// // Use a specific location
216    /// let db = Aurora::open("./data/my_application.db")?;
217    ///
218    /// // Just use a name (creates in current directory)
219    /// let db = Aurora::open("customer_data.db")?;
220    /// ```
221    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
222        let config = AuroraConfig {
223            db_path: Self::resolve_path(path)?,
224            ..Default::default()
225        };
226        Self::with_config(config)
227    }
228
229    /// Helper method to resolve database path
230    fn resolve_path<P: AsRef<Path>>(path: P) -> Result<PathBuf> {
231        let path = path.as_ref();
232
233        // If it's an absolute path, use it directly
234        if path.is_absolute() {
235            return Ok(path.to_path_buf());
236        }
237
238        // Otherwise, resolve relative to current directory
239        match std::env::current_dir() {
240            Ok(current_dir) => Ok(current_dir.join(path)),
241            Err(e) => Err(AuroraError::IoError(format!(
242                "Failed to resolve current directory: {}",
243                e
244            ))),
245        }
246    }
247
248    /// Open a database with custom configuration
249    ///
250    /// # Arguments
251    /// * `config` - Database configuration settings
252    ///
253    /// # Examples
254    ///
255    /// ```
256    /// use aurora_db::{Aurora, types::AuroraConfig};
257    /// use std::time::Duration;
258    ///
259    /// let config = AuroraConfig {
260    ///     db_path: "my_data.db".into(),
261    ///     hot_cache_size_mb: 512,           // 512 MB cache
262    ///     enable_write_buffering: true,     // Batch writes for speed
263    ///     enable_wal: true,                 // Durability
264    ///     auto_compact: true,               // Background compaction
265    ///     compact_interval_mins: 60,        // Compact every hour
266    ///     ..Default::default()
267    /// };
268    ///
269    /// let db = Aurora::with_config(config)?;
270    /// ```
271    pub fn with_config(config: AuroraConfig) -> Result<Self> {
272        let path = Self::resolve_path(&config.db_path)?;
273
274        if config.create_dirs
275            && let Some(parent) = path.parent()
276                && !parent.exists() {
277                    std::fs::create_dir_all(parent)?;
278                }
279
280        // Fix method calls to pass all required parameters
281        let cold = Arc::new(ColdStore::with_config(
282            path.to_str().unwrap(),
283            config.cold_cache_capacity_mb,
284            config.cold_flush_interval_ms,
285            config.cold_mode,
286        )?);
287
288        let hot = HotStore::with_config_and_eviction(
289            config.hot_cache_size_mb,
290            config.hot_cache_cleanup_interval_secs,
291            config.eviction_policy,
292        );
293
294        // Initialize write buffer if enabled
295        let write_buffer = if config.enable_write_buffering {
296            Some(WriteBuffer::new(
297                Arc::clone(&cold),
298                config.write_buffer_size,
299                config.write_buffer_flush_interval_ms,
300            ))
301        } else {
302            None
303        };
304
305        // Store auto_compact before moving config
306        let auto_compact = config.auto_compact;
307        let enable_wal = config.enable_wal;
308
309        let pubsub = crate::pubsub::PubSubSystem::new(10000);
310
311        // Initialize WAL if enabled and check for recovery
312        let (wal, wal_entries) = if enable_wal {
313            let wal_path = path.to_str().unwrap();
314            match WriteAheadLog::new(wal_path) {
315                Ok(mut wal_log) => {
316                    let entries = wal_log.recover().unwrap_or_else(|_| Vec::new());
317                    (Some(Arc::new(RwLock::new(wal_log))), entries)
318                }
319                Err(_) => (None, Vec::new()),
320            }
321        } else {
322            (None, Vec::new())
323        };
324
325        // Spawn background checkpoint task if WAL is enabled
326        let checkpoint_shutdown = if wal.is_some() {
327            let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel();
328            let cold_clone = Arc::clone(&cold);
329            let wal_clone = wal.clone();
330            let checkpoint_interval = config.checkpoint_interval_ms;
331
332            tokio::spawn(async move {
333                let mut interval =
334                    tokio::time::interval(Duration::from_millis(checkpoint_interval));
335                loop {
336                    tokio::select! {
337                        _ = interval.tick() => {
338                            // Checkpoint: flush cold storage
339                            if let Err(e) = cold_clone.flush() {
340                                eprintln!("Background checkpoint flush error: {}", e);
341                            }
342                            // Truncate WAL after successful flush
343                            if let Some(ref wal) = wal_clone
344                                && let Ok(mut wal_guard) = wal.write() {
345                                    let _ = wal_guard.truncate();
346                                }
347                        }
348                        _ = shutdown_rx.recv() => {
349                            // Final checkpoint before shutdown
350                            let _ = cold_clone.flush();
351                            if let Some(ref wal) = wal_clone
352                                && let Ok(mut wal_guard) = wal.write() {
353                                    let _ = wal_guard.truncate();
354                                }
355                            break;
356                        }
357                    }
358                }
359            });
360
361            Some(shutdown_tx)
362        } else {
363            None
364        };
365
366        // Spawn background compaction task if auto_compact is enabled
367        let compaction_shutdown = if auto_compact {
368            let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel();
369            let cold_clone = Arc::clone(&cold);
370            let compact_interval = config.compact_interval_mins;
371
372            tokio::spawn(async move {
373                let mut interval =
374                    tokio::time::interval(Duration::from_secs(compact_interval * 60));
375                loop {
376                    tokio::select! {
377                        _ = interval.tick() => {
378                            if let Err(e) = cold_clone.compact() {
379                                eprintln!("Background compaction error: {}", e);
380                            }
381                        }
382                        _ = shutdown_rx.recv() => {
383                            let _ = cold_clone.compact();
384                            break;
385                        }
386                    }
387                }
388            });
389
390            Some(shutdown_tx)
391        } else {
392            None
393        };
394
395        // Initialize the database
396        let db = Self {
397            hot,
398            cold,
399            primary_indices: Arc::new(DashMap::new()),
400            secondary_indices: Arc::new(DashMap::new()),
401            indices_initialized: Arc::new(OnceCell::new()),
402            transaction_manager: crate::transaction::TransactionManager::new(),
403            indices: Arc::new(DashMap::new()),
404            schema_cache: Arc::new(DashMap::new()),
405            config,
406            write_buffer,
407            pubsub,
408            wal,
409            checkpoint_shutdown,
410            compaction_shutdown,
411        };
412
413        // Replay WAL entries if any
414        if !wal_entries.is_empty() {
415            db.replay_wal(wal_entries)?;
416        }
417
418        Ok(db)
419    }
420
421    // Lazy index initialization
422    pub async fn ensure_indices_initialized(&self) -> Result<()> {
423        self.indices_initialized
424            .get_or_init(|| async {
425                println!("Initializing indices...");
426                if let Err(e) = self.initialize_indices() {
427                    eprintln!("Failed to initialize indices: {:?}", e);
428                }
429                println!("Indices initialized");
430                
431            })
432            .await;
433        Ok(())
434    }
435
436    fn initialize_indices(&self) -> Result<()> {
437        for result in self.cold.scan() {
438            let (key, value) = result?;
439            let key_str = std::str::from_utf8(key.as_bytes())
440                .map_err(|_| AuroraError::InvalidKey("Invalid UTF-8".into()))?;
441
442            if let Some(collection_name) = key_str.split(':').next() {
443                self.index_value(collection_name, key_str, &value)?;
444            }
445        }
446        Ok(())
447    }
448
449    // Fast key-value operations with index support
450    /// Get a value by key (low-level key-value access)
451    ///
452    /// This is the low-level method. For document access, use `get_document()` instead.
453    /// Checks hot cache first, then falls back to cold storage for maximum performance.
454    ///
455    /// # Performance
456    /// - Hot cache hit: ~1M reads/sec (instant)
457    /// - Cold storage: ~500K reads/sec (disk I/O)
458    /// - Cache hit rate: typically 95%+ at scale
459    ///
460    /// # Examples
461    ///
462    /// ```
463    /// // Low-level key-value access
464    /// let data = db.get("users:12345")?;
465    /// if let Some(bytes) = data {
466    ///     let doc: Document = serde_json::from_slice(&bytes)?;
467    ///     println!("Found: {:?}", doc);
468    /// }
469    ///
470    /// // Better: use get_document() for documents
471    /// let user = db.get_document("users", "12345")?;
472    /// ```
473    pub fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
474        // Check hot cache first
475        if let Some(value) = self.hot.get(key) {
476            return Ok(Some(value));
477        }
478
479        // Check if key exists in primary index
480        if let Some(collection) = key.split(':').next()
481            && let Some(index) = self.primary_indices.get(collection)
482                && index.contains_key(key) {
483                    // Key exists in index, fetch from cold storage
484                    if let Some(value) = self.cold.get(key)? {
485                        // Promote to hot cache for future fast access
486                        self.hot.set(key.to_string(), value.clone(), None);
487                        return Ok(Some(value));
488                    }
489                }
490
491        // Fallback to cold storage
492        let value = self.cold.get(key)?;
493        if let Some(v) = &value {
494            self.hot.set(key.to_string(), v.clone(), None);
495        }
496        Ok(value)
497    }
498
499    /// Get value with zero-copy Arc reference (10-100x faster than get!)
500    /// Only checks hot cache - returns None if not cached
501    pub fn get_hot_ref(&self, key: &str) -> Option<Arc<Vec<u8>>> {
502        self.hot.get_ref(key)
503    }
504
505    /// Get cache statistics
506    ///
507    /// Returns detailed metrics about cache performance including hit/miss rates,
508    /// memory usage, and access patterns. Useful for monitoring, optimization,
509    /// and understanding database performance characteristics.
510    ///
511    /// # Returns
512    /// `CacheStats` struct containing:
513    /// - `hits`: Number of cache hits (data found in memory)
514    /// - `misses`: Number of cache misses (had to read from disk)
515    /// - `hit_rate`: Percentage of requests served from cache (0.0-1.0)
516    /// - `size`: Current number of entries in cache
517    /// - `capacity`: Maximum cache capacity
518    /// - `evictions`: Number of entries evicted due to capacity
519    ///
520    /// # Examples
521    ///
522    /// ```
523    /// use aurora_db::Aurora;
524    ///
525    /// let db = Aurora::open("mydb.db")?;
526    ///
527    /// // Check cache performance
528    /// let stats = db.get_cache_stats();
529    /// println!("Cache hit rate: {:.1}%", stats.hit_rate * 100.0);
530    /// println!("Cache size: {} / {} entries", stats.size, stats.capacity);
531    /// println!("Total hits: {}, misses: {}", stats.hits, stats.misses);
532    ///
533    /// // Monitor performance during operations
534    /// let before = db.get_cache_stats();
535    ///
536    /// // Perform many reads
537    /// for i in 0..1000 {
538    ///     db.get_document("users", &format!("user-{}", i))?;
539    /// }
540    ///
541    /// let after = db.get_cache_stats();
542    /// let hit_rate = (after.hits - before.hits) as f64 / 1000.0;
543    /// println!("Read hit rate: {:.1}%", hit_rate * 100.0);
544    ///
545    /// // Performance tuning
546    /// let stats = db.get_cache_stats();
547    /// if stats.hit_rate < 0.80 {
548    ///     println!("Low cache hit rate! Consider:");
549    ///     println!("- Increasing cache size in config");
550    ///     println!("- Prewarming cache with prewarm_cache()");
551    ///     println!("- Reviewing query patterns");
552    /// }
553    ///
554    /// if stats.evictions > stats.size {
555    ///     println!("High eviction rate! Cache may be too small.");
556    ///     println!("Consider increasing cache capacity.");
557    /// }
558    ///
559    /// // Production monitoring
560    /// use std::time::Duration;
561    /// use std::thread;
562    ///
563    /// loop {
564    ///     let stats = db.get_cache_stats();
565    ///
566    ///     // Log to monitoring system
567    ///     if stats.hit_rate < 0.90 {
568    ///         eprintln!("Warning: Cache hit rate dropped to {:.1}%",
569    ///                   stats.hit_rate * 100.0);
570    ///     }
571    ///
572    ///     thread::sleep(Duration::from_secs(60));
573    /// }
574    /// ```
575    ///
576    /// # Typical Performance Metrics
577    /// - **Excellent**: 95%+ hit rate (most reads from memory)
578    /// - **Good**: 80-95% hit rate (acceptable performance)
579    /// - **Poor**: <80% hit rate (consider cache tuning)
580    ///
581    /// # See Also
582    /// - `prewarm_cache()` to improve hit rates by preloading data
583    /// - `Aurora::with_config()` to adjust cache capacity
584    pub fn get_cache_stats(&self) -> crate::storage::hot::CacheStats {
585        self.hot.get_stats()
586    }
587
588    // ============================================
589    // PubSub API - Real-time Change Notifications
590    // ============================================
591
592    /// Listen for real-time changes in a collection
593    ///
594    /// Returns a stream of change events (inserts, updates, deletes) that you can subscribe to.
595    /// Perfect for building reactive UIs, cache invalidation, audit logging, webhooks, and
596    /// data synchronization systems.
597    ///
598    /// # Performance
599    /// - Zero overhead when no listeners are active
600    /// - Events are broadcast to all listeners asynchronously
601    /// - Non-blocking - doesn't slow down write operations
602    /// - Multiple listeners can watch the same collection
603    ///
604    /// # Examples
605    ///
606    /// ```
607    /// use aurora_db::{Aurora, types::Value};
608    ///
609    /// let db = Aurora::open("mydb.db")?;
610    ///
611    /// // Basic listener
612    /// let mut listener = db.listen("users");
613    ///
614    /// tokio::spawn(async move {
615    ///     while let Ok(event) = listener.recv().await {
616    ///         match event.change_type {
617    ///             ChangeType::Insert => println!("New user: {:?}", event.document),
618    ///             ChangeType::Update => println!("Updated user: {:?}", event.document),
619    ///             ChangeType::Delete => println!("Deleted user ID: {}", event.id),
620    ///         }
621    ///     }
622    /// });
623    ///
624    /// // Now any insert/update/delete will trigger the listener
625    /// db.insert_into("users", vec![("name", Value::String("Alice".into()))]).await?;
626    /// ```
627    ///
628    /// # Real-World Use Cases
629    ///
630    /// **Cache Invalidation:**
631    /// ```
632    /// use std::sync::Arc;
633    /// use tokio::sync::RwLock;
634    /// use std::collections::HashMap;
635    ///
636    /// let cache = Arc::new(RwLock::new(HashMap::new()));
637    /// let cache_clone = Arc::clone(&cache);
638    ///
639    /// let mut listener = db.listen("products");
640    ///
641    /// tokio::spawn(async move {
642    ///     while let Ok(event) = listener.recv().await {
643    ///         // Invalidate cache entry when product changes
644    ///         cache_clone.write().await.remove(&event.id);
645    ///         println!("Cache invalidated for product: {}", event.id);
646    ///     }
647    /// });
648    /// ```
649    ///
650    /// **Webhook Notifications:**
651    /// ```
652    /// let mut listener = db.listen("orders");
653    ///
654    /// tokio::spawn(async move {
655    ///     while let Ok(event) = listener.recv().await {
656    ///         if event.change_type == ChangeType::Insert {
657    ///             // Send webhook for new orders
658    ///             send_webhook("https://api.example.com/webhooks/order", &event).await;
659    ///         }
660    ///     }
661    /// });
662    /// ```
663    ///
664    /// **Audit Logging:**
665    /// ```
666    /// let mut listener = db.listen("sensitive_data");
667    ///
668    /// tokio::spawn(async move {
669    ///     while let Ok(event) = listener.recv().await {
670    ///         // Log all changes to audit trail
671    ///         db.insert_into("audit_log", vec![
672    ///             ("collection", Value::String("sensitive_data".into())),
673    ///             ("action", Value::String(format!("{:?}", event.change_type))),
674    ///             ("document_id", Value::String(event.id.clone())),
675    ///             ("timestamp", Value::String(chrono::Utc::now().to_rfc3339())),
676    ///         ]).await?;
677    ///     }
678    /// });
679    /// ```
680    ///
681    /// **Data Synchronization:**
682    /// ```
683    /// let mut listener = db.listen("users");
684    ///
685    /// tokio::spawn(async move {
686    ///     while let Ok(event) = listener.recv().await {
687    ///         // Sync changes to external system
688    ///         match event.change_type {
689    ///             ChangeType::Insert | ChangeType::Update => {
690    ///                 if let Some(doc) = event.document {
691    ///                     external_api.upsert_user(&doc).await?;
692    ///                 }
693    ///             },
694    ///             ChangeType::Delete => {
695    ///                 external_api.delete_user(&event.id).await?;
696    ///             },
697    ///         }
698    ///     }
699    /// });
700    /// ```
701    ///
702    /// **Real-Time Notifications:**
703    /// ```
704    /// let mut listener = db.listen("messages");
705    ///
706    /// tokio::spawn(async move {
707    ///     while let Ok(event) = listener.recv().await {
708    ///         if event.change_type == ChangeType::Insert {
709    ///             if let Some(msg) = event.document {
710    ///                 // Push notification to connected websockets
711    ///                 if let Some(recipient) = msg.data.get("recipient_id") {
712    ///                     websocket_manager.send_to_user(recipient, &msg).await;
713    ///                 }
714    ///             }
715    ///         }
716    ///     }
717    /// });
718    /// ```
719    ///
720    /// **Filtered Listener:**
721    /// ```
722    /// use aurora_db::pubsub::EventFilter;
723    ///
724    /// // Only listen for inserts
725    /// let mut listener = db.listen("users")
726    ///     .filter(EventFilter::ChangeType(ChangeType::Insert));
727    ///
728    /// // Only listen for documents with specific field value
729    /// let mut listener = db.listen("users")
730    ///     .filter(EventFilter::FieldEquals("role".to_string(), Value::String("admin".into())));
731    /// ```
732    ///
733    /// # Important Notes
734    /// - Listener stays active until dropped
735    /// - Events are delivered in order
736    /// - Each listener has its own event stream
737    /// - Use filters to reduce unnecessary event processing
738    /// - Listeners don't affect write performance
739    ///
740    /// # See Also
741    /// - `listen_all()` to listen to all collections
742    /// - `ChangeListener::filter()` to filter events
743    /// - `query().watch()` for reactive queries with filtering
744    pub fn listen(&self, collection: impl Into<String>) -> crate::pubsub::ChangeListener {
745        self.pubsub.listen(collection)
746    }
747
748    /// Listen for all changes across all collections
749    ///
750    /// Returns a stream of change events for every insert, update, and delete
751    /// operation across the entire database. Useful for global audit logging,
752    /// replication, and monitoring systems.
753    ///
754    /// # Performance
755    /// - Same performance as single collection listener
756    /// - Filter events by collection in your handler
757    /// - Consider using `listen(collection)` if only watching specific collections
758    ///
759    /// # Examples
760    ///
761    /// ```
762    /// use aurora_db::Aurora;
763    ///
764    /// let db = Aurora::open("mydb.db")?;
765    ///
766    /// // Listen to everything
767    /// let mut listener = db.listen_all();
768    ///
769    /// tokio::spawn(async move {
770    ///     while let Ok(event) = listener.recv().await {
771    ///         println!("Change in {}: {:?}", event.collection, event.change_type);
772    ///     }
773    /// });
774    /// ```
775    ///
776    /// # Real-World Use Cases
777    ///
778    /// **Global Audit Trail:**
779    /// ```
780    /// let mut listener = db.listen_all();
781    ///
782    /// tokio::spawn(async move {
783    ///     while let Ok(event) = listener.recv().await {
784    ///         // Log every database change
785    ///         audit_logger.log(AuditEntry {
786    ///             timestamp: chrono::Utc::now(),
787    ///             collection: event.collection,
788    ///             action: event.change_type,
789    ///             document_id: event.id,
790    ///             user_id: get_current_user_id(),
791    ///         }).await;
792    ///     }
793    /// });
794    /// ```
795    ///
796    /// **Database Replication:**
797    /// ```
798    /// let mut listener = db.listen_all();
799    ///
800    /// tokio::spawn(async move {
801    ///     while let Ok(event) = listener.recv().await {
802    ///         // Replicate to secondary database
803    ///         replica_db.apply_change(event).await?;
804    ///     }
805    /// });
806    /// ```
807    ///
808    /// **Change Data Capture (CDC):**
809    /// ```
810    /// let mut listener = db.listen_all();
811    ///
812    /// tokio::spawn(async move {
813    ///     while let Ok(event) = listener.recv().await {
814    ///         // Stream changes to Kafka/RabbitMQ
815    ///         kafka_producer.send(
816    ///             &format!("cdc.{}", event.collection),
817    ///             serde_json::to_string(&event)?
818    ///         ).await?;
819    ///     }
820    /// });
821    /// ```
822    ///
823    /// **Monitoring & Metrics:**
824    /// ```
825    /// use std::sync::atomic::{AtomicUsize, Ordering};
826    ///
827    /// let write_counter = Arc::new(AtomicUsize::new(0));
828    /// let counter_clone = Arc::clone(&write_counter);
829    ///
830    /// let mut listener = db.listen_all();
831    ///
832    /// tokio::spawn(async move {
833    ///     while let Ok(_event) = listener.recv().await {
834    ///         counter_clone.fetch_add(1, Ordering::Relaxed);
835    ///     }
836    /// });
837    ///
838    /// // Report metrics every 60 seconds
839    /// tokio::spawn(async move {
840    ///     loop {
841    ///         tokio::time::sleep(Duration::from_secs(60)).await;
842    ///         let count = write_counter.swap(0, Ordering::Relaxed);
843    ///         println!("Writes per minute: {}", count);
844    ///     }
845    /// });
846    /// ```
847    ///
848    /// **Selective Processing:**
849    /// ```
850    /// let mut listener = db.listen_all();
851    ///
852    /// tokio::spawn(async move {
853    ///     while let Ok(event) = listener.recv().await {
854    ///         // Handle different collections differently
855    ///         match event.collection.as_str() {
856    ///             "users" => handle_user_change(event).await,
857    ///             "orders" => handle_order_change(event).await,
858    ///             "payments" => handle_payment_change(event).await,
859    ///             _ => {} // Ignore others
860    ///         }
861    ///     }
862    /// });
863    /// ```
864    ///
865    /// # When to Use
866    /// - Global audit logging
867    /// - Database replication
868    /// - Change data capture (CDC)
869    /// - Monitoring and metrics
870    /// - Event sourcing systems
871    ///
872    /// # When NOT to Use
873    /// - Only need to watch 1-2 collections → Use `listen(collection)` instead
874    /// - High write volume with selective interest → Use collection-specific listeners
875    /// - Need complex filtering → Use `query().watch()` instead
876    ///
877    /// # See Also
878    /// - `listen()` for single collection listening
879    /// - `listener_count()` to check active listeners
880    /// - `query().watch()` for filtered reactive queries
881    pub fn listen_all(&self) -> crate::pubsub::ChangeListener {
882        self.pubsub.listen_all()
883    }
884
885    /// Get the number of active listeners for a collection
886    pub fn listener_count(&self, collection: &str) -> usize {
887        self.pubsub.listener_count(collection)
888    }
889
890    /// Get total number of active listeners
891    pub fn total_listeners(&self) -> usize {
892        self.pubsub.total_listeners()
893    }
894
895    /// Flushes all buffered writes to disk to ensure durability.
896    ///
897    /// This method forces all pending writes from:
898    /// - Write buffer (if enabled)
899    /// - Cold storage internal buffers
900    /// - Write-ahead log (if enabled)
901    ///
902    /// Call this when you need to ensure data persistence before
903    /// a critical operation or shutdown. After flush() completes,
904    /// all data is guaranteed to be on disk even if power fails.
905    ///
906    /// # Performance
907    /// - Flush time: ~10-50ms depending on buffered data
908    /// - Triggers OS-level fsync() for durability guarantee
909    /// - Truncates WAL after successful flush
910    /// - Not needed for every write (WAL provides durability)
911    ///
912    /// # Examples
913    ///
914    /// ```
915    /// use aurora_db::Aurora;
916    ///
917    /// let db = Aurora::open("mydb.db")?;
918    ///
919    /// // Basic flush after critical write
920    /// db.insert_into("users", data).await?;
921    /// db.flush()?;  // Ensure data is persisted to disk
922    ///
923    /// // Graceful shutdown pattern
924    /// fn shutdown(db: &Aurora) -> Result<()> {
925    ///     println!("Flushing pending writes...");
926    ///     db.flush()?;
927    ///     println!("Shutdown complete - all data persisted");
928    ///     Ok(())
929    /// }
930    ///
931    /// // Periodic checkpoint pattern
932    /// use std::time::Duration;
933    /// use std::thread;
934    ///
935    /// let db = db.clone();
936    /// thread::spawn(move || {
937    ///     loop {
938    ///         thread::sleep(Duration::from_secs(60));
939    ///         if let Err(e) = db.flush() {
940    ///             eprintln!("Flush error: {}", e);
941    ///         } else {
942    ///             println!("Checkpoint: data flushed to disk");
943    ///         }
944    ///     }
945    /// });
946    ///
947    /// // Critical transaction pattern
948    /// let tx_id = db.begin_transaction();
949    ///
950    /// // Multiple operations
951    /// db.insert_into("orders", order_data).await?;
952    /// db.update_document("inventory", product_id, updates).await?;
953    /// db.insert_into("audit_log", audit_data).await?;
954    ///
955    /// // Commit and flush immediately
956    /// db.commit_transaction(tx_id)?;
957    /// db.flush()?;  // Critical: ensure transaction is on disk
958    ///
959    /// // Backup preparation
960    /// println!("Preparing backup...");
961    /// db.flush()?;  // Ensure all data is written
962    /// std::fs::copy("mydb.db", "backup.db")?;
963    /// println!("Backup complete");
964    /// ```
965    ///
966    /// # When to Use
967    /// - Before graceful shutdown
968    /// - After critical transactions
969    /// - Before creating backups
970    /// - Periodic checkpoints (every 30-60 seconds)
971    /// - Before risky operations
972    ///
973    /// # When NOT to Use
974    /// - After every single write (too slow, WAL provides durability)
975    /// - In high-throughput loops (batch instead)
976    /// - When durability mode is already Immediate
977    ///
978    /// # Important Notes
979    /// - WAL provides durability even without explicit flush()
980    /// - flush() adds latency (~10-50ms) so use strategically
981    /// - Automatic flush happens during graceful shutdown
982    /// - After flush(), WAL is truncated (data is in main storage)
983    ///
984    /// # See Also
985    /// - `Aurora::with_config()` to set durability mode
986    /// - WAL (Write-Ahead Log) provides durability without explicit flushes
987    pub fn flush(&self) -> Result<()> {
988        // Flush write buffer if present
989        if let Some(ref write_buffer) = self.write_buffer {
990            write_buffer.flush()?;
991        }
992
993        // Flush cold storage
994        self.cold.flush()?;
995
996        // Truncate WAL after successful flush (data is now in cold storage)
997        if let Some(ref wal) = self.wal
998            && let Ok(mut wal_lock) = wal.write() {
999                wal_lock.truncate()?;
1000            }
1001
1002        Ok(())
1003    }
1004
1005    /// Store a key-value pair (low-level storage)
1006    ///
1007    /// This is the low-level method. For documents, use `insert_into()` instead.
1008    /// Writes are buffered and batched for performance.
1009    ///
1010    /// # Arguments
1011    /// * `key` - Unique key (format: "collection:id" for documents)
1012    /// * `value` - Raw bytes to store
1013    /// * `ttl` - Optional time-to-live (None = permanent)
1014    ///
1015    /// # Performance
1016    /// - Buffered writes: ~15-30K docs/sec
1017    /// - Batching improves throughput significantly
1018    /// - Call `flush()` to ensure data is persisted
1019    ///
1020    /// # Examples
1021    ///
1022    /// ```
1023    /// use std::time::Duration;
1024    ///
1025    /// // Permanent storage
1026    /// let data = serde_json::to_vec(&my_struct)?;
1027    /// db.put("mykey".to_string(), data, None)?;
1028    ///
1029    /// // With TTL (expires after 1 hour)
1030    /// db.put("session:abc".to_string(), session_data, Some(Duration::from_secs(3600)))?;
1031    ///
1032    /// // Better: use insert_into() for documents
1033    /// db.insert_into("users", vec![("name", Value::String("Alice".into()))])?;
1034    /// ```
1035    pub fn put(&self, key: String, value: Vec<u8>, ttl: Option<Duration>) -> Result<()> {
1036        const MAX_BLOB_SIZE: usize = 50 * 1024 * 1024;
1037
1038        if value.len() > MAX_BLOB_SIZE {
1039            return Err(AuroraError::InvalidOperation(format!(
1040                "Blob size {} exceeds maximum allowed size of {}MB",
1041                value.len() / (1024 * 1024),
1042                MAX_BLOB_SIZE / (1024 * 1024)
1043            )));
1044        }
1045
1046        if let Some(ref wal) = self.wal
1047            && self.config.durability_mode != DurabilityMode::None {
1048                wal.write()
1049                    .unwrap()
1050                    .append(Operation::Put, &key, Some(&value))?;
1051            }
1052
1053        if let Some(ref write_buffer) = self.write_buffer {
1054            write_buffer.write(key.clone(), value.clone())?;
1055        } else {
1056            self.cold.set(key.clone(), value.clone())?;
1057        }
1058
1059        self.hot.set(key.clone(), value.clone(), ttl);
1060
1061        if let Some(collection_name) = key.split(':').next()
1062            && !collection_name.starts_with('_') {
1063                self.index_value(collection_name, &key, &value)?;
1064            }
1065
1066        Ok(())
1067    }
1068
1069    /// Replay WAL entries to recover from crash
1070    fn replay_wal(&self, entries: Vec<crate::wal::LogEntry>) -> Result<()> {
1071        for entry in entries {
1072            match entry.operation {
1073                Operation::Put => {
1074                    if let Some(value) = entry.value {
1075                        // Write directly to cold storage (skip WAL, already logged)
1076                        self.cold.set(entry.key.clone(), value.clone())?;
1077
1078                        // Update hot cache
1079                        self.hot.set(entry.key.clone(), value.clone(), None);
1080
1081                        // Rebuild indices
1082                        if let Some(collection) = entry.key.split(':').next()
1083                            && !collection.starts_with('_') {
1084                                self.index_value(collection, &entry.key, &value)?;
1085                            }
1086                    }
1087                }
1088                Operation::Delete => {
1089                    self.cold.delete(&entry.key)?;
1090                    self.hot.delete(&entry.key);
1091                    // TODO: Remove from indices
1092                }
1093                Operation::BeginTx | Operation::CommitTx | Operation::RollbackTx => {
1094                    // Transaction boundaries - future implementation
1095                }
1096            }
1097        }
1098
1099        // Flush after replay and truncate WAL
1100        self.cold.flush()?;
1101        if let Some(ref wal) = self.wal {
1102            wal.write().unwrap().truncate()?;
1103        }
1104
1105        Ok(())
1106    }
1107
1108    fn index_value(&self, collection: &str, key: &str, value: &[u8]) -> Result<()> {
1109        // Update primary index with metadata only (not the full value)
1110        let location = DiskLocation::new(value.len());
1111        self.primary_indices
1112            .entry(collection.to_string())
1113            .or_default()
1114            .insert(key.to_string(), location);
1115
1116        // Try to get schema from cache first, otherwise load and cache it
1117        let collection_obj = match self.schema_cache.get(collection) {
1118            Some(cached_schema) => Arc::clone(cached_schema.value()),
1119            None => {
1120                // Schema not in cache - load it
1121                let collection_key = format!("_collection:{}", collection);
1122                let schema_data = match self.get(&collection_key)? {
1123                    Some(data) => data,
1124                    None => return Ok(()), // No schema = no secondary indices
1125                };
1126
1127                let obj: Collection = match serde_json::from_slice(&schema_data) {
1128                    Ok(obj) => obj,
1129                    Err(_) => return Ok(()), // Invalid schema = skip indexing
1130                };
1131
1132                // Cache the schema for future use
1133                let arc_obj = Arc::new(obj);
1134                self.schema_cache
1135                    .insert(collection.to_string(), Arc::clone(&arc_obj));
1136                arc_obj
1137            }
1138        };
1139
1140        // Build list of fields that should be indexed (unique or explicitly indexed)
1141        let indexed_fields: Vec<String> = collection_obj
1142            .fields
1143            .iter()
1144            .filter(|(_, def)| def.indexed || def.unique)
1145            .map(|(name, _)| name.clone())
1146            .collect();
1147
1148        // If no fields need indexing, we're done
1149        if indexed_fields.is_empty() {
1150            return Ok(());
1151        }
1152
1153        // Update secondary indices - ONLY for indexed/unique fields
1154        if let Ok(doc) = serde_json::from_slice::<Document>(value) {
1155            for (field, field_value) in doc.data {
1156                // CRITICAL FIX: Skip fields that aren't indexed
1157                if !indexed_fields.contains(&field) {
1158                    continue;
1159                }
1160
1161                // Use consistent string formatting for indexing
1162                let value_str = match &field_value {
1163                    Value::String(s) => s.clone(),
1164                    _ => field_value.to_string(),
1165                };
1166
1167                let index_key = format!("{}:{}", collection, field);
1168                let secondary_index = self
1169                    .secondary_indices
1170                    .entry(index_key)
1171                    .or_default();
1172
1173                // Check if we're at the index limit
1174                let max_entries = self.config.max_index_entries_per_field;
1175
1176                secondary_index
1177                    .entry(value_str)
1178                    .and_modify(|doc_ids| {
1179                        // Only add if we haven't exceeded the limit per value
1180                        if doc_ids.len() < max_entries {
1181                            doc_ids.push(key.to_string());
1182                        }
1183                    })
1184                    .or_insert_with(|| vec![key.to_string()]);
1185            }
1186        }
1187        Ok(())
1188    }
1189
1190    /// Scan collection with filter and early termination support
1191    /// Used by QueryBuilder for optimized queries with LIMIT
1192    pub fn scan_and_filter<F>(&self, collection: &str, filter: F, limit: Option<usize>)
1193        -> Result<Vec<Document>>
1194    where
1195        F: Fn(&Document) -> bool,
1196    {
1197        let mut documents = Vec::new();
1198
1199        if let Some(index) = self.primary_indices.get(collection) {
1200            for entry in index.iter() {
1201                // Early termination
1202                if let Some(max) = limit {
1203                    if documents.len() >= max {
1204                        break;
1205                    }
1206                }
1207
1208                let key = entry.key();
1209                if let Some(data) = self.get(key)? {
1210                    if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
1211                        if filter(&doc) {
1212                            documents.push(doc);
1213                        }
1214                    }
1215                }
1216            }
1217        } else {
1218            // Fallback
1219            documents = self.scan_collection(collection)?;
1220            documents.retain(|doc| filter(doc));
1221            if let Some(max) = limit {
1222                documents.truncate(max);
1223            }
1224        }
1225
1226        Ok(documents)
1227    }
1228
1229    /// Smart collection scan that uses the primary index as a key directory
1230    /// Avoids forced flushes and leverages hot cache for better performance
1231    fn scan_collection(&self, collection: &str) -> Result<Vec<Document>> {
1232        let mut documents = Vec::new();
1233
1234        // Use the primary index as a "key directory" - it contains all document keys
1235        if let Some(index) = self.primary_indices.get(collection) {
1236            // Iterate through all keys in the primary index (fast, in-memory)
1237            for entry in index.iter() {
1238                let key = entry.key();
1239
1240                // Fetch each document using get() which checks:
1241                // 1. Hot cache first (instant)
1242                // 2. Cold storage if not cached (disk I/O only for uncached items)
1243                if let Some(data) = self.get(key)? {
1244                    if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
1245                        documents.push(doc);
1246                    }
1247                }
1248            }
1249        } else {
1250            // Fallback: scan from cold storage if primary index not yet initialized
1251            let prefix = format!("{}:", collection);
1252            for result in self.cold.scan_prefix(&prefix) {
1253                if let Ok((_key, value)) = result {
1254                    if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
1255                        documents.push(doc);
1256                    }
1257                }
1258            }
1259        }
1260
1261        Ok(documents)
1262    }
1263
1264    // Restore missing methods
1265    pub async fn put_blob(&self, key: String, file_path: &Path) -> Result<()> {
1266        const MAX_FILE_SIZE: usize = 50 * 1024 * 1024; // 50MB limit
1267
1268        // Get file metadata to check size before reading
1269        let metadata = tokio::fs::metadata(file_path).await?;
1270        let file_size = metadata.len() as usize;
1271
1272        if file_size > MAX_FILE_SIZE {
1273            return Err(AuroraError::InvalidOperation(format!(
1274                "File size {} MB exceeds maximum allowed size of {} MB",
1275                file_size / (1024 * 1024),
1276                MAX_FILE_SIZE / (1024 * 1024)
1277            )));
1278        }
1279
1280        let mut file = File::open(file_path).await?;
1281        let mut buffer = Vec::new();
1282        file.read_to_end(&mut buffer).await?;
1283
1284        // Add BLOB: prefix to mark this as blob data
1285        let mut blob_data = Vec::with_capacity(5 + buffer.len());
1286        blob_data.extend_from_slice(b"BLOB:");
1287        blob_data.extend_from_slice(&buffer);
1288
1289        self.put(key, blob_data, None)
1290    }
1291
1292    /// Create a new collection with schema definition
1293    ///
1294    /// Collections are like tables in SQL - they define the structure of your documents.
1295    /// The third boolean parameter indicates if the field should be indexed for fast lookups.
1296    ///
1297    /// # Arguments
1298    /// * `name` - Collection name
1299    /// * `fields` - Vector of (field_name, field_type, indexed) tuples
1300    ///   - Field name (accepts both &str and String)
1301    ///   - Field type (String, Int, Float, Bool, etc.)
1302    ///   - Indexed: true for fast lookups, false for no index
1303    ///
1304    /// # Performance
1305    /// - Indexed fields: Fast equality queries (O(1) lookup)
1306    /// - Non-indexed fields: Full scan required for queries
1307    /// - Unique fields are automatically indexed
1308    ///
1309    /// # Examples
1310    ///
1311    /// ```
1312    /// use aurora_db::{Aurora, types::FieldType};
1313    ///
1314    /// let db = Aurora::open("mydb.db")?;
1315    ///
1316    /// // Create a users collection
1317    /// db.new_collection("users", vec![
1318    ///     ("name", FieldType::String, false),      // Not indexed
1319    ///     ("email", FieldType::String, true),      // Indexed - fast lookups
1320    ///     ("age", FieldType::Int, false),
1321    ///     ("active", FieldType::Bool, true),       // Indexed
1322    ///     ("score", FieldType::Float, false),
1323    /// ])?;
1324    ///
1325    /// // Idempotent - calling again is safe
1326    /// db.new_collection("users", vec![/* ... */])?; // OK!
1327    /// ```
1328    pub fn new_collection<S: Into<String>>(
1329        &self,
1330        name: &str,
1331        fields: Vec<(S, FieldType, bool)>,
1332    ) -> Result<()> {
1333        let collection_key = format!("_collection:{}", name);
1334
1335        // Check if collection already exists - if so, just return Ok (idempotent)
1336        if self.get(&collection_key)?.is_some() {
1337            return Ok(());
1338        }
1339
1340        // Create field definitions
1341        let mut field_definitions = HashMap::new();
1342        for (field_name, field_type, unique) in fields {
1343            field_definitions.insert(
1344                field_name.into(),
1345                FieldDefinition {
1346                    field_type,
1347                    unique,
1348                    indexed: unique, // Auto-index unique fields
1349                },
1350            );
1351        }
1352
1353        let collection = Collection {
1354            name: name.to_string(),
1355            fields: field_definitions,
1356            // REMOVED: unique_fields is now derived from fields
1357        };
1358
1359        let collection_data = serde_json::to_vec(&collection)?;
1360        self.put(collection_key, collection_data, None)?;
1361
1362        // Invalidate schema cache since we just created/updated the collection schema
1363        self.schema_cache.remove(name);
1364
1365        Ok(())
1366    }
1367
1368    /// Insert a document into a collection
1369    ///
1370    /// Automatically generates a UUID for the document and validates against
1371    /// collection schema and unique constraints. Returns the generated document ID.
1372    ///
1373    /// # Performance
1374    /// - Single insert: ~15,000 docs/sec
1375    /// - Bulk insert: Use `batch_insert()` for 10+ documents (~50,000 docs/sec)
1376    /// - Triggers PubSub events for real-time listeners
1377    ///
1378    /// # Arguments
1379    /// * `collection` - Name of the collection to insert into
1380    /// * `data` - Document fields and values to insert
1381    ///
1382    /// # Returns
1383    /// The auto-generated ID of the inserted document or an error
1384    ///
1385    /// # Errors
1386    /// - `CollectionNotFound`: Collection doesn't exist
1387    /// - `ValidationError`: Data violates schema or unique constraints
1388    /// - `SerializationError`: Invalid data format
1389    ///
1390    /// # Examples
1391    ///
1392    /// ```
1393    /// use aurora_db::{Aurora, types::Value};
1394    ///
1395    /// let db = Aurora::open("mydb.db")?;
1396    ///
1397    /// // Basic insertion
1398    /// let user_id = db.insert_into("users", vec![
1399    ///     ("name", Value::String("Alice Smith".to_string())),
1400    ///     ("email", Value::String("alice@example.com".to_string())),
1401    ///     ("age", Value::Int(28)),
1402    ///     ("active", Value::Bool(true)),
1403    /// ]).await?;
1404    ///
1405    /// println!("Created user with ID: {}", user_id);
1406    ///
1407    /// // Inserting with nested data
1408    /// let order_id = db.insert_into("orders", vec![
1409    ///     ("user_id", Value::String(user_id.clone())),
1410    ///     ("total", Value::Float(99.99)),
1411    ///     ("status", Value::String("pending".to_string())),
1412    ///     ("items", Value::Array(vec![
1413    ///         Value::String("item-123".to_string()),
1414    ///         Value::String("item-456".to_string()),
1415    ///     ])),
1416    /// ]).await?;
1417    ///
1418    /// // Error handling - unique constraint violation
1419    /// match db.insert_into("users", vec![
1420    ///     ("email", Value::String("alice@example.com".to_string())),  // Duplicate!
1421    ///     ("name", Value::String("Alice Clone".to_string())),
1422    /// ]).await {
1423    ///     Ok(id) => println!("Inserted: {}", id),
1424    ///     Err(e) => println!("Failed: {} (email already exists)", e),
1425    /// }
1426    ///
1427    /// // For bulk inserts (10+ documents), use batch_insert() instead
1428    /// let users = vec![
1429    ///     HashMap::from([
1430    ///         ("name".to_string(), Value::String("Bob".to_string())),
1431    ///         ("email".to_string(), Value::String("bob@example.com".to_string())),
1432    ///     ]),
1433    ///     HashMap::from([
1434    ///         ("name".to_string(), Value::String("Carol".to_string())),
1435    ///         ("email".to_string(), Value::String("carol@example.com".to_string())),
1436    ///     ]),
1437    ///     // ... more documents
1438    /// ];
1439    /// let ids = db.batch_insert("users", users).await?;  // 3x faster!
1440    /// println!("Inserted {} users", ids.len());
1441    /// ```
1442    pub async fn insert_into(&self, collection: &str, data: Vec<(&str, Value)>) -> Result<String> {
1443        // Convert Vec<(&str, Value)> to HashMap<String, Value>
1444        let data_map: HashMap<String, Value> =
1445            data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
1446
1447        // Validate unique constraints before inserting
1448        self.validate_unique_constraints(collection, &data_map)
1449            .await?;
1450
1451        let doc_id = Uuid::new_v4().to_string();
1452        let document = Document {
1453            id: doc_id.clone(),
1454            data: data_map,
1455        };
1456
1457        self.put(
1458            format!("{}:{}", collection, doc_id),
1459            serde_json::to_vec(&document)?,
1460            None,
1461        )?;
1462
1463        // Publish insert event
1464        let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
1465        let _ = self.pubsub.publish(event);
1466
1467        Ok(doc_id)
1468    }
1469
1470    pub async fn insert_map(
1471        &self,
1472        collection: &str,
1473        data: HashMap<String, Value>,
1474    ) -> Result<String> {
1475        // Validate unique constraints before inserting
1476        self.validate_unique_constraints(collection, &data).await?;
1477
1478        let doc_id = Uuid::new_v4().to_string();
1479        let document = Document {
1480            id: doc_id.clone(),
1481            data,
1482        };
1483
1484        self.put(
1485            format!("{}:{}", collection, doc_id),
1486            serde_json::to_vec(&document)?,
1487            None,
1488        )?;
1489
1490        // Publish insert event
1491        let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
1492        let _ = self.pubsub.publish(event);
1493
1494        Ok(doc_id)
1495    }
1496
1497    /// Batch insert multiple documents with optimized write path
1498    ///
1499    /// Inserts multiple documents in a single optimized operation, bypassing
1500    /// the write buffer for better performance. Ideal for bulk data loading,
1501    /// migrations, or initial database seeding. 3x faster than individual inserts.
1502    ///
1503    /// # Performance
1504    /// - Insert speed: ~50,000 docs/sec (vs ~15,000 for single inserts)
1505    /// - Batch writes to WAL and storage
1506    /// - Validates all unique constraints
1507    /// - Use for 10+ documents minimum
1508    ///
1509    /// # Arguments
1510    /// * `collection` - Name of the collection to insert into
1511    /// * `documents` - Vector of document data as HashMaps
1512    ///
1513    /// # Returns
1514    /// Vector of auto-generated document IDs or an error
1515    ///
1516    /// # Examples
1517    ///
1518    /// ```
1519    /// use aurora_db::{Aurora, types::Value};
1520    /// use std::collections::HashMap;
1521    ///
1522    /// let db = Aurora::open("mydb.db")?;
1523    ///
1524    /// // Bulk user import
1525    /// let users = vec![
1526    ///     HashMap::from([
1527    ///         ("name".to_string(), Value::String("Alice".into())),
1528    ///         ("email".to_string(), Value::String("alice@example.com".into())),
1529    ///         ("age".to_string(), Value::Int(28)),
1530    ///     ]),
1531    ///     HashMap::from([
1532    ///         ("name".to_string(), Value::String("Bob".into())),
1533    ///         ("email".to_string(), Value::String("bob@example.com".into())),
1534    ///         ("age".to_string(), Value::Int(32)),
1535    ///     ]),
1536    ///     HashMap::from([
1537    ///         ("name".to_string(), Value::String("Carol".into())),
1538    ///         ("email".to_string(), Value::String("carol@example.com".into())),
1539    ///         ("age".to_string(), Value::Int(25)),
1540    ///     ]),
1541    /// ];
1542    ///
1543    /// let ids = db.batch_insert("users", users).await?;
1544    /// println!("Inserted {} users", ids.len());
1545    ///
1546    /// // Seeding test data
1547    /// let test_products: Vec<HashMap<String, Value>> = (0..1000)
1548    ///     .map(|i| HashMap::from([
1549    ///         ("sku".to_string(), Value::String(format!("PROD-{:04}", i))),
1550    ///         ("price".to_string(), Value::Float(9.99 + i as f64)),
1551    ///         ("stock".to_string(), Value::Int(100)),
1552    ///     ]))
1553    ///     .collect();
1554    ///
1555    /// let ids = db.batch_insert("products", test_products).await?;
1556    /// // Much faster than 1000 individual insert_into() calls!
1557    ///
1558    /// // Migration from CSV data
1559    /// let mut csv_reader = csv::Reader::from_path("data.csv")?;
1560    /// let mut batch = Vec::new();
1561    ///
1562    /// for result in csv_reader.records() {
1563    ///     let record = result?;
1564    ///     let doc = HashMap::from([
1565    ///         ("field1".to_string(), Value::String(record[0].to_string())),
1566    ///         ("field2".to_string(), Value::String(record[1].to_string())),
1567    ///     ]);
1568    ///     batch.push(doc);
1569    ///
1570    ///     // Insert in batches of 1000
1571    ///     if batch.len() >= 1000 {
1572    ///         db.batch_insert("imported_data", batch.clone()).await?;
1573    ///         batch.clear();
1574    ///     }
1575    /// }
1576    ///
1577    /// // Insert remaining
1578    /// if !batch.is_empty() {
1579    ///     db.batch_insert("imported_data", batch).await?;
1580    /// }
1581    /// ```
1582    ///
1583    /// # Errors
1584    /// - `ValidationError`: Unique constraint violation on any document
1585    /// - `CollectionNotFound`: Collection doesn't exist
1586    /// - `IoError`: Storage write failure
1587    ///
1588    /// # Important Notes
1589    /// - All inserts are atomic - if one fails, none are inserted
1590    /// - UUIDs are auto-generated for all documents
1591    /// - PubSub events are published for each insert
1592    /// - For 10+ documents, this is 3x faster than individual inserts
1593    /// - For < 10 documents, use `insert_into()` instead
1594    ///
1595    /// # See Also
1596    /// - `insert_into()` for single document inserts
1597    /// - `import_from_json()` for file-based bulk imports
1598    /// - `batch_write()` for low-level batch operations
1599    pub async fn batch_insert(
1600        &self,
1601        collection: &str,
1602        documents: Vec<HashMap<String, Value>>,
1603    ) -> Result<Vec<String>> {
1604        let mut doc_ids = Vec::with_capacity(documents.len());
1605        let mut pairs = Vec::with_capacity(documents.len());
1606
1607        // Prepare all documents
1608        for data in documents {
1609            // Validate unique constraints
1610            self.validate_unique_constraints(collection, &data).await?;
1611
1612            let doc_id = Uuid::new_v4().to_string();
1613            let document = Document {
1614                id: doc_id.clone(),
1615                data,
1616            };
1617
1618            let key = format!("{}:{}", collection, doc_id);
1619            let value = serde_json::to_vec(&document)?;
1620
1621            pairs.push((key, value));
1622            doc_ids.push(doc_id);
1623        }
1624
1625        // Write to WAL in batch (if enabled)
1626        if let Some(ref wal) = self.wal
1627            && self.config.durability_mode != DurabilityMode::None {
1628                let mut wal_lock = wal.write().unwrap();
1629                for (key, value) in &pairs {
1630                    wal_lock.append(Operation::Put, key, Some(value))?;
1631                }
1632            }
1633
1634        // Bypass write buffer - go directly to cold storage batch API
1635        self.cold.batch_set(pairs.clone())?;
1636
1637        // Note: Durability is handled by background checkpoint process
1638
1639        // Update hot cache and indices
1640        for (key, value) in pairs {
1641            self.hot.set(key.clone(), value.clone(), None);
1642
1643            if let Some(collection_name) = key.split(':').next()
1644                && !collection_name.starts_with('_') {
1645                    self.index_value(collection_name, &key, &value)?;
1646                }
1647        }
1648
1649        // Publish events
1650        for doc_id in &doc_ids {
1651            if let Ok(Some(doc)) = self.get_document(collection, doc_id) {
1652                let event = crate::pubsub::ChangeEvent::insert(collection, doc_id, doc);
1653                let _ = self.pubsub.publish(event);
1654            }
1655        }
1656
1657        Ok(doc_ids)
1658    }
1659
1660    /// Update a document by ID
1661    ///
1662    /// # Arguments
1663    /// * `collection` - Collection name
1664    /// * `doc_id` - Document ID to update
1665    /// * `data` - New field values to set
1666    ///
1667    /// # Returns
1668    /// Ok(()) on success, or an error if the document doesn't exist
1669    ///
1670    /// # Examples
1671    ///
1672    /// ```
1673    /// db.update_document("users", &user_id, vec![
1674    ///     ("status", Value::String("active".to_string())),
1675    ///     ("last_login", Value::String(chrono::Utc::now().to_rfc3339())),
1676    /// ]).await?;
1677    /// ```
1678    pub async fn update_document(
1679        &self,
1680        collection: &str,
1681        doc_id: &str,
1682        updates: Vec<(&str, Value)>,
1683    ) -> Result<()> {
1684        // Get existing document
1685        let mut document = self
1686            .get_document(collection, doc_id)?
1687            .ok_or_else(|| AuroraError::NotFound(format!("Document not found: {}", doc_id)))?;
1688
1689        // Store old document for event
1690        let old_document = document.clone();
1691
1692        // Apply updates
1693        for (field, value) in updates {
1694            document.data.insert(field.to_string(), value);
1695        }
1696
1697        // Validate unique constraints after update (excluding current document)
1698        self.validate_unique_constraints_excluding(collection, &document.data, doc_id)
1699            .await?;
1700
1701        // Save updated document
1702        self.put(
1703            format!("{}:{}", collection, doc_id),
1704            serde_json::to_vec(&document)?,
1705            None,
1706        )?;
1707
1708        // Publish update event
1709        let event =
1710            crate::pubsub::ChangeEvent::update(collection, doc_id, old_document, document.clone());
1711        let _ = self.pubsub.publish(event);
1712
1713        Ok(())
1714    }
1715
1716    pub async fn get_all_collection(&self, collection: &str) -> Result<Vec<Document>> {
1717        self.ensure_indices_initialized().await?;
1718        self.scan_collection(collection)
1719    }
1720
1721    pub fn get_data_by_pattern(&self, pattern: &str) -> Result<Vec<(String, DataInfo)>> {
1722        let mut data = Vec::new();
1723
1724        // Scan from cold storage instead of primary index
1725        for result in self.cold.scan() {
1726            if let Ok((key, value)) = result {
1727                if key.contains(pattern) {
1728                    let info = if value.starts_with(b"BLOB:") {
1729                        DataInfo::Blob { size: value.len() }
1730                    } else {
1731                        DataInfo::Data {
1732                            size: value.len(),
1733                            preview: String::from_utf8_lossy(&value[..value.len().min(50)])
1734                                .into_owned(),
1735                        }
1736                    };
1737
1738                    data.push((key.clone(), info));
1739                }
1740            }
1741        }
1742
1743        Ok(data)
1744    }
1745
1746    /// Begin a transaction
1747    ///
1748    /// All operations after beginning a transaction will be part of the transaction
1749    /// until either commit_transaction() or rollback_transaction() is called.
1750    ///
1751    /// # Returns
1752    /// Success or an error (e.g., if a transaction is already in progress)
1753    ///
1754    /// # Examples
1755    ///
1756    /// ```
1757    /// // Start a transaction for atomic operations
1758    /// db.begin_transaction()?;
1759    ///
1760    /// // Perform multiple operations
1761    /// db.insert_into("accounts", vec![("user_id", Value::String(user_id)), ("balance", Value::Float(100.0))])?;
1762    /// db.insert_into("audit_log", vec![("action", Value::String("account_created".to_string()))])?;
1763    ///
1764    /// // Commit all changes or roll back if there's an error
1765    /// if all_ok {
1766    ///     db.commit_transaction()?;
1767    /// } else {
1768    ///     db.rollback_transaction()?;
1769    /// }
1770    /// ```
1771    /// Begin a new transaction for atomic operations
1772    ///
1773    /// Transactions ensure all-or-nothing execution: either all operations succeed,
1774    /// or none of them are applied. Perfect for maintaining data consistency.
1775    ///
1776    /// # Examples
1777    ///
1778    /// ```
1779    /// use aurora_db::{Aurora, types::Value};
1780    ///
1781    /// let db = Aurora::open("mydb.db")?;
1782    ///
1783    /// // Start transaction
1784    /// let tx_id = db.begin_transaction();
1785    ///
1786    /// // Perform multiple operations
1787    /// db.insert_into("accounts", vec![
1788    ///     ("user_id", Value::String("alice".into())),
1789    ///     ("balance", Value::Int(1000)),
1790    /// ]).await?;
1791    ///
1792    /// db.insert_into("accounts", vec![
1793    ///     ("user_id", Value::String("bob".into())),
1794    ///     ("balance", Value::Int(500)),
1795    ///     ])).await?;
1796    ///
1797    /// // Commit if all succeeded
1798    /// db.commit_transaction(tx_id)?;
1799    ///
1800    /// // Or rollback on error
1801    /// // db.rollback_transaction(tx_id)?;
1802    /// ```
1803    pub fn begin_transaction(&self) -> crate::transaction::TransactionId {
1804        let buffer = self.transaction_manager.begin();
1805        buffer.id
1806    }
1807
1808    /// Commit a transaction, making all changes permanent
1809    ///
1810    /// All operations within the transaction are atomically applied to the database.
1811    /// If any operation fails, none are applied.
1812    ///
1813    /// # Arguments
1814    /// * `tx_id` - Transaction ID returned from begin_transaction()
1815    ///
1816    /// # Examples
1817    ///
1818    /// ```
1819    /// use aurora_db::{Aurora, types::Value};
1820    ///
1821    /// let db = Aurora::open("mydb.db")?;
1822    ///
1823    /// // Transfer money between accounts
1824    /// let tx_id = db.begin_transaction();
1825    ///
1826    /// // Deduct from Alice
1827    /// db.update_document("accounts", "alice", vec![
1828    ///     ("balance", Value::Int(900)),  // Was 1000
1829    /// ]).await?;
1830    ///
1831    /// // Add to Bob
1832    /// db.update_document("accounts", "bob", vec![
1833    ///     ("balance", Value::Int(600)),  // Was 500
1834    /// ]).await?;
1835    ///
1836    /// // Both updates succeed - commit them
1837    /// db.commit_transaction(tx_id)?;
1838    ///
1839    /// println!("Transfer completed!");
1840    /// ```
1841    pub fn commit_transaction(&self, tx_id: crate::transaction::TransactionId) -> Result<()> {
1842        let buffer = self
1843            .transaction_manager
1844            .active_transactions
1845            .get(&tx_id)
1846            .ok_or_else(|| {
1847                AuroraError::InvalidOperation("Transaction not found or already completed".into())
1848            })?;
1849
1850        for item in buffer.writes.iter() {
1851            let key = item.key();
1852            let value = item.value();
1853            self.cold.set(key.clone(), value.clone())?;
1854            self.hot.set(key.clone(), value.clone(), None);
1855            if let Some(collection_name) = key.split(':').next()
1856                && !collection_name.starts_with('_') {
1857                    self.index_value(collection_name, key, value)?;
1858                }
1859        }
1860
1861        for item in buffer.deletes.iter() {
1862            let key = item.key();
1863            if let Some((collection, id)) = key.split_once(':')
1864                && let Ok(Some(doc)) = self.get_document(collection, id) {
1865                    self.remove_from_indices(collection, &doc)?;
1866                }
1867            self.cold.delete(key)?;
1868            self.hot.delete(key);
1869        }
1870
1871        // Drop the buffer reference to release the DashMap read lock
1872        // before calling commit which needs to remove the entry (write lock)
1873        drop(buffer);
1874
1875        self.transaction_manager.commit(tx_id)?;
1876
1877        self.cold.compact()?;
1878
1879        Ok(())
1880    }
1881
1882    /// Roll back a transaction, discarding all changes
1883    ///
1884    /// All operations within the transaction are discarded. The database state
1885    /// remains unchanged. Use this when an error occurs during transaction processing.
1886    ///
1887    /// # Arguments
1888    /// * `tx_id` - Transaction ID returned from begin_transaction()
1889    ///
1890    /// # Examples
1891    ///
1892    /// ```
1893    /// use aurora_db::{Aurora, types::Value};
1894    ///
1895    /// let db = Aurora::open("mydb.db")?;
1896    ///
1897    /// // Attempt a transfer with validation
1898    /// let tx_id = db.begin_transaction();
1899    ///
1900    /// let result = async {
1901    ///     // Deduct from Alice
1902    ///     let alice = db.get_document("accounts", "alice").await?;
1903    ///     let balance = alice.and_then(|doc| doc.data.get("balance"));
1904    ///
1905    ///     if let Some(Value::Int(bal)) = balance {
1906    ///         if *bal < 100 {
1907    ///             return Err("Insufficient funds");
1908    ///         }
1909    ///
1910    ///         db.update_document("accounts", "alice", vec![
1911    ///             ("balance", Value::Int(bal - 100)),
1912    ///         ]).await?;
1913    ///
1914    ///         db.update_document("accounts", "bob", vec![
1915    ///             ("balance", Value::Int(600)),
1916    ///         ]).await?;
1917    ///
1918    ///         Ok(())
1919    ///     } else {
1920    ///         Err("Account not found")
1921    ///     }
1922    /// }.await;
1923    ///
1924    /// match result {
1925    ///     Ok(_) => {
1926    ///         db.commit_transaction(tx_id)?;
1927    ///         println!("Transfer completed");
1928    ///     }
1929    ///     Err(e) => {
1930    ///         db.rollback_transaction(tx_id)?;
1931    ///         println!("Transfer failed: {}, changes rolled back", e);
1932    ///     }
1933    /// }
1934    /// ```
1935    pub fn rollback_transaction(&self, tx_id: crate::transaction::TransactionId) -> Result<()> {
1936        self.transaction_manager.rollback(tx_id)
1937    }
1938
1939    /// Create a secondary index on a field for faster queries
1940    ///
1941    /// Indexes dramatically improve query performance for frequently accessed fields,
1942    /// trading increased memory usage and slower writes for much faster reads.
1943    ///
1944    /// # When to Create Indexes
1945    /// - **Frequent queries**: Fields used in 80%+ of your queries
1946    /// - **High cardinality**: Fields with many unique values (user_id, email)
1947    /// - **Sorting/filtering**: Fields used in ORDER BY or WHERE clauses
1948    /// - **Large collections**: Most beneficial with 10,000+ documents
1949    ///
1950    /// # When NOT to Index
1951    /// - Low cardinality fields (e.g., boolean flags, small enums)
1952    /// - Rarely queried fields
1953    /// - Fields that change frequently (write-heavy workloads)
1954    /// - Small collections (<1,000 documents) - full scans are fast enough
1955    ///
1956    /// # Performance Characteristics
1957    /// - **Query speedup**: O(n) → O(1) for equality filters
1958    /// - **Memory cost**: ~100-200 bytes per document per index
1959    /// - **Write slowdown**: ~20-30% longer insert/update times
1960    /// - **Build time**: ~5,000 docs/sec for initial indexing
1961    ///
1962    /// # Arguments
1963    /// * `collection` - Name of the collection to index
1964    /// * `field` - Name of the field to index
1965    ///
1966    /// # Examples
1967    ///
1968    /// ```
1969    /// use aurora_db::Aurora;
1970    ///
1971    /// let db = Aurora::open("mydb.db")?;
1972    /// db.new_collection("users", vec![
1973    ///     ("email", FieldType::String),
1974    ///     ("age", FieldType::Int),
1975    ///     ("active", FieldType::Bool),
1976    /// ])?;
1977    ///
1978    /// // Index email - frequently queried, high cardinality
1979    /// db.create_index("users", "email").await?;
1980    ///
1981    /// // Now this query is FAST (O(1) instead of O(n))
1982    /// let user = db.query("users")
1983    ///     .filter(|f| f.eq("email", "alice@example.com"))
1984    ///     .first_one()
1985    ///     .await?;
1986    ///
1987    /// // DON'T index 'active' - low cardinality (only 2 values: true/false)
1988    /// // A full scan is fast enough for boolean fields
1989    ///
1990    /// // DO index 'age' if you frequently query age ranges
1991    /// db.create_index("users", "age").await?;
1992    ///
1993    /// let young_users = db.query("users")
1994    ///     .filter(|f| f.lt("age", 30))
1995    ///     .collect()
1996    ///     .await?;
1997    /// ```
1998    ///
1999    /// # Real-World Example: E-commerce Orders
2000    ///
2001    /// ```
2002    /// // Orders collection: 1 million documents
2003    /// db.new_collection("orders", vec![
2004    ///     ("user_id", FieldType::String),    // High cardinality
2005    ///     ("status", FieldType::String),      // Low cardinality (pending, shipped, delivered)
2006    ///     ("created_at", FieldType::String),
2007    ///     ("total", FieldType::Float),
2008    /// ])?;
2009    ///
2010    /// // Index user_id - queries like "show me my orders" are common
2011    /// db.create_index("orders", "user_id").await?;  // Good choice
2012    ///
2013    /// // Query speedup: 2.5s → 0.001s
2014    /// let my_orders = db.query("orders")
2015    ///     .filter(|f| f.eq("user_id", user_id))
2016    ///     .collect()
2017    ///     .await?;
2018    ///
2019    /// // DON'T index 'status' - only 3 possible values
2020    /// // Scanning 1M docs takes ~100ms, indexing won't help much
2021    ///
2022    /// // Index created_at if you frequently query recent orders
2023    /// db.create_index("orders", "created_at").await?;  // Good for time-based queries
2024    /// ```
2025    pub async fn create_index(&self, collection: &str, field: &str) -> Result<()> {
2026        // Check if collection exists
2027        if self.get(&format!("_collection:{}", collection))?.is_none() {
2028            return Err(AuroraError::CollectionNotFound(collection.to_string()));
2029        }
2030
2031        // Generate a default index name
2032        let index_name = format!("idx_{}_{}", collection, field);
2033
2034        // Create index definition
2035        let definition = IndexDefinition {
2036            name: index_name.clone(),
2037            collection: collection.to_string(),
2038            fields: vec![field.to_string()],
2039            index_type: IndexType::BTree,
2040            unique: false,
2041        };
2042
2043        // Create the index
2044        let index = Index::new(definition.clone());
2045
2046        // Index all existing documents in the collection
2047        let prefix = format!("{}:", collection);
2048        for result in self.cold.scan_prefix(&prefix) {
2049            if let Ok((_, data)) = result
2050                && let Ok(doc) = serde_json::from_slice::<Document>(&data) {
2051                    let _ = index.insert(&doc);
2052                }
2053        }
2054
2055        // Store the index
2056        self.indices.insert(index_name, index);
2057
2058        // Store the index definition for persistence
2059        let index_key = format!("_index:{}:{}", collection, field);
2060        self.put(index_key, serde_json::to_vec(&definition)?, None)?;
2061
2062        Ok(())
2063    }
2064
2065    /// Query documents in a collection with filtering, sorting, and pagination
2066    ///
2067    /// Returns a `QueryBuilder` that allows fluent chaining of query operations.
2068    /// Queries use early termination for LIMIT clauses, making them extremely fast
2069    /// even on large collections (6,800x faster than naive implementations).
2070    ///
2071    /// # Performance
2072    /// - With LIMIT: O(k) where k = limit + offset (early termination!)
2073    /// - Without LIMIT: O(n) where n = matching documents
2074    /// - Uses secondary indices when available for equality filters
2075    /// - Hot cache: ~1M reads/sec, Cold storage: ~500K reads/sec
2076    ///
2077    /// # Examples
2078    ///
2079    /// ```
2080    /// use aurora_db::{Aurora, types::Value};
2081    ///
2082    /// let db = Aurora::open("mydb.db")?;
2083    ///
2084    /// // Simple equality query
2085    /// let active_users = db.query("users")
2086    ///     .filter(|f| f.eq("active", Value::Bool(true)))
2087    ///     .collect()
2088    ///     .await?;
2089    ///
2090    /// // Range query with pagination (FAST - uses early termination!)
2091    /// let top_scorers = db.query("users")
2092    ///     .filter(|f| f.gt("score", Value::Int(1000)))
2093    ///     .order_by("score", false)  // descending
2094    ///     .limit(10)
2095    ///     .offset(20)
2096    ///     .collect()
2097    ///     .await?;
2098    ///
2099    /// // Multiple filters
2100    /// let premium_active = db.query("users")
2101    ///     .filter(|f| f.eq("tier", Value::String("premium".into())))
2102    ///     .filter(|f| f.eq("active", Value::Bool(true)))
2103    ///     .limit(100)  // Only scans ~200 docs, not all million!
2104    ///     .collect()
2105    ///     .await?;
2106    ///
2107    /// // Text search in a field
2108    /// let matching = db.query("articles")
2109    ///     .filter(|f| f.contains("title", "rust"))
2110    ///     .collect()
2111    ///     .await?;
2112    /// ```
2113    pub fn query<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
2114        QueryBuilder::new(self, collection)
2115    }
2116
2117    /// Create a search builder for full-text search
2118    ///
2119    /// # Arguments
2120    /// * `collection` - Name of the collection to search
2121    ///
2122    /// # Returns
2123    /// A `SearchBuilder` for configuring and executing searches
2124    ///
2125    /// # Examples
2126    ///
2127    /// ```
2128    /// // Search for documents containing text
2129    /// let search_results = db.search("articles")
2130    ///     .field("content")
2131    ///     .matching("quantum computing")
2132    ///     .fuzzy(true)  // Enable fuzzy matching for typo tolerance
2133    ///     .collect()
2134    ///     .await?;
2135    /// ```
2136    pub fn search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
2137        SearchBuilder::new(self, collection)
2138    }
2139
2140    /// Retrieve a document by ID
2141    ///
2142    /// Fast direct lookup when you know the document ID. Significantly faster
2143    /// than querying with filters when ID is known.
2144    ///
2145    /// # Performance
2146    /// - Hot cache: ~1,000,000 reads/sec (instant)
2147    /// - Cold storage: ~500,000 reads/sec (disk I/O)
2148    /// - Complexity: O(1) - constant time lookup
2149    /// - Much faster than `.query().filter(|f| f.eq("id", ...))` which is O(n)
2150    ///
2151    /// # Arguments
2152    /// * `collection` - Name of the collection to query
2153    /// * `id` - ID of the document to retrieve
2154    ///
2155    /// # Returns
2156    /// The document if found, None if not found, or an error
2157    ///
2158    /// # Examples
2159    ///
2160    /// ```
2161    /// use aurora_db::{Aurora, types::Value};
2162    ///
2163    /// let db = Aurora::open("mydb.db")?;
2164    ///
2165    /// // Basic retrieval
2166    /// if let Some(user) = db.get_document("users", &user_id)? {
2167    ///     println!("Found user: {}", user.id);
2168    ///
2169    ///     // Access fields safely
2170    ///     if let Some(Value::String(name)) = user.data.get("name") {
2171    ///         println!("Name: {}", name);
2172    ///     }
2173    ///
2174    ///     if let Some(Value::Int(age)) = user.data.get("age") {
2175    ///         println!("Age: {}", age);
2176    ///     }
2177    /// } else {
2178    ///     println!("User not found");
2179    /// }
2180    ///
2181    /// // Idiomatic error handling
2182    /// let user = db.get_document("users", &user_id)?
2183    ///     .ok_or_else(|| AuroraError::NotFound("User not found".into()))?;
2184    ///
2185    /// // Checking existence before operations
2186    /// if db.get_document("users", &user_id)?.is_some() {
2187    ///     db.update_document("users", &user_id, vec![
2188    ///         ("last_login", Value::String(chrono::Utc::now().to_rfc3339())),
2189    ///     ]).await?;
2190    /// }
2191    ///
2192    /// // Batch retrieval (fetch multiple by ID)
2193    /// let user_ids = vec!["user-1", "user-2", "user-3"];
2194    /// let users: Vec<Document> = user_ids.iter()
2195    ///     .filter_map(|id| db.get_document("users", id).ok().flatten())
2196    ///     .collect();
2197    ///
2198    /// println!("Found {} out of {} users", users.len(), user_ids.len());
2199    /// ```
2200    ///
2201    /// # When to Use
2202    /// - You know the document ID (from insert, previous query, or URL param)
2203    /// - Need fastest possible lookup (1M reads/sec)
2204    /// - Fetching a single document
2205    ///
2206    /// # When NOT to Use
2207    /// - Searching by other fields → Use `query().filter()` instead
2208    /// - Need multiple documents by criteria → Use `query().collect()` instead
2209    /// - Don't know the ID → Use `find_by_field()` or `query()` instead
2210    pub fn get_document(&self, collection: &str, id: &str) -> Result<Option<Document>> {
2211        let key = format!("{}:{}", collection, id);
2212        if let Some(data) = self.get(&key)? {
2213            Ok(Some(serde_json::from_slice(&data)?))
2214        } else {
2215            Ok(None)
2216        }
2217    }
2218
2219    /// Delete a document by ID
2220    ///
2221    /// Permanently removes a document from storage, cache, and all indices.
2222    /// Publishes a delete event for PubSub subscribers. This operation cannot be undone.
2223    ///
2224    /// # Performance
2225    /// - Delete speed: ~50,000 deletes/sec
2226    /// - Cleans up hot cache, cold storage, primary + secondary indices
2227    /// - Triggers PubSub events for listeners
2228    ///
2229    /// # Arguments
2230    /// * `key` - Full key in format "collection:id" (e.g., "users:123")
2231    ///
2232    /// # Returns
2233    /// Success or an error
2234    ///
2235    /// # Errors
2236    /// - `InvalidOperation`: Invalid key format (must be "collection:id")
2237    /// - `IoError`: Storage deletion failed
2238    ///
2239    /// # Examples
2240    ///
2241    /// ```
2242    /// use aurora_db::Aurora;
2243    ///
2244    /// let db = Aurora::open("mydb.db")?;
2245    ///
2246    /// // Basic deletion (note: requires "collection:id" format)
2247    /// db.delete("users:abc123").await?;
2248    ///
2249    /// // Delete with existence check
2250    /// let user_id = "user-456";
2251    /// if db.get_document("users", user_id)?.is_some() {
2252    ///     db.delete(&format!("users:{}", user_id)).await?;
2253    ///     println!("User deleted");
2254    /// } else {
2255    ///     println!("User not found");
2256    /// }
2257    ///
2258    /// // Error handling
2259    /// match db.delete("users:nonexistent").await {
2260    ///     Ok(_) => println!("Deleted successfully"),
2261    ///     Err(e) => println!("Delete failed: {}", e),
2262    /// }
2263    ///
2264    /// // Batch deletion using query
2265    /// let inactive_count = db.delete_where("users", |f| {
2266    ///     f.eq("active", Value::Bool(false))
2267    /// }).await?;
2268    /// println!("Deleted {} inactive users", inactive_count);
2269    ///
2270    /// // Delete with cascading (manual cascade pattern)
2271    /// let user_id = "user-123";
2272    ///
2273    /// // Delete user's orders first
2274    /// let orders = db.query("orders")
2275    ///     .filter(|f| f.eq("user_id", user_id))
2276    ///     .collect()
2277    ///     .await?;
2278    ///
2279    /// for order in orders {
2280    ///     db.delete(&format!("orders:{}", order.id)).await?;
2281    /// }
2282    ///
2283    /// // Then delete the user
2284    /// db.delete(&format!("users:{}", user_id)).await?;
2285    /// println!("User and all orders deleted");
2286    /// ```
2287    ///
2288    /// # Alternative: Soft Delete Pattern
2289    ///
2290    /// For recoverable deletions, use soft deletes instead:
2291    ///
2292    /// ```
2293    /// // Soft delete - mark as deleted instead of removing
2294    /// db.update_document("users", &user_id, vec![
2295    ///     ("deleted", Value::Bool(true)),
2296    ///     ("deleted_at", Value::String(chrono::Utc::now().to_rfc3339())),
2297    /// ]).await?;
2298    ///
2299    /// // Query excludes soft-deleted items
2300    /// let active_users = db.query("users")
2301    ///     .filter(|f| f.eq("deleted", Value::Bool(false)))
2302    ///     .collect()
2303    ///     .await?;
2304    ///
2305    /// // Later: hard delete after retention period
2306    /// let old_deletions = db.query("users")
2307    ///     .filter(|f| f.eq("deleted", Value::Bool(true)))
2308    ///     .filter(|f| f.lt("deleted_at", thirty_days_ago))
2309    ///     .collect()
2310    ///     .await?;
2311    ///
2312    /// for user in old_deletions {
2313    ///     db.delete(&format!("users:{}", user.id)).await?;
2314    /// }
2315    /// ```
2316    ///
2317    /// # Important Notes
2318    /// - Deletion is permanent - no undo/recovery
2319    /// - Consider soft deletes for recoverable operations
2320    /// - Use transactions for multi-document deletions
2321    /// - PubSub subscribers will receive delete events
2322    /// - All indices are automatically cleaned up
2323    pub async fn delete(&self, key: &str) -> Result<()> {
2324        // Extract collection and id from key (format: "collection:id")
2325        let (collection, id) = if let Some((coll, doc_id)) = key.split_once(':') {
2326            (coll, doc_id)
2327        } else {
2328            return Err(AuroraError::InvalidOperation(
2329                "Invalid key format, expected 'collection:id'".into(),
2330            ));
2331        };
2332
2333        // CRITICAL FIX: Get document BEFORE deletion to clean up secondary indices
2334        let document = self.get_document(collection, id)?;
2335
2336        // Delete from hot cache
2337        if self.hot.get(key).is_some() {
2338            self.hot.delete(key);
2339        }
2340
2341        // Delete from cold storage
2342        self.cold.delete(key)?;
2343
2344        // CRITICAL FIX: Clean up ALL indices (primary + secondary)
2345        if let Some(doc) = document {
2346            self.remove_from_indices(collection, &doc)?;
2347        } else {
2348            // Fallback: at least remove from primary index
2349            if let Some(index) = self.primary_indices.get_mut(collection) {
2350                index.remove(id);
2351            }
2352        }
2353
2354        // Publish delete event
2355        let event = crate::pubsub::ChangeEvent::delete(collection, id);
2356        let _ = self.pubsub.publish(event);
2357
2358        Ok(())
2359    }
2360
2361    pub async fn delete_collection(&self, collection: &str) -> Result<()> {
2362        let prefix = format!("{}:", collection);
2363
2364        // Get all keys in collection
2365        let keys: Vec<String> = self
2366            .cold
2367            .scan()
2368            .filter_map(|r| r.ok())
2369            .filter(|(k, _)| k.starts_with(&prefix))
2370            .map(|(k, _)| k)
2371            .collect();
2372
2373        // Delete each key
2374        for key in keys {
2375            self.delete(&key).await?;
2376        }
2377
2378        // Remove collection indices
2379        self.primary_indices.remove(collection);
2380        self.secondary_indices
2381            .retain(|k, _| !k.starts_with(&prefix));
2382
2383        // Invalidate schema cache
2384        self.schema_cache.remove(collection);
2385
2386        Ok(())
2387    }
2388
2389    fn remove_from_indices(&self, collection: &str, doc: &Document) -> Result<()> {
2390        // Remove from primary index
2391        if let Some(index) = self.primary_indices.get(collection) {
2392            index.remove(&doc.id);
2393        }
2394
2395        // Remove from secondary indices
2396        for (field, value) in &doc.data {
2397            let index_key = format!("{}:{}", collection, field);
2398            if let Some(index) = self.secondary_indices.get(&index_key)
2399                && let Some(mut doc_ids) = index.get_mut(&value.to_string()) {
2400                    doc_ids.retain(|id| id != &doc.id);
2401                }
2402        }
2403
2404        Ok(())
2405    }
2406
2407    pub async fn search_text(
2408        &self,
2409        collection: &str,
2410        field: &str,
2411        query: &str,
2412    ) -> Result<Vec<Document>> {
2413        let mut results = Vec::new();
2414        let docs = self.get_all_collection(collection).await?;
2415
2416        for doc in docs {
2417            if let Some(Value::String(text)) = doc.data.get(field)
2418                && text.to_lowercase().contains(&query.to_lowercase()) {
2419                    results.push(doc);
2420                }
2421        }
2422
2423        Ok(results)
2424    }
2425
2426    /// Export a collection to a JSON file
2427    ///
2428    /// Creates a JSON file containing all documents in the collection.
2429    /// Useful for backups, data migration, or sharing datasets.
2430    /// Automatically appends `.json` extension if not present.
2431    ///
2432    /// # Performance
2433    /// - Export speed: ~10,000 docs/sec
2434    /// - Scans entire collection from cold storage
2435    /// - Memory efficient: streams documents to file
2436    ///
2437    /// # Arguments
2438    /// * `collection` - Name of the collection to export
2439    /// * `output_path` - Path to the output JSON file (`.json` auto-appended)
2440    ///
2441    /// # Returns
2442    /// Success or an error
2443    ///
2444    /// # Examples
2445    ///
2446    /// ```
2447    /// use aurora_db::Aurora;
2448    ///
2449    /// let db = Aurora::open("mydb.db")?;
2450    ///
2451    /// // Basic export
2452    /// db.export_as_json("users", "./backups/users_2024-01-15")?;
2453    /// // Creates: ./backups/users_2024-01-15.json
2454    ///
2455    /// // Timestamped backup
2456    /// let timestamp = chrono::Utc::now().format("%Y%m%d_%H%M%S");
2457    /// let backup_path = format!("./backups/users_{}", timestamp);
2458    /// db.export_as_json("users", &backup_path)?;
2459    ///
2460    /// // Export multiple collections
2461    /// for collection in &["users", "orders", "products"] {
2462    ///     db.export_as_json(collection, &format!("./export/{}", collection))?;
2463    /// }
2464    /// ```
2465    ///
2466    /// # Output Format
2467    ///
2468    /// The exported JSON has this structure:
2469    /// ```json
2470    /// {
2471    ///   "users": [
2472    ///     { "id": "123", "name": "Alice", "email": "alice@example.com" },
2473    ///     { "id": "456", "name": "Bob", "email": "bob@example.com" }
2474    ///   ]
2475    /// }
2476    /// ```
2477    ///
2478    /// # See Also
2479    /// - `export_as_csv()` for CSV format export
2480    /// - `import_from_json()` to restore exported data
2481    pub fn export_as_json(&self, collection: &str, output_path: &str) -> Result<()> {
2482        let output_path = if !output_path.ends_with(".json") {
2483            format!("{}.json", output_path)
2484        } else {
2485            output_path.to_string()
2486        };
2487
2488        let mut docs = Vec::new();
2489
2490        // Get all documents from the specified collection
2491        for result in self.cold.scan() {
2492            let (key, value) = result?;
2493
2494            // Only process documents from the specified collection
2495            if let Some(key_collection) = key.split(':').next()
2496                && key_collection == collection && !key.starts_with("_collection:")
2497                    && let Ok(doc) = serde_json::from_slice::<Document>(&value) {
2498                        // Convert Value enum to raw JSON values
2499                        let mut clean_doc = serde_json::Map::new();
2500                        for (k, v) in doc.data {
2501                            match v {
2502                                Value::String(s) => clean_doc.insert(k, JsonValue::String(s)),
2503                                Value::Int(i) => clean_doc.insert(k, JsonValue::Number(i.into())),
2504                                Value::Float(f) => {
2505                                    if let Some(n) = serde_json::Number::from_f64(f) {
2506                                        clean_doc.insert(k, JsonValue::Number(n))
2507                                    } else {
2508                                        clean_doc.insert(k, JsonValue::Null)
2509                                    }
2510                                }
2511                                Value::Bool(b) => clean_doc.insert(k, JsonValue::Bool(b)),
2512                                Value::Array(arr) => {
2513                                    let clean_arr: Vec<JsonValue> = arr
2514                                        .into_iter()
2515                                        .map(|v| match v {
2516                                            Value::String(s) => JsonValue::String(s),
2517                                            Value::Int(i) => JsonValue::Number(i.into()),
2518                                            Value::Float(f) => serde_json::Number::from_f64(f)
2519                                                .map(JsonValue::Number)
2520                                                .unwrap_or(JsonValue::Null),
2521                                            Value::Bool(b) => JsonValue::Bool(b),
2522                                            Value::Null => JsonValue::Null,
2523                                            _ => JsonValue::Null,
2524                                        })
2525                                        .collect();
2526                                    clean_doc.insert(k, JsonValue::Array(clean_arr))
2527                                }
2528                                Value::Uuid(u) => {
2529                                    clean_doc.insert(k, JsonValue::String(u.to_string()))
2530                                }
2531                                Value::Null => clean_doc.insert(k, JsonValue::Null),
2532                                Value::Object(_) => None, // Handle nested objects if needed
2533                            };
2534                        }
2535                        docs.push(JsonValue::Object(clean_doc));
2536                    }
2537        }
2538
2539        let output = JsonValue::Object(serde_json::Map::from_iter(vec![(
2540            collection.to_string(),
2541            JsonValue::Array(docs),
2542        )]));
2543
2544        let mut file = StdFile::create(&output_path)?;
2545        serde_json::to_writer_pretty(&mut file, &output)?;
2546        println!("Exported collection '{}' to {}", collection, &output_path);
2547        Ok(())
2548    }
2549
2550    /// Export a collection to a CSV file
2551    ///
2552    /// Creates a CSV file with headers from the first document and rows for each document.
2553    /// Useful for spreadsheet analysis, data science workflows, or reporting.
2554    /// Automatically appends `.csv` extension if not present.
2555    ///
2556    /// # Performance
2557    /// - Export speed: ~8,000 docs/sec
2558    /// - Memory efficient: streams rows to file
2559    /// - Headers determined from first document
2560    ///
2561    /// # Arguments
2562    /// * `collection` - Name of the collection to export
2563    /// * `filename` - Path to the output CSV file (`.csv` auto-appended)
2564    ///
2565    /// # Returns
2566    /// Success or an error
2567    ///
2568    /// # Examples
2569    ///
2570    /// ```
2571    /// use aurora_db::Aurora;
2572    ///
2573    /// let db = Aurora::open("mydb.db")?;
2574    ///
2575    /// // Basic CSV export
2576    /// db.export_as_csv("users", "./reports/users")?;
2577    /// // Creates: ./reports/users.csv
2578    ///
2579    /// // Export for analysis in Excel/Google Sheets
2580    /// db.export_as_csv("orders", "./analytics/sales_data")?;
2581    ///
2582    /// // Monthly report generation
2583    /// let month = chrono::Utc::now().format("%Y-%m");
2584    /// db.export_as_csv("transactions", &format!("./reports/transactions_{}", month))?;
2585    /// ```
2586    ///
2587    /// # Output Format
2588    ///
2589    /// ```csv
2590    /// id,name,email,age
2591    /// 123,Alice,alice@example.com,28
2592    /// 456,Bob,bob@example.com,32
2593    /// ```
2594    ///
2595    /// # Important Notes
2596    /// - Headers are taken from the first document's fields
2597    /// - Documents with different fields will have empty values for missing fields
2598    /// - Nested objects/arrays are converted to strings
2599    /// - Best for flat document structures
2600    ///
2601    /// # See Also
2602    /// - `export_as_json()` for JSON format (better for nested data)
2603    /// - For complex nested structures, use JSON export instead
2604    pub fn export_as_csv(&self, collection: &str, filename: &str) -> Result<()> {
2605        let output_path = if !filename.ends_with(".csv") {
2606            format!("{}.csv", filename)
2607        } else {
2608            filename.to_string()
2609        };
2610
2611        let mut writer = csv::Writer::from_path(&output_path)?;
2612        let mut headers = Vec::new();
2613        let mut first_doc = true;
2614
2615        // Get all documents from the specified collection
2616        for result in self.cold.scan() {
2617            let (key, value) = result?;
2618
2619            // Only process documents from the specified collection
2620            if let Some(key_collection) = key.split(':').next()
2621                && key_collection == collection && !key.starts_with("_collection:")
2622                    && let Ok(doc) = serde_json::from_slice::<Document>(&value) {
2623                        // Write headers from first document
2624                        if first_doc && !doc.data.is_empty() {
2625                            headers = doc.data.keys().cloned().collect();
2626                            writer.write_record(&headers)?;
2627                            first_doc = false;
2628                        }
2629
2630                        // Write the document values
2631                        let values: Vec<String> = headers
2632                            .iter()
2633                            .map(|header| {
2634                                doc.data
2635                                    .get(header)
2636                                    .map(|v| v.to_string())
2637                                    .unwrap_or_default()
2638                            })
2639                            .collect();
2640                        writer.write_record(&values)?;
2641                    }
2642        }
2643
2644        writer.flush()?;
2645        println!("Exported collection '{}' to {}", collection, &output_path);
2646        Ok(())
2647    }
2648
2649    // Helper method to create filter-based queries
2650    pub fn find<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
2651        self.query(collection)
2652    }
2653
2654    // Convenience methods that build on top of the FilterBuilder
2655
2656    pub async fn find_by_id(&self, collection: &str, id: &str) -> Result<Option<Document>> {
2657        self.query(collection)
2658            .filter(|f| f.eq("id", id))
2659            .first_one()
2660            .await
2661    }
2662
2663    pub async fn find_one<F>(&self, collection: &str, filter_fn: F) -> Result<Option<Document>>
2664    where
2665        F: Fn(&FilterBuilder) -> bool + Send + Sync + 'static,
2666    {
2667        self.query(collection).filter(filter_fn).first_one().await
2668    }
2669
2670    pub async fn find_by_field<T: Into<Value> + Clone + Send + Sync + 'static>(
2671        &self,
2672        collection: &str,
2673        field: &'static str,
2674        value: T,
2675    ) -> Result<Vec<Document>> {
2676        let value_clone = value.clone();
2677        self.query(collection)
2678            .filter(move |f| f.eq(field, value_clone.clone()))
2679            .collect()
2680            .await
2681    }
2682
2683    pub async fn find_by_fields(
2684        &self,
2685        collection: &str,
2686        fields: Vec<(&str, Value)>,
2687    ) -> Result<Vec<Document>> {
2688        let mut query = self.query(collection);
2689
2690        for (field, value) in fields {
2691            let field_owned = field.to_owned();
2692            let value_owned = value.clone();
2693            query = query.filter(move |f| f.eq(&field_owned, value_owned.clone()));
2694        }
2695
2696        query.collect().await
2697    }
2698
2699    // Advanced example: find documents with a field value in a specific range
2700    pub async fn find_in_range<T: Into<Value> + Clone + Send + Sync + 'static>(
2701        &self,
2702        collection: &str,
2703        field: &'static str,
2704        min: T,
2705        max: T,
2706    ) -> Result<Vec<Document>> {
2707        self.query(collection)
2708            .filter(move |f| f.between(field, min.clone(), max.clone()))
2709            .collect()
2710            .await
2711    }
2712
2713    // Complex query example: build with multiple combined filters
2714    pub async fn find_complex<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
2715        self.query(collection)
2716    }
2717
2718    // Create a full-text search query with added filter options
2719    pub fn advanced_search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
2720        self.search(collection)
2721    }
2722
2723    // Utility methods for common operations
2724    pub async fn upsert(
2725        &self,
2726        collection: &str,
2727        id: &str,
2728        data: Vec<(&str, Value)>,
2729    ) -> Result<String> {
2730        // Convert Vec<(&str, Value)> to HashMap<String, Value>
2731        let data_map: HashMap<String, Value> =
2732            data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
2733
2734        // Check if document exists
2735        if let Some(mut doc) = self.get_document(collection, id)? {
2736            // Update existing document - merge new data
2737            for (key, value) in data_map {
2738                doc.data.insert(key, value);
2739            }
2740
2741            // Validate unique constraints for the updated document
2742            // We need to exclude the current document from the uniqueness check
2743            self.validate_unique_constraints_excluding(collection, &doc.data, id)
2744                .await?;
2745
2746            self.put(
2747                format!("{}:{}", collection, id),
2748                serde_json::to_vec(&doc)?,
2749                None,
2750            )?;
2751            Ok(id.to_string())
2752        } else {
2753            // Insert new document with specified ID - validate unique constraints
2754            self.validate_unique_constraints(collection, &data_map)
2755                .await?;
2756
2757            let document = Document {
2758                id: id.to_string(),
2759                data: data_map,
2760            };
2761
2762            self.put(
2763                format!("{}:{}", collection, id),
2764                serde_json::to_vec(&document)?,
2765                None,
2766            )?;
2767            Ok(id.to_string())
2768        }
2769    }
2770
2771    // Atomic increment/decrement
2772    pub async fn increment(
2773        &self,
2774        collection: &str,
2775        id: &str,
2776        field: &str,
2777        amount: i64,
2778    ) -> Result<i64> {
2779        if let Some(mut doc) = self.get_document(collection, id)? {
2780            // Get current value
2781            let current = match doc.data.get(field) {
2782                Some(Value::Int(i)) => *i,
2783                _ => 0,
2784            };
2785
2786            // Increment
2787            let new_value = current + amount;
2788            doc.data.insert(field.to_string(), Value::Int(new_value));
2789
2790            // Save changes
2791            self.put(
2792                format!("{}:{}", collection, id),
2793                serde_json::to_vec(&doc)?,
2794                None,
2795            )?;
2796
2797            Ok(new_value)
2798        } else {
2799            Err(AuroraError::NotFound(format!(
2800                "Document {}:{} not found",
2801                collection, id
2802            )))
2803        }
2804    }
2805
2806    // Delete documents by query
2807    pub async fn delete_by_query<F>(&self, collection: &str, filter_fn: F) -> Result<usize>
2808    where
2809        F: Fn(&FilterBuilder) -> bool + Send + Sync + 'static,
2810    {
2811        let docs = self.query(collection).filter(filter_fn).collect().await?;
2812
2813        let mut deleted_count = 0;
2814
2815        for doc in docs {
2816            let key = format!("{}:{}", collection, doc.id);
2817            self.delete(&key).await?;
2818            deleted_count += 1;
2819        }
2820
2821        Ok(deleted_count)
2822    }
2823
2824    /// Import documents from a JSON file into a collection
2825    ///
2826    /// Validates each document against the collection schema, skips duplicates (by ID),
2827    /// and provides detailed statistics about the import operation. Useful for restoring
2828    /// backups, migrating data, or seeding development databases.
2829    ///
2830    /// # Performance
2831    /// - Import speed: ~5,000 docs/sec (with validation)
2832    /// - Memory efficient: processes documents one at a time
2833    /// - Validates schema and unique constraints
2834    ///
2835    /// # Arguments
2836    /// * `collection` - Name of the collection to import into
2837    /// * `filename` - Path to the JSON file containing documents (array format)
2838    ///
2839    /// # Returns
2840    /// `ImportStats` containing counts of imported, skipped, and failed documents
2841    ///
2842    /// # Examples
2843    ///
2844    /// ```
2845    /// use aurora_db::Aurora;
2846    ///
2847    /// let db = Aurora::open("mydb.db")?;
2848    ///
2849    /// // Basic import
2850    /// let stats = db.import_from_json("users", "./data/new_users.json").await?;
2851    /// println!("Imported: {}, Skipped: {}, Failed: {}",
2852    ///     stats.imported, stats.skipped, stats.failed);
2853    ///
2854    /// // Restore from backup
2855    /// let backup_file = "./backups/users_2024-01-15.json";
2856    /// let stats = db.import_from_json("users", backup_file).await?;
2857    ///
2858    /// if stats.failed > 0 {
2859    ///     eprintln!("Warning: {} documents failed validation", stats.failed);
2860    /// }
2861    ///
2862    /// // Idempotent import - duplicates are skipped
2863    /// let stats = db.import_from_json("users", "./data/users.json").await?;
2864    /// // Running again will skip all existing documents
2865    /// let stats2 = db.import_from_json("users", "./data/users.json").await?;
2866    /// assert_eq!(stats2.skipped, stats.imported);
2867    ///
2868    /// // Migration from another system
2869    /// db.new_collection("products", vec![
2870    ///     ("sku", FieldType::String),
2871    ///     ("name", FieldType::String),
2872    ///     ("price", FieldType::Float),
2873    /// ])?;
2874    ///
2875    /// let stats = db.import_from_json("products", "./migration/products.json").await?;
2876    /// println!("Migration complete: {} products imported", stats.imported);
2877    /// ```
2878    ///
2879    /// # Expected JSON Format
2880    ///
2881    /// The JSON file should contain an array of document objects:
2882    /// ```json
2883    /// [
2884    ///   { "id": "123", "name": "Alice", "email": "alice@example.com" },
2885    ///   { "id": "456", "name": "Bob", "email": "bob@example.com" },
2886    ///   { "name": "Carol", "email": "carol@example.com" }
2887    /// ]
2888    /// ```
2889    ///
2890    /// # Behavior
2891    /// - Documents with existing IDs are skipped (duplicate detection)
2892    /// - Documents without IDs get auto-generated UUIDs
2893    /// - Schema validation is performed on all fields
2894    /// - Failed documents are counted but don't stop the import
2895    /// - Unique constraints are checked
2896    ///
2897    /// # See Also
2898    /// - `export_as_json()` to create compatible backup files
2899    /// - `batch_insert()` for programmatic bulk inserts
2900    pub async fn import_from_json(&self, collection: &str, filename: &str) -> Result<ImportStats> {
2901        // Validate that the collection exists
2902        let collection_def = self.get_collection_definition(collection)?;
2903
2904        // Load JSON file
2905        let json_string = read_to_string(filename)
2906            .await
2907            .map_err(|e| AuroraError::IoError(format!("Failed to read import file: {}", e)))?;
2908
2909        // Parse JSON
2910        let documents: Vec<JsonValue> = from_str(&json_string)
2911            .map_err(|e| AuroraError::SerializationError(format!("Failed to parse JSON: {}", e)))?;
2912
2913        let mut stats = ImportStats::default();
2914
2915        // Process each document
2916        for doc_json in documents {
2917            match self
2918                .import_document(collection, &collection_def, doc_json)
2919                .await
2920            {
2921                Ok(ImportResult::Imported) => stats.imported += 1,
2922                Ok(ImportResult::Skipped) => stats.skipped += 1,
2923                Err(_) => stats.failed += 1,
2924            }
2925        }
2926
2927        Ok(stats)
2928    }
2929
2930    /// Import a single document, performing schema validation and duplicate checking
2931    async fn import_document(
2932        &self,
2933        collection: &str,
2934        collection_def: &Collection,
2935        doc_json: JsonValue,
2936    ) -> Result<ImportResult> {
2937        if !doc_json.is_object() {
2938            return Err(AuroraError::InvalidOperation("Expected JSON object".into()));
2939        }
2940
2941        // Extract document ID if present
2942        let doc_id = doc_json
2943            .get("id")
2944            .and_then(|id| id.as_str())
2945            .map(|s| s.to_string())
2946            .unwrap_or_else(|| Uuid::new_v4().to_string());
2947
2948        // Check if document with this ID already exists
2949        if self.get_document(collection, &doc_id)?.is_some() {
2950            return Ok(ImportResult::Skipped);
2951        }
2952
2953        // Convert JSON to our document format and validate against schema
2954        let mut data_map = HashMap::new();
2955
2956        if let Some(obj) = doc_json.as_object() {
2957            for (field_name, field_def) in &collection_def.fields {
2958                if let Some(json_value) = obj.get(field_name) {
2959                    // Validate value against field type
2960                    if !self.validate_field_value(json_value, &field_def.field_type) {
2961                        return Err(AuroraError::InvalidOperation(format!(
2962                            "Field '{}' has invalid type",
2963                            field_name
2964                        )));
2965                    }
2966
2967                    // Convert JSON value to our Value type
2968                    let value = self.json_to_value(json_value)?;
2969                    data_map.insert(field_name.clone(), value);
2970                } else if field_def.unique {
2971                    // Missing required unique field
2972                    return Err(AuroraError::InvalidOperation(format!(
2973                        "Missing required unique field '{}'",
2974                        field_name
2975                    )));
2976                }
2977            }
2978        }
2979
2980        // Check for duplicates by unique fields
2981        let unique_fields = self.get_unique_fields(collection_def);
2982        for unique_field in &unique_fields {
2983            if let Some(value) = data_map.get(unique_field) {
2984                // Query for existing documents with this unique value
2985                let query_results = self
2986                    .query(collection)
2987                    .filter(move |f| f.eq(unique_field, value.clone()))
2988                    .limit(1)
2989                    .collect()
2990                    .await?;
2991
2992                if !query_results.is_empty() {
2993                    // Found duplicate by unique field
2994                    return Ok(ImportResult::Skipped);
2995                }
2996            }
2997        }
2998
2999        // Create and insert document
3000        let document = Document {
3001            id: doc_id,
3002            data: data_map,
3003        };
3004
3005        self.put(
3006            format!("{}:{}", collection, document.id),
3007            serde_json::to_vec(&document)?,
3008            None,
3009        )?;
3010
3011        Ok(ImportResult::Imported)
3012    }
3013
3014    /// Validate that a JSON value matches the expected field type
3015    fn validate_field_value(&self, value: &JsonValue, field_type: &FieldType) -> bool {
3016        match field_type {
3017            FieldType::String => value.is_string(),
3018            FieldType::Int => value.is_i64() || value.is_u64(),
3019            FieldType::Float => value.is_number(),
3020            FieldType::Bool => value.is_boolean(),
3021            FieldType::Array => value.is_array(),
3022            FieldType::Object => value.is_object(),
3023            FieldType::Uuid => {
3024                value.is_string() && Uuid::parse_str(value.as_str().unwrap_or("")).is_ok()
3025            }
3026        }
3027    }
3028
3029    /// Convert a JSON value to our internal Value type
3030    #[allow(clippy::only_used_in_recursion)]
3031    fn json_to_value(&self, json_value: &JsonValue) -> Result<Value> {
3032        match json_value {
3033            JsonValue::Null => Ok(Value::Null),
3034            JsonValue::Bool(b) => Ok(Value::Bool(*b)),
3035            JsonValue::Number(n) => {
3036                if let Some(i) = n.as_i64() {
3037                    Ok(Value::Int(i))
3038                } else if let Some(f) = n.as_f64() {
3039                    Ok(Value::Float(f))
3040                } else {
3041                    Err(AuroraError::InvalidOperation("Invalid number value".into()))
3042                }
3043            }
3044            JsonValue::String(s) => {
3045                // Try parsing as UUID first
3046                if let Ok(uuid) = Uuid::parse_str(s) {
3047                    Ok(Value::Uuid(uuid))
3048                } else {
3049                    Ok(Value::String(s.clone()))
3050                }
3051            }
3052            JsonValue::Array(arr) => {
3053                let mut values = Vec::new();
3054                for item in arr {
3055                    values.push(self.json_to_value(item)?);
3056                }
3057                Ok(Value::Array(values))
3058            }
3059            JsonValue::Object(obj) => {
3060                let mut map = HashMap::new();
3061                for (k, v) in obj {
3062                    map.insert(k.clone(), self.json_to_value(v)?);
3063                }
3064                Ok(Value::Object(map))
3065            }
3066        }
3067    }
3068
3069    /// Get collection definition
3070    fn get_collection_definition(&self, collection: &str) -> Result<Collection> {
3071        if let Some(data) = self.get(&format!("_collection:{}", collection))? {
3072            let collection_def: Collection = serde_json::from_slice(&data)?;
3073            Ok(collection_def)
3074        } else {
3075            Err(AuroraError::CollectionNotFound(collection.to_string()))
3076        }
3077    }
3078
3079    /// Get storage statistics and information about the database
3080    pub fn get_database_stats(&self) -> Result<DatabaseStats> {
3081        let hot_stats = self.hot.get_stats();
3082        let cold_stats = self.cold.get_stats()?;
3083
3084        Ok(DatabaseStats {
3085            hot_stats,
3086            cold_stats,
3087            estimated_size: self.cold.estimated_size(),
3088            collections: self.get_collection_stats()?,
3089        })
3090    }
3091
3092    /// Check if a key is currently stored in the hot cache
3093    pub fn is_in_hot_cache(&self, key: &str) -> bool {
3094        self.hot.is_hot(key)
3095    }
3096
3097    /// Start background cleanup of hot cache with specified interval
3098    pub async fn start_hot_cache_maintenance(&self, interval_secs: u64) {
3099        let hot_store = Arc::new(self.hot.clone());
3100        hot_store.start_cleanup_with_interval(interval_secs).await;
3101    }
3102
3103    /// Clear the hot cache (useful when memory needs to be freed)
3104    pub fn clear_hot_cache(&self) {
3105        self.hot.clear();
3106        println!(
3107            "Hot cache cleared, current hit ratio: {:.2}%",
3108            self.hot.hit_ratio() * 100.0
3109        );
3110    }
3111
3112    /// Prewarm the cache by loading frequently accessed data from cold storage
3113    ///
3114    /// Loads documents from a collection into memory cache to eliminate cold-start
3115    /// latency. Dramatically improves initial query performance after database startup
3116    /// by preloading the most commonly accessed data.
3117    ///
3118    /// # Performance Impact
3119    /// - Prewarming speed: ~20,000 docs/sec
3120    /// - Improves subsequent read latency from ~2ms (disk) to ~0.001ms (memory)
3121    /// - Cache hit rate jumps from 0% to 95%+ for prewarmed data
3122    /// - Memory cost: ~500 bytes per document average
3123    ///
3124    /// # Arguments
3125    /// * `collection` - The collection to prewarm
3126    /// * `limit` - Maximum number of documents to load (default: 1000, None = all)
3127    ///
3128    /// # Returns
3129    /// Number of documents loaded into cache
3130    ///
3131    /// # Examples
3132    ///
3133    /// ```
3134    /// use aurora_db::Aurora;
3135    ///
3136    /// let db = Aurora::open("mydb.db")?;
3137    ///
3138    /// // Prewarm frequently accessed collection
3139    /// let loaded = db.prewarm_cache("users", Some(1000)).await?;
3140    /// println!("Prewarmed {} user documents", loaded);
3141    ///
3142    /// // Now queries are fast from the start
3143    /// let stats_before = db.get_cache_stats();
3144    /// let users = db.query("users").collect().await?;
3145    /// let stats_after = db.get_cache_stats();
3146    ///
3147    /// // High hit rate thanks to prewarming
3148    /// assert!(stats_after.hit_rate > 0.95);
3149    ///
3150    /// // Startup optimization pattern
3151    /// async fn startup_prewarm(db: &Aurora) -> Result<()> {
3152    ///     println!("Prewarming caches...");
3153    ///
3154    ///     // Prewarm most frequently accessed collections
3155    ///     db.prewarm_cache("users", Some(5000)).await?;
3156    ///     db.prewarm_cache("sessions", Some(1000)).await?;
3157    ///     db.prewarm_cache("products", Some(500)).await?;
3158    ///
3159    ///     let stats = db.get_cache_stats();
3160    ///     println!("Cache prewarmed: {} entries loaded", stats.size);
3161    ///
3162    ///     Ok(())
3163    /// }
3164    ///
3165    /// // Web server startup
3166    /// #[tokio::main]
3167    /// async fn main() {
3168    ///     let db = Aurora::open("app.db").unwrap();
3169    ///
3170    ///     // Prewarm before accepting requests
3171    ///     db.prewarm_cache("users", Some(10000)).await.unwrap();
3172    ///
3173    ///     // Server is now ready with hot cache
3174    ///     start_web_server(db).await;
3175    /// }
3176    ///
3177    /// // Prewarm all documents (for small collections)
3178    /// let all_loaded = db.prewarm_cache("config", None).await?;
3179    /// // All config documents now in memory
3180    ///
3181    /// // Selective prewarming based on access patterns
3182    /// async fn smart_prewarm(db: &Aurora) -> Result<()> {
3183    ///     // Load recent users (they're accessed most)
3184    ///     db.prewarm_cache("users", Some(1000)).await?;
3185    ///
3186    ///     // Load active sessions only
3187    ///     let active_sessions = db.query("sessions")
3188    ///         .filter(|f| f.eq("active", Value::Bool(true)))
3189    ///         .limit(500)
3190    ///         .collect()
3191    ///         .await?;
3192    ///
3193    ///     // Manually populate cache with hot data
3194    ///     for session in active_sessions {
3195    ///         // Reading automatically caches
3196    ///         db.get_document("sessions", &session.id)?;
3197    ///     }
3198    ///
3199    ///     Ok(())
3200    /// }
3201    /// ```
3202    ///
3203    /// # Typical Prewarming Scenarios
3204    ///
3205    /// **Web Application Startup:**
3206    /// ```
3207    /// // Load user data, sessions, and active content
3208    /// db.prewarm_cache("users", Some(5000)).await?;
3209    /// db.prewarm_cache("sessions", Some(2000)).await?;
3210    /// db.prewarm_cache("posts", Some(1000)).await?;
3211    /// ```
3212    ///
3213    /// **E-commerce Site:**
3214    /// ```
3215    /// // Load products, categories, and user carts
3216    /// db.prewarm_cache("products", Some(500)).await?;
3217    /// db.prewarm_cache("categories", None).await?;  // All categories
3218    /// db.prewarm_cache("active_carts", Some(1000)).await?;
3219    /// ```
3220    ///
3221    /// **API Server:**
3222    /// ```
3223    /// // Load authentication data and rate limits
3224    /// db.prewarm_cache("api_keys", None).await?;
3225    /// db.prewarm_cache("rate_limits", Some(10000)).await?;
3226    /// ```
3227    ///
3228    /// # When to Use
3229    /// - At application startup to eliminate cold-start latency
3230    /// - After cache clear operations
3231    /// - Before high-traffic events (product launches, etc.)
3232    /// - When deploying new instances (load balancer warm-up)
3233    ///
3234    /// # Memory Considerations
3235    /// - 1,000 docs ≈ 500 KB memory
3236    /// - 10,000 docs ≈ 5 MB memory
3237    /// - 100,000 docs ≈ 50 MB memory
3238    /// - Stay within configured cache capacity
3239    ///
3240    /// # See Also
3241    /// - `get_cache_stats()` to monitor cache effectiveness
3242    /// - `prewarm_all_collections()` to prewarm all collections
3243    /// - `Aurora::with_config()` to adjust cache capacity
3244    pub async fn prewarm_cache(&self, collection: &str, limit: Option<usize>) -> Result<usize> {
3245        let limit = limit.unwrap_or(1000);
3246        let prefix = format!("{}:", collection);
3247        let mut loaded = 0;
3248
3249        for entry in self.cold.scan_prefix(&prefix) {
3250            if loaded >= limit {
3251                break;
3252            }
3253
3254            if let Ok((key, value)) = entry {
3255                // Load into hot cache
3256                self.hot.set(key.clone(), value, None);
3257                loaded += 1;
3258            }
3259        }
3260
3261        println!("Prewarmed {} with {} documents", collection, loaded);
3262        Ok(loaded)
3263    }
3264
3265    /// Prewarm cache for all collections
3266    pub async fn prewarm_all_collections(
3267        &self,
3268        docs_per_collection: Option<usize>,
3269    ) -> Result<HashMap<String, usize>> {
3270        let mut stats = HashMap::new();
3271
3272        // Get all collections
3273        let collections: Vec<String> = self
3274            .cold
3275            .scan()
3276            .filter_map(|r| r.ok())
3277            .map(|(k, _)| k)
3278            .filter(|k| k.starts_with("_collection:"))
3279            .map(|k| k.trim_start_matches("_collection:").to_string())
3280            .collect();
3281
3282        for collection in collections {
3283            let loaded = self.prewarm_cache(&collection, docs_per_collection).await?;
3284            stats.insert(collection, loaded);
3285        }
3286
3287        Ok(stats)
3288    }
3289
3290    /// Store multiple key-value pairs efficiently in a single batch operation
3291    ///
3292    /// Low-level batch write operation that bypasses document validation and
3293    /// writes raw byte data directly to storage. Useful for advanced use cases,
3294    /// custom serialization, or maximum performance scenarios.
3295    ///
3296    /// # Performance
3297    /// - Write speed: ~100,000 writes/sec
3298    /// - Single disk fsync for entire batch
3299    /// - No validation or schema checking
3300    /// - Direct storage access
3301    ///
3302    /// # Arguments
3303    /// * `pairs` - Vector of (key, value) tuples where value is raw bytes
3304    ///
3305    /// # Returns
3306    /// Success or an error
3307    ///
3308    /// # Examples
3309    ///
3310    /// ```
3311    /// use aurora_db::Aurora;
3312    ///
3313    /// let db = Aurora::open("mydb.db")?;
3314    ///
3315    /// // Low-level batch write
3316    /// let pairs = vec![
3317    ///     ("users:123".to_string(), b"raw data 1".to_vec()),
3318    ///     ("users:456".to_string(), b"raw data 2".to_vec()),
3319    ///     ("cache:key1".to_string(), b"cached value".to_vec()),
3320    /// ];
3321    ///
3322    /// db.batch_write(pairs)?;
3323    ///
3324    /// // Custom binary serialization
3325    /// use bincode;
3326    ///
3327    /// #[derive(Serialize, Deserialize)]
3328    /// struct CustomData {
3329    ///     id: u64,
3330    ///     payload: Vec<u8>,
3331    /// }
3332    ///
3333    /// let custom_data = vec![
3334    ///     CustomData { id: 1, payload: vec![1, 2, 3] },
3335    ///     CustomData { id: 2, payload: vec![4, 5, 6] },
3336    /// ];
3337    ///
3338    /// let pairs: Vec<(String, Vec<u8>)> = custom_data
3339    ///     .iter()
3340    ///     .map(|data| {
3341    ///         let key = format!("binary:{}", data.id);
3342    ///         let value = bincode::serialize(data).unwrap();
3343    ///         (key, value)
3344    ///     })
3345    ///     .collect();
3346    ///
3347    /// db.batch_write(pairs)?;
3348    ///
3349    /// // Bulk cache population
3350    /// let cache_entries: Vec<(String, Vec<u8>)> = (0..10000)
3351    ///     .map(|i| {
3352    ///         let key = format!("cache:item_{}", i);
3353    ///         let value = format!("value_{}", i).into_bytes();
3354    ///         (key, value)
3355    ///     })
3356    ///     .collect();
3357    ///
3358    /// db.batch_write(cache_entries)?;
3359    /// // Writes 10,000 entries in ~100ms
3360    /// ```
3361    ///
3362    /// # Important Notes
3363    /// - No schema validation performed
3364    /// - No unique constraint checking
3365    /// - No automatic indexing
3366    /// - Keys must follow "collection:id" format for proper grouping
3367    /// - Values are raw bytes - you handle serialization
3368    /// - Use `batch_insert()` for validated document inserts
3369    ///
3370    /// # When to Use
3371    /// - Maximum write performance needed
3372    /// - Custom serialization formats (bincode, msgpack, etc.)
3373    /// - Cache population
3374    /// - Low-level database operations
3375    /// - You're bypassing the document model
3376    ///
3377    /// # When NOT to Use
3378    /// - Regular document inserts → Use `batch_insert()` instead
3379    /// - Need validation → Use `batch_insert()` instead
3380    /// - Need indexing → Use `batch_insert()` instead
3381    ///
3382    /// # See Also
3383    /// - `batch_insert()` for validated document batch inserts
3384    /// - `put()` for single key-value writes
3385    pub fn batch_write(&self, pairs: Vec<(String, Vec<u8>)>) -> Result<()> {
3386        // Group pairs by collection name
3387        let mut collections: HashMap<String, Vec<(String, Vec<u8>)>> = HashMap::new();
3388        for (key, value) in &pairs {
3389            if let Some(collection_name) = key.split(':').next() {
3390                collections
3391                    .entry(collection_name.to_string())
3392                    .or_default()
3393                    .push((key.clone(), value.clone()));
3394            }
3395        }
3396
3397        // First, do the batch write to cold storage for all pairs
3398        self.cold.batch_set(pairs)?;
3399
3400        // Then, process each collection for in-memory updates
3401        for (collection_name, batch) in collections {
3402            // --- Optimized Batch Indexing ---
3403
3404            // 1. Get schema once for the entire collection batch
3405            let collection_obj = match self.schema_cache.get(&collection_name) {
3406                Some(cached_schema) => Arc::clone(cached_schema.value()),
3407                None => {
3408                    let collection_key = format!("_collection:{}", collection_name);
3409                    match self.get(&collection_key)? {
3410                        Some(data) => {
3411                            let obj: Collection = serde_json::from_slice(&data)?;
3412                            let arc_obj = Arc::new(obj);
3413                            self.schema_cache
3414                                .insert(collection_name.to_string(), Arc::clone(&arc_obj));
3415                            arc_obj
3416                        }
3417                        None => continue,
3418                    }
3419                }
3420            };
3421
3422            let indexed_fields: Vec<String> = collection_obj
3423                .fields
3424                .iter()
3425                .filter(|(_, def)| def.indexed || def.unique)
3426                .map(|(name, _)| name.clone())
3427                .collect();
3428
3429            let primary_index = self
3430                .primary_indices
3431                .entry(collection_name.to_string())
3432                .or_default();
3433
3434            for (key, value) in batch {
3435                // 2. Update hot cache
3436                self.hot.set(key.clone(), value.clone(), None);
3437
3438                // 3. Update primary index with metadata only
3439                let location = DiskLocation::new(value.len());
3440                primary_index.insert(key.clone(), location);
3441
3442                // 4. Update secondary indices
3443                if !indexed_fields.is_empty()
3444                    && let Ok(doc) = serde_json::from_slice::<Document>(&value) {
3445                        for (field, field_value) in doc.data {
3446                            if indexed_fields.contains(&field) {
3447                                let value_str = match &field_value {
3448                                    Value::String(s) => s.clone(),
3449                                    _ => field_value.to_string(),
3450                                };
3451                                let index_key = format!("{}:{}", collection_name, field);
3452                                let secondary_index = self
3453                                    .secondary_indices
3454                                    .entry(index_key)
3455                                    .or_default();
3456
3457                                let max_entries = self.config.max_index_entries_per_field;
3458                                secondary_index
3459                                    .entry(value_str)
3460                                    .and_modify(|doc_ids| {
3461                                        if doc_ids.len() < max_entries {
3462                                            doc_ids.push(key.to_string());
3463                                        }
3464                                    })
3465                                    .or_insert_with(|| vec![key.to_string()]);
3466                            }
3467                        }
3468                    }
3469            }
3470        }
3471
3472        Ok(())
3473    }
3474
3475    /// Scan for keys with a specific prefix
3476    pub fn scan_with_prefix(
3477        &self,
3478        prefix: &str,
3479    ) -> impl Iterator<Item = Result<(String, Vec<u8>)>> + '_ {
3480        self.cold.scan_prefix(prefix)
3481    }
3482
3483    /// Get storage efficiency metrics for the database
3484    pub fn get_collection_stats(&self) -> Result<HashMap<String, CollectionStats>> {
3485        let mut stats = HashMap::new();
3486
3487        // Scan all collections
3488        let collections: Vec<String> = self
3489            .cold
3490            .scan()
3491            .filter_map(|r| r.ok())
3492            .map(|(k, _)| k)
3493            .filter(|k| k.starts_with("_collection:"))
3494            .map(|k| k.trim_start_matches("_collection:").to_string())
3495            .collect();
3496
3497        for collection in collections {
3498            // Use primary index for fast stats (count + size from DiskLocation)
3499            let (count, size) = if let Some(index) = self.primary_indices.get(&collection) {
3500                let count = index.len();
3501                // Sum up sizes from DiskLocation metadata (much faster than disk scan)
3502                let size: usize = index.iter().map(|entry| entry.value().size as usize).sum();
3503                (count, size)
3504            } else {
3505                // Fallback: scan from cold storage if index not available
3506                let prefix = format!("{}:", collection);
3507                let count = self.cold.scan_prefix(&prefix).count();
3508                let size: usize = self
3509                    .cold
3510                    .scan_prefix(&prefix)
3511                    .filter_map(|r| r.ok())
3512                    .map(|(_, v)| v.len())
3513                    .sum();
3514                (count, size)
3515            };
3516
3517            stats.insert(
3518                collection,
3519                CollectionStats {
3520                    count,
3521                    size_bytes: size,
3522                    avg_doc_size: if count > 0 { size / count } else { 0 },
3523                },
3524            );
3525        }
3526
3527        Ok(stats)
3528    }
3529
3530    /// Search for documents by exact value using an index
3531    ///
3532    /// This method performs a fast lookup using a pre-created index
3533    pub fn search_by_value(
3534        &self,
3535        collection: &str,
3536        field: &str,
3537        value: &Value,
3538    ) -> Result<Vec<Document>> {
3539        let index_key = format!("_index:{}:{}", collection, field);
3540
3541        if let Some(index_data) = self.get(&index_key)? {
3542            let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
3543            let index = Index::new(index_def);
3544
3545            // Use the previously unused search method
3546            if let Some(doc_ids) = index.search(value) {
3547                // Load the documents by ID
3548                let mut docs = Vec::new();
3549                for id in doc_ids {
3550                    if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
3551                        let doc: Document = serde_json::from_slice(&doc_data)?;
3552                        docs.push(doc);
3553                    }
3554                }
3555                return Ok(docs);
3556            }
3557        }
3558
3559        // Return empty result if no index or no matches
3560        Ok(Vec::new())
3561    }
3562
3563    /// Perform a full-text search on an indexed text field
3564    ///
3565    /// This provides more advanced text search capabilities including
3566    /// relevance ranking of results
3567    pub fn full_text_search(
3568        &self,
3569        collection: &str,
3570        field: &str,
3571        query: &str,
3572    ) -> Result<Vec<Document>> {
3573        let index_key = format!("_index:{}:{}", collection, field);
3574
3575        if let Some(index_data) = self.get(&index_key)? {
3576            let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
3577
3578            // Ensure this is a full-text index
3579            if !matches!(index_def.index_type, IndexType::FullText) {
3580                return Err(AuroraError::InvalidOperation(format!(
3581                    "Field '{}' is not indexed as full-text",
3582                    field
3583                )));
3584            }
3585
3586            let index = Index::new(index_def);
3587
3588            // Use the previously unused search_text method
3589            if let Some(doc_id_scores) = index.search_text(query) {
3590                // Load the documents by ID, preserving score order
3591                let mut docs = Vec::new();
3592                for (id, _score) in doc_id_scores {
3593                    if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
3594                        let doc: Document = serde_json::from_slice(&doc_data)?;
3595                        docs.push(doc);
3596                    }
3597                }
3598                return Ok(docs);
3599            }
3600        }
3601
3602        // Return empty result if no index or no matches
3603        Ok(Vec::new())
3604    }
3605
3606    /// Create a full-text search index on a text field
3607    pub fn create_text_index(
3608        &self,
3609        collection: &str,
3610        field: &str,
3611        _enable_stop_words: bool,
3612    ) -> Result<()> {
3613        // Check if collection exists
3614        if self.get(&format!("_collection:{}", collection))?.is_none() {
3615            return Err(AuroraError::CollectionNotFound(collection.to_string()));
3616        }
3617
3618        // Create index definition
3619        let index_def = IndexDefinition {
3620            name: format!("{}_{}_fulltext", collection, field),
3621            collection: collection.to_string(),
3622            fields: vec![field.to_string()],
3623            index_type: IndexType::FullText,
3624            unique: false,
3625        };
3626
3627        // Store index definition
3628        let index_key = format!("_index:{}:{}", collection, field);
3629        self.put(index_key, serde_json::to_vec(&index_def)?, None)?;
3630
3631        // Create the actual index
3632        let index = Index::new(index_def);
3633
3634        // Index all existing documents in the collection
3635        let prefix = format!("{}:", collection);
3636        for (_, data) in self.cold.scan_prefix(&prefix).flatten() {
3637            let doc: Document = serde_json::from_slice(&data)?;
3638            index.insert(&doc)?;
3639        }
3640
3641        Ok(())
3642    }
3643
3644    pub async fn execute_simple_query(
3645        &self,
3646        builder: &SimpleQueryBuilder,
3647    ) -> Result<Vec<Document>> {
3648        // Ensure indices are initialized
3649        self.ensure_indices_initialized().await?;
3650
3651        // A place to store the IDs of the documents we need to fetch
3652        let mut doc_ids_to_load: Option<Vec<String>> = None;
3653
3654        // --- The "Query Planner" ---
3655        // Smart heuristic: For range queries with small LIMITs, full scan can be faster
3656        // than collecting millions of IDs from secondary index
3657        let use_index_for_range = if let Some(limit) = builder.limit {
3658            // If limit is small (< 1000), prefer full scan for range queries
3659            // The secondary index would scan all entries anyway, might as well
3660            // scan documents directly and benefit from early termination
3661            limit >= 1000
3662        } else {
3663            // No limit? Index might still help if result set is small
3664            true
3665        };
3666
3667        // Look for an opportunity to use an index
3668        for (_filter_idx, filter) in builder.filters.iter().enumerate() {
3669            match filter {
3670                Filter::Eq(field, value) => {
3671                    let index_key = format!("{}:{}", &builder.collection, field);
3672
3673                    // Do we have a secondary index for this field?
3674                    if let Some(index) = self.secondary_indices.get(&index_key) {
3675                        // Yes! Let's use it.
3676                        if let Some(matching_ids) = index.get(&value.to_string()) {
3677                            doc_ids_to_load = Some(matching_ids.clone());
3678                            break; // Stop searching for other indexes for now
3679                        }
3680                    }
3681                }
3682                Filter::Gt(field, value)
3683                | Filter::Gte(field, value)
3684                | Filter::Lt(field, value)
3685                | Filter::Lte(field, value) => {
3686                    // Skip index for range queries with small LIMITs (see query planner heuristic above)
3687                    if !use_index_for_range {
3688                        continue;
3689                    }
3690
3691                    let index_key = format!("{}:{}", &builder.collection, field);
3692
3693                    // Do we have a secondary index for this field?
3694                    if let Some(index) = self.secondary_indices.get(&index_key) {
3695                        // For range queries, we need to scan through the index values
3696                        let mut matching_ids = Vec::new();
3697
3698                        for entry in index.iter() {
3699                            let index_value_str = entry.key();
3700
3701                            // Try to parse the index value to compare with our filter value
3702                            if let Ok(index_value) =
3703                                self.parse_value_from_string(index_value_str, value)
3704                            {
3705                                let matches = match filter {
3706                                    Filter::Gt(_, filter_val) => index_value > *filter_val,
3707                                    Filter::Gte(_, filter_val) => index_value >= *filter_val,
3708                                    Filter::Lt(_, filter_val) => index_value < *filter_val,
3709                                    Filter::Lte(_, filter_val) => index_value <= *filter_val,
3710                                    _ => false,
3711                                };
3712
3713                                if matches {
3714                                    matching_ids.extend(entry.value().clone());
3715                                }
3716                            }
3717                        }
3718
3719                        if !matching_ids.is_empty() {
3720                            doc_ids_to_load = Some(matching_ids);
3721                            break;
3722                        }
3723                    }
3724                }
3725                Filter::Contains(field, search_term) => {
3726                    let index_key = format!("{}:{}", &builder.collection, field);
3727
3728                    // Do we have a secondary index for this field?
3729                    if let Some(index) = self.secondary_indices.get(&index_key) {
3730                        let mut matching_ids = Vec::new();
3731
3732                        for entry in index.iter() {
3733                            let index_value_str = entry.key();
3734
3735                            // Check if this indexed value contains our search term
3736                            if index_value_str
3737                                .to_lowercase()
3738                                .contains(&search_term.to_lowercase())
3739                            {
3740                                matching_ids.extend(entry.value().clone());
3741                            }
3742                        }
3743
3744                        if !matching_ids.is_empty() {
3745                            // Remove duplicates since a document could match multiple indexed values
3746                            matching_ids.sort();
3747                            matching_ids.dedup();
3748
3749                            doc_ids_to_load = Some(matching_ids);
3750                            break;
3751                        }
3752                    }
3753                }
3754            }
3755        }
3756
3757        let mut final_docs: Vec<Document>;
3758
3759        if let Some(ids) = doc_ids_to_load {
3760            // Index path
3761            use std::io::Write;
3762            if let Ok(mut file) = std::fs::OpenOptions::new()
3763                .create(true)
3764                .append(true)
3765                .open("/tmp/aurora_query_stats.log") {
3766                let _ = writeln!(file, "[INDEX PATH] IDs to load: {} | Collection: {}",
3767                    ids.len(), builder.collection);
3768            }
3769
3770            final_docs = Vec::with_capacity(ids.len());
3771
3772            for id in ids {
3773                let doc_key = format!("{}:{}", &builder.collection, id);
3774                if let Some(data) = self.get(&doc_key)?
3775                    && let Ok(doc) = serde_json::from_slice::<Document>(&data) {
3776                        final_docs.push(doc);
3777                    }
3778            }
3779        } else {
3780            // --- Path 2: Full Collection Scan with Early Termination ---
3781
3782            // Optimization: If we have a LIMIT but no ORDER BY, we can stop scanning
3783            // as soon as we have enough matching documents
3784            let early_termination_target = if builder.order_by.is_none() {
3785                builder.limit.map(|l| l + builder.offset.unwrap_or(0))
3786            } else {
3787                // With ORDER BY, we need all matching docs to sort correctly
3788                None
3789            };
3790
3791            // Smart scan with early termination support
3792            final_docs = Vec::new();
3793            let mut scan_stats = (0usize, 0usize, 0usize); // (keys_scanned, docs_fetched, matches_found)
3794
3795            if let Some(index) = self.primary_indices.get(&builder.collection) {
3796                for entry in index.iter() {
3797                    let key = entry.key();
3798                    scan_stats.0 += 1; // keys scanned
3799
3800                    // Early termination check
3801                    if let Some(target) = early_termination_target {
3802                        if final_docs.len() >= target {
3803                            break; // We have enough documents!
3804                        }
3805                    }
3806
3807                    // Fetch and filter document
3808                    if let Some(data) = self.get(key)? {
3809                        scan_stats.1 += 1; // docs fetched
3810
3811                        if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
3812                            // Apply all filters
3813                            let matches_all_filters = builder.filters.iter().all(|filter| {
3814                                match filter {
3815                                    Filter::Eq(field, value) => doc.data.get(field) == Some(value),
3816                                    Filter::Gt(field, value) => doc.data.get(field).is_some_and(|v| v > value),
3817                                    Filter::Gte(field, value) => doc.data.get(field).is_some_and(|v| v >= value),
3818                                    Filter::Lt(field, value) => doc.data.get(field).is_some_and(|v| v < value),
3819                                    Filter::Lte(field, value) => doc.data.get(field).is_some_and(|v| v <= value),
3820                                    Filter::Contains(field, value_str) => {
3821                                        doc.data.get(field).is_some_and(|v| match v {
3822                                            Value::String(s) => s.contains(value_str),
3823                                            Value::Array(arr) => arr.contains(&Value::String(value_str.clone())),
3824                                            _ => false,
3825                                        })
3826                                    }
3827                                }
3828                            });
3829
3830                            if matches_all_filters {
3831                                scan_stats.2 += 1; // matches found
3832                                final_docs.push(doc);
3833                            }
3834                        }
3835                    }
3836                }
3837
3838                // Debug logging for query performance analysis
3839                use std::io::Write;
3840                if let Ok(mut file) = std::fs::OpenOptions::new()
3841                    .create(true)
3842                    .append(true)
3843                    .open("/tmp/aurora_query_stats.log") {
3844                    let _ = writeln!(file, "[SCAN PATH] Scanned: {} keys | Fetched: {} docs | Matched: {} | Collection: {}",
3845                        scan_stats.0, scan_stats.1, scan_stats.2, builder.collection);
3846                }
3847            } else {
3848                // Fallback: scan from cold storage if index not initialized
3849                final_docs = self.get_all_collection(&builder.collection).await?;
3850
3851                // Apply filters
3852                final_docs.retain(|doc| {
3853                    builder.filters.iter().all(|filter| {
3854                        match filter {
3855                            Filter::Eq(field, value) => doc.data.get(field) == Some(value),
3856                            Filter::Gt(field, value) => doc.data.get(field).is_some_and(|v| v > value),
3857                            Filter::Gte(field, value) => doc.data.get(field).is_some_and(|v| v >= value),
3858                            Filter::Lt(field, value) => doc.data.get(field).is_some_and(|v| v < value),
3859                            Filter::Lte(field, value) => doc.data.get(field).is_some_and(|v| v <= value),
3860                            Filter::Contains(field, value_str) => {
3861                                doc.data.get(field).is_some_and(|v| match v {
3862                                    Value::String(s) => s.contains(value_str),
3863                                    Value::Array(arr) => arr.contains(&Value::String(value_str.clone())),
3864                                    _ => false,
3865                                })
3866                            }
3867                        }
3868                    })
3869                });
3870            }
3871        }
3872
3873        // Apply ordering
3874        if let Some((field, ascending)) = &builder.order_by {
3875            final_docs.sort_by(|a, b| match (a.data.get(field), b.data.get(field)) {
3876                (Some(v1), Some(v2)) => {
3877                    let cmp = v1.cmp(v2);
3878                    if *ascending {
3879                        cmp
3880                    } else {
3881                        cmp.reverse()
3882                    }
3883                }
3884                (None, Some(_)) => std::cmp::Ordering::Less,
3885                (Some(_), None) => std::cmp::Ordering::Greater,
3886                (None, None) => std::cmp::Ordering::Equal,
3887            });
3888        }
3889
3890        // Apply offset and limit
3891        let start = builder.offset.unwrap_or(0);
3892        let end = builder
3893            .limit
3894            .map(|l| start.saturating_add(l))
3895            .unwrap_or(final_docs.len());
3896
3897        let end = end.min(final_docs.len());
3898        Ok(final_docs.get(start..end).unwrap_or(&[]).to_vec())
3899    }
3900
3901    /// Helper method to parse a string value back to a Value for comparison
3902    fn parse_value_from_string(&self, value_str: &str, reference_value: &Value) -> Result<Value> {
3903        match reference_value {
3904            Value::Int(_) => {
3905                if let Ok(i) = value_str.parse::<i64>() {
3906                    Ok(Value::Int(i))
3907                } else {
3908                    Err(AuroraError::InvalidOperation("Failed to parse int".into()))
3909                }
3910            }
3911            Value::Float(_) => {
3912                if let Ok(f) = value_str.parse::<f64>() {
3913                    Ok(Value::Float(f))
3914                } else {
3915                    Err(AuroraError::InvalidOperation(
3916                        "Failed to parse float".into(),
3917                    ))
3918                }
3919            }
3920            Value::String(_) => Ok(Value::String(value_str.to_string())),
3921            _ => Ok(Value::String(value_str.to_string())),
3922        }
3923    }
3924
3925    pub async fn execute_dynamic_query(
3926        &self,
3927        collection: &str,
3928        payload: &QueryPayload,
3929    ) -> Result<Vec<Document>> {
3930        let mut docs = self.get_all_collection(collection).await?;
3931
3932        // 1. Apply Filters
3933        if let Some(filters) = &payload.filters {
3934            docs.retain(|doc| {
3935                filters.iter().all(|filter| {
3936                    doc.data
3937                        .get(&filter.field)
3938                        .is_some_and(|doc_val| check_filter(doc_val, filter))
3939                })
3940            });
3941        }
3942
3943        // 2. Apply Sorting
3944        if let Some(sort_options) = &payload.sort {
3945            docs.sort_by(|a, b| {
3946                let a_val = a.data.get(&sort_options.field);
3947                let b_val = b.data.get(&sort_options.field);
3948                let ordering = a_val
3949                    .partial_cmp(&b_val)
3950                    .unwrap_or(std::cmp::Ordering::Equal);
3951                if sort_options.ascending {
3952                    ordering
3953                } else {
3954                    ordering.reverse()
3955                }
3956            });
3957        }
3958
3959        // 3. Apply Pagination
3960        if let Some(offset) = payload.offset {
3961            docs = docs.into_iter().skip(offset).collect();
3962        }
3963        if let Some(limit) = payload.limit {
3964            docs = docs.into_iter().take(limit).collect();
3965        }
3966
3967        // 4. Apply Field Selection (Projection)
3968        if let Some(select_fields) = &payload.select
3969            && !select_fields.is_empty() {
3970                docs = docs
3971                    .into_iter()
3972                    .map(|mut doc| {
3973                        doc.data.retain(|key, _| select_fields.contains(key));
3974                        doc
3975                    })
3976                    .collect();
3977            }
3978
3979        Ok(docs)
3980    }
3981
3982    pub async fn process_network_request(
3983        &self,
3984        request: crate::network::protocol::Request,
3985    ) -> crate::network::protocol::Response {
3986        use crate::network::protocol::Response;
3987
3988        match request {
3989            crate::network::protocol::Request::Get(key) => match self.get(&key) {
3990                Ok(value) => Response::Success(value),
3991                Err(e) => Response::Error(e.to_string()),
3992            },
3993            crate::network::protocol::Request::Put(key, value) => {
3994                match self.put(key, value, None) {
3995                    Ok(_) => Response::Done,
3996                    Err(e) => Response::Error(e.to_string()),
3997                }
3998            }
3999            crate::network::protocol::Request::Delete(key) => match self.delete(&key).await {
4000                Ok(_) => Response::Done,
4001                Err(e) => Response::Error(e.to_string()),
4002            },
4003            crate::network::protocol::Request::NewCollection { name, fields } => {
4004                let fields_for_db: Vec<(String, crate::types::FieldType, bool)> = fields
4005                    .iter()
4006                    .map(|(name, ft, unique)| (name.clone(), ft.clone(), *unique))
4007                    .collect();
4008
4009                match self.new_collection(&name, fields_for_db) {
4010                    Ok(_) => Response::Done,
4011                    Err(e) => Response::Error(e.to_string()),
4012                }
4013            }
4014            crate::network::protocol::Request::Insert { collection, data } => {
4015                match self.insert_map(&collection, data).await {
4016                    Ok(id) => Response::Message(id),
4017                    Err(e) => Response::Error(e.to_string()),
4018                }
4019            }
4020            crate::network::protocol::Request::GetDocument { collection, id } => {
4021                match self.get_document(&collection, &id) {
4022                    Ok(doc) => Response::Document(doc),
4023                    Err(e) => Response::Error(e.to_string()),
4024                }
4025            }
4026            crate::network::protocol::Request::Query(builder) => {
4027                match self.execute_simple_query(&builder).await {
4028                    Ok(docs) => Response::Documents(docs),
4029                    Err(e) => Response::Error(e.to_string()),
4030                }
4031            }
4032            crate::network::protocol::Request::BeginTransaction => {
4033                let tx_id = self.begin_transaction();
4034                Response::TransactionId(tx_id.as_u64())
4035            }
4036            crate::network::protocol::Request::CommitTransaction(tx_id_u64) => {
4037                let tx_id = crate::transaction::TransactionId::from_u64(tx_id_u64);
4038                match self.commit_transaction(tx_id) {
4039                    Ok(_) => Response::Done,
4040                    Err(e) => Response::Error(e.to_string()),
4041                }
4042            }
4043            crate::network::protocol::Request::RollbackTransaction(tx_id_u64) => {
4044                let tx_id = crate::transaction::TransactionId::from_u64(tx_id_u64);
4045                match self.rollback_transaction(tx_id) {
4046                    Ok(_) => Response::Done,
4047                    Err(e) => Response::Error(e.to_string()),
4048                }
4049            }
4050        }
4051    }
4052
4053    /// Create indices for commonly queried fields automatically
4054    ///
4055    /// This is a convenience method that creates indices for fields that are
4056    /// likely to be queried frequently, improving performance.
4057    ///
4058    /// # Arguments
4059    /// * `collection` - Name of the collection
4060    /// * `fields` - List of field names to create indices for
4061    ///
4062    /// # Examples
4063    /// ```
4064    /// // Create indices for commonly queried fields
4065    /// db.create_indices("users", &["email", "status", "created_at"]).await?;
4066    /// ```
4067    pub async fn create_indices(&self, collection: &str, fields: &[&str]) -> Result<()> {
4068        for field in fields {
4069            if let Err(e) = self.create_index(collection, field).await {
4070                eprintln!(
4071                    "Warning: Failed to create index for {}.{}: {}",
4072                    collection, field, e
4073                );
4074            } else {
4075                println!("Created index for {}.{}", collection, field);
4076            }
4077        }
4078        Ok(())
4079    }
4080
4081    /// Get index statistics for a collection
4082    ///
4083    /// This helps understand which indices exist and how effective they are.
4084    pub fn get_index_stats(&self, collection: &str) -> HashMap<String, IndexStats> {
4085        let mut stats = HashMap::new();
4086
4087        for entry in self.secondary_indices.iter() {
4088            let key = entry.key();
4089            if key.starts_with(&format!("{}:", collection)) {
4090                let field = key.split(':').nth(1).unwrap_or("unknown");
4091                let index = entry.value();
4092
4093                let unique_values = index.len();
4094                let total_documents: usize = index.iter().map(|entry| entry.value().len()).sum();
4095
4096                stats.insert(
4097                    field.to_string(),
4098                    IndexStats {
4099                        unique_values,
4100                        total_documents,
4101                        avg_docs_per_value: if unique_values > 0 {
4102                            total_documents / unique_values
4103                        } else {
4104                            0
4105                        },
4106                    },
4107                );
4108            }
4109        }
4110
4111        stats
4112    }
4113
4114    /// Optimize a collection by creating indices for frequently filtered fields
4115    ///
4116    /// This analyzes common query patterns and suggests/creates optimal indices.
4117    pub async fn optimize_collection(&self, collection: &str) -> Result<()> {
4118        if let Ok(collection_def) = self.get_collection_definition(collection) {
4119            let field_names: Vec<&str> = collection_def.fields.keys().map(|s| s.as_str()).collect();
4120            self.create_indices(collection, &field_names).await?;
4121        }
4122
4123        Ok(())
4124    }
4125
4126    // Helper method to get unique fields from a collection
4127    fn get_unique_fields(&self, collection: &Collection) -> Vec<String> {
4128        collection
4129            .fields
4130            .iter()
4131            .filter(|(_, def)| def.unique)
4132            .map(|(name, _)| name.clone())
4133            .collect()
4134    }
4135
4136    // Update the validation method to use the helper
4137    async fn validate_unique_constraints(
4138        &self,
4139        collection: &str,
4140        data: &HashMap<String, Value>,
4141    ) -> Result<()> {
4142        self.ensure_indices_initialized().await?;
4143        let collection_def = self.get_collection_definition(collection)?;
4144        let unique_fields = self.get_unique_fields(&collection_def);
4145
4146        for unique_field in &unique_fields {
4147            if let Some(value) = data.get(unique_field) {
4148                let index_key = format!("{}:{}", collection, unique_field);
4149                if let Some(index) = self.secondary_indices.get(&index_key) {
4150                    // Get the raw string value without JSON formatting
4151                    let value_str = match value {
4152                        Value::String(s) => s.clone(),
4153                        _ => value.to_string(),
4154                    };
4155                    if index.contains_key(&value_str) {
4156                        return Err(AuroraError::UniqueConstraintViolation(
4157                            unique_field.clone(),
4158                            value_str,
4159                        ));
4160                    }
4161                }
4162            }
4163        }
4164        Ok(())
4165    }
4166
4167    /// Validate unique constraints excluding a specific document ID (for updates)
4168    async fn validate_unique_constraints_excluding(
4169        &self,
4170        collection: &str,
4171        data: &HashMap<String, Value>,
4172        exclude_id: &str,
4173    ) -> Result<()> {
4174        self.ensure_indices_initialized().await?;
4175        let collection_def = self.get_collection_definition(collection)?;
4176        let unique_fields = self.get_unique_fields(&collection_def);
4177
4178        for unique_field in &unique_fields {
4179            if let Some(value) = data.get(unique_field) {
4180                let index_key = format!("{}:{}", collection, unique_field);
4181                if let Some(index) = self.secondary_indices.get(&index_key) {
4182                    // Get the raw string value without JSON formatting
4183                    let value_str = match value {
4184                        Value::String(s) => s.clone(),
4185                        _ => value.to_string(),
4186                    };
4187                    if let Some(doc_ids) = index.get(&value_str) {
4188                        // Check if any document other than the excluded one has this value
4189                        let exclude_key = format!("{}:{}", collection, exclude_id);
4190                        for doc_key in doc_ids.value() {
4191                            if doc_key != &exclude_key {
4192                                return Err(AuroraError::UniqueConstraintViolation(
4193                                    unique_field.clone(),
4194                                    value_str,
4195                                ));
4196                            }
4197                        }
4198                    }
4199                }
4200            }
4201        }
4202        Ok(())
4203    }
4204}
4205
4206impl Drop for Aurora {
4207    fn drop(&mut self) {
4208        // Signal checkpoint task to shutdown gracefully
4209        if let Some(ref shutdown_tx) = self.checkpoint_shutdown {
4210            let _ = shutdown_tx.send(());
4211        }
4212        // Signal compaction task to shutdown gracefully
4213        if let Some(ref shutdown_tx) = self.compaction_shutdown {
4214            let _ = shutdown_tx.send(());
4215        }
4216    }
4217}
4218
4219fn check_filter(doc_val: &Value, filter: &HttpFilter) -> bool {
4220    let filter_val = match json_to_value(&filter.value) {
4221        Ok(v) => v,
4222        Err(_) => return false,
4223    };
4224
4225    match filter.operator {
4226        FilterOperator::Eq => doc_val == &filter_val,
4227        FilterOperator::Ne => doc_val != &filter_val,
4228        FilterOperator::Gt => doc_val > &filter_val,
4229        FilterOperator::Gte => doc_val >= &filter_val,
4230        FilterOperator::Lt => doc_val < &filter_val,
4231        FilterOperator::Lte => doc_val <= &filter_val,
4232        FilterOperator::Contains => match (doc_val, &filter_val) {
4233            (Value::String(s), Value::String(fv)) => s.contains(fv),
4234            (Value::Array(arr), _) => arr.contains(&filter_val),
4235            _ => false,
4236        },
4237    }
4238}
4239
4240/// Results of importing a document
4241enum ImportResult {
4242    Imported,
4243    Skipped,
4244}
4245
4246/// Statistics from an import operation
4247#[derive(Debug, Default)]
4248pub struct ImportStats {
4249    /// Number of documents successfully imported
4250    pub imported: usize,
4251    /// Number of documents skipped (usually because they already exist)
4252    pub skipped: usize,
4253    /// Number of documents that failed to import
4254    pub failed: usize,
4255}
4256
4257/// Statistics for a specific collection
4258#[derive(Debug)]
4259pub struct CollectionStats {
4260    /// Number of documents in the collection
4261    pub count: usize,
4262    /// Total size of the collection in bytes
4263    pub size_bytes: usize,
4264    /// Average document size in bytes
4265    pub avg_doc_size: usize,
4266}
4267
4268/// Statistics for an index
4269#[derive(Debug)]
4270pub struct IndexStats {
4271    /// Number of unique values in the index
4272    pub unique_values: usize,
4273    /// Total number of documents covered by the index
4274    pub total_documents: usize,
4275    /// Average number of documents per unique value
4276    pub avg_docs_per_value: usize,
4277}
4278
4279/// Combined database statistics
4280#[derive(Debug)]
4281pub struct DatabaseStats {
4282    /// Hot cache statistics
4283    pub hot_stats: crate::storage::hot::CacheStats,
4284    /// Cold storage statistics
4285    pub cold_stats: crate::storage::cold::ColdStoreStats,
4286    /// Estimated total database size in bytes
4287    pub estimated_size: u64,
4288    /// Statistics for each collection
4289    pub collections: HashMap<String, CollectionStats>,
4290}
4291
4292#[cfg(test)]
4293mod tests {
4294    use super::*;
4295    use tempfile::tempdir;
4296
4297    #[tokio::test]
4298    async fn test_basic_operations() -> Result<()> {
4299        let temp_dir = tempdir()?;
4300        let db_path = temp_dir.path().join("test.aurora");
4301        let db = Aurora::open(db_path.to_str().unwrap())?;
4302
4303        // Test collection creation
4304        db.new_collection(
4305            "users",
4306            vec![
4307                ("name", FieldType::String, false),
4308                ("age", FieldType::Int, false),
4309                ("email", FieldType::String, true),
4310            ],
4311        )?;
4312
4313        // Test document insertion
4314        let doc_id = db
4315            .insert_into(
4316                "users",
4317                vec![
4318                    ("name", Value::String("John Doe".to_string())),
4319                    ("age", Value::Int(30)),
4320                    ("email", Value::String("john@example.com".to_string())),
4321                ],
4322            )
4323            .await?;
4324
4325        // Test document retrieval
4326        let doc = db.get_document("users", &doc_id)?.unwrap();
4327        assert_eq!(
4328            doc.data.get("name").unwrap(),
4329            &Value::String("John Doe".to_string())
4330        );
4331        assert_eq!(doc.data.get("age").unwrap(), &Value::Int(30));
4332
4333        Ok(())
4334    }
4335
4336    #[tokio::test]
4337    async fn test_transactions() -> Result<()> {
4338        let temp_dir = tempdir()?;
4339        let db_path = temp_dir.path().join("test.aurora");
4340        let db = Aurora::open(db_path.to_str().unwrap())?;
4341
4342        // Create collection
4343        db.new_collection("test", vec![("field", FieldType::String, false)])?;
4344
4345        // Start transaction
4346        let tx_id = db.begin_transaction();
4347
4348        // Insert document
4349        let doc_id = db
4350            .insert_into("test", vec![("field", Value::String("value".to_string()))])
4351            .await?;
4352
4353        // Commit transaction
4354        db.commit_transaction(tx_id)?;
4355
4356        // Verify document exists
4357        let doc = db.get_document("test", &doc_id)?.unwrap();
4358        assert_eq!(
4359            doc.data.get("field").unwrap(),
4360            &Value::String("value".to_string())
4361        );
4362
4363        Ok(())
4364    }
4365
4366    #[tokio::test]
4367    async fn test_query_operations() -> Result<()> {
4368        let temp_dir = tempdir()?;
4369        let db_path = temp_dir.path().join("test.aurora");
4370        let db = Aurora::open(db_path.to_str().unwrap())?;
4371
4372        // Test collection creation
4373        db.new_collection(
4374            "books",
4375            vec![
4376                ("title", FieldType::String, false),
4377                ("author", FieldType::String, false),
4378                ("year", FieldType::Int, false),
4379            ],
4380        )?;
4381
4382        // Test document insertion
4383        db.insert_into(
4384            "books",
4385            vec![
4386                ("title", Value::String("Book 1".to_string())),
4387                ("author", Value::String("Author 1".to_string())),
4388                ("year", Value::Int(2020)),
4389            ],
4390        )
4391        .await?;
4392
4393        db.insert_into(
4394            "books",
4395            vec![
4396                ("title", Value::String("Book 2".to_string())),
4397                ("author", Value::String("Author 2".to_string())),
4398                ("year", Value::Int(2021)),
4399            ],
4400        )
4401        .await?;
4402
4403        // Test query
4404        let results = db
4405            .query("books")
4406            .filter(|f| f.gt("year", Value::Int(2019)))
4407            .order_by("year", true)
4408            .collect()
4409            .await?;
4410
4411        assert_eq!(results.len(), 2);
4412        assert!(results[0].data.get("year").unwrap() < results[1].data.get("year").unwrap());
4413
4414        Ok(())
4415    }
4416
4417    #[tokio::test]
4418    async fn test_blob_operations() -> Result<()> {
4419        let temp_dir = tempdir()?;
4420        let db_path = temp_dir.path().join("test.aurora");
4421        let db = Aurora::open(db_path.to_str().unwrap())?;
4422
4423        // Create test file
4424        let file_path = temp_dir.path().join("test.txt");
4425        std::fs::write(&file_path, b"Hello, World!")?;
4426
4427        // Test blob storage
4428        db.put_blob("test:blob".to_string(), &file_path).await?;
4429
4430        // Verify blob exists
4431        let data = db.get_data_by_pattern("test:blob")?;
4432        assert_eq!(data.len(), 1);
4433        match &data[0].1 {
4434            DataInfo::Blob { size } => assert_eq!(*size, 13 + 5), // content + "BLOB:" prefix
4435            _ => panic!("Expected Blob type"),
4436        }
4437
4438        Ok(())
4439    }
4440
4441    #[tokio::test]
4442    async fn test_blob_size_limit() -> Result<()> {
4443        let temp_dir = tempdir()?;
4444        let db_path = temp_dir.path().join("test.aurora");
4445        let db = Aurora::open(db_path.to_str().unwrap())?;
4446
4447        // Create a test file that's too large (201MB)
4448        let large_file_path = temp_dir.path().join("large_file.bin");
4449        let large_data = vec![0u8; 201 * 1024 * 1024];
4450        std::fs::write(&large_file_path, &large_data)?;
4451
4452        // Attempt to store the large file
4453        let result = db
4454            .put_blob("test:large_blob".to_string(), &large_file_path)
4455            .await;
4456
4457        assert!(result.is_err());
4458        assert!(matches!(
4459            result.unwrap_err(),
4460            AuroraError::InvalidOperation(_)
4461        ));
4462
4463        Ok(())
4464    }
4465
4466    #[tokio::test]
4467    async fn test_unique_constraints() -> Result<()> {
4468        let temp_dir = tempdir()?;
4469        let db_path = temp_dir.path().join("test.aurora");
4470        let db = Aurora::open(db_path.to_str().unwrap())?;
4471
4472        // Create collection with unique email field
4473        db.new_collection(
4474            "users",
4475            vec![
4476                ("name", FieldType::String, false),
4477                ("email", FieldType::String, true), // unique field
4478                ("age", FieldType::Int, false),
4479            ],
4480        )?;
4481
4482        // Insert first document
4483        let _doc_id1 = db
4484            .insert_into(
4485                "users",
4486                vec![
4487                    ("name", Value::String("John Doe".to_string())),
4488                    ("email", Value::String("john@example.com".to_string())),
4489                    ("age", Value::Int(30)),
4490                ],
4491            )
4492            .await?;
4493
4494        // Try to insert second document with same email - should fail
4495        let result = db
4496            .insert_into(
4497                "users",
4498                vec![
4499                    ("name", Value::String("Jane Doe".to_string())),
4500                    ("email", Value::String("john@example.com".to_string())), // duplicate email
4501                    ("age", Value::Int(25)),
4502                ],
4503            )
4504            .await;
4505
4506        assert!(result.is_err());
4507        if let Err(AuroraError::UniqueConstraintViolation(field, value)) = result {
4508            assert_eq!(field, "email");
4509            assert_eq!(value, "john@example.com");
4510        } else {
4511            panic!("Expected UniqueConstraintViolation error");
4512        }
4513
4514        // Test upsert with unique constraint
4515        // Should succeed for new document
4516        let _doc_id2 = db
4517            .upsert(
4518                "users",
4519                "user2",
4520                vec![
4521                    ("name", Value::String("Alice Smith".to_string())),
4522                    ("email", Value::String("alice@example.com".to_string())),
4523                    ("age", Value::Int(28)),
4524                ],
4525            )
4526            .await?;
4527
4528        // Should fail when trying to upsert with duplicate email
4529        let result = db
4530            .upsert(
4531                "users",
4532                "user3",
4533                vec![
4534                    ("name", Value::String("Bob Wilson".to_string())),
4535                    ("email", Value::String("alice@example.com".to_string())), // duplicate
4536                    ("age", Value::Int(35)),
4537                ],
4538            )
4539            .await;
4540
4541        assert!(result.is_err());
4542
4543        // Should succeed when updating existing document with same email (no change)
4544        let result = db
4545            .upsert(
4546                "users",
4547                "user2",
4548                vec![
4549                    ("name", Value::String("Alice Updated".to_string())),
4550                    ("email", Value::String("alice@example.com".to_string())), // same email, same doc
4551                    ("age", Value::Int(29)),
4552                ],
4553            )
4554            .await;
4555
4556        assert!(result.is_ok());
4557
4558        Ok(())
4559    }
4560}