aurora_db/
db.rs

1//! # Aurora Database
2//!
3//! Aurora is an embedded document database with tiered storage architecture.
4//! It provides document storage, querying, indexing, and search capabilities
5//! while optimizing for both performance and durability.
6//!
7//! ## Key Features
8//!
9//! * **Tiered Storage**: Hot in-memory cache + persistent cold storage
10//! * **Document Model**: Schema-flexible JSON-like document storage
11//! * **Querying**: Rich query capabilities with filtering and sorting
12//! * **Full-text Search**: Built-in search engine with relevance ranking
13//! * **Transactions**: ACID-compliant transaction support
14//! * **Blob Storage**: Efficient storage for large binary objects
15//!
16//! ## Usage Example
17//!
18//! ```rust
19//! use aurora::Aurora;
20//!
21//! // Open a database
22//! let db = Aurora::open("my_database.db")?;
23//!
24//! // Create a collection with schema
25//! db.new_collection("users", vec![
26//!     ("name", FieldType::String, false),
27//!     ("email", FieldType::String, true),  // unique field
28//!     ("age", FieldType::Int, false),
29//! ])?;
30//!
31//! // Insert a document
32//! let user_id = db.insert_into("users", vec![
33//!     ("name", Value::String("Jane Doe".to_string())),
34//!     ("email", Value::String("jane@example.com".to_string())),
35//!     ("age", Value::Int(28)),
36//! ])?;
37//!
38//! // Query for documents
39//! let adult_users = db.query("users")
40//!     .filter(|f| f.gt("age", 18))
41//!     .order_by("name", true)
42//!     .collect()
43//!     .await?;
44//! ```
45
46use crate::error::{AuroraError, Result};
47use crate::index::{Index, IndexDefinition, IndexType};
48use crate::network::http_models::{
49    Filter as HttpFilter, FilterOperator, QueryPayload, json_to_value,
50};
51use crate::query::{Filter, FilterBuilder, QueryBuilder, SearchBuilder, SimpleQueryBuilder};
52use crate::storage::{ColdStore, HotStore, WriteBuffer};
53use crate::types::{
54    AuroraConfig, Collection, Document, DurabilityMode, FieldDefinition, FieldType, Value,
55};
56use crate::wal::{Operation, WriteAheadLog};
57use dashmap::DashMap;
58use serde_json::Value as JsonValue;
59use serde_json::from_str;
60use std::collections::HashMap;
61use std::fmt;
62use std::fs::File as StdFile;
63use std::path::{Path, PathBuf};
64use std::sync::{Arc, RwLock};
65use std::time::Duration;
66use tokio::fs::File;
67use tokio::fs::read_to_string;
68use tokio::io::AsyncReadExt;
69use tokio::sync::OnceCell;
70use uuid::Uuid;
71
72// Disk location metadata for primary index
73// Instead of storing full Vec<u8> values, we store minimal metadata
74#[derive(Debug, Clone, Copy)]
75struct DiskLocation {
76    size: u32,  // Size in bytes (useful for statistics)
77}
78
79impl DiskLocation {
80    fn new(size: usize) -> Self {
81        Self {
82            size: size as u32,
83        }
84    }
85}
86
87// Index types for faster lookups
88type PrimaryIndex = DashMap<String, DiskLocation>;
89type SecondaryIndex = DashMap<String, Vec<String>>;
90
91// Move DataInfo enum outside impl block
92#[derive(Debug)]
93pub enum DataInfo {
94    Data { size: usize, preview: String },
95    Blob { size: usize },
96    Compressed { size: usize },
97}
98
99impl DataInfo {
100    pub fn size(&self) -> usize {
101        match self {
102            DataInfo::Data { size, .. } => *size,
103            DataInfo::Blob { size } => *size,
104            DataInfo::Compressed { size } => *size,
105        }
106    }
107}
108
109/// The main database engine
110///
111/// Aurora combines a tiered storage architecture with document-oriented database features:
112/// - Hot tier: In-memory cache for frequently accessed data
113/// - Cold tier: Persistent disk storage for durability
114/// - Primary indices: Fast key-based access
115/// - Secondary indices: Fast field-based queries
116///
117/// # Examples
118///
119/// ```
120/// // Open a database (creates if doesn't exist)
121/// let db = Aurora::open("my_app.db")?;
122///
123/// // Insert a document
124/// let doc_id = db.insert_into("users", vec![
125///     ("name", Value::String("Alice".to_string())),
126///     ("age", Value::Int(32)),
127/// ])?;
128///
129/// // Retrieve a document
130/// let user = db.get_document("users", &doc_id)?;
131/// ```
132pub struct Aurora {
133    hot: HotStore,
134    cold: Arc<ColdStore>,
135    primary_indices: Arc<DashMap<String, PrimaryIndex>>,
136    secondary_indices: Arc<DashMap<String, SecondaryIndex>>,
137    indices_initialized: Arc<OnceCell<()>>,
138    transaction_manager: crate::transaction::TransactionManager,
139    indices: Arc<DashMap<String, Index>>,
140    schema_cache: Arc<DashMap<String, Arc<Collection>>>,
141    config: AuroraConfig,
142    write_buffer: Option<WriteBuffer>,
143    pubsub: crate::pubsub::PubSubSystem,
144    // Write-ahead log for durability (optional, based on config)
145    wal: Option<Arc<RwLock<WriteAheadLog>>>,
146    // Background checkpoint task
147    checkpoint_shutdown: Option<tokio::sync::mpsc::UnboundedSender<()>>,
148    // Background compaction task
149    compaction_shutdown: Option<tokio::sync::mpsc::UnboundedSender<()>>,
150}
151
152impl fmt::Debug for Aurora {
153    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154        f.debug_struct("Aurora")
155            .field("hot", &"HotStore")
156            .field("cold", &"ColdStore")
157            .field("primary_indices_count", &self.primary_indices.len())
158            .field("secondary_indices_count", &self.secondary_indices.len())
159            .field(
160                "active_transactions",
161                &self.transaction_manager.active_count(),
162            )
163            .field("indices_count", &self.indices.len())
164            .finish()
165    }
166}
167
168impl Aurora {
169    /// Open or create a database at the specified location
170    ///
171    /// # Arguments
172    /// * `path` - Path to the database file or directory
173    ///   - Absolute paths (like `/data/myapp.db`) are used as-is
174    ///   - Relative paths (like `./data/myapp.db`) are resolved relative to the current directory
175    ///   - Simple names (like `myapp.db`) use the current directory
176    ///
177    /// # Returns
178    /// An initialized `Aurora` database instance
179    ///
180    /// # Examples
181    ///
182    /// ```
183    /// // Use a specific location
184    /// let db = Aurora::open("./data/my_application.db")?;
185    ///
186    /// // Just use a name (creates in current directory)
187    /// let db = Aurora::open("customer_data.db")?;
188    /// ```
189    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
190        let config = AuroraConfig {
191            db_path: Self::resolve_path(path)?,
192            ..Default::default()
193        };
194        Self::with_config(config)
195    }
196
197    /// Helper method to resolve database path
198    fn resolve_path<P: AsRef<Path>>(path: P) -> Result<PathBuf> {
199        let path = path.as_ref();
200
201        // If it's an absolute path, use it directly
202        if path.is_absolute() {
203            return Ok(path.to_path_buf());
204        }
205
206        // Otherwise, resolve relative to current directory
207        match std::env::current_dir() {
208            Ok(current_dir) => Ok(current_dir.join(path)),
209            Err(e) => Err(AuroraError::IoError(format!(
210                "Failed to resolve current directory: {}",
211                e
212            ))),
213        }
214    }
215
216    /// Open a database with custom configuration
217    ///
218    /// # Arguments
219    /// * `config` - Database configuration settings
220    ///
221    /// # Examples
222    ///
223    /// ```
224    /// use aurora_db::{Aurora, types::AuroraConfig};
225    /// use std::time::Duration;
226    ///
227    /// let config = AuroraConfig {
228    ///     db_path: "my_data.db".into(),
229    ///     hot_cache_size_mb: 512,           // 512 MB cache
230    ///     enable_write_buffering: true,     // Batch writes for speed
231    ///     enable_wal: true,                 // Durability
232    ///     auto_compact: true,               // Background compaction
233    ///     compact_interval_mins: 60,        // Compact every hour
234    ///     ..Default::default()
235    /// };
236    ///
237    /// let db = Aurora::with_config(config)?;
238    /// ```
239    pub fn with_config(config: AuroraConfig) -> Result<Self> {
240        let path = Self::resolve_path(&config.db_path)?;
241
242        if config.create_dirs
243            && let Some(parent) = path.parent()
244                && !parent.exists() {
245                    std::fs::create_dir_all(parent)?;
246                }
247
248        // Fix method calls to pass all required parameters
249        let cold = Arc::new(ColdStore::with_config(
250            path.to_str().unwrap(),
251            config.cold_cache_capacity_mb,
252            config.cold_flush_interval_ms,
253            config.cold_mode,
254        )?);
255
256        let hot = HotStore::with_config_and_eviction(
257            config.hot_cache_size_mb,
258            config.hot_cache_cleanup_interval_secs,
259            config.eviction_policy,
260        );
261
262        // Initialize write buffer if enabled
263        let write_buffer = if config.enable_write_buffering {
264            Some(WriteBuffer::new(
265                Arc::clone(&cold),
266                config.write_buffer_size,
267                config.write_buffer_flush_interval_ms,
268            ))
269        } else {
270            None
271        };
272
273        // Store auto_compact before moving config
274        let auto_compact = config.auto_compact;
275        let enable_wal = config.enable_wal;
276
277        let pubsub = crate::pubsub::PubSubSystem::new(10000);
278
279        // Initialize WAL if enabled and check for recovery
280        let (wal, wal_entries) = if enable_wal {
281            let wal_path = path.to_str().unwrap();
282            match WriteAheadLog::new(wal_path) {
283                Ok(mut wal_log) => {
284                    let entries = wal_log.recover().unwrap_or_else(|_| Vec::new());
285                    (Some(Arc::new(RwLock::new(wal_log))), entries)
286                }
287                Err(_) => (None, Vec::new()),
288            }
289        } else {
290            (None, Vec::new())
291        };
292
293        // Spawn background checkpoint task if WAL is enabled
294        let checkpoint_shutdown = if wal.is_some() {
295            let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel();
296            let cold_clone = Arc::clone(&cold);
297            let wal_clone = wal.clone();
298            let checkpoint_interval = config.checkpoint_interval_ms;
299
300            tokio::spawn(async move {
301                let mut interval =
302                    tokio::time::interval(Duration::from_millis(checkpoint_interval));
303                loop {
304                    tokio::select! {
305                        _ = interval.tick() => {
306                            // Checkpoint: flush cold storage
307                            if let Err(e) = cold_clone.flush() {
308                                eprintln!("Background checkpoint flush error: {}", e);
309                            }
310                            // Truncate WAL after successful flush
311                            if let Some(ref wal) = wal_clone
312                                && let Ok(mut wal_guard) = wal.write() {
313                                    let _ = wal_guard.truncate();
314                                }
315                        }
316                        _ = shutdown_rx.recv() => {
317                            // Final checkpoint before shutdown
318                            let _ = cold_clone.flush();
319                            if let Some(ref wal) = wal_clone
320                                && let Ok(mut wal_guard) = wal.write() {
321                                    let _ = wal_guard.truncate();
322                                }
323                            break;
324                        }
325                    }
326                }
327            });
328
329            Some(shutdown_tx)
330        } else {
331            None
332        };
333
334        // Spawn background compaction task if auto_compact is enabled
335        let compaction_shutdown = if auto_compact {
336            let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel();
337            let cold_clone = Arc::clone(&cold);
338            let compact_interval = config.compact_interval_mins;
339
340            tokio::spawn(async move {
341                let mut interval =
342                    tokio::time::interval(Duration::from_secs(compact_interval * 60));
343                loop {
344                    tokio::select! {
345                        _ = interval.tick() => {
346                            if let Err(e) = cold_clone.compact() {
347                                eprintln!("Background compaction error: {}", e);
348                            }
349                        }
350                        _ = shutdown_rx.recv() => {
351                            let _ = cold_clone.compact();
352                            break;
353                        }
354                    }
355                }
356            });
357
358            Some(shutdown_tx)
359        } else {
360            None
361        };
362
363        // Initialize the database
364        let db = Self {
365            hot,
366            cold,
367            primary_indices: Arc::new(DashMap::new()),
368            secondary_indices: Arc::new(DashMap::new()),
369            indices_initialized: Arc::new(OnceCell::new()),
370            transaction_manager: crate::transaction::TransactionManager::new(),
371            indices: Arc::new(DashMap::new()),
372            schema_cache: Arc::new(DashMap::new()),
373            config,
374            write_buffer,
375            pubsub,
376            wal,
377            checkpoint_shutdown,
378            compaction_shutdown,
379        };
380
381        // Replay WAL entries if any
382        if !wal_entries.is_empty() {
383            db.replay_wal(wal_entries)?;
384        }
385
386        Ok(db)
387    }
388
389    // Lazy index initialization
390    pub async fn ensure_indices_initialized(&self) -> Result<()> {
391        self.indices_initialized
392            .get_or_init(|| async {
393                println!("Initializing indices...");
394                if let Err(e) = self.initialize_indices() {
395                    eprintln!("Failed to initialize indices: {:?}", e);
396                }
397                println!("Indices initialized");
398                
399            })
400            .await;
401        Ok(())
402    }
403
404    fn initialize_indices(&self) -> Result<()> {
405        for result in self.cold.scan() {
406            let (key, value) = result?;
407            let key_str = std::str::from_utf8(key.as_bytes())
408                .map_err(|_| AuroraError::InvalidKey("Invalid UTF-8".into()))?;
409
410            if let Some(collection_name) = key_str.split(':').next() {
411                self.index_value(collection_name, key_str, &value)?;
412            }
413        }
414        Ok(())
415    }
416
417    // Fast key-value operations with index support
418    /// Get a value by key (low-level key-value access)
419    ///
420    /// This is the low-level method. For document access, use `get_document()` instead.
421    /// Checks hot cache first, then falls back to cold storage for maximum performance.
422    ///
423    /// # Performance
424    /// - Hot cache hit: ~1M reads/sec (instant)
425    /// - Cold storage: ~500K reads/sec (disk I/O)
426    /// - Cache hit rate: typically 95%+ at scale
427    ///
428    /// # Examples
429    ///
430    /// ```
431    /// // Low-level key-value access
432    /// let data = db.get("users:12345")?;
433    /// if let Some(bytes) = data {
434    ///     let doc: Document = serde_json::from_slice(&bytes)?;
435    ///     println!("Found: {:?}", doc);
436    /// }
437    ///
438    /// // Better: use get_document() for documents
439    /// let user = db.get_document("users", "12345")?;
440    /// ```
441    pub fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
442        // Check hot cache first
443        if let Some(value) = self.hot.get(key) {
444            return Ok(Some(value));
445        }
446
447        // Check if key exists in primary index
448        if let Some(collection) = key.split(':').next()
449            && let Some(index) = self.primary_indices.get(collection)
450                && index.contains_key(key) {
451                    // Key exists in index, fetch from cold storage
452                    if let Some(value) = self.cold.get(key)? {
453                        // Promote to hot cache for future fast access
454                        self.hot.set(key.to_string(), value.clone(), None);
455                        return Ok(Some(value));
456                    }
457                }
458
459        // Fallback to cold storage
460        let value = self.cold.get(key)?;
461        if let Some(v) = &value {
462            self.hot.set(key.to_string(), v.clone(), None);
463        }
464        Ok(value)
465    }
466
467    /// Get value with zero-copy Arc reference (10-100x faster than get!)
468    /// Only checks hot cache - returns None if not cached
469    pub fn get_hot_ref(&self, key: &str) -> Option<Arc<Vec<u8>>> {
470        self.hot.get_ref(key)
471    }
472
473    /// Get cache statistics
474    ///
475    /// Returns detailed metrics about cache performance including hit/miss rates,
476    /// memory usage, and access patterns. Useful for monitoring, optimization,
477    /// and understanding database performance characteristics.
478    ///
479    /// # Returns
480    /// `CacheStats` struct containing:
481    /// - `hits`: Number of cache hits (data found in memory)
482    /// - `misses`: Number of cache misses (had to read from disk)
483    /// - `hit_rate`: Percentage of requests served from cache (0.0-1.0)
484    /// - `size`: Current number of entries in cache
485    /// - `capacity`: Maximum cache capacity
486    /// - `evictions`: Number of entries evicted due to capacity
487    ///
488    /// # Examples
489    ///
490    /// ```
491    /// use aurora_db::Aurora;
492    ///
493    /// let db = Aurora::open("mydb.db")?;
494    ///
495    /// // Check cache performance
496    /// let stats = db.get_cache_stats();
497    /// println!("Cache hit rate: {:.1}%", stats.hit_rate * 100.0);
498    /// println!("Cache size: {} / {} entries", stats.size, stats.capacity);
499    /// println!("Total hits: {}, misses: {}", stats.hits, stats.misses);
500    ///
501    /// // Monitor performance during operations
502    /// let before = db.get_cache_stats();
503    ///
504    /// // Perform many reads
505    /// for i in 0..1000 {
506    ///     db.get_document("users", &format!("user-{}", i))?;
507    /// }
508    ///
509    /// let after = db.get_cache_stats();
510    /// let hit_rate = (after.hits - before.hits) as f64 / 1000.0;
511    /// println!("Read hit rate: {:.1}%", hit_rate * 100.0);
512    ///
513    /// // Performance tuning
514    /// let stats = db.get_cache_stats();
515    /// if stats.hit_rate < 0.80 {
516    ///     println!("Low cache hit rate! Consider:");
517    ///     println!("- Increasing cache size in config");
518    ///     println!("- Prewarming cache with prewarm_cache()");
519    ///     println!("- Reviewing query patterns");
520    /// }
521    ///
522    /// if stats.evictions > stats.size {
523    ///     println!("High eviction rate! Cache may be too small.");
524    ///     println!("Consider increasing cache capacity.");
525    /// }
526    ///
527    /// // Production monitoring
528    /// use std::time::Duration;
529    /// use std::thread;
530    ///
531    /// loop {
532    ///     let stats = db.get_cache_stats();
533    ///
534    ///     // Log to monitoring system
535    ///     if stats.hit_rate < 0.90 {
536    ///         eprintln!("Warning: Cache hit rate dropped to {:.1}%",
537    ///                   stats.hit_rate * 100.0);
538    ///     }
539    ///
540    ///     thread::sleep(Duration::from_secs(60));
541    /// }
542    /// ```
543    ///
544    /// # Typical Performance Metrics
545    /// - **Excellent**: 95%+ hit rate (most reads from memory)
546    /// - **Good**: 80-95% hit rate (acceptable performance)
547    /// - **Poor**: <80% hit rate (consider cache tuning)
548    ///
549    /// # See Also
550    /// - `prewarm_cache()` to improve hit rates by preloading data
551    /// - `Aurora::with_config()` to adjust cache capacity
552    pub fn get_cache_stats(&self) -> crate::storage::hot::CacheStats {
553        self.hot.get_stats()
554    }
555
556    // ============================================
557    // PubSub API - Real-time Change Notifications
558    // ============================================
559
560    /// Listen for real-time changes in a collection
561    ///
562    /// Returns a stream of change events (inserts, updates, deletes) that you can subscribe to.
563    /// Perfect for building reactive UIs, cache invalidation, audit logging, webhooks, and
564    /// data synchronization systems.
565    ///
566    /// # Performance
567    /// - Zero overhead when no listeners are active
568    /// - Events are broadcast to all listeners asynchronously
569    /// - Non-blocking - doesn't slow down write operations
570    /// - Multiple listeners can watch the same collection
571    ///
572    /// # Examples
573    ///
574    /// ```
575    /// use aurora_db::{Aurora, types::Value};
576    ///
577    /// let db = Aurora::open("mydb.db")?;
578    ///
579    /// // Basic listener
580    /// let mut listener = db.listen("users");
581    ///
582    /// tokio::spawn(async move {
583    ///     while let Ok(event) = listener.recv().await {
584    ///         match event.change_type {
585    ///             ChangeType::Insert => println!("New user: {:?}", event.document),
586    ///             ChangeType::Update => println!("Updated user: {:?}", event.document),
587    ///             ChangeType::Delete => println!("Deleted user ID: {}", event.id),
588    ///         }
589    ///     }
590    /// });
591    ///
592    /// // Now any insert/update/delete will trigger the listener
593    /// db.insert_into("users", vec![("name", Value::String("Alice".into()))]).await?;
594    /// ```
595    ///
596    /// # Real-World Use Cases
597    ///
598    /// **Cache Invalidation:**
599    /// ```
600    /// use std::sync::Arc;
601    /// use tokio::sync::RwLock;
602    /// use std::collections::HashMap;
603    ///
604    /// let cache = Arc::new(RwLock::new(HashMap::new()));
605    /// let cache_clone = Arc::clone(&cache);
606    ///
607    /// let mut listener = db.listen("products");
608    ///
609    /// tokio::spawn(async move {
610    ///     while let Ok(event) = listener.recv().await {
611    ///         // Invalidate cache entry when product changes
612    ///         cache_clone.write().await.remove(&event.id);
613    ///         println!("Cache invalidated for product: {}", event.id);
614    ///     }
615    /// });
616    /// ```
617    ///
618    /// **Webhook Notifications:**
619    /// ```
620    /// let mut listener = db.listen("orders");
621    ///
622    /// tokio::spawn(async move {
623    ///     while let Ok(event) = listener.recv().await {
624    ///         if event.change_type == ChangeType::Insert {
625    ///             // Send webhook for new orders
626    ///             send_webhook("https://api.example.com/webhooks/order", &event).await;
627    ///         }
628    ///     }
629    /// });
630    /// ```
631    ///
632    /// **Audit Logging:**
633    /// ```
634    /// let mut listener = db.listen("sensitive_data");
635    ///
636    /// tokio::spawn(async move {
637    ///     while let Ok(event) = listener.recv().await {
638    ///         // Log all changes to audit trail
639    ///         db.insert_into("audit_log", vec![
640    ///             ("collection", Value::String("sensitive_data".into())),
641    ///             ("action", Value::String(format!("{:?}", event.change_type))),
642    ///             ("document_id", Value::String(event.id.clone())),
643    ///             ("timestamp", Value::String(chrono::Utc::now().to_rfc3339())),
644    ///         ]).await?;
645    ///     }
646    /// });
647    /// ```
648    ///
649    /// **Data Synchronization:**
650    /// ```
651    /// let mut listener = db.listen("users");
652    ///
653    /// tokio::spawn(async move {
654    ///     while let Ok(event) = listener.recv().await {
655    ///         // Sync changes to external system
656    ///         match event.change_type {
657    ///             ChangeType::Insert | ChangeType::Update => {
658    ///                 if let Some(doc) = event.document {
659    ///                     external_api.upsert_user(&doc).await?;
660    ///                 }
661    ///             },
662    ///             ChangeType::Delete => {
663    ///                 external_api.delete_user(&event.id).await?;
664    ///             },
665    ///         }
666    ///     }
667    /// });
668    /// ```
669    ///
670    /// **Real-Time Notifications:**
671    /// ```
672    /// let mut listener = db.listen("messages");
673    ///
674    /// tokio::spawn(async move {
675    ///     while let Ok(event) = listener.recv().await {
676    ///         if event.change_type == ChangeType::Insert {
677    ///             if let Some(msg) = event.document {
678    ///                 // Push notification to connected websockets
679    ///                 if let Some(recipient) = msg.data.get("recipient_id") {
680    ///                     websocket_manager.send_to_user(recipient, &msg).await;
681    ///                 }
682    ///             }
683    ///         }
684    ///     }
685    /// });
686    /// ```
687    ///
688    /// **Filtered Listener:**
689    /// ```
690    /// use aurora_db::pubsub::EventFilter;
691    ///
692    /// // Only listen for inserts
693    /// let mut listener = db.listen("users")
694    ///     .filter(EventFilter::ChangeType(ChangeType::Insert));
695    ///
696    /// // Only listen for documents with specific field value
697    /// let mut listener = db.listen("users")
698    ///     .filter(EventFilter::FieldEquals("role".to_string(), Value::String("admin".into())));
699    /// ```
700    ///
701    /// # Important Notes
702    /// - Listener stays active until dropped
703    /// - Events are delivered in order
704    /// - Each listener has its own event stream
705    /// - Use filters to reduce unnecessary event processing
706    /// - Listeners don't affect write performance
707    ///
708    /// # See Also
709    /// - `listen_all()` to listen to all collections
710    /// - `ChangeListener::filter()` to filter events
711    /// - `query().watch()` for reactive queries with filtering
712    pub fn listen(&self, collection: impl Into<String>) -> crate::pubsub::ChangeListener {
713        self.pubsub.listen(collection)
714    }
715
716    /// Listen for all changes across all collections
717    ///
718    /// Returns a stream of change events for every insert, update, and delete
719    /// operation across the entire database. Useful for global audit logging,
720    /// replication, and monitoring systems.
721    ///
722    /// # Performance
723    /// - Same performance as single collection listener
724    /// - Filter events by collection in your handler
725    /// - Consider using `listen(collection)` if only watching specific collections
726    ///
727    /// # Examples
728    ///
729    /// ```
730    /// use aurora_db::Aurora;
731    ///
732    /// let db = Aurora::open("mydb.db")?;
733    ///
734    /// // Listen to everything
735    /// let mut listener = db.listen_all();
736    ///
737    /// tokio::spawn(async move {
738    ///     while let Ok(event) = listener.recv().await {
739    ///         println!("Change in {}: {:?}", event.collection, event.change_type);
740    ///     }
741    /// });
742    /// ```
743    ///
744    /// # Real-World Use Cases
745    ///
746    /// **Global Audit Trail:**
747    /// ```
748    /// let mut listener = db.listen_all();
749    ///
750    /// tokio::spawn(async move {
751    ///     while let Ok(event) = listener.recv().await {
752    ///         // Log every database change
753    ///         audit_logger.log(AuditEntry {
754    ///             timestamp: chrono::Utc::now(),
755    ///             collection: event.collection,
756    ///             action: event.change_type,
757    ///             document_id: event.id,
758    ///             user_id: get_current_user_id(),
759    ///         }).await;
760    ///     }
761    /// });
762    /// ```
763    ///
764    /// **Database Replication:**
765    /// ```
766    /// let mut listener = db.listen_all();
767    ///
768    /// tokio::spawn(async move {
769    ///     while let Ok(event) = listener.recv().await {
770    ///         // Replicate to secondary database
771    ///         replica_db.apply_change(event).await?;
772    ///     }
773    /// });
774    /// ```
775    ///
776    /// **Change Data Capture (CDC):**
777    /// ```
778    /// let mut listener = db.listen_all();
779    ///
780    /// tokio::spawn(async move {
781    ///     while let Ok(event) = listener.recv().await {
782    ///         // Stream changes to Kafka/RabbitMQ
783    ///         kafka_producer.send(
784    ///             &format!("cdc.{}", event.collection),
785    ///             serde_json::to_string(&event)?
786    ///         ).await?;
787    ///     }
788    /// });
789    /// ```
790    ///
791    /// **Monitoring & Metrics:**
792    /// ```
793    /// use std::sync::atomic::{AtomicUsize, Ordering};
794    ///
795    /// let write_counter = Arc::new(AtomicUsize::new(0));
796    /// let counter_clone = Arc::clone(&write_counter);
797    ///
798    /// let mut listener = db.listen_all();
799    ///
800    /// tokio::spawn(async move {
801    ///     while let Ok(_event) = listener.recv().await {
802    ///         counter_clone.fetch_add(1, Ordering::Relaxed);
803    ///     }
804    /// });
805    ///
806    /// // Report metrics every 60 seconds
807    /// tokio::spawn(async move {
808    ///     loop {
809    ///         tokio::time::sleep(Duration::from_secs(60)).await;
810    ///         let count = write_counter.swap(0, Ordering::Relaxed);
811    ///         println!("Writes per minute: {}", count);
812    ///     }
813    /// });
814    /// ```
815    ///
816    /// **Selective Processing:**
817    /// ```
818    /// let mut listener = db.listen_all();
819    ///
820    /// tokio::spawn(async move {
821    ///     while let Ok(event) = listener.recv().await {
822    ///         // Handle different collections differently
823    ///         match event.collection.as_str() {
824    ///             "users" => handle_user_change(event).await,
825    ///             "orders" => handle_order_change(event).await,
826    ///             "payments" => handle_payment_change(event).await,
827    ///             _ => {} // Ignore others
828    ///         }
829    ///     }
830    /// });
831    /// ```
832    ///
833    /// # When to Use
834    /// - Global audit logging
835    /// - Database replication
836    /// - Change data capture (CDC)
837    /// - Monitoring and metrics
838    /// - Event sourcing systems
839    ///
840    /// # When NOT to Use
841    /// - Only need to watch 1-2 collections → Use `listen(collection)` instead
842    /// - High write volume with selective interest → Use collection-specific listeners
843    /// - Need complex filtering → Use `query().watch()` instead
844    ///
845    /// # See Also
846    /// - `listen()` for single collection listening
847    /// - `listener_count()` to check active listeners
848    /// - `query().watch()` for filtered reactive queries
849    pub fn listen_all(&self) -> crate::pubsub::ChangeListener {
850        self.pubsub.listen_all()
851    }
852
853    /// Get the number of active listeners for a collection
854    pub fn listener_count(&self, collection: &str) -> usize {
855        self.pubsub.listener_count(collection)
856    }
857
858    /// Get total number of active listeners
859    pub fn total_listeners(&self) -> usize {
860        self.pubsub.total_listeners()
861    }
862
863    /// Flushes all buffered writes to disk to ensure durability.
864    ///
865    /// This method forces all pending writes from:
866    /// - Write buffer (if enabled)
867    /// - Cold storage internal buffers
868    /// - Write-ahead log (if enabled)
869    ///
870    /// Call this when you need to ensure data persistence before
871    /// a critical operation or shutdown. After flush() completes,
872    /// all data is guaranteed to be on disk even if power fails.
873    ///
874    /// # Performance
875    /// - Flush time: ~10-50ms depending on buffered data
876    /// - Triggers OS-level fsync() for durability guarantee
877    /// - Truncates WAL after successful flush
878    /// - Not needed for every write (WAL provides durability)
879    ///
880    /// # Examples
881    ///
882    /// ```
883    /// use aurora_db::Aurora;
884    ///
885    /// let db = Aurora::open("mydb.db")?;
886    ///
887    /// // Basic flush after critical write
888    /// db.insert_into("users", data).await?;
889    /// db.flush()?;  // Ensure data is persisted to disk
890    ///
891    /// // Graceful shutdown pattern
892    /// fn shutdown(db: &Aurora) -> Result<()> {
893    ///     println!("Flushing pending writes...");
894    ///     db.flush()?;
895    ///     println!("Shutdown complete - all data persisted");
896    ///     Ok(())
897    /// }
898    ///
899    /// // Periodic checkpoint pattern
900    /// use std::time::Duration;
901    /// use std::thread;
902    ///
903    /// let db = db.clone();
904    /// thread::spawn(move || {
905    ///     loop {
906    ///         thread::sleep(Duration::from_secs(60));
907    ///         if let Err(e) = db.flush() {
908    ///             eprintln!("Flush error: {}", e);
909    ///         } else {
910    ///             println!("Checkpoint: data flushed to disk");
911    ///         }
912    ///     }
913    /// });
914    ///
915    /// // Critical transaction pattern
916    /// let tx_id = db.begin_transaction();
917    ///
918    /// // Multiple operations
919    /// db.insert_into("orders", order_data).await?;
920    /// db.update_document("inventory", product_id, updates).await?;
921    /// db.insert_into("audit_log", audit_data).await?;
922    ///
923    /// // Commit and flush immediately
924    /// db.commit_transaction(tx_id)?;
925    /// db.flush()?;  // Critical: ensure transaction is on disk
926    ///
927    /// // Backup preparation
928    /// println!("Preparing backup...");
929    /// db.flush()?;  // Ensure all data is written
930    /// std::fs::copy("mydb.db", "backup.db")?;
931    /// println!("Backup complete");
932    /// ```
933    ///
934    /// # When to Use
935    /// - Before graceful shutdown
936    /// - After critical transactions
937    /// - Before creating backups
938    /// - Periodic checkpoints (every 30-60 seconds)
939    /// - Before risky operations
940    ///
941    /// # When NOT to Use
942    /// - After every single write (too slow, WAL provides durability)
943    /// - In high-throughput loops (batch instead)
944    /// - When durability mode is already Immediate
945    ///
946    /// # Important Notes
947    /// - WAL provides durability even without explicit flush()
948    /// - flush() adds latency (~10-50ms) so use strategically
949    /// - Automatic flush happens during graceful shutdown
950    /// - After flush(), WAL is truncated (data is in main storage)
951    ///
952    /// # See Also
953    /// - `Aurora::with_config()` to set durability mode
954    /// - WAL (Write-Ahead Log) provides durability without explicit flushes
955    pub fn flush(&self) -> Result<()> {
956        // Flush write buffer if present
957        if let Some(ref write_buffer) = self.write_buffer {
958            write_buffer.flush()?;
959        }
960
961        // Flush cold storage
962        self.cold.flush()?;
963
964        // Truncate WAL after successful flush (data is now in cold storage)
965        if let Some(ref wal) = self.wal
966            && let Ok(mut wal_lock) = wal.write() {
967                wal_lock.truncate()?;
968            }
969
970        Ok(())
971    }
972
973    /// Store a key-value pair (low-level storage)
974    ///
975    /// This is the low-level method. For documents, use `insert_into()` instead.
976    /// Writes are buffered and batched for performance.
977    ///
978    /// # Arguments
979    /// * `key` - Unique key (format: "collection:id" for documents)
980    /// * `value` - Raw bytes to store
981    /// * `ttl` - Optional time-to-live (None = permanent)
982    ///
983    /// # Performance
984    /// - Buffered writes: ~15-30K docs/sec
985    /// - Batching improves throughput significantly
986    /// - Call `flush()` to ensure data is persisted
987    ///
988    /// # Examples
989    ///
990    /// ```
991    /// use std::time::Duration;
992    ///
993    /// // Permanent storage
994    /// let data = serde_json::to_vec(&my_struct)?;
995    /// db.put("mykey".to_string(), data, None)?;
996    ///
997    /// // With TTL (expires after 1 hour)
998    /// db.put("session:abc".to_string(), session_data, Some(Duration::from_secs(3600)))?;
999    ///
1000    /// // Better: use insert_into() for documents
1001    /// db.insert_into("users", vec![("name", Value::String("Alice".into()))])?;
1002    /// ```
1003    pub fn put(&self, key: String, value: Vec<u8>, ttl: Option<Duration>) -> Result<()> {
1004        const MAX_BLOB_SIZE: usize = 50 * 1024 * 1024;
1005
1006        if value.len() > MAX_BLOB_SIZE {
1007            return Err(AuroraError::InvalidOperation(format!(
1008                "Blob size {} exceeds maximum allowed size of {}MB",
1009                value.len() / (1024 * 1024),
1010                MAX_BLOB_SIZE / (1024 * 1024)
1011            )));
1012        }
1013
1014        if let Some(ref wal) = self.wal
1015            && self.config.durability_mode != DurabilityMode::None {
1016                wal.write()
1017                    .unwrap()
1018                    .append(Operation::Put, &key, Some(&value))?;
1019            }
1020
1021        if let Some(ref write_buffer) = self.write_buffer {
1022            write_buffer.write(key.clone(), value.clone())?;
1023        } else {
1024            self.cold.set(key.clone(), value.clone())?;
1025        }
1026
1027        self.hot.set(key.clone(), value.clone(), ttl);
1028
1029        if let Some(collection_name) = key.split(':').next()
1030            && !collection_name.starts_with('_') {
1031                self.index_value(collection_name, &key, &value)?;
1032            }
1033
1034        Ok(())
1035    }
1036
1037    /// Replay WAL entries to recover from crash
1038    fn replay_wal(&self, entries: Vec<crate::wal::LogEntry>) -> Result<()> {
1039        for entry in entries {
1040            match entry.operation {
1041                Operation::Put => {
1042                    if let Some(value) = entry.value {
1043                        // Write directly to cold storage (skip WAL, already logged)
1044                        self.cold.set(entry.key.clone(), value.clone())?;
1045
1046                        // Update hot cache
1047                        self.hot.set(entry.key.clone(), value.clone(), None);
1048
1049                        // Rebuild indices
1050                        if let Some(collection) = entry.key.split(':').next()
1051                            && !collection.starts_with('_') {
1052                                self.index_value(collection, &entry.key, &value)?;
1053                            }
1054                    }
1055                }
1056                Operation::Delete => {
1057                    self.cold.delete(&entry.key)?;
1058                    self.hot.delete(&entry.key);
1059                    // TODO: Remove from indices
1060                }
1061                Operation::BeginTx | Operation::CommitTx | Operation::RollbackTx => {
1062                    // Transaction boundaries - future implementation
1063                }
1064            }
1065        }
1066
1067        // Flush after replay and truncate WAL
1068        self.cold.flush()?;
1069        if let Some(ref wal) = self.wal {
1070            wal.write().unwrap().truncate()?;
1071        }
1072
1073        Ok(())
1074    }
1075
1076    fn index_value(&self, collection: &str, key: &str, value: &[u8]) -> Result<()> {
1077        // Update primary index with metadata only (not the full value)
1078        let location = DiskLocation::new(value.len());
1079        self.primary_indices
1080            .entry(collection.to_string())
1081            .or_default()
1082            .insert(key.to_string(), location);
1083
1084        // Try to get schema from cache first, otherwise load and cache it
1085        let collection_obj = match self.schema_cache.get(collection) {
1086            Some(cached_schema) => Arc::clone(cached_schema.value()),
1087            None => {
1088                // Schema not in cache - load it
1089                let collection_key = format!("_collection:{}", collection);
1090                let schema_data = match self.get(&collection_key)? {
1091                    Some(data) => data,
1092                    None => return Ok(()), // No schema = no secondary indices
1093                };
1094
1095                let obj: Collection = match serde_json::from_slice(&schema_data) {
1096                    Ok(obj) => obj,
1097                    Err(_) => return Ok(()), // Invalid schema = skip indexing
1098                };
1099
1100                // Cache the schema for future use
1101                let arc_obj = Arc::new(obj);
1102                self.schema_cache
1103                    .insert(collection.to_string(), Arc::clone(&arc_obj));
1104                arc_obj
1105            }
1106        };
1107
1108        // Build list of fields that should be indexed (unique or explicitly indexed)
1109        let indexed_fields: Vec<String> = collection_obj
1110            .fields
1111            .iter()
1112            .filter(|(_, def)| def.indexed || def.unique)
1113            .map(|(name, _)| name.clone())
1114            .collect();
1115
1116        // If no fields need indexing, we're done
1117        if indexed_fields.is_empty() {
1118            return Ok(());
1119        }
1120
1121        // Update secondary indices - ONLY for indexed/unique fields
1122        if let Ok(doc) = serde_json::from_slice::<Document>(value) {
1123            for (field, field_value) in doc.data {
1124                // CRITICAL FIX: Skip fields that aren't indexed
1125                if !indexed_fields.contains(&field) {
1126                    continue;
1127                }
1128
1129                // Use consistent string formatting for indexing
1130                let value_str = match &field_value {
1131                    Value::String(s) => s.clone(),
1132                    _ => field_value.to_string(),
1133                };
1134
1135                let index_key = format!("{}:{}", collection, field);
1136                let secondary_index = self
1137                    .secondary_indices
1138                    .entry(index_key)
1139                    .or_default();
1140
1141                // Check if we're at the index limit
1142                let max_entries = self.config.max_index_entries_per_field;
1143
1144                secondary_index
1145                    .entry(value_str)
1146                    .and_modify(|doc_ids| {
1147                        // Only add if we haven't exceeded the limit per value
1148                        if doc_ids.len() < max_entries {
1149                            doc_ids.push(key.to_string());
1150                        }
1151                    })
1152                    .or_insert_with(|| vec![key.to_string()]);
1153            }
1154        }
1155        Ok(())
1156    }
1157
1158    /// Scan collection with filter and early termination support
1159    /// Used by QueryBuilder for optimized queries with LIMIT
1160    pub fn scan_and_filter<F>(&self, collection: &str, filter: F, limit: Option<usize>)
1161        -> Result<Vec<Document>>
1162    where
1163        F: Fn(&Document) -> bool,
1164    {
1165        let mut documents = Vec::new();
1166
1167        if let Some(index) = self.primary_indices.get(collection) {
1168            for entry in index.iter() {
1169                // Early termination
1170                if let Some(max) = limit {
1171                    if documents.len() >= max {
1172                        break;
1173                    }
1174                }
1175
1176                let key = entry.key();
1177                if let Some(data) = self.get(key)? {
1178                    if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
1179                        if filter(&doc) {
1180                            documents.push(doc);
1181                        }
1182                    }
1183                }
1184            }
1185        } else {
1186            // Fallback
1187            documents = self.scan_collection(collection)?;
1188            documents.retain(|doc| filter(doc));
1189            if let Some(max) = limit {
1190                documents.truncate(max);
1191            }
1192        }
1193
1194        Ok(documents)
1195    }
1196
1197    /// Smart collection scan that uses the primary index as a key directory
1198    /// Avoids forced flushes and leverages hot cache for better performance
1199    fn scan_collection(&self, collection: &str) -> Result<Vec<Document>> {
1200        let mut documents = Vec::new();
1201
1202        // Use the primary index as a "key directory" - it contains all document keys
1203        if let Some(index) = self.primary_indices.get(collection) {
1204            // Iterate through all keys in the primary index (fast, in-memory)
1205            for entry in index.iter() {
1206                let key = entry.key();
1207
1208                // Fetch each document using get() which checks:
1209                // 1. Hot cache first (instant)
1210                // 2. Cold storage if not cached (disk I/O only for uncached items)
1211                if let Some(data) = self.get(key)? {
1212                    if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
1213                        documents.push(doc);
1214                    }
1215                }
1216            }
1217        } else {
1218            // Fallback: scan from cold storage if primary index not yet initialized
1219            let prefix = format!("{}:", collection);
1220            for result in self.cold.scan_prefix(&prefix) {
1221                if let Ok((_key, value)) = result {
1222                    if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
1223                        documents.push(doc);
1224                    }
1225                }
1226            }
1227        }
1228
1229        Ok(documents)
1230    }
1231
1232    // Restore missing methods
1233    pub async fn put_blob(&self, key: String, file_path: &Path) -> Result<()> {
1234        const MAX_FILE_SIZE: usize = 50 * 1024 * 1024; // 50MB limit
1235
1236        // Get file metadata to check size before reading
1237        let metadata = tokio::fs::metadata(file_path).await?;
1238        let file_size = metadata.len() as usize;
1239
1240        if file_size > MAX_FILE_SIZE {
1241            return Err(AuroraError::InvalidOperation(format!(
1242                "File size {} MB exceeds maximum allowed size of {} MB",
1243                file_size / (1024 * 1024),
1244                MAX_FILE_SIZE / (1024 * 1024)
1245            )));
1246        }
1247
1248        let mut file = File::open(file_path).await?;
1249        let mut buffer = Vec::new();
1250        file.read_to_end(&mut buffer).await?;
1251
1252        // Add BLOB: prefix to mark this as blob data
1253        let mut blob_data = Vec::with_capacity(5 + buffer.len());
1254        blob_data.extend_from_slice(b"BLOB:");
1255        blob_data.extend_from_slice(&buffer);
1256
1257        self.put(key, blob_data, None)
1258    }
1259
1260    /// Create a new collection with schema definition
1261    ///
1262    /// Collections are like tables in SQL - they define the structure of your documents.
1263    /// The third boolean parameter indicates if the field should be indexed for fast lookups.
1264    ///
1265    /// # Arguments
1266    /// * `name` - Collection name
1267    /// * `fields` - Vector of (field_name, field_type, indexed) tuples
1268    ///   - Field name (accepts both &str and String)
1269    ///   - Field type (String, Int, Float, Bool, etc.)
1270    ///   - Indexed: true for fast lookups, false for no index
1271    ///
1272    /// # Performance
1273    /// - Indexed fields: Fast equality queries (O(1) lookup)
1274    /// - Non-indexed fields: Full scan required for queries
1275    /// - Unique fields are automatically indexed
1276    ///
1277    /// # Examples
1278    ///
1279    /// ```
1280    /// use aurora_db::{Aurora, types::FieldType};
1281    ///
1282    /// let db = Aurora::open("mydb.db")?;
1283    ///
1284    /// // Create a users collection
1285    /// db.new_collection("users", vec![
1286    ///     ("name", FieldType::String, false),      // Not indexed
1287    ///     ("email", FieldType::String, true),      // Indexed - fast lookups
1288    ///     ("age", FieldType::Int, false),
1289    ///     ("active", FieldType::Bool, true),       // Indexed
1290    ///     ("score", FieldType::Float, false),
1291    /// ])?;
1292    ///
1293    /// // Idempotent - calling again is safe
1294    /// db.new_collection("users", vec![/* ... */])?; // OK!
1295    /// ```
1296    pub fn new_collection<S: Into<String>>(
1297        &self,
1298        name: &str,
1299        fields: Vec<(S, FieldType, bool)>,
1300    ) -> Result<()> {
1301        let collection_key = format!("_collection:{}", name);
1302
1303        // Check if collection already exists - if so, just return Ok (idempotent)
1304        if self.get(&collection_key)?.is_some() {
1305            return Ok(());
1306        }
1307
1308        // Create field definitions
1309        let mut field_definitions = HashMap::new();
1310        for (field_name, field_type, unique) in fields {
1311            field_definitions.insert(
1312                field_name.into(),
1313                FieldDefinition {
1314                    field_type,
1315                    unique,
1316                    indexed: unique, // Auto-index unique fields
1317                },
1318            );
1319        }
1320
1321        let collection = Collection {
1322            name: name.to_string(),
1323            fields: field_definitions,
1324            // REMOVED: unique_fields is now derived from fields
1325        };
1326
1327        let collection_data = serde_json::to_vec(&collection)?;
1328        self.put(collection_key, collection_data, None)?;
1329
1330        // Invalidate schema cache since we just created/updated the collection schema
1331        self.schema_cache.remove(name);
1332
1333        Ok(())
1334    }
1335
1336    /// Insert a document into a collection
1337    ///
1338    /// Automatically generates a UUID for the document and validates against
1339    /// collection schema and unique constraints. Returns the generated document ID.
1340    ///
1341    /// # Performance
1342    /// - Single insert: ~15,000 docs/sec
1343    /// - Bulk insert: Use `batch_insert()` for 10+ documents (~50,000 docs/sec)
1344    /// - Triggers PubSub events for real-time listeners
1345    ///
1346    /// # Arguments
1347    /// * `collection` - Name of the collection to insert into
1348    /// * `data` - Document fields and values to insert
1349    ///
1350    /// # Returns
1351    /// The auto-generated ID of the inserted document or an error
1352    ///
1353    /// # Errors
1354    /// - `CollectionNotFound`: Collection doesn't exist
1355    /// - `ValidationError`: Data violates schema or unique constraints
1356    /// - `SerializationError`: Invalid data format
1357    ///
1358    /// # Examples
1359    ///
1360    /// ```
1361    /// use aurora_db::{Aurora, types::Value};
1362    ///
1363    /// let db = Aurora::open("mydb.db")?;
1364    ///
1365    /// // Basic insertion
1366    /// let user_id = db.insert_into("users", vec![
1367    ///     ("name", Value::String("Alice Smith".to_string())),
1368    ///     ("email", Value::String("alice@example.com".to_string())),
1369    ///     ("age", Value::Int(28)),
1370    ///     ("active", Value::Bool(true)),
1371    /// ]).await?;
1372    ///
1373    /// println!("Created user with ID: {}", user_id);
1374    ///
1375    /// // Inserting with nested data
1376    /// let order_id = db.insert_into("orders", vec![
1377    ///     ("user_id", Value::String(user_id.clone())),
1378    ///     ("total", Value::Float(99.99)),
1379    ///     ("status", Value::String("pending".to_string())),
1380    ///     ("items", Value::Array(vec![
1381    ///         Value::String("item-123".to_string()),
1382    ///         Value::String("item-456".to_string()),
1383    ///     ])),
1384    /// ]).await?;
1385    ///
1386    /// // Error handling - unique constraint violation
1387    /// match db.insert_into("users", vec![
1388    ///     ("email", Value::String("alice@example.com".to_string())),  // Duplicate!
1389    ///     ("name", Value::String("Alice Clone".to_string())),
1390    /// ]).await {
1391    ///     Ok(id) => println!("Inserted: {}", id),
1392    ///     Err(e) => println!("Failed: {} (email already exists)", e),
1393    /// }
1394    ///
1395    /// // For bulk inserts (10+ documents), use batch_insert() instead
1396    /// let users = vec![
1397    ///     HashMap::from([
1398    ///         ("name".to_string(), Value::String("Bob".to_string())),
1399    ///         ("email".to_string(), Value::String("bob@example.com".to_string())),
1400    ///     ]),
1401    ///     HashMap::from([
1402    ///         ("name".to_string(), Value::String("Carol".to_string())),
1403    ///         ("email".to_string(), Value::String("carol@example.com".to_string())),
1404    ///     ]),
1405    ///     // ... more documents
1406    /// ];
1407    /// let ids = db.batch_insert("users", users).await?;  // 3x faster!
1408    /// println!("Inserted {} users", ids.len());
1409    /// ```
1410    pub async fn insert_into(&self, collection: &str, data: Vec<(&str, Value)>) -> Result<String> {
1411        // Convert Vec<(&str, Value)> to HashMap<String, Value>
1412        let data_map: HashMap<String, Value> =
1413            data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
1414
1415        // Validate unique constraints before inserting
1416        self.validate_unique_constraints(collection, &data_map)
1417            .await?;
1418
1419        let doc_id = Uuid::new_v4().to_string();
1420        let document = Document {
1421            id: doc_id.clone(),
1422            data: data_map,
1423        };
1424
1425        self.put(
1426            format!("{}:{}", collection, doc_id),
1427            serde_json::to_vec(&document)?,
1428            None,
1429        )?;
1430
1431        // Publish insert event
1432        let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
1433        let _ = self.pubsub.publish(event);
1434
1435        Ok(doc_id)
1436    }
1437
1438    pub async fn insert_map(
1439        &self,
1440        collection: &str,
1441        data: HashMap<String, Value>,
1442    ) -> Result<String> {
1443        // Validate unique constraints before inserting
1444        self.validate_unique_constraints(collection, &data).await?;
1445
1446        let doc_id = Uuid::new_v4().to_string();
1447        let document = Document {
1448            id: doc_id.clone(),
1449            data,
1450        };
1451
1452        self.put(
1453            format!("{}:{}", collection, doc_id),
1454            serde_json::to_vec(&document)?,
1455            None,
1456        )?;
1457
1458        // Publish insert event
1459        let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
1460        let _ = self.pubsub.publish(event);
1461
1462        Ok(doc_id)
1463    }
1464
1465    /// Batch insert multiple documents with optimized write path
1466    ///
1467    /// Inserts multiple documents in a single optimized operation, bypassing
1468    /// the write buffer for better performance. Ideal for bulk data loading,
1469    /// migrations, or initial database seeding. 3x faster than individual inserts.
1470    ///
1471    /// # Performance
1472    /// - Insert speed: ~50,000 docs/sec (vs ~15,000 for single inserts)
1473    /// - Batch writes to WAL and storage
1474    /// - Validates all unique constraints
1475    /// - Use for 10+ documents minimum
1476    ///
1477    /// # Arguments
1478    /// * `collection` - Name of the collection to insert into
1479    /// * `documents` - Vector of document data as HashMaps
1480    ///
1481    /// # Returns
1482    /// Vector of auto-generated document IDs or an error
1483    ///
1484    /// # Examples
1485    ///
1486    /// ```
1487    /// use aurora_db::{Aurora, types::Value};
1488    /// use std::collections::HashMap;
1489    ///
1490    /// let db = Aurora::open("mydb.db")?;
1491    ///
1492    /// // Bulk user import
1493    /// let users = vec![
1494    ///     HashMap::from([
1495    ///         ("name".to_string(), Value::String("Alice".into())),
1496    ///         ("email".to_string(), Value::String("alice@example.com".into())),
1497    ///         ("age".to_string(), Value::Int(28)),
1498    ///     ]),
1499    ///     HashMap::from([
1500    ///         ("name".to_string(), Value::String("Bob".into())),
1501    ///         ("email".to_string(), Value::String("bob@example.com".into())),
1502    ///         ("age".to_string(), Value::Int(32)),
1503    ///     ]),
1504    ///     HashMap::from([
1505    ///         ("name".to_string(), Value::String("Carol".into())),
1506    ///         ("email".to_string(), Value::String("carol@example.com".into())),
1507    ///         ("age".to_string(), Value::Int(25)),
1508    ///     ]),
1509    /// ];
1510    ///
1511    /// let ids = db.batch_insert("users", users).await?;
1512    /// println!("Inserted {} users", ids.len());
1513    ///
1514    /// // Seeding test data
1515    /// let test_products: Vec<HashMap<String, Value>> = (0..1000)
1516    ///     .map(|i| HashMap::from([
1517    ///         ("sku".to_string(), Value::String(format!("PROD-{:04}", i))),
1518    ///         ("price".to_string(), Value::Float(9.99 + i as f64)),
1519    ///         ("stock".to_string(), Value::Int(100)),
1520    ///     ]))
1521    ///     .collect();
1522    ///
1523    /// let ids = db.batch_insert("products", test_products).await?;
1524    /// // Much faster than 1000 individual insert_into() calls!
1525    ///
1526    /// // Migration from CSV data
1527    /// let mut csv_reader = csv::Reader::from_path("data.csv")?;
1528    /// let mut batch = Vec::new();
1529    ///
1530    /// for result in csv_reader.records() {
1531    ///     let record = result?;
1532    ///     let doc = HashMap::from([
1533    ///         ("field1".to_string(), Value::String(record[0].to_string())),
1534    ///         ("field2".to_string(), Value::String(record[1].to_string())),
1535    ///     ]);
1536    ///     batch.push(doc);
1537    ///
1538    ///     // Insert in batches of 1000
1539    ///     if batch.len() >= 1000 {
1540    ///         db.batch_insert("imported_data", batch.clone()).await?;
1541    ///         batch.clear();
1542    ///     }
1543    /// }
1544    ///
1545    /// // Insert remaining
1546    /// if !batch.is_empty() {
1547    ///     db.batch_insert("imported_data", batch).await?;
1548    /// }
1549    /// ```
1550    ///
1551    /// # Errors
1552    /// - `ValidationError`: Unique constraint violation on any document
1553    /// - `CollectionNotFound`: Collection doesn't exist
1554    /// - `IoError`: Storage write failure
1555    ///
1556    /// # Important Notes
1557    /// - All inserts are atomic - if one fails, none are inserted
1558    /// - UUIDs are auto-generated for all documents
1559    /// - PubSub events are published for each insert
1560    /// - For 10+ documents, this is 3x faster than individual inserts
1561    /// - For < 10 documents, use `insert_into()` instead
1562    ///
1563    /// # See Also
1564    /// - `insert_into()` for single document inserts
1565    /// - `import_from_json()` for file-based bulk imports
1566    /// - `batch_write()` for low-level batch operations
1567    pub async fn batch_insert(
1568        &self,
1569        collection: &str,
1570        documents: Vec<HashMap<String, Value>>,
1571    ) -> Result<Vec<String>> {
1572        let mut doc_ids = Vec::with_capacity(documents.len());
1573        let mut pairs = Vec::with_capacity(documents.len());
1574
1575        // Prepare all documents
1576        for data in documents {
1577            // Validate unique constraints
1578            self.validate_unique_constraints(collection, &data).await?;
1579
1580            let doc_id = Uuid::new_v4().to_string();
1581            let document = Document {
1582                id: doc_id.clone(),
1583                data,
1584            };
1585
1586            let key = format!("{}:{}", collection, doc_id);
1587            let value = serde_json::to_vec(&document)?;
1588
1589            pairs.push((key, value));
1590            doc_ids.push(doc_id);
1591        }
1592
1593        // Write to WAL in batch (if enabled)
1594        if let Some(ref wal) = self.wal
1595            && self.config.durability_mode != DurabilityMode::None {
1596                let mut wal_lock = wal.write().unwrap();
1597                for (key, value) in &pairs {
1598                    wal_lock.append(Operation::Put, key, Some(value))?;
1599                }
1600            }
1601
1602        // Bypass write buffer - go directly to cold storage batch API
1603        self.cold.batch_set(pairs.clone())?;
1604
1605        // Note: Durability is handled by background checkpoint process
1606
1607        // Update hot cache and indices
1608        for (key, value) in pairs {
1609            self.hot.set(key.clone(), value.clone(), None);
1610
1611            if let Some(collection_name) = key.split(':').next()
1612                && !collection_name.starts_with('_') {
1613                    self.index_value(collection_name, &key, &value)?;
1614                }
1615        }
1616
1617        // Publish events
1618        for doc_id in &doc_ids {
1619            if let Ok(Some(doc)) = self.get_document(collection, doc_id) {
1620                let event = crate::pubsub::ChangeEvent::insert(collection, doc_id, doc);
1621                let _ = self.pubsub.publish(event);
1622            }
1623        }
1624
1625        Ok(doc_ids)
1626    }
1627
1628    /// Update a document by ID
1629    ///
1630    /// # Arguments
1631    /// * `collection` - Collection name
1632    /// * `doc_id` - Document ID to update
1633    /// * `data` - New field values to set
1634    ///
1635    /// # Returns
1636    /// Ok(()) on success, or an error if the document doesn't exist
1637    ///
1638    /// # Examples
1639    ///
1640    /// ```
1641    /// db.update_document("users", &user_id, vec![
1642    ///     ("status", Value::String("active".to_string())),
1643    ///     ("last_login", Value::String(chrono::Utc::now().to_rfc3339())),
1644    /// ]).await?;
1645    /// ```
1646    pub async fn update_document(
1647        &self,
1648        collection: &str,
1649        doc_id: &str,
1650        updates: Vec<(&str, Value)>,
1651    ) -> Result<()> {
1652        // Get existing document
1653        let mut document = self
1654            .get_document(collection, doc_id)?
1655            .ok_or_else(|| AuroraError::NotFound(format!("Document not found: {}", doc_id)))?;
1656
1657        // Store old document for event
1658        let old_document = document.clone();
1659
1660        // Apply updates
1661        for (field, value) in updates {
1662            document.data.insert(field.to_string(), value);
1663        }
1664
1665        // Validate unique constraints after update (excluding current document)
1666        self.validate_unique_constraints_excluding(collection, &document.data, doc_id)
1667            .await?;
1668
1669        // Save updated document
1670        self.put(
1671            format!("{}:{}", collection, doc_id),
1672            serde_json::to_vec(&document)?,
1673            None,
1674        )?;
1675
1676        // Publish update event
1677        let event =
1678            crate::pubsub::ChangeEvent::update(collection, doc_id, old_document, document.clone());
1679        let _ = self.pubsub.publish(event);
1680
1681        Ok(())
1682    }
1683
1684    pub async fn get_all_collection(&self, collection: &str) -> Result<Vec<Document>> {
1685        self.ensure_indices_initialized().await?;
1686        self.scan_collection(collection)
1687    }
1688
1689    pub fn get_data_by_pattern(&self, pattern: &str) -> Result<Vec<(String, DataInfo)>> {
1690        let mut data = Vec::new();
1691
1692        // Scan from cold storage instead of primary index
1693        for result in self.cold.scan() {
1694            if let Ok((key, value)) = result {
1695                if key.contains(pattern) {
1696                    let info = if value.starts_with(b"BLOB:") {
1697                        DataInfo::Blob { size: value.len() }
1698                    } else {
1699                        DataInfo::Data {
1700                            size: value.len(),
1701                            preview: String::from_utf8_lossy(&value[..value.len().min(50)])
1702                                .into_owned(),
1703                        }
1704                    };
1705
1706                    data.push((key.clone(), info));
1707                }
1708            }
1709        }
1710
1711        Ok(data)
1712    }
1713
1714    /// Begin a transaction
1715    ///
1716    /// All operations after beginning a transaction will be part of the transaction
1717    /// until either commit_transaction() or rollback_transaction() is called.
1718    ///
1719    /// # Returns
1720    /// Success or an error (e.g., if a transaction is already in progress)
1721    ///
1722    /// # Examples
1723    ///
1724    /// ```
1725    /// // Start a transaction for atomic operations
1726    /// db.begin_transaction()?;
1727    ///
1728    /// // Perform multiple operations
1729    /// db.insert_into("accounts", vec![("user_id", Value::String(user_id)), ("balance", Value::Float(100.0))])?;
1730    /// db.insert_into("audit_log", vec![("action", Value::String("account_created".to_string()))])?;
1731    ///
1732    /// // Commit all changes or roll back if there's an error
1733    /// if all_ok {
1734    ///     db.commit_transaction()?;
1735    /// } else {
1736    ///     db.rollback_transaction()?;
1737    /// }
1738    /// ```
1739    /// Begin a new transaction for atomic operations
1740    ///
1741    /// Transactions ensure all-or-nothing execution: either all operations succeed,
1742    /// or none of them are applied. Perfect for maintaining data consistency.
1743    ///
1744    /// # Examples
1745    ///
1746    /// ```
1747    /// use aurora_db::{Aurora, types::Value};
1748    ///
1749    /// let db = Aurora::open("mydb.db")?;
1750    ///
1751    /// // Start transaction
1752    /// let tx_id = db.begin_transaction();
1753    ///
1754    /// // Perform multiple operations
1755    /// db.insert_into("accounts", vec![
1756    ///     ("user_id", Value::String("alice".into())),
1757    ///     ("balance", Value::Int(1000)),
1758    /// ]).await?;
1759    ///
1760    /// db.insert_into("accounts", vec![
1761    ///     ("user_id", Value::String("bob".into())),
1762    ///     ("balance", Value::Int(500)),
1763    ///     ])).await?;
1764    ///
1765    /// // Commit if all succeeded
1766    /// db.commit_transaction(tx_id)?;
1767    ///
1768    /// // Or rollback on error
1769    /// // db.rollback_transaction(tx_id)?;
1770    /// ```
1771    pub fn begin_transaction(&self) -> crate::transaction::TransactionId {
1772        let buffer = self.transaction_manager.begin();
1773        buffer.id
1774    }
1775
1776    /// Commit a transaction, making all changes permanent
1777    ///
1778    /// All operations within the transaction are atomically applied to the database.
1779    /// If any operation fails, none are applied.
1780    ///
1781    /// # Arguments
1782    /// * `tx_id` - Transaction ID returned from begin_transaction()
1783    ///
1784    /// # Examples
1785    ///
1786    /// ```
1787    /// use aurora_db::{Aurora, types::Value};
1788    ///
1789    /// let db = Aurora::open("mydb.db")?;
1790    ///
1791    /// // Transfer money between accounts
1792    /// let tx_id = db.begin_transaction();
1793    ///
1794    /// // Deduct from Alice
1795    /// db.update_document("accounts", "alice", vec![
1796    ///     ("balance", Value::Int(900)),  // Was 1000
1797    /// ]).await?;
1798    ///
1799    /// // Add to Bob
1800    /// db.update_document("accounts", "bob", vec![
1801    ///     ("balance", Value::Int(600)),  // Was 500
1802    /// ]).await?;
1803    ///
1804    /// // Both updates succeed - commit them
1805    /// db.commit_transaction(tx_id)?;
1806    ///
1807    /// println!("Transfer completed!");
1808    /// ```
1809    pub fn commit_transaction(&self, tx_id: crate::transaction::TransactionId) -> Result<()> {
1810        let buffer = self
1811            .transaction_manager
1812            .active_transactions
1813            .get(&tx_id)
1814            .ok_or_else(|| {
1815                AuroraError::InvalidOperation("Transaction not found or already completed".into())
1816            })?;
1817
1818        for item in buffer.writes.iter() {
1819            let key = item.key();
1820            let value = item.value();
1821            self.cold.set(key.clone(), value.clone())?;
1822            self.hot.set(key.clone(), value.clone(), None);
1823            if let Some(collection_name) = key.split(':').next()
1824                && !collection_name.starts_with('_') {
1825                    self.index_value(collection_name, key, value)?;
1826                }
1827        }
1828
1829        for item in buffer.deletes.iter() {
1830            let key = item.key();
1831            if let Some((collection, id)) = key.split_once(':')
1832                && let Ok(Some(doc)) = self.get_document(collection, id) {
1833                    self.remove_from_indices(collection, &doc)?;
1834                }
1835            self.cold.delete(key)?;
1836            self.hot.delete(key);
1837        }
1838
1839        // Drop the buffer reference to release the DashMap read lock
1840        // before calling commit which needs to remove the entry (write lock)
1841        drop(buffer);
1842
1843        self.transaction_manager.commit(tx_id)?;
1844
1845        self.cold.compact()?;
1846
1847        Ok(())
1848    }
1849
1850    /// Roll back a transaction, discarding all changes
1851    ///
1852    /// All operations within the transaction are discarded. The database state
1853    /// remains unchanged. Use this when an error occurs during transaction processing.
1854    ///
1855    /// # Arguments
1856    /// * `tx_id` - Transaction ID returned from begin_transaction()
1857    ///
1858    /// # Examples
1859    ///
1860    /// ```
1861    /// use aurora_db::{Aurora, types::Value};
1862    ///
1863    /// let db = Aurora::open("mydb.db")?;
1864    ///
1865    /// // Attempt a transfer with validation
1866    /// let tx_id = db.begin_transaction();
1867    ///
1868    /// let result = async {
1869    ///     // Deduct from Alice
1870    ///     let alice = db.get_document("accounts", "alice").await?;
1871    ///     let balance = alice.and_then(|doc| doc.data.get("balance"));
1872    ///
1873    ///     if let Some(Value::Int(bal)) = balance {
1874    ///         if *bal < 100 {
1875    ///             return Err("Insufficient funds");
1876    ///         }
1877    ///
1878    ///         db.update_document("accounts", "alice", vec![
1879    ///             ("balance", Value::Int(bal - 100)),
1880    ///         ]).await?;
1881    ///
1882    ///         db.update_document("accounts", "bob", vec![
1883    ///             ("balance", Value::Int(600)),
1884    ///         ]).await?;
1885    ///
1886    ///         Ok(())
1887    ///     } else {
1888    ///         Err("Account not found")
1889    ///     }
1890    /// }.await;
1891    ///
1892    /// match result {
1893    ///     Ok(_) => {
1894    ///         db.commit_transaction(tx_id)?;
1895    ///         println!("Transfer completed");
1896    ///     }
1897    ///     Err(e) => {
1898    ///         db.rollback_transaction(tx_id)?;
1899    ///         println!("Transfer failed: {}, changes rolled back", e);
1900    ///     }
1901    /// }
1902    /// ```
1903    pub fn rollback_transaction(&self, tx_id: crate::transaction::TransactionId) -> Result<()> {
1904        self.transaction_manager.rollback(tx_id)
1905    }
1906
1907    /// Create a secondary index on a field for faster queries
1908    ///
1909    /// Indexes dramatically improve query performance for frequently accessed fields,
1910    /// trading increased memory usage and slower writes for much faster reads.
1911    ///
1912    /// # When to Create Indexes
1913    /// - **Frequent queries**: Fields used in 80%+ of your queries
1914    /// - **High cardinality**: Fields with many unique values (user_id, email)
1915    /// - **Sorting/filtering**: Fields used in ORDER BY or WHERE clauses
1916    /// - **Large collections**: Most beneficial with 10,000+ documents
1917    ///
1918    /// # When NOT to Index
1919    /// - Low cardinality fields (e.g., boolean flags, small enums)
1920    /// - Rarely queried fields
1921    /// - Fields that change frequently (write-heavy workloads)
1922    /// - Small collections (<1,000 documents) - full scans are fast enough
1923    ///
1924    /// # Performance Characteristics
1925    /// - **Query speedup**: O(n) → O(1) for equality filters
1926    /// - **Memory cost**: ~100-200 bytes per document per index
1927    /// - **Write slowdown**: ~20-30% longer insert/update times
1928    /// - **Build time**: ~5,000 docs/sec for initial indexing
1929    ///
1930    /// # Arguments
1931    /// * `collection` - Name of the collection to index
1932    /// * `field` - Name of the field to index
1933    ///
1934    /// # Examples
1935    ///
1936    /// ```
1937    /// use aurora_db::Aurora;
1938    ///
1939    /// let db = Aurora::open("mydb.db")?;
1940    /// db.new_collection("users", vec![
1941    ///     ("email", FieldType::String),
1942    ///     ("age", FieldType::Int),
1943    ///     ("active", FieldType::Bool),
1944    /// ])?;
1945    ///
1946    /// // Index email - frequently queried, high cardinality
1947    /// db.create_index("users", "email").await?;
1948    ///
1949    /// // Now this query is FAST (O(1) instead of O(n))
1950    /// let user = db.query("users")
1951    ///     .filter(|f| f.eq("email", "alice@example.com"))
1952    ///     .first_one()
1953    ///     .await?;
1954    ///
1955    /// // DON'T index 'active' - low cardinality (only 2 values: true/false)
1956    /// // A full scan is fast enough for boolean fields
1957    ///
1958    /// // DO index 'age' if you frequently query age ranges
1959    /// db.create_index("users", "age").await?;
1960    ///
1961    /// let young_users = db.query("users")
1962    ///     .filter(|f| f.lt("age", 30))
1963    ///     .collect()
1964    ///     .await?;
1965    /// ```
1966    ///
1967    /// # Real-World Example: E-commerce Orders
1968    ///
1969    /// ```
1970    /// // Orders collection: 1 million documents
1971    /// db.new_collection("orders", vec![
1972    ///     ("user_id", FieldType::String),    // High cardinality
1973    ///     ("status", FieldType::String),      // Low cardinality (pending, shipped, delivered)
1974    ///     ("created_at", FieldType::String),
1975    ///     ("total", FieldType::Float),
1976    /// ])?;
1977    ///
1978    /// // Index user_id - queries like "show me my orders" are common
1979    /// db.create_index("orders", "user_id").await?;  // Good choice
1980    ///
1981    /// // Query speedup: 2.5s → 0.001s
1982    /// let my_orders = db.query("orders")
1983    ///     .filter(|f| f.eq("user_id", user_id))
1984    ///     .collect()
1985    ///     .await?;
1986    ///
1987    /// // DON'T index 'status' - only 3 possible values
1988    /// // Scanning 1M docs takes ~100ms, indexing won't help much
1989    ///
1990    /// // Index created_at if you frequently query recent orders
1991    /// db.create_index("orders", "created_at").await?;  // Good for time-based queries
1992    /// ```
1993    pub async fn create_index(&self, collection: &str, field: &str) -> Result<()> {
1994        // Check if collection exists
1995        if self.get(&format!("_collection:{}", collection))?.is_none() {
1996            return Err(AuroraError::CollectionNotFound(collection.to_string()));
1997        }
1998
1999        // Generate a default index name
2000        let index_name = format!("idx_{}_{}", collection, field);
2001
2002        // Create index definition
2003        let definition = IndexDefinition {
2004            name: index_name.clone(),
2005            collection: collection.to_string(),
2006            fields: vec![field.to_string()],
2007            index_type: IndexType::BTree,
2008            unique: false,
2009        };
2010
2011        // Create the index
2012        let index = Index::new(definition.clone());
2013
2014        // Index all existing documents in the collection
2015        let prefix = format!("{}:", collection);
2016        for result in self.cold.scan_prefix(&prefix) {
2017            if let Ok((_, data)) = result
2018                && let Ok(doc) = serde_json::from_slice::<Document>(&data) {
2019                    let _ = index.insert(&doc);
2020                }
2021        }
2022
2023        // Store the index
2024        self.indices.insert(index_name, index);
2025
2026        // Store the index definition for persistence
2027        let index_key = format!("_index:{}:{}", collection, field);
2028        self.put(index_key, serde_json::to_vec(&definition)?, None)?;
2029
2030        Ok(())
2031    }
2032
2033    /// Query documents in a collection with filtering, sorting, and pagination
2034    ///
2035    /// Returns a `QueryBuilder` that allows fluent chaining of query operations.
2036    /// Queries use early termination for LIMIT clauses, making them extremely fast
2037    /// even on large collections (6,800x faster than naive implementations).
2038    ///
2039    /// # Performance
2040    /// - With LIMIT: O(k) where k = limit + offset (early termination!)
2041    /// - Without LIMIT: O(n) where n = matching documents
2042    /// - Uses secondary indices when available for equality filters
2043    /// - Hot cache: ~1M reads/sec, Cold storage: ~500K reads/sec
2044    ///
2045    /// # Examples
2046    ///
2047    /// ```
2048    /// use aurora_db::{Aurora, types::Value};
2049    ///
2050    /// let db = Aurora::open("mydb.db")?;
2051    ///
2052    /// // Simple equality query
2053    /// let active_users = db.query("users")
2054    ///     .filter(|f| f.eq("active", Value::Bool(true)))
2055    ///     .collect()
2056    ///     .await?;
2057    ///
2058    /// // Range query with pagination (FAST - uses early termination!)
2059    /// let top_scorers = db.query("users")
2060    ///     .filter(|f| f.gt("score", Value::Int(1000)))
2061    ///     .order_by("score", false)  // descending
2062    ///     .limit(10)
2063    ///     .offset(20)
2064    ///     .collect()
2065    ///     .await?;
2066    ///
2067    /// // Multiple filters
2068    /// let premium_active = db.query("users")
2069    ///     .filter(|f| f.eq("tier", Value::String("premium".into())))
2070    ///     .filter(|f| f.eq("active", Value::Bool(true)))
2071    ///     .limit(100)  // Only scans ~200 docs, not all million!
2072    ///     .collect()
2073    ///     .await?;
2074    ///
2075    /// // Text search in a field
2076    /// let matching = db.query("articles")
2077    ///     .filter(|f| f.contains("title", "rust"))
2078    ///     .collect()
2079    ///     .await?;
2080    /// ```
2081    pub fn query<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
2082        QueryBuilder::new(self, collection)
2083    }
2084
2085    /// Create a search builder for full-text search
2086    ///
2087    /// # Arguments
2088    /// * `collection` - Name of the collection to search
2089    ///
2090    /// # Returns
2091    /// A `SearchBuilder` for configuring and executing searches
2092    ///
2093    /// # Examples
2094    ///
2095    /// ```
2096    /// // Search for documents containing text
2097    /// let search_results = db.search("articles")
2098    ///     .field("content")
2099    ///     .matching("quantum computing")
2100    ///     .fuzzy(true)  // Enable fuzzy matching for typo tolerance
2101    ///     .collect()
2102    ///     .await?;
2103    /// ```
2104    pub fn search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
2105        SearchBuilder::new(self, collection)
2106    }
2107
2108    /// Retrieve a document by ID
2109    ///
2110    /// Fast direct lookup when you know the document ID. Significantly faster
2111    /// than querying with filters when ID is known.
2112    ///
2113    /// # Performance
2114    /// - Hot cache: ~1,000,000 reads/sec (instant)
2115    /// - Cold storage: ~500,000 reads/sec (disk I/O)
2116    /// - Complexity: O(1) - constant time lookup
2117    /// - Much faster than `.query().filter(|f| f.eq("id", ...))` which is O(n)
2118    ///
2119    /// # Arguments
2120    /// * `collection` - Name of the collection to query
2121    /// * `id` - ID of the document to retrieve
2122    ///
2123    /// # Returns
2124    /// The document if found, None if not found, or an error
2125    ///
2126    /// # Examples
2127    ///
2128    /// ```
2129    /// use aurora_db::{Aurora, types::Value};
2130    ///
2131    /// let db = Aurora::open("mydb.db")?;
2132    ///
2133    /// // Basic retrieval
2134    /// if let Some(user) = db.get_document("users", &user_id)? {
2135    ///     println!("Found user: {}", user.id);
2136    ///
2137    ///     // Access fields safely
2138    ///     if let Some(Value::String(name)) = user.data.get("name") {
2139    ///         println!("Name: {}", name);
2140    ///     }
2141    ///
2142    ///     if let Some(Value::Int(age)) = user.data.get("age") {
2143    ///         println!("Age: {}", age);
2144    ///     }
2145    /// } else {
2146    ///     println!("User not found");
2147    /// }
2148    ///
2149    /// // Idiomatic error handling
2150    /// let user = db.get_document("users", &user_id)?
2151    ///     .ok_or_else(|| AuroraError::NotFound("User not found".into()))?;
2152    ///
2153    /// // Checking existence before operations
2154    /// if db.get_document("users", &user_id)?.is_some() {
2155    ///     db.update_document("users", &user_id, vec![
2156    ///         ("last_login", Value::String(chrono::Utc::now().to_rfc3339())),
2157    ///     ]).await?;
2158    /// }
2159    ///
2160    /// // Batch retrieval (fetch multiple by ID)
2161    /// let user_ids = vec!["user-1", "user-2", "user-3"];
2162    /// let users: Vec<Document> = user_ids.iter()
2163    ///     .filter_map(|id| db.get_document("users", id).ok().flatten())
2164    ///     .collect();
2165    ///
2166    /// println!("Found {} out of {} users", users.len(), user_ids.len());
2167    /// ```
2168    ///
2169    /// # When to Use
2170    /// - You know the document ID (from insert, previous query, or URL param)
2171    /// - Need fastest possible lookup (1M reads/sec)
2172    /// - Fetching a single document
2173    ///
2174    /// # When NOT to Use
2175    /// - Searching by other fields → Use `query().filter()` instead
2176    /// - Need multiple documents by criteria → Use `query().collect()` instead
2177    /// - Don't know the ID → Use `find_by_field()` or `query()` instead
2178    pub fn get_document(&self, collection: &str, id: &str) -> Result<Option<Document>> {
2179        let key = format!("{}:{}", collection, id);
2180        if let Some(data) = self.get(&key)? {
2181            Ok(Some(serde_json::from_slice(&data)?))
2182        } else {
2183            Ok(None)
2184        }
2185    }
2186
2187    /// Delete a document by ID
2188    ///
2189    /// Permanently removes a document from storage, cache, and all indices.
2190    /// Publishes a delete event for PubSub subscribers. This operation cannot be undone.
2191    ///
2192    /// # Performance
2193    /// - Delete speed: ~50,000 deletes/sec
2194    /// - Cleans up hot cache, cold storage, primary + secondary indices
2195    /// - Triggers PubSub events for listeners
2196    ///
2197    /// # Arguments
2198    /// * `key` - Full key in format "collection:id" (e.g., "users:123")
2199    ///
2200    /// # Returns
2201    /// Success or an error
2202    ///
2203    /// # Errors
2204    /// - `InvalidOperation`: Invalid key format (must be "collection:id")
2205    /// - `IoError`: Storage deletion failed
2206    ///
2207    /// # Examples
2208    ///
2209    /// ```
2210    /// use aurora_db::Aurora;
2211    ///
2212    /// let db = Aurora::open("mydb.db")?;
2213    ///
2214    /// // Basic deletion (note: requires "collection:id" format)
2215    /// db.delete("users:abc123").await?;
2216    ///
2217    /// // Delete with existence check
2218    /// let user_id = "user-456";
2219    /// if db.get_document("users", user_id)?.is_some() {
2220    ///     db.delete(&format!("users:{}", user_id)).await?;
2221    ///     println!("User deleted");
2222    /// } else {
2223    ///     println!("User not found");
2224    /// }
2225    ///
2226    /// // Error handling
2227    /// match db.delete("users:nonexistent").await {
2228    ///     Ok(_) => println!("Deleted successfully"),
2229    ///     Err(e) => println!("Delete failed: {}", e),
2230    /// }
2231    ///
2232    /// // Batch deletion using query
2233    /// let inactive_count = db.delete_where("users", |f| {
2234    ///     f.eq("active", Value::Bool(false))
2235    /// }).await?;
2236    /// println!("Deleted {} inactive users", inactive_count);
2237    ///
2238    /// // Delete with cascading (manual cascade pattern)
2239    /// let user_id = "user-123";
2240    ///
2241    /// // Delete user's orders first
2242    /// let orders = db.query("orders")
2243    ///     .filter(|f| f.eq("user_id", user_id))
2244    ///     .collect()
2245    ///     .await?;
2246    ///
2247    /// for order in orders {
2248    ///     db.delete(&format!("orders:{}", order.id)).await?;
2249    /// }
2250    ///
2251    /// // Then delete the user
2252    /// db.delete(&format!("users:{}", user_id)).await?;
2253    /// println!("User and all orders deleted");
2254    /// ```
2255    ///
2256    /// # Alternative: Soft Delete Pattern
2257    ///
2258    /// For recoverable deletions, use soft deletes instead:
2259    ///
2260    /// ```
2261    /// // Soft delete - mark as deleted instead of removing
2262    /// db.update_document("users", &user_id, vec![
2263    ///     ("deleted", Value::Bool(true)),
2264    ///     ("deleted_at", Value::String(chrono::Utc::now().to_rfc3339())),
2265    /// ]).await?;
2266    ///
2267    /// // Query excludes soft-deleted items
2268    /// let active_users = db.query("users")
2269    ///     .filter(|f| f.eq("deleted", Value::Bool(false)))
2270    ///     .collect()
2271    ///     .await?;
2272    ///
2273    /// // Later: hard delete after retention period
2274    /// let old_deletions = db.query("users")
2275    ///     .filter(|f| f.eq("deleted", Value::Bool(true)))
2276    ///     .filter(|f| f.lt("deleted_at", thirty_days_ago))
2277    ///     .collect()
2278    ///     .await?;
2279    ///
2280    /// for user in old_deletions {
2281    ///     db.delete(&format!("users:{}", user.id)).await?;
2282    /// }
2283    /// ```
2284    ///
2285    /// # Important Notes
2286    /// - Deletion is permanent - no undo/recovery
2287    /// - Consider soft deletes for recoverable operations
2288    /// - Use transactions for multi-document deletions
2289    /// - PubSub subscribers will receive delete events
2290    /// - All indices are automatically cleaned up
2291    pub async fn delete(&self, key: &str) -> Result<()> {
2292        // Extract collection and id from key (format: "collection:id")
2293        let (collection, id) = if let Some((coll, doc_id)) = key.split_once(':') {
2294            (coll, doc_id)
2295        } else {
2296            return Err(AuroraError::InvalidOperation(
2297                "Invalid key format, expected 'collection:id'".into(),
2298            ));
2299        };
2300
2301        // CRITICAL FIX: Get document BEFORE deletion to clean up secondary indices
2302        let document = self.get_document(collection, id)?;
2303
2304        // Delete from hot cache
2305        if self.hot.get(key).is_some() {
2306            self.hot.delete(key);
2307        }
2308
2309        // Delete from cold storage
2310        self.cold.delete(key)?;
2311
2312        // CRITICAL FIX: Clean up ALL indices (primary + secondary)
2313        if let Some(doc) = document {
2314            self.remove_from_indices(collection, &doc)?;
2315        } else {
2316            // Fallback: at least remove from primary index
2317            if let Some(index) = self.primary_indices.get_mut(collection) {
2318                index.remove(id);
2319            }
2320        }
2321
2322        // Publish delete event
2323        let event = crate::pubsub::ChangeEvent::delete(collection, id);
2324        let _ = self.pubsub.publish(event);
2325
2326        Ok(())
2327    }
2328
2329    pub async fn delete_collection(&self, collection: &str) -> Result<()> {
2330        let prefix = format!("{}:", collection);
2331
2332        // Get all keys in collection
2333        let keys: Vec<String> = self
2334            .cold
2335            .scan()
2336            .filter_map(|r| r.ok())
2337            .filter(|(k, _)| k.starts_with(&prefix))
2338            .map(|(k, _)| k)
2339            .collect();
2340
2341        // Delete each key
2342        for key in keys {
2343            self.delete(&key).await?;
2344        }
2345
2346        // Remove collection indices
2347        self.primary_indices.remove(collection);
2348        self.secondary_indices
2349            .retain(|k, _| !k.starts_with(&prefix));
2350
2351        // Invalidate schema cache
2352        self.schema_cache.remove(collection);
2353
2354        Ok(())
2355    }
2356
2357    fn remove_from_indices(&self, collection: &str, doc: &Document) -> Result<()> {
2358        // Remove from primary index
2359        if let Some(index) = self.primary_indices.get(collection) {
2360            index.remove(&doc.id);
2361        }
2362
2363        // Remove from secondary indices
2364        for (field, value) in &doc.data {
2365            let index_key = format!("{}:{}", collection, field);
2366            if let Some(index) = self.secondary_indices.get(&index_key)
2367                && let Some(mut doc_ids) = index.get_mut(&value.to_string()) {
2368                    doc_ids.retain(|id| id != &doc.id);
2369                }
2370        }
2371
2372        Ok(())
2373    }
2374
2375    pub async fn search_text(
2376        &self,
2377        collection: &str,
2378        field: &str,
2379        query: &str,
2380    ) -> Result<Vec<Document>> {
2381        let mut results = Vec::new();
2382        let docs = self.get_all_collection(collection).await?;
2383
2384        for doc in docs {
2385            if let Some(Value::String(text)) = doc.data.get(field)
2386                && text.to_lowercase().contains(&query.to_lowercase()) {
2387                    results.push(doc);
2388                }
2389        }
2390
2391        Ok(results)
2392    }
2393
2394    /// Export a collection to a JSON file
2395    ///
2396    /// Creates a JSON file containing all documents in the collection.
2397    /// Useful for backups, data migration, or sharing datasets.
2398    /// Automatically appends `.json` extension if not present.
2399    ///
2400    /// # Performance
2401    /// - Export speed: ~10,000 docs/sec
2402    /// - Scans entire collection from cold storage
2403    /// - Memory efficient: streams documents to file
2404    ///
2405    /// # Arguments
2406    /// * `collection` - Name of the collection to export
2407    /// * `output_path` - Path to the output JSON file (`.json` auto-appended)
2408    ///
2409    /// # Returns
2410    /// Success or an error
2411    ///
2412    /// # Examples
2413    ///
2414    /// ```
2415    /// use aurora_db::Aurora;
2416    ///
2417    /// let db = Aurora::open("mydb.db")?;
2418    ///
2419    /// // Basic export
2420    /// db.export_as_json("users", "./backups/users_2024-01-15")?;
2421    /// // Creates: ./backups/users_2024-01-15.json
2422    ///
2423    /// // Timestamped backup
2424    /// let timestamp = chrono::Utc::now().format("%Y%m%d_%H%M%S");
2425    /// let backup_path = format!("./backups/users_{}", timestamp);
2426    /// db.export_as_json("users", &backup_path)?;
2427    ///
2428    /// // Export multiple collections
2429    /// for collection in &["users", "orders", "products"] {
2430    ///     db.export_as_json(collection, &format!("./export/{}", collection))?;
2431    /// }
2432    /// ```
2433    ///
2434    /// # Output Format
2435    ///
2436    /// The exported JSON has this structure:
2437    /// ```json
2438    /// {
2439    ///   "users": [
2440    ///     { "id": "123", "name": "Alice", "email": "alice@example.com" },
2441    ///     { "id": "456", "name": "Bob", "email": "bob@example.com" }
2442    ///   ]
2443    /// }
2444    /// ```
2445    ///
2446    /// # See Also
2447    /// - `export_as_csv()` for CSV format export
2448    /// - `import_from_json()` to restore exported data
2449    pub fn export_as_json(&self, collection: &str, output_path: &str) -> Result<()> {
2450        let output_path = if !output_path.ends_with(".json") {
2451            format!("{}.json", output_path)
2452        } else {
2453            output_path.to_string()
2454        };
2455
2456        let mut docs = Vec::new();
2457
2458        // Get all documents from the specified collection
2459        for result in self.cold.scan() {
2460            let (key, value) = result?;
2461
2462            // Only process documents from the specified collection
2463            if let Some(key_collection) = key.split(':').next()
2464                && key_collection == collection && !key.starts_with("_collection:")
2465                    && let Ok(doc) = serde_json::from_slice::<Document>(&value) {
2466                        // Convert Value enum to raw JSON values
2467                        let mut clean_doc = serde_json::Map::new();
2468                        for (k, v) in doc.data {
2469                            match v {
2470                                Value::String(s) => clean_doc.insert(k, JsonValue::String(s)),
2471                                Value::Int(i) => clean_doc.insert(k, JsonValue::Number(i.into())),
2472                                Value::Float(f) => {
2473                                    if let Some(n) = serde_json::Number::from_f64(f) {
2474                                        clean_doc.insert(k, JsonValue::Number(n))
2475                                    } else {
2476                                        clean_doc.insert(k, JsonValue::Null)
2477                                    }
2478                                }
2479                                Value::Bool(b) => clean_doc.insert(k, JsonValue::Bool(b)),
2480                                Value::Array(arr) => {
2481                                    let clean_arr: Vec<JsonValue> = arr
2482                                        .into_iter()
2483                                        .map(|v| match v {
2484                                            Value::String(s) => JsonValue::String(s),
2485                                            Value::Int(i) => JsonValue::Number(i.into()),
2486                                            Value::Float(f) => serde_json::Number::from_f64(f)
2487                                                .map(JsonValue::Number)
2488                                                .unwrap_or(JsonValue::Null),
2489                                            Value::Bool(b) => JsonValue::Bool(b),
2490                                            Value::Null => JsonValue::Null,
2491                                            _ => JsonValue::Null,
2492                                        })
2493                                        .collect();
2494                                    clean_doc.insert(k, JsonValue::Array(clean_arr))
2495                                }
2496                                Value::Uuid(u) => {
2497                                    clean_doc.insert(k, JsonValue::String(u.to_string()))
2498                                }
2499                                Value::Null => clean_doc.insert(k, JsonValue::Null),
2500                                Value::Object(_) => None, // Handle nested objects if needed
2501                            };
2502                        }
2503                        docs.push(JsonValue::Object(clean_doc));
2504                    }
2505        }
2506
2507        let output = JsonValue::Object(serde_json::Map::from_iter(vec![(
2508            collection.to_string(),
2509            JsonValue::Array(docs),
2510        )]));
2511
2512        let mut file = StdFile::create(&output_path)?;
2513        serde_json::to_writer_pretty(&mut file, &output)?;
2514        println!("Exported collection '{}' to {}", collection, &output_path);
2515        Ok(())
2516    }
2517
2518    /// Export a collection to a CSV file
2519    ///
2520    /// Creates a CSV file with headers from the first document and rows for each document.
2521    /// Useful for spreadsheet analysis, data science workflows, or reporting.
2522    /// Automatically appends `.csv` extension if not present.
2523    ///
2524    /// # Performance
2525    /// - Export speed: ~8,000 docs/sec
2526    /// - Memory efficient: streams rows to file
2527    /// - Headers determined from first document
2528    ///
2529    /// # Arguments
2530    /// * `collection` - Name of the collection to export
2531    /// * `filename` - Path to the output CSV file (`.csv` auto-appended)
2532    ///
2533    /// # Returns
2534    /// Success or an error
2535    ///
2536    /// # Examples
2537    ///
2538    /// ```
2539    /// use aurora_db::Aurora;
2540    ///
2541    /// let db = Aurora::open("mydb.db")?;
2542    ///
2543    /// // Basic CSV export
2544    /// db.export_as_csv("users", "./reports/users")?;
2545    /// // Creates: ./reports/users.csv
2546    ///
2547    /// // Export for analysis in Excel/Google Sheets
2548    /// db.export_as_csv("orders", "./analytics/sales_data")?;
2549    ///
2550    /// // Monthly report generation
2551    /// let month = chrono::Utc::now().format("%Y-%m");
2552    /// db.export_as_csv("transactions", &format!("./reports/transactions_{}", month))?;
2553    /// ```
2554    ///
2555    /// # Output Format
2556    ///
2557    /// ```csv
2558    /// id,name,email,age
2559    /// 123,Alice,alice@example.com,28
2560    /// 456,Bob,bob@example.com,32
2561    /// ```
2562    ///
2563    /// # Important Notes
2564    /// - Headers are taken from the first document's fields
2565    /// - Documents with different fields will have empty values for missing fields
2566    /// - Nested objects/arrays are converted to strings
2567    /// - Best for flat document structures
2568    ///
2569    /// # See Also
2570    /// - `export_as_json()` for JSON format (better for nested data)
2571    /// - For complex nested structures, use JSON export instead
2572    pub fn export_as_csv(&self, collection: &str, filename: &str) -> Result<()> {
2573        let output_path = if !filename.ends_with(".csv") {
2574            format!("{}.csv", filename)
2575        } else {
2576            filename.to_string()
2577        };
2578
2579        let mut writer = csv::Writer::from_path(&output_path)?;
2580        let mut headers = Vec::new();
2581        let mut first_doc = true;
2582
2583        // Get all documents from the specified collection
2584        for result in self.cold.scan() {
2585            let (key, value) = result?;
2586
2587            // Only process documents from the specified collection
2588            if let Some(key_collection) = key.split(':').next()
2589                && key_collection == collection && !key.starts_with("_collection:")
2590                    && let Ok(doc) = serde_json::from_slice::<Document>(&value) {
2591                        // Write headers from first document
2592                        if first_doc && !doc.data.is_empty() {
2593                            headers = doc.data.keys().cloned().collect();
2594                            writer.write_record(&headers)?;
2595                            first_doc = false;
2596                        }
2597
2598                        // Write the document values
2599                        let values: Vec<String> = headers
2600                            .iter()
2601                            .map(|header| {
2602                                doc.data
2603                                    .get(header)
2604                                    .map(|v| v.to_string())
2605                                    .unwrap_or_default()
2606                            })
2607                            .collect();
2608                        writer.write_record(&values)?;
2609                    }
2610        }
2611
2612        writer.flush()?;
2613        println!("Exported collection '{}' to {}", collection, &output_path);
2614        Ok(())
2615    }
2616
2617    // Helper method to create filter-based queries
2618    pub fn find<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
2619        self.query(collection)
2620    }
2621
2622    // Convenience methods that build on top of the FilterBuilder
2623
2624    pub async fn find_by_id(&self, collection: &str, id: &str) -> Result<Option<Document>> {
2625        self.query(collection)
2626            .filter(|f| f.eq("id", id))
2627            .first_one()
2628            .await
2629    }
2630
2631    pub async fn find_one<F>(&self, collection: &str, filter_fn: F) -> Result<Option<Document>>
2632    where
2633        F: Fn(&FilterBuilder) -> bool + Send + Sync + 'static,
2634    {
2635        self.query(collection).filter(filter_fn).first_one().await
2636    }
2637
2638    pub async fn find_by_field<T: Into<Value> + Clone + Send + Sync + 'static>(
2639        &self,
2640        collection: &str,
2641        field: &'static str,
2642        value: T,
2643    ) -> Result<Vec<Document>> {
2644        let value_clone = value.clone();
2645        self.query(collection)
2646            .filter(move |f| f.eq(field, value_clone.clone()))
2647            .collect()
2648            .await
2649    }
2650
2651    pub async fn find_by_fields(
2652        &self,
2653        collection: &str,
2654        fields: Vec<(&str, Value)>,
2655    ) -> Result<Vec<Document>> {
2656        let mut query = self.query(collection);
2657
2658        for (field, value) in fields {
2659            let field_owned = field.to_owned();
2660            let value_owned = value.clone();
2661            query = query.filter(move |f| f.eq(&field_owned, value_owned.clone()));
2662        }
2663
2664        query.collect().await
2665    }
2666
2667    // Advanced example: find documents with a field value in a specific range
2668    pub async fn find_in_range<T: Into<Value> + Clone + Send + Sync + 'static>(
2669        &self,
2670        collection: &str,
2671        field: &'static str,
2672        min: T,
2673        max: T,
2674    ) -> Result<Vec<Document>> {
2675        self.query(collection)
2676            .filter(move |f| f.between(field, min.clone(), max.clone()))
2677            .collect()
2678            .await
2679    }
2680
2681    // Complex query example: build with multiple combined filters
2682    pub async fn find_complex<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
2683        self.query(collection)
2684    }
2685
2686    // Create a full-text search query with added filter options
2687    pub fn advanced_search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
2688        self.search(collection)
2689    }
2690
2691    // Utility methods for common operations
2692    pub async fn upsert(
2693        &self,
2694        collection: &str,
2695        id: &str,
2696        data: Vec<(&str, Value)>,
2697    ) -> Result<String> {
2698        // Convert Vec<(&str, Value)> to HashMap<String, Value>
2699        let data_map: HashMap<String, Value> =
2700            data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
2701
2702        // Check if document exists
2703        if let Some(mut doc) = self.get_document(collection, id)? {
2704            // Update existing document - merge new data
2705            for (key, value) in data_map {
2706                doc.data.insert(key, value);
2707            }
2708
2709            // Validate unique constraints for the updated document
2710            // We need to exclude the current document from the uniqueness check
2711            self.validate_unique_constraints_excluding(collection, &doc.data, id)
2712                .await?;
2713
2714            self.put(
2715                format!("{}:{}", collection, id),
2716                serde_json::to_vec(&doc)?,
2717                None,
2718            )?;
2719            Ok(id.to_string())
2720        } else {
2721            // Insert new document with specified ID - validate unique constraints
2722            self.validate_unique_constraints(collection, &data_map)
2723                .await?;
2724
2725            let document = Document {
2726                id: id.to_string(),
2727                data: data_map,
2728            };
2729
2730            self.put(
2731                format!("{}:{}", collection, id),
2732                serde_json::to_vec(&document)?,
2733                None,
2734            )?;
2735            Ok(id.to_string())
2736        }
2737    }
2738
2739    // Atomic increment/decrement
2740    pub async fn increment(
2741        &self,
2742        collection: &str,
2743        id: &str,
2744        field: &str,
2745        amount: i64,
2746    ) -> Result<i64> {
2747        if let Some(mut doc) = self.get_document(collection, id)? {
2748            // Get current value
2749            let current = match doc.data.get(field) {
2750                Some(Value::Int(i)) => *i,
2751                _ => 0,
2752            };
2753
2754            // Increment
2755            let new_value = current + amount;
2756            doc.data.insert(field.to_string(), Value::Int(new_value));
2757
2758            // Save changes
2759            self.put(
2760                format!("{}:{}", collection, id),
2761                serde_json::to_vec(&doc)?,
2762                None,
2763            )?;
2764
2765            Ok(new_value)
2766        } else {
2767            Err(AuroraError::NotFound(format!(
2768                "Document {}:{} not found",
2769                collection, id
2770            )))
2771        }
2772    }
2773
2774    // Delete documents by query
2775    pub async fn delete_by_query<F>(&self, collection: &str, filter_fn: F) -> Result<usize>
2776    where
2777        F: Fn(&FilterBuilder) -> bool + Send + Sync + 'static,
2778    {
2779        let docs = self.query(collection).filter(filter_fn).collect().await?;
2780
2781        let mut deleted_count = 0;
2782
2783        for doc in docs {
2784            let key = format!("{}:{}", collection, doc.id);
2785            self.delete(&key).await?;
2786            deleted_count += 1;
2787        }
2788
2789        Ok(deleted_count)
2790    }
2791
2792    /// Import documents from a JSON file into a collection
2793    ///
2794    /// Validates each document against the collection schema, skips duplicates (by ID),
2795    /// and provides detailed statistics about the import operation. Useful for restoring
2796    /// backups, migrating data, or seeding development databases.
2797    ///
2798    /// # Performance
2799    /// - Import speed: ~5,000 docs/sec (with validation)
2800    /// - Memory efficient: processes documents one at a time
2801    /// - Validates schema and unique constraints
2802    ///
2803    /// # Arguments
2804    /// * `collection` - Name of the collection to import into
2805    /// * `filename` - Path to the JSON file containing documents (array format)
2806    ///
2807    /// # Returns
2808    /// `ImportStats` containing counts of imported, skipped, and failed documents
2809    ///
2810    /// # Examples
2811    ///
2812    /// ```
2813    /// use aurora_db::Aurora;
2814    ///
2815    /// let db = Aurora::open("mydb.db")?;
2816    ///
2817    /// // Basic import
2818    /// let stats = db.import_from_json("users", "./data/new_users.json").await?;
2819    /// println!("Imported: {}, Skipped: {}, Failed: {}",
2820    ///     stats.imported, stats.skipped, stats.failed);
2821    ///
2822    /// // Restore from backup
2823    /// let backup_file = "./backups/users_2024-01-15.json";
2824    /// let stats = db.import_from_json("users", backup_file).await?;
2825    ///
2826    /// if stats.failed > 0 {
2827    ///     eprintln!("Warning: {} documents failed validation", stats.failed);
2828    /// }
2829    ///
2830    /// // Idempotent import - duplicates are skipped
2831    /// let stats = db.import_from_json("users", "./data/users.json").await?;
2832    /// // Running again will skip all existing documents
2833    /// let stats2 = db.import_from_json("users", "./data/users.json").await?;
2834    /// assert_eq!(stats2.skipped, stats.imported);
2835    ///
2836    /// // Migration from another system
2837    /// db.new_collection("products", vec![
2838    ///     ("sku", FieldType::String),
2839    ///     ("name", FieldType::String),
2840    ///     ("price", FieldType::Float),
2841    /// ])?;
2842    ///
2843    /// let stats = db.import_from_json("products", "./migration/products.json").await?;
2844    /// println!("Migration complete: {} products imported", stats.imported);
2845    /// ```
2846    ///
2847    /// # Expected JSON Format
2848    ///
2849    /// The JSON file should contain an array of document objects:
2850    /// ```json
2851    /// [
2852    ///   { "id": "123", "name": "Alice", "email": "alice@example.com" },
2853    ///   { "id": "456", "name": "Bob", "email": "bob@example.com" },
2854    ///   { "name": "Carol", "email": "carol@example.com" }
2855    /// ]
2856    /// ```
2857    ///
2858    /// # Behavior
2859    /// - Documents with existing IDs are skipped (duplicate detection)
2860    /// - Documents without IDs get auto-generated UUIDs
2861    /// - Schema validation is performed on all fields
2862    /// - Failed documents are counted but don't stop the import
2863    /// - Unique constraints are checked
2864    ///
2865    /// # See Also
2866    /// - `export_as_json()` to create compatible backup files
2867    /// - `batch_insert()` for programmatic bulk inserts
2868    pub async fn import_from_json(&self, collection: &str, filename: &str) -> Result<ImportStats> {
2869        // Validate that the collection exists
2870        let collection_def = self.get_collection_definition(collection)?;
2871
2872        // Load JSON file
2873        let json_string = read_to_string(filename)
2874            .await
2875            .map_err(|e| AuroraError::IoError(format!("Failed to read import file: {}", e)))?;
2876
2877        // Parse JSON
2878        let documents: Vec<JsonValue> = from_str(&json_string)
2879            .map_err(|e| AuroraError::SerializationError(format!("Failed to parse JSON: {}", e)))?;
2880
2881        let mut stats = ImportStats::default();
2882
2883        // Process each document
2884        for doc_json in documents {
2885            match self
2886                .import_document(collection, &collection_def, doc_json)
2887                .await
2888            {
2889                Ok(ImportResult::Imported) => stats.imported += 1,
2890                Ok(ImportResult::Skipped) => stats.skipped += 1,
2891                Err(_) => stats.failed += 1,
2892            }
2893        }
2894
2895        Ok(stats)
2896    }
2897
2898    /// Import a single document, performing schema validation and duplicate checking
2899    async fn import_document(
2900        &self,
2901        collection: &str,
2902        collection_def: &Collection,
2903        doc_json: JsonValue,
2904    ) -> Result<ImportResult> {
2905        if !doc_json.is_object() {
2906            return Err(AuroraError::InvalidOperation("Expected JSON object".into()));
2907        }
2908
2909        // Extract document ID if present
2910        let doc_id = doc_json
2911            .get("id")
2912            .and_then(|id| id.as_str())
2913            .map(|s| s.to_string())
2914            .unwrap_or_else(|| Uuid::new_v4().to_string());
2915
2916        // Check if document with this ID already exists
2917        if self.get_document(collection, &doc_id)?.is_some() {
2918            return Ok(ImportResult::Skipped);
2919        }
2920
2921        // Convert JSON to our document format and validate against schema
2922        let mut data_map = HashMap::new();
2923
2924        if let Some(obj) = doc_json.as_object() {
2925            for (field_name, field_def) in &collection_def.fields {
2926                if let Some(json_value) = obj.get(field_name) {
2927                    // Validate value against field type
2928                    if !self.validate_field_value(json_value, &field_def.field_type) {
2929                        return Err(AuroraError::InvalidOperation(format!(
2930                            "Field '{}' has invalid type",
2931                            field_name
2932                        )));
2933                    }
2934
2935                    // Convert JSON value to our Value type
2936                    let value = self.json_to_value(json_value)?;
2937                    data_map.insert(field_name.clone(), value);
2938                } else if field_def.unique {
2939                    // Missing required unique field
2940                    return Err(AuroraError::InvalidOperation(format!(
2941                        "Missing required unique field '{}'",
2942                        field_name
2943                    )));
2944                }
2945            }
2946        }
2947
2948        // Check for duplicates by unique fields
2949        let unique_fields = self.get_unique_fields(collection_def);
2950        for unique_field in &unique_fields {
2951            if let Some(value) = data_map.get(unique_field) {
2952                // Query for existing documents with this unique value
2953                let query_results = self
2954                    .query(collection)
2955                    .filter(move |f| f.eq(unique_field, value.clone()))
2956                    .limit(1)
2957                    .collect()
2958                    .await?;
2959
2960                if !query_results.is_empty() {
2961                    // Found duplicate by unique field
2962                    return Ok(ImportResult::Skipped);
2963                }
2964            }
2965        }
2966
2967        // Create and insert document
2968        let document = Document {
2969            id: doc_id,
2970            data: data_map,
2971        };
2972
2973        self.put(
2974            format!("{}:{}", collection, document.id),
2975            serde_json::to_vec(&document)?,
2976            None,
2977        )?;
2978
2979        Ok(ImportResult::Imported)
2980    }
2981
2982    /// Validate that a JSON value matches the expected field type
2983    fn validate_field_value(&self, value: &JsonValue, field_type: &FieldType) -> bool {
2984        match field_type {
2985            FieldType::String => value.is_string(),
2986            FieldType::Int => value.is_i64() || value.is_u64(),
2987            FieldType::Float => value.is_number(),
2988            FieldType::Bool => value.is_boolean(),
2989            FieldType::Array => value.is_array(),
2990            FieldType::Object => value.is_object(),
2991            FieldType::Uuid => {
2992                value.is_string() && Uuid::parse_str(value.as_str().unwrap_or("")).is_ok()
2993            }
2994        }
2995    }
2996
2997    /// Convert a JSON value to our internal Value type
2998    #[allow(clippy::only_used_in_recursion)]
2999    fn json_to_value(&self, json_value: &JsonValue) -> Result<Value> {
3000        match json_value {
3001            JsonValue::Null => Ok(Value::Null),
3002            JsonValue::Bool(b) => Ok(Value::Bool(*b)),
3003            JsonValue::Number(n) => {
3004                if let Some(i) = n.as_i64() {
3005                    Ok(Value::Int(i))
3006                } else if let Some(f) = n.as_f64() {
3007                    Ok(Value::Float(f))
3008                } else {
3009                    Err(AuroraError::InvalidOperation("Invalid number value".into()))
3010                }
3011            }
3012            JsonValue::String(s) => {
3013                // Try parsing as UUID first
3014                if let Ok(uuid) = Uuid::parse_str(s) {
3015                    Ok(Value::Uuid(uuid))
3016                } else {
3017                    Ok(Value::String(s.clone()))
3018                }
3019            }
3020            JsonValue::Array(arr) => {
3021                let mut values = Vec::new();
3022                for item in arr {
3023                    values.push(self.json_to_value(item)?);
3024                }
3025                Ok(Value::Array(values))
3026            }
3027            JsonValue::Object(obj) => {
3028                let mut map = HashMap::new();
3029                for (k, v) in obj {
3030                    map.insert(k.clone(), self.json_to_value(v)?);
3031                }
3032                Ok(Value::Object(map))
3033            }
3034        }
3035    }
3036
3037    /// Get collection definition
3038    fn get_collection_definition(&self, collection: &str) -> Result<Collection> {
3039        if let Some(data) = self.get(&format!("_collection:{}", collection))? {
3040            let collection_def: Collection = serde_json::from_slice(&data)?;
3041            Ok(collection_def)
3042        } else {
3043            Err(AuroraError::CollectionNotFound(collection.to_string()))
3044        }
3045    }
3046
3047    /// Get storage statistics and information about the database
3048    pub fn get_database_stats(&self) -> Result<DatabaseStats> {
3049        let hot_stats = self.hot.get_stats();
3050        let cold_stats = self.cold.get_stats()?;
3051
3052        Ok(DatabaseStats {
3053            hot_stats,
3054            cold_stats,
3055            estimated_size: self.cold.estimated_size(),
3056            collections: self.get_collection_stats()?,
3057        })
3058    }
3059
3060    /// Check if a key is currently stored in the hot cache
3061    pub fn is_in_hot_cache(&self, key: &str) -> bool {
3062        self.hot.is_hot(key)
3063    }
3064
3065    /// Start background cleanup of hot cache with specified interval
3066    pub async fn start_hot_cache_maintenance(&self, interval_secs: u64) {
3067        let hot_store = Arc::new(self.hot.clone());
3068        hot_store.start_cleanup_with_interval(interval_secs).await;
3069    }
3070
3071    /// Clear the hot cache (useful when memory needs to be freed)
3072    pub fn clear_hot_cache(&self) {
3073        self.hot.clear();
3074        println!(
3075            "Hot cache cleared, current hit ratio: {:.2}%",
3076            self.hot.hit_ratio() * 100.0
3077        );
3078    }
3079
3080    /// Prewarm the cache by loading frequently accessed data from cold storage
3081    ///
3082    /// Loads documents from a collection into memory cache to eliminate cold-start
3083    /// latency. Dramatically improves initial query performance after database startup
3084    /// by preloading the most commonly accessed data.
3085    ///
3086    /// # Performance Impact
3087    /// - Prewarming speed: ~20,000 docs/sec
3088    /// - Improves subsequent read latency from ~2ms (disk) to ~0.001ms (memory)
3089    /// - Cache hit rate jumps from 0% to 95%+ for prewarmed data
3090    /// - Memory cost: ~500 bytes per document average
3091    ///
3092    /// # Arguments
3093    /// * `collection` - The collection to prewarm
3094    /// * `limit` - Maximum number of documents to load (default: 1000, None = all)
3095    ///
3096    /// # Returns
3097    /// Number of documents loaded into cache
3098    ///
3099    /// # Examples
3100    ///
3101    /// ```
3102    /// use aurora_db::Aurora;
3103    ///
3104    /// let db = Aurora::open("mydb.db")?;
3105    ///
3106    /// // Prewarm frequently accessed collection
3107    /// let loaded = db.prewarm_cache("users", Some(1000)).await?;
3108    /// println!("Prewarmed {} user documents", loaded);
3109    ///
3110    /// // Now queries are fast from the start
3111    /// let stats_before = db.get_cache_stats();
3112    /// let users = db.query("users").collect().await?;
3113    /// let stats_after = db.get_cache_stats();
3114    ///
3115    /// // High hit rate thanks to prewarming
3116    /// assert!(stats_after.hit_rate > 0.95);
3117    ///
3118    /// // Startup optimization pattern
3119    /// async fn startup_prewarm(db: &Aurora) -> Result<()> {
3120    ///     println!("Prewarming caches...");
3121    ///
3122    ///     // Prewarm most frequently accessed collections
3123    ///     db.prewarm_cache("users", Some(5000)).await?;
3124    ///     db.prewarm_cache("sessions", Some(1000)).await?;
3125    ///     db.prewarm_cache("products", Some(500)).await?;
3126    ///
3127    ///     let stats = db.get_cache_stats();
3128    ///     println!("Cache prewarmed: {} entries loaded", stats.size);
3129    ///
3130    ///     Ok(())
3131    /// }
3132    ///
3133    /// // Web server startup
3134    /// #[tokio::main]
3135    /// async fn main() {
3136    ///     let db = Aurora::open("app.db").unwrap();
3137    ///
3138    ///     // Prewarm before accepting requests
3139    ///     db.prewarm_cache("users", Some(10000)).await.unwrap();
3140    ///
3141    ///     // Server is now ready with hot cache
3142    ///     start_web_server(db).await;
3143    /// }
3144    ///
3145    /// // Prewarm all documents (for small collections)
3146    /// let all_loaded = db.prewarm_cache("config", None).await?;
3147    /// // All config documents now in memory
3148    ///
3149    /// // Selective prewarming based on access patterns
3150    /// async fn smart_prewarm(db: &Aurora) -> Result<()> {
3151    ///     // Load recent users (they're accessed most)
3152    ///     db.prewarm_cache("users", Some(1000)).await?;
3153    ///
3154    ///     // Load active sessions only
3155    ///     let active_sessions = db.query("sessions")
3156    ///         .filter(|f| f.eq("active", Value::Bool(true)))
3157    ///         .limit(500)
3158    ///         .collect()
3159    ///         .await?;
3160    ///
3161    ///     // Manually populate cache with hot data
3162    ///     for session in active_sessions {
3163    ///         // Reading automatically caches
3164    ///         db.get_document("sessions", &session.id)?;
3165    ///     }
3166    ///
3167    ///     Ok(())
3168    /// }
3169    /// ```
3170    ///
3171    /// # Typical Prewarming Scenarios
3172    ///
3173    /// **Web Application Startup:**
3174    /// ```
3175    /// // Load user data, sessions, and active content
3176    /// db.prewarm_cache("users", Some(5000)).await?;
3177    /// db.prewarm_cache("sessions", Some(2000)).await?;
3178    /// db.prewarm_cache("posts", Some(1000)).await?;
3179    /// ```
3180    ///
3181    /// **E-commerce Site:**
3182    /// ```
3183    /// // Load products, categories, and user carts
3184    /// db.prewarm_cache("products", Some(500)).await?;
3185    /// db.prewarm_cache("categories", None).await?;  // All categories
3186    /// db.prewarm_cache("active_carts", Some(1000)).await?;
3187    /// ```
3188    ///
3189    /// **API Server:**
3190    /// ```
3191    /// // Load authentication data and rate limits
3192    /// db.prewarm_cache("api_keys", None).await?;
3193    /// db.prewarm_cache("rate_limits", Some(10000)).await?;
3194    /// ```
3195    ///
3196    /// # When to Use
3197    /// - At application startup to eliminate cold-start latency
3198    /// - After cache clear operations
3199    /// - Before high-traffic events (product launches, etc.)
3200    /// - When deploying new instances (load balancer warm-up)
3201    ///
3202    /// # Memory Considerations
3203    /// - 1,000 docs ≈ 500 KB memory
3204    /// - 10,000 docs ≈ 5 MB memory
3205    /// - 100,000 docs ≈ 50 MB memory
3206    /// - Stay within configured cache capacity
3207    ///
3208    /// # See Also
3209    /// - `get_cache_stats()` to monitor cache effectiveness
3210    /// - `prewarm_all_collections()` to prewarm all collections
3211    /// - `Aurora::with_config()` to adjust cache capacity
3212    pub async fn prewarm_cache(&self, collection: &str, limit: Option<usize>) -> Result<usize> {
3213        let limit = limit.unwrap_or(1000);
3214        let prefix = format!("{}:", collection);
3215        let mut loaded = 0;
3216
3217        for entry in self.cold.scan_prefix(&prefix) {
3218            if loaded >= limit {
3219                break;
3220            }
3221
3222            if let Ok((key, value)) = entry {
3223                // Load into hot cache
3224                self.hot.set(key.clone(), value, None);
3225                loaded += 1;
3226            }
3227        }
3228
3229        println!("Prewarmed {} with {} documents", collection, loaded);
3230        Ok(loaded)
3231    }
3232
3233    /// Prewarm cache for all collections
3234    pub async fn prewarm_all_collections(
3235        &self,
3236        docs_per_collection: Option<usize>,
3237    ) -> Result<HashMap<String, usize>> {
3238        let mut stats = HashMap::new();
3239
3240        // Get all collections
3241        let collections: Vec<String> = self
3242            .cold
3243            .scan()
3244            .filter_map(|r| r.ok())
3245            .map(|(k, _)| k)
3246            .filter(|k| k.starts_with("_collection:"))
3247            .map(|k| k.trim_start_matches("_collection:").to_string())
3248            .collect();
3249
3250        for collection in collections {
3251            let loaded = self.prewarm_cache(&collection, docs_per_collection).await?;
3252            stats.insert(collection, loaded);
3253        }
3254
3255        Ok(stats)
3256    }
3257
3258    /// Store multiple key-value pairs efficiently in a single batch operation
3259    ///
3260    /// Low-level batch write operation that bypasses document validation and
3261    /// writes raw byte data directly to storage. Useful for advanced use cases,
3262    /// custom serialization, or maximum performance scenarios.
3263    ///
3264    /// # Performance
3265    /// - Write speed: ~100,000 writes/sec
3266    /// - Single disk fsync for entire batch
3267    /// - No validation or schema checking
3268    /// - Direct storage access
3269    ///
3270    /// # Arguments
3271    /// * `pairs` - Vector of (key, value) tuples where value is raw bytes
3272    ///
3273    /// # Returns
3274    /// Success or an error
3275    ///
3276    /// # Examples
3277    ///
3278    /// ```
3279    /// use aurora_db::Aurora;
3280    ///
3281    /// let db = Aurora::open("mydb.db")?;
3282    ///
3283    /// // Low-level batch write
3284    /// let pairs = vec![
3285    ///     ("users:123".to_string(), b"raw data 1".to_vec()),
3286    ///     ("users:456".to_string(), b"raw data 2".to_vec()),
3287    ///     ("cache:key1".to_string(), b"cached value".to_vec()),
3288    /// ];
3289    ///
3290    /// db.batch_write(pairs)?;
3291    ///
3292    /// // Custom binary serialization
3293    /// use bincode;
3294    ///
3295    /// #[derive(Serialize, Deserialize)]
3296    /// struct CustomData {
3297    ///     id: u64,
3298    ///     payload: Vec<u8>,
3299    /// }
3300    ///
3301    /// let custom_data = vec![
3302    ///     CustomData { id: 1, payload: vec![1, 2, 3] },
3303    ///     CustomData { id: 2, payload: vec![4, 5, 6] },
3304    /// ];
3305    ///
3306    /// let pairs: Vec<(String, Vec<u8>)> = custom_data
3307    ///     .iter()
3308    ///     .map(|data| {
3309    ///         let key = format!("binary:{}", data.id);
3310    ///         let value = bincode::serialize(data).unwrap();
3311    ///         (key, value)
3312    ///     })
3313    ///     .collect();
3314    ///
3315    /// db.batch_write(pairs)?;
3316    ///
3317    /// // Bulk cache population
3318    /// let cache_entries: Vec<(String, Vec<u8>)> = (0..10000)
3319    ///     .map(|i| {
3320    ///         let key = format!("cache:item_{}", i);
3321    ///         let value = format!("value_{}", i).into_bytes();
3322    ///         (key, value)
3323    ///     })
3324    ///     .collect();
3325    ///
3326    /// db.batch_write(cache_entries)?;
3327    /// // Writes 10,000 entries in ~100ms
3328    /// ```
3329    ///
3330    /// # Important Notes
3331    /// - No schema validation performed
3332    /// - No unique constraint checking
3333    /// - No automatic indexing
3334    /// - Keys must follow "collection:id" format for proper grouping
3335    /// - Values are raw bytes - you handle serialization
3336    /// - Use `batch_insert()` for validated document inserts
3337    ///
3338    /// # When to Use
3339    /// - Maximum write performance needed
3340    /// - Custom serialization formats (bincode, msgpack, etc.)
3341    /// - Cache population
3342    /// - Low-level database operations
3343    /// - You're bypassing the document model
3344    ///
3345    /// # When NOT to Use
3346    /// - Regular document inserts → Use `batch_insert()` instead
3347    /// - Need validation → Use `batch_insert()` instead
3348    /// - Need indexing → Use `batch_insert()` instead
3349    ///
3350    /// # See Also
3351    /// - `batch_insert()` for validated document batch inserts
3352    /// - `put()` for single key-value writes
3353    pub fn batch_write(&self, pairs: Vec<(String, Vec<u8>)>) -> Result<()> {
3354        // Group pairs by collection name
3355        let mut collections: HashMap<String, Vec<(String, Vec<u8>)>> = HashMap::new();
3356        for (key, value) in &pairs {
3357            if let Some(collection_name) = key.split(':').next() {
3358                collections
3359                    .entry(collection_name.to_string())
3360                    .or_default()
3361                    .push((key.clone(), value.clone()));
3362            }
3363        }
3364
3365        // First, do the batch write to cold storage for all pairs
3366        self.cold.batch_set(pairs)?;
3367
3368        // Then, process each collection for in-memory updates
3369        for (collection_name, batch) in collections {
3370            // --- Optimized Batch Indexing ---
3371
3372            // 1. Get schema once for the entire collection batch
3373            let collection_obj = match self.schema_cache.get(&collection_name) {
3374                Some(cached_schema) => Arc::clone(cached_schema.value()),
3375                None => {
3376                    let collection_key = format!("_collection:{}", collection_name);
3377                    match self.get(&collection_key)? {
3378                        Some(data) => {
3379                            let obj: Collection = serde_json::from_slice(&data)?;
3380                            let arc_obj = Arc::new(obj);
3381                            self.schema_cache
3382                                .insert(collection_name.to_string(), Arc::clone(&arc_obj));
3383                            arc_obj
3384                        }
3385                        None => continue,
3386                    }
3387                }
3388            };
3389
3390            let indexed_fields: Vec<String> = collection_obj
3391                .fields
3392                .iter()
3393                .filter(|(_, def)| def.indexed || def.unique)
3394                .map(|(name, _)| name.clone())
3395                .collect();
3396
3397            let primary_index = self
3398                .primary_indices
3399                .entry(collection_name.to_string())
3400                .or_default();
3401
3402            for (key, value) in batch {
3403                // 2. Update hot cache
3404                self.hot.set(key.clone(), value.clone(), None);
3405
3406                // 3. Update primary index with metadata only
3407                let location = DiskLocation::new(value.len());
3408                primary_index.insert(key.clone(), location);
3409
3410                // 4. Update secondary indices
3411                if !indexed_fields.is_empty()
3412                    && let Ok(doc) = serde_json::from_slice::<Document>(&value) {
3413                        for (field, field_value) in doc.data {
3414                            if indexed_fields.contains(&field) {
3415                                let value_str = match &field_value {
3416                                    Value::String(s) => s.clone(),
3417                                    _ => field_value.to_string(),
3418                                };
3419                                let index_key = format!("{}:{}", collection_name, field);
3420                                let secondary_index = self
3421                                    .secondary_indices
3422                                    .entry(index_key)
3423                                    .or_default();
3424
3425                                let max_entries = self.config.max_index_entries_per_field;
3426                                secondary_index
3427                                    .entry(value_str)
3428                                    .and_modify(|doc_ids| {
3429                                        if doc_ids.len() < max_entries {
3430                                            doc_ids.push(key.to_string());
3431                                        }
3432                                    })
3433                                    .or_insert_with(|| vec![key.to_string()]);
3434                            }
3435                        }
3436                    }
3437            }
3438        }
3439
3440        Ok(())
3441    }
3442
3443    /// Scan for keys with a specific prefix
3444    pub fn scan_with_prefix(
3445        &self,
3446        prefix: &str,
3447    ) -> impl Iterator<Item = Result<(String, Vec<u8>)>> + '_ {
3448        self.cold.scan_prefix(prefix)
3449    }
3450
3451    /// Get storage efficiency metrics for the database
3452    pub fn get_collection_stats(&self) -> Result<HashMap<String, CollectionStats>> {
3453        let mut stats = HashMap::new();
3454
3455        // Scan all collections
3456        let collections: Vec<String> = self
3457            .cold
3458            .scan()
3459            .filter_map(|r| r.ok())
3460            .map(|(k, _)| k)
3461            .filter(|k| k.starts_with("_collection:"))
3462            .map(|k| k.trim_start_matches("_collection:").to_string())
3463            .collect();
3464
3465        for collection in collections {
3466            // Use primary index for fast stats (count + size from DiskLocation)
3467            let (count, size) = if let Some(index) = self.primary_indices.get(&collection) {
3468                let count = index.len();
3469                // Sum up sizes from DiskLocation metadata (much faster than disk scan)
3470                let size: usize = index.iter().map(|entry| entry.value().size as usize).sum();
3471                (count, size)
3472            } else {
3473                // Fallback: scan from cold storage if index not available
3474                let prefix = format!("{}:", collection);
3475                let count = self.cold.scan_prefix(&prefix).count();
3476                let size: usize = self
3477                    .cold
3478                    .scan_prefix(&prefix)
3479                    .filter_map(|r| r.ok())
3480                    .map(|(_, v)| v.len())
3481                    .sum();
3482                (count, size)
3483            };
3484
3485            stats.insert(
3486                collection,
3487                CollectionStats {
3488                    count,
3489                    size_bytes: size,
3490                    avg_doc_size: if count > 0 { size / count } else { 0 },
3491                },
3492            );
3493        }
3494
3495        Ok(stats)
3496    }
3497
3498    /// Search for documents by exact value using an index
3499    ///
3500    /// This method performs a fast lookup using a pre-created index
3501    pub fn search_by_value(
3502        &self,
3503        collection: &str,
3504        field: &str,
3505        value: &Value,
3506    ) -> Result<Vec<Document>> {
3507        let index_key = format!("_index:{}:{}", collection, field);
3508
3509        if let Some(index_data) = self.get(&index_key)? {
3510            let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
3511            let index = Index::new(index_def);
3512
3513            // Use the previously unused search method
3514            if let Some(doc_ids) = index.search(value) {
3515                // Load the documents by ID
3516                let mut docs = Vec::new();
3517                for id in doc_ids {
3518                    if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
3519                        let doc: Document = serde_json::from_slice(&doc_data)?;
3520                        docs.push(doc);
3521                    }
3522                }
3523                return Ok(docs);
3524            }
3525        }
3526
3527        // Return empty result if no index or no matches
3528        Ok(Vec::new())
3529    }
3530
3531    /// Perform a full-text search on an indexed text field
3532    ///
3533    /// This provides more advanced text search capabilities including
3534    /// relevance ranking of results
3535    pub fn full_text_search(
3536        &self,
3537        collection: &str,
3538        field: &str,
3539        query: &str,
3540    ) -> Result<Vec<Document>> {
3541        let index_key = format!("_index:{}:{}", collection, field);
3542
3543        if let Some(index_data) = self.get(&index_key)? {
3544            let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
3545
3546            // Ensure this is a full-text index
3547            if !matches!(index_def.index_type, IndexType::FullText) {
3548                return Err(AuroraError::InvalidOperation(format!(
3549                    "Field '{}' is not indexed as full-text",
3550                    field
3551                )));
3552            }
3553
3554            let index = Index::new(index_def);
3555
3556            // Use the previously unused search_text method
3557            if let Some(doc_id_scores) = index.search_text(query) {
3558                // Load the documents by ID, preserving score order
3559                let mut docs = Vec::new();
3560                for (id, _score) in doc_id_scores {
3561                    if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
3562                        let doc: Document = serde_json::from_slice(&doc_data)?;
3563                        docs.push(doc);
3564                    }
3565                }
3566                return Ok(docs);
3567            }
3568        }
3569
3570        // Return empty result if no index or no matches
3571        Ok(Vec::new())
3572    }
3573
3574    /// Create a full-text search index on a text field
3575    pub fn create_text_index(
3576        &self,
3577        collection: &str,
3578        field: &str,
3579        _enable_stop_words: bool,
3580    ) -> Result<()> {
3581        // Check if collection exists
3582        if self.get(&format!("_collection:{}", collection))?.is_none() {
3583            return Err(AuroraError::CollectionNotFound(collection.to_string()));
3584        }
3585
3586        // Create index definition
3587        let index_def = IndexDefinition {
3588            name: format!("{}_{}_fulltext", collection, field),
3589            collection: collection.to_string(),
3590            fields: vec![field.to_string()],
3591            index_type: IndexType::FullText,
3592            unique: false,
3593        };
3594
3595        // Store index definition
3596        let index_key = format!("_index:{}:{}", collection, field);
3597        self.put(index_key, serde_json::to_vec(&index_def)?, None)?;
3598
3599        // Create the actual index
3600        let index = Index::new(index_def);
3601
3602        // Index all existing documents in the collection
3603        let prefix = format!("{}:", collection);
3604        for (_, data) in self.cold.scan_prefix(&prefix).flatten() {
3605            let doc: Document = serde_json::from_slice(&data)?;
3606            index.insert(&doc)?;
3607        }
3608
3609        Ok(())
3610    }
3611
3612    pub async fn execute_simple_query(
3613        &self,
3614        builder: &SimpleQueryBuilder,
3615    ) -> Result<Vec<Document>> {
3616        // Ensure indices are initialized
3617        self.ensure_indices_initialized().await?;
3618
3619        // A place to store the IDs of the documents we need to fetch
3620        let mut doc_ids_to_load: Option<Vec<String>> = None;
3621
3622        // --- The "Query Planner" ---
3623        // Smart heuristic: For range queries with small LIMITs, full scan can be faster
3624        // than collecting millions of IDs from secondary index
3625        let use_index_for_range = if let Some(limit) = builder.limit {
3626            // If limit is small (< 1000), prefer full scan for range queries
3627            // The secondary index would scan all entries anyway, might as well
3628            // scan documents directly and benefit from early termination
3629            limit >= 1000
3630        } else {
3631            // No limit? Index might still help if result set is small
3632            true
3633        };
3634
3635        // Look for an opportunity to use an index
3636        for (_filter_idx, filter) in builder.filters.iter().enumerate() {
3637            match filter {
3638                Filter::Eq(field, value) => {
3639                    let index_key = format!("{}:{}", &builder.collection, field);
3640
3641                    // Do we have a secondary index for this field?
3642                    if let Some(index) = self.secondary_indices.get(&index_key) {
3643                        // Yes! Let's use it.
3644                        if let Some(matching_ids) = index.get(&value.to_string()) {
3645                            doc_ids_to_load = Some(matching_ids.clone());
3646                            break; // Stop searching for other indexes for now
3647                        }
3648                    }
3649                }
3650                Filter::Gt(field, value)
3651                | Filter::Gte(field, value)
3652                | Filter::Lt(field, value)
3653                | Filter::Lte(field, value) => {
3654                    // Skip index for range queries with small LIMITs (see query planner heuristic above)
3655                    if !use_index_for_range {
3656                        continue;
3657                    }
3658
3659                    let index_key = format!("{}:{}", &builder.collection, field);
3660
3661                    // Do we have a secondary index for this field?
3662                    if let Some(index) = self.secondary_indices.get(&index_key) {
3663                        // For range queries, we need to scan through the index values
3664                        let mut matching_ids = Vec::new();
3665
3666                        for entry in index.iter() {
3667                            let index_value_str = entry.key();
3668
3669                            // Try to parse the index value to compare with our filter value
3670                            if let Ok(index_value) =
3671                                self.parse_value_from_string(index_value_str, value)
3672                            {
3673                                let matches = match filter {
3674                                    Filter::Gt(_, filter_val) => index_value > *filter_val,
3675                                    Filter::Gte(_, filter_val) => index_value >= *filter_val,
3676                                    Filter::Lt(_, filter_val) => index_value < *filter_val,
3677                                    Filter::Lte(_, filter_val) => index_value <= *filter_val,
3678                                    _ => false,
3679                                };
3680
3681                                if matches {
3682                                    matching_ids.extend(entry.value().clone());
3683                                }
3684                            }
3685                        }
3686
3687                        if !matching_ids.is_empty() {
3688                            doc_ids_to_load = Some(matching_ids);
3689                            break;
3690                        }
3691                    }
3692                }
3693                Filter::Contains(field, search_term) => {
3694                    let index_key = format!("{}:{}", &builder.collection, field);
3695
3696                    // Do we have a secondary index for this field?
3697                    if let Some(index) = self.secondary_indices.get(&index_key) {
3698                        let mut matching_ids = Vec::new();
3699
3700                        for entry in index.iter() {
3701                            let index_value_str = entry.key();
3702
3703                            // Check if this indexed value contains our search term
3704                            if index_value_str
3705                                .to_lowercase()
3706                                .contains(&search_term.to_lowercase())
3707                            {
3708                                matching_ids.extend(entry.value().clone());
3709                            }
3710                        }
3711
3712                        if !matching_ids.is_empty() {
3713                            // Remove duplicates since a document could match multiple indexed values
3714                            matching_ids.sort();
3715                            matching_ids.dedup();
3716
3717                            doc_ids_to_load = Some(matching_ids);
3718                            break;
3719                        }
3720                    }
3721                }
3722            }
3723        }
3724
3725        let mut final_docs: Vec<Document>;
3726
3727        if let Some(ids) = doc_ids_to_load {
3728            // Index path
3729            use std::io::Write;
3730            if let Ok(mut file) = std::fs::OpenOptions::new()
3731                .create(true)
3732                .append(true)
3733                .open("/tmp/aurora_query_stats.log") {
3734                let _ = writeln!(file, "[INDEX PATH] IDs to load: {} | Collection: {}",
3735                    ids.len(), builder.collection);
3736            }
3737
3738            final_docs = Vec::with_capacity(ids.len());
3739
3740            for id in ids {
3741                let doc_key = format!("{}:{}", &builder.collection, id);
3742                if let Some(data) = self.get(&doc_key)?
3743                    && let Ok(doc) = serde_json::from_slice::<Document>(&data) {
3744                        final_docs.push(doc);
3745                    }
3746            }
3747        } else {
3748            // --- Path 2: Full Collection Scan with Early Termination ---
3749
3750            // Optimization: If we have a LIMIT but no ORDER BY, we can stop scanning
3751            // as soon as we have enough matching documents
3752            let early_termination_target = if builder.order_by.is_none() {
3753                builder.limit.map(|l| l + builder.offset.unwrap_or(0))
3754            } else {
3755                // With ORDER BY, we need all matching docs to sort correctly
3756                None
3757            };
3758
3759            // Smart scan with early termination support
3760            final_docs = Vec::new();
3761            let mut scan_stats = (0usize, 0usize, 0usize); // (keys_scanned, docs_fetched, matches_found)
3762
3763            if let Some(index) = self.primary_indices.get(&builder.collection) {
3764                for entry in index.iter() {
3765                    let key = entry.key();
3766                    scan_stats.0 += 1; // keys scanned
3767
3768                    // Early termination check
3769                    if let Some(target) = early_termination_target {
3770                        if final_docs.len() >= target {
3771                            break; // We have enough documents!
3772                        }
3773                    }
3774
3775                    // Fetch and filter document
3776                    if let Some(data) = self.get(key)? {
3777                        scan_stats.1 += 1; // docs fetched
3778
3779                        if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
3780                            // Apply all filters
3781                            let matches_all_filters = builder.filters.iter().all(|filter| {
3782                                match filter {
3783                                    Filter::Eq(field, value) => doc.data.get(field) == Some(value),
3784                                    Filter::Gt(field, value) => doc.data.get(field).is_some_and(|v| v > value),
3785                                    Filter::Gte(field, value) => doc.data.get(field).is_some_and(|v| v >= value),
3786                                    Filter::Lt(field, value) => doc.data.get(field).is_some_and(|v| v < value),
3787                                    Filter::Lte(field, value) => doc.data.get(field).is_some_and(|v| v <= value),
3788                                    Filter::Contains(field, value_str) => {
3789                                        doc.data.get(field).is_some_and(|v| match v {
3790                                            Value::String(s) => s.contains(value_str),
3791                                            Value::Array(arr) => arr.contains(&Value::String(value_str.clone())),
3792                                            _ => false,
3793                                        })
3794                                    }
3795                                }
3796                            });
3797
3798                            if matches_all_filters {
3799                                scan_stats.2 += 1; // matches found
3800                                final_docs.push(doc);
3801                            }
3802                        }
3803                    }
3804                }
3805
3806                // Debug logging for query performance analysis
3807                use std::io::Write;
3808                if let Ok(mut file) = std::fs::OpenOptions::new()
3809                    .create(true)
3810                    .append(true)
3811                    .open("/tmp/aurora_query_stats.log") {
3812                    let _ = writeln!(file, "[SCAN PATH] Scanned: {} keys | Fetched: {} docs | Matched: {} | Collection: {}",
3813                        scan_stats.0, scan_stats.1, scan_stats.2, builder.collection);
3814                }
3815            } else {
3816                // Fallback: scan from cold storage if index not initialized
3817                final_docs = self.get_all_collection(&builder.collection).await?;
3818
3819                // Apply filters
3820                final_docs.retain(|doc| {
3821                    builder.filters.iter().all(|filter| {
3822                        match filter {
3823                            Filter::Eq(field, value) => doc.data.get(field) == Some(value),
3824                            Filter::Gt(field, value) => doc.data.get(field).is_some_and(|v| v > value),
3825                            Filter::Gte(field, value) => doc.data.get(field).is_some_and(|v| v >= value),
3826                            Filter::Lt(field, value) => doc.data.get(field).is_some_and(|v| v < value),
3827                            Filter::Lte(field, value) => doc.data.get(field).is_some_and(|v| v <= value),
3828                            Filter::Contains(field, value_str) => {
3829                                doc.data.get(field).is_some_and(|v| match v {
3830                                    Value::String(s) => s.contains(value_str),
3831                                    Value::Array(arr) => arr.contains(&Value::String(value_str.clone())),
3832                                    _ => false,
3833                                })
3834                            }
3835                        }
3836                    })
3837                });
3838            }
3839        }
3840
3841        // Apply ordering
3842        if let Some((field, ascending)) = &builder.order_by {
3843            final_docs.sort_by(|a, b| match (a.data.get(field), b.data.get(field)) {
3844                (Some(v1), Some(v2)) => {
3845                    let cmp = v1.cmp(v2);
3846                    if *ascending {
3847                        cmp
3848                    } else {
3849                        cmp.reverse()
3850                    }
3851                }
3852                (None, Some(_)) => std::cmp::Ordering::Less,
3853                (Some(_), None) => std::cmp::Ordering::Greater,
3854                (None, None) => std::cmp::Ordering::Equal,
3855            });
3856        }
3857
3858        // Apply offset and limit
3859        let start = builder.offset.unwrap_or(0);
3860        let end = builder
3861            .limit
3862            .map(|l| start.saturating_add(l))
3863            .unwrap_or(final_docs.len());
3864
3865        let end = end.min(final_docs.len());
3866        Ok(final_docs.get(start..end).unwrap_or(&[]).to_vec())
3867    }
3868
3869    /// Helper method to parse a string value back to a Value for comparison
3870    fn parse_value_from_string(&self, value_str: &str, reference_value: &Value) -> Result<Value> {
3871        match reference_value {
3872            Value::Int(_) => {
3873                if let Ok(i) = value_str.parse::<i64>() {
3874                    Ok(Value::Int(i))
3875                } else {
3876                    Err(AuroraError::InvalidOperation("Failed to parse int".into()))
3877                }
3878            }
3879            Value::Float(_) => {
3880                if let Ok(f) = value_str.parse::<f64>() {
3881                    Ok(Value::Float(f))
3882                } else {
3883                    Err(AuroraError::InvalidOperation(
3884                        "Failed to parse float".into(),
3885                    ))
3886                }
3887            }
3888            Value::String(_) => Ok(Value::String(value_str.to_string())),
3889            _ => Ok(Value::String(value_str.to_string())),
3890        }
3891    }
3892
3893    pub async fn execute_dynamic_query(
3894        &self,
3895        collection: &str,
3896        payload: &QueryPayload,
3897    ) -> Result<Vec<Document>> {
3898        let mut docs = self.get_all_collection(collection).await?;
3899
3900        // 1. Apply Filters
3901        if let Some(filters) = &payload.filters {
3902            docs.retain(|doc| {
3903                filters.iter().all(|filter| {
3904                    doc.data
3905                        .get(&filter.field)
3906                        .is_some_and(|doc_val| check_filter(doc_val, filter))
3907                })
3908            });
3909        }
3910
3911        // 2. Apply Sorting
3912        if let Some(sort_options) = &payload.sort {
3913            docs.sort_by(|a, b| {
3914                let a_val = a.data.get(&sort_options.field);
3915                let b_val = b.data.get(&sort_options.field);
3916                let ordering = a_val
3917                    .partial_cmp(&b_val)
3918                    .unwrap_or(std::cmp::Ordering::Equal);
3919                if sort_options.ascending {
3920                    ordering
3921                } else {
3922                    ordering.reverse()
3923                }
3924            });
3925        }
3926
3927        // 3. Apply Pagination
3928        if let Some(offset) = payload.offset {
3929            docs = docs.into_iter().skip(offset).collect();
3930        }
3931        if let Some(limit) = payload.limit {
3932            docs = docs.into_iter().take(limit).collect();
3933        }
3934
3935        // 4. Apply Field Selection (Projection)
3936        if let Some(select_fields) = &payload.select
3937            && !select_fields.is_empty() {
3938                docs = docs
3939                    .into_iter()
3940                    .map(|mut doc| {
3941                        doc.data.retain(|key, _| select_fields.contains(key));
3942                        doc
3943                    })
3944                    .collect();
3945            }
3946
3947        Ok(docs)
3948    }
3949
3950    pub async fn process_network_request(
3951        &self,
3952        request: crate::network::protocol::Request,
3953    ) -> crate::network::protocol::Response {
3954        use crate::network::protocol::Response;
3955
3956        match request {
3957            crate::network::protocol::Request::Get(key) => match self.get(&key) {
3958                Ok(value) => Response::Success(value),
3959                Err(e) => Response::Error(e.to_string()),
3960            },
3961            crate::network::protocol::Request::Put(key, value) => {
3962                match self.put(key, value, None) {
3963                    Ok(_) => Response::Done,
3964                    Err(e) => Response::Error(e.to_string()),
3965                }
3966            }
3967            crate::network::protocol::Request::Delete(key) => match self.delete(&key).await {
3968                Ok(_) => Response::Done,
3969                Err(e) => Response::Error(e.to_string()),
3970            },
3971            crate::network::protocol::Request::NewCollection { name, fields } => {
3972                let fields_for_db: Vec<(String, crate::types::FieldType, bool)> = fields
3973                    .iter()
3974                    .map(|(name, ft, unique)| (name.clone(), ft.clone(), *unique))
3975                    .collect();
3976
3977                match self.new_collection(&name, fields_for_db) {
3978                    Ok(_) => Response::Done,
3979                    Err(e) => Response::Error(e.to_string()),
3980                }
3981            }
3982            crate::network::protocol::Request::Insert { collection, data } => {
3983                match self.insert_map(&collection, data).await {
3984                    Ok(id) => Response::Message(id),
3985                    Err(e) => Response::Error(e.to_string()),
3986                }
3987            }
3988            crate::network::protocol::Request::GetDocument { collection, id } => {
3989                match self.get_document(&collection, &id) {
3990                    Ok(doc) => Response::Document(doc),
3991                    Err(e) => Response::Error(e.to_string()),
3992                }
3993            }
3994            crate::network::protocol::Request::Query(builder) => {
3995                match self.execute_simple_query(&builder).await {
3996                    Ok(docs) => Response::Documents(docs),
3997                    Err(e) => Response::Error(e.to_string()),
3998                }
3999            }
4000            crate::network::protocol::Request::BeginTransaction => {
4001                let tx_id = self.begin_transaction();
4002                Response::TransactionId(tx_id.as_u64())
4003            }
4004            crate::network::protocol::Request::CommitTransaction(tx_id_u64) => {
4005                let tx_id = crate::transaction::TransactionId::from_u64(tx_id_u64);
4006                match self.commit_transaction(tx_id) {
4007                    Ok(_) => Response::Done,
4008                    Err(e) => Response::Error(e.to_string()),
4009                }
4010            }
4011            crate::network::protocol::Request::RollbackTransaction(tx_id_u64) => {
4012                let tx_id = crate::transaction::TransactionId::from_u64(tx_id_u64);
4013                match self.rollback_transaction(tx_id) {
4014                    Ok(_) => Response::Done,
4015                    Err(e) => Response::Error(e.to_string()),
4016                }
4017            }
4018        }
4019    }
4020
4021    /// Create indices for commonly queried fields automatically
4022    ///
4023    /// This is a convenience method that creates indices for fields that are
4024    /// likely to be queried frequently, improving performance.
4025    ///
4026    /// # Arguments
4027    /// * `collection` - Name of the collection
4028    /// * `fields` - List of field names to create indices for
4029    ///
4030    /// # Examples
4031    /// ```
4032    /// // Create indices for commonly queried fields
4033    /// db.create_indices("users", &["email", "status", "created_at"]).await?;
4034    /// ```
4035    pub async fn create_indices(&self, collection: &str, fields: &[&str]) -> Result<()> {
4036        for field in fields {
4037            if let Err(e) = self.create_index(collection, field).await {
4038                eprintln!(
4039                    "Warning: Failed to create index for {}.{}: {}",
4040                    collection, field, e
4041                );
4042            } else {
4043                println!("Created index for {}.{}", collection, field);
4044            }
4045        }
4046        Ok(())
4047    }
4048
4049    /// Get index statistics for a collection
4050    ///
4051    /// This helps understand which indices exist and how effective they are.
4052    pub fn get_index_stats(&self, collection: &str) -> HashMap<String, IndexStats> {
4053        let mut stats = HashMap::new();
4054
4055        for entry in self.secondary_indices.iter() {
4056            let key = entry.key();
4057            if key.starts_with(&format!("{}:", collection)) {
4058                let field = key.split(':').nth(1).unwrap_or("unknown");
4059                let index = entry.value();
4060
4061                let unique_values = index.len();
4062                let total_documents: usize = index.iter().map(|entry| entry.value().len()).sum();
4063
4064                stats.insert(
4065                    field.to_string(),
4066                    IndexStats {
4067                        unique_values,
4068                        total_documents,
4069                        avg_docs_per_value: if unique_values > 0 {
4070                            total_documents / unique_values
4071                        } else {
4072                            0
4073                        },
4074                    },
4075                );
4076            }
4077        }
4078
4079        stats
4080    }
4081
4082    /// Optimize a collection by creating indices for frequently filtered fields
4083    ///
4084    /// This analyzes common query patterns and suggests/creates optimal indices.
4085    pub async fn optimize_collection(&self, collection: &str) -> Result<()> {
4086        if let Ok(collection_def) = self.get_collection_definition(collection) {
4087            let field_names: Vec<&str> = collection_def.fields.keys().map(|s| s.as_str()).collect();
4088            self.create_indices(collection, &field_names).await?;
4089        }
4090
4091        Ok(())
4092    }
4093
4094    // Helper method to get unique fields from a collection
4095    fn get_unique_fields(&self, collection: &Collection) -> Vec<String> {
4096        collection
4097            .fields
4098            .iter()
4099            .filter(|(_, def)| def.unique)
4100            .map(|(name, _)| name.clone())
4101            .collect()
4102    }
4103
4104    // Update the validation method to use the helper
4105    async fn validate_unique_constraints(
4106        &self,
4107        collection: &str,
4108        data: &HashMap<String, Value>,
4109    ) -> Result<()> {
4110        self.ensure_indices_initialized().await?;
4111        let collection_def = self.get_collection_definition(collection)?;
4112        let unique_fields = self.get_unique_fields(&collection_def);
4113
4114        for unique_field in &unique_fields {
4115            if let Some(value) = data.get(unique_field) {
4116                let index_key = format!("{}:{}", collection, unique_field);
4117                if let Some(index) = self.secondary_indices.get(&index_key) {
4118                    // Get the raw string value without JSON formatting
4119                    let value_str = match value {
4120                        Value::String(s) => s.clone(),
4121                        _ => value.to_string(),
4122                    };
4123                    if index.contains_key(&value_str) {
4124                        return Err(AuroraError::UniqueConstraintViolation(
4125                            unique_field.clone(),
4126                            value_str,
4127                        ));
4128                    }
4129                }
4130            }
4131        }
4132        Ok(())
4133    }
4134
4135    /// Validate unique constraints excluding a specific document ID (for updates)
4136    async fn validate_unique_constraints_excluding(
4137        &self,
4138        collection: &str,
4139        data: &HashMap<String, Value>,
4140        exclude_id: &str,
4141    ) -> Result<()> {
4142        self.ensure_indices_initialized().await?;
4143        let collection_def = self.get_collection_definition(collection)?;
4144        let unique_fields = self.get_unique_fields(&collection_def);
4145
4146        for unique_field in &unique_fields {
4147            if let Some(value) = data.get(unique_field) {
4148                let index_key = format!("{}:{}", collection, unique_field);
4149                if let Some(index) = self.secondary_indices.get(&index_key) {
4150                    // Get the raw string value without JSON formatting
4151                    let value_str = match value {
4152                        Value::String(s) => s.clone(),
4153                        _ => value.to_string(),
4154                    };
4155                    if let Some(doc_ids) = index.get(&value_str) {
4156                        // Check if any document other than the excluded one has this value
4157                        let exclude_key = format!("{}:{}", collection, exclude_id);
4158                        for doc_key in doc_ids.value() {
4159                            if doc_key != &exclude_key {
4160                                return Err(AuroraError::UniqueConstraintViolation(
4161                                    unique_field.clone(),
4162                                    value_str,
4163                                ));
4164                            }
4165                        }
4166                    }
4167                }
4168            }
4169        }
4170        Ok(())
4171    }
4172}
4173
4174impl Drop for Aurora {
4175    fn drop(&mut self) {
4176        // Signal checkpoint task to shutdown gracefully
4177        if let Some(ref shutdown_tx) = self.checkpoint_shutdown {
4178            let _ = shutdown_tx.send(());
4179        }
4180        // Signal compaction task to shutdown gracefully
4181        if let Some(ref shutdown_tx) = self.compaction_shutdown {
4182            let _ = shutdown_tx.send(());
4183        }
4184    }
4185}
4186
4187fn check_filter(doc_val: &Value, filter: &HttpFilter) -> bool {
4188    let filter_val = match json_to_value(&filter.value) {
4189        Ok(v) => v,
4190        Err(_) => return false,
4191    };
4192
4193    match filter.operator {
4194        FilterOperator::Eq => doc_val == &filter_val,
4195        FilterOperator::Ne => doc_val != &filter_val,
4196        FilterOperator::Gt => doc_val > &filter_val,
4197        FilterOperator::Gte => doc_val >= &filter_val,
4198        FilterOperator::Lt => doc_val < &filter_val,
4199        FilterOperator::Lte => doc_val <= &filter_val,
4200        FilterOperator::Contains => match (doc_val, &filter_val) {
4201            (Value::String(s), Value::String(fv)) => s.contains(fv),
4202            (Value::Array(arr), _) => arr.contains(&filter_val),
4203            _ => false,
4204        },
4205    }
4206}
4207
4208/// Results of importing a document
4209enum ImportResult {
4210    Imported,
4211    Skipped,
4212}
4213
4214/// Statistics from an import operation
4215#[derive(Debug, Default)]
4216pub struct ImportStats {
4217    /// Number of documents successfully imported
4218    pub imported: usize,
4219    /// Number of documents skipped (usually because they already exist)
4220    pub skipped: usize,
4221    /// Number of documents that failed to import
4222    pub failed: usize,
4223}
4224
4225/// Statistics for a specific collection
4226#[derive(Debug)]
4227pub struct CollectionStats {
4228    /// Number of documents in the collection
4229    pub count: usize,
4230    /// Total size of the collection in bytes
4231    pub size_bytes: usize,
4232    /// Average document size in bytes
4233    pub avg_doc_size: usize,
4234}
4235
4236/// Statistics for an index
4237#[derive(Debug)]
4238pub struct IndexStats {
4239    /// Number of unique values in the index
4240    pub unique_values: usize,
4241    /// Total number of documents covered by the index
4242    pub total_documents: usize,
4243    /// Average number of documents per unique value
4244    pub avg_docs_per_value: usize,
4245}
4246
4247/// Combined database statistics
4248#[derive(Debug)]
4249pub struct DatabaseStats {
4250    /// Hot cache statistics
4251    pub hot_stats: crate::storage::hot::CacheStats,
4252    /// Cold storage statistics
4253    pub cold_stats: crate::storage::cold::ColdStoreStats,
4254    /// Estimated total database size in bytes
4255    pub estimated_size: u64,
4256    /// Statistics for each collection
4257    pub collections: HashMap<String, CollectionStats>,
4258}
4259
4260#[cfg(test)]
4261mod tests {
4262    use super::*;
4263    use tempfile::tempdir;
4264
4265    #[tokio::test]
4266    async fn test_basic_operations() -> Result<()> {
4267        let temp_dir = tempdir()?;
4268        let db_path = temp_dir.path().join("test.aurora");
4269        let db = Aurora::open(db_path.to_str().unwrap())?;
4270
4271        // Test collection creation
4272        db.new_collection(
4273            "users",
4274            vec![
4275                ("name", FieldType::String, false),
4276                ("age", FieldType::Int, false),
4277                ("email", FieldType::String, true),
4278            ],
4279        )?;
4280
4281        // Test document insertion
4282        let doc_id = db
4283            .insert_into(
4284                "users",
4285                vec![
4286                    ("name", Value::String("John Doe".to_string())),
4287                    ("age", Value::Int(30)),
4288                    ("email", Value::String("john@example.com".to_string())),
4289                ],
4290            )
4291            .await?;
4292
4293        // Test document retrieval
4294        let doc = db.get_document("users", &doc_id)?.unwrap();
4295        assert_eq!(
4296            doc.data.get("name").unwrap(),
4297            &Value::String("John Doe".to_string())
4298        );
4299        assert_eq!(doc.data.get("age").unwrap(), &Value::Int(30));
4300
4301        Ok(())
4302    }
4303
4304    #[tokio::test]
4305    async fn test_transactions() -> Result<()> {
4306        let temp_dir = tempdir()?;
4307        let db_path = temp_dir.path().join("test.aurora");
4308        let db = Aurora::open(db_path.to_str().unwrap())?;
4309
4310        // Create collection
4311        db.new_collection("test", vec![("field", FieldType::String, false)])?;
4312
4313        // Start transaction
4314        let tx_id = db.begin_transaction();
4315
4316        // Insert document
4317        let doc_id = db
4318            .insert_into("test", vec![("field", Value::String("value".to_string()))])
4319            .await?;
4320
4321        // Commit transaction
4322        db.commit_transaction(tx_id)?;
4323
4324        // Verify document exists
4325        let doc = db.get_document("test", &doc_id)?.unwrap();
4326        assert_eq!(
4327            doc.data.get("field").unwrap(),
4328            &Value::String("value".to_string())
4329        );
4330
4331        Ok(())
4332    }
4333
4334    #[tokio::test]
4335    async fn test_query_operations() -> Result<()> {
4336        let temp_dir = tempdir()?;
4337        let db_path = temp_dir.path().join("test.aurora");
4338        let db = Aurora::open(db_path.to_str().unwrap())?;
4339
4340        // Test collection creation
4341        db.new_collection(
4342            "books",
4343            vec![
4344                ("title", FieldType::String, false),
4345                ("author", FieldType::String, false),
4346                ("year", FieldType::Int, false),
4347            ],
4348        )?;
4349
4350        // Test document insertion
4351        db.insert_into(
4352            "books",
4353            vec![
4354                ("title", Value::String("Book 1".to_string())),
4355                ("author", Value::String("Author 1".to_string())),
4356                ("year", Value::Int(2020)),
4357            ],
4358        )
4359        .await?;
4360
4361        db.insert_into(
4362            "books",
4363            vec![
4364                ("title", Value::String("Book 2".to_string())),
4365                ("author", Value::String("Author 2".to_string())),
4366                ("year", Value::Int(2021)),
4367            ],
4368        )
4369        .await?;
4370
4371        // Test query
4372        let results = db
4373            .query("books")
4374            .filter(|f| f.gt("year", Value::Int(2019)))
4375            .order_by("year", true)
4376            .collect()
4377            .await?;
4378
4379        assert_eq!(results.len(), 2);
4380        assert!(results[0].data.get("year").unwrap() < results[1].data.get("year").unwrap());
4381
4382        Ok(())
4383    }
4384
4385    #[tokio::test]
4386    async fn test_blob_operations() -> Result<()> {
4387        let temp_dir = tempdir()?;
4388        let db_path = temp_dir.path().join("test.aurora");
4389        let db = Aurora::open(db_path.to_str().unwrap())?;
4390
4391        // Create test file
4392        let file_path = temp_dir.path().join("test.txt");
4393        std::fs::write(&file_path, b"Hello, World!")?;
4394
4395        // Test blob storage
4396        db.put_blob("test:blob".to_string(), &file_path).await?;
4397
4398        // Verify blob exists
4399        let data = db.get_data_by_pattern("test:blob")?;
4400        assert_eq!(data.len(), 1);
4401        match &data[0].1 {
4402            DataInfo::Blob { size } => assert_eq!(*size, 13 + 5), // content + "BLOB:" prefix
4403            _ => panic!("Expected Blob type"),
4404        }
4405
4406        Ok(())
4407    }
4408
4409    #[tokio::test]
4410    async fn test_blob_size_limit() -> Result<()> {
4411        let temp_dir = tempdir()?;
4412        let db_path = temp_dir.path().join("test.aurora");
4413        let db = Aurora::open(db_path.to_str().unwrap())?;
4414
4415        // Create a test file that's too large (201MB)
4416        let large_file_path = temp_dir.path().join("large_file.bin");
4417        let large_data = vec![0u8; 201 * 1024 * 1024];
4418        std::fs::write(&large_file_path, &large_data)?;
4419
4420        // Attempt to store the large file
4421        let result = db
4422            .put_blob("test:large_blob".to_string(), &large_file_path)
4423            .await;
4424
4425        assert!(result.is_err());
4426        assert!(matches!(
4427            result.unwrap_err(),
4428            AuroraError::InvalidOperation(_)
4429        ));
4430
4431        Ok(())
4432    }
4433
4434    #[tokio::test]
4435    async fn test_unique_constraints() -> Result<()> {
4436        let temp_dir = tempdir()?;
4437        let db_path = temp_dir.path().join("test.aurora");
4438        let db = Aurora::open(db_path.to_str().unwrap())?;
4439
4440        // Create collection with unique email field
4441        db.new_collection(
4442            "users",
4443            vec![
4444                ("name", FieldType::String, false),
4445                ("email", FieldType::String, true), // unique field
4446                ("age", FieldType::Int, false),
4447            ],
4448        )?;
4449
4450        // Insert first document
4451        let _doc_id1 = db
4452            .insert_into(
4453                "users",
4454                vec![
4455                    ("name", Value::String("John Doe".to_string())),
4456                    ("email", Value::String("john@example.com".to_string())),
4457                    ("age", Value::Int(30)),
4458                ],
4459            )
4460            .await?;
4461
4462        // Try to insert second document with same email - should fail
4463        let result = db
4464            .insert_into(
4465                "users",
4466                vec![
4467                    ("name", Value::String("Jane Doe".to_string())),
4468                    ("email", Value::String("john@example.com".to_string())), // duplicate email
4469                    ("age", Value::Int(25)),
4470                ],
4471            )
4472            .await;
4473
4474        assert!(result.is_err());
4475        if let Err(AuroraError::UniqueConstraintViolation(field, value)) = result {
4476            assert_eq!(field, "email");
4477            assert_eq!(value, "john@example.com");
4478        } else {
4479            panic!("Expected UniqueConstraintViolation error");
4480        }
4481
4482        // Test upsert with unique constraint
4483        // Should succeed for new document
4484        let _doc_id2 = db
4485            .upsert(
4486                "users",
4487                "user2",
4488                vec![
4489                    ("name", Value::String("Alice Smith".to_string())),
4490                    ("email", Value::String("alice@example.com".to_string())),
4491                    ("age", Value::Int(28)),
4492                ],
4493            )
4494            .await?;
4495
4496        // Should fail when trying to upsert with duplicate email
4497        let result = db
4498            .upsert(
4499                "users",
4500                "user3",
4501                vec![
4502                    ("name", Value::String("Bob Wilson".to_string())),
4503                    ("email", Value::String("alice@example.com".to_string())), // duplicate
4504                    ("age", Value::Int(35)),
4505                ],
4506            )
4507            .await;
4508
4509        assert!(result.is_err());
4510
4511        // Should succeed when updating existing document with same email (no change)
4512        let result = db
4513            .upsert(
4514                "users",
4515                "user2",
4516                vec![
4517                    ("name", Value::String("Alice Updated".to_string())),
4518                    ("email", Value::String("alice@example.com".to_string())), // same email, same doc
4519                    ("age", Value::Int(29)),
4520                ],
4521            )
4522            .await;
4523
4524        assert!(result.is_ok());
4525
4526        Ok(())
4527    }
4528}